Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions sqlmesh/core/plan/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from sqlmesh.core.snapshot.definition import (
DeployabilityIndex,
Snapshot,
SnapshotChangeCategory,
SnapshotTableInfo,
SnapshotId,
Interval,
Expand Down Expand Up @@ -251,18 +250,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
deployability_index = DeployabilityIndex.all_deployable()

snapshots_with_schema_migration = [
s
for s in snapshots.values()
if s.is_paused
and s.is_model
and not s.is_symbolic
and (
not deployability_index_for_creation.is_representative(s)
or (
s.is_view
and s.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING
)
)
s for s in snapshots.values() if s.requires_schema_migration_in_prod
]

snapshots_to_intervals = self._missing_intervals(
Expand Down
15 changes: 15 additions & 0 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,21 @@ def expiration_ts(self) -> int:
check_categorical_relative_expression=False,
)

@property
def requires_schema_migration_in_prod(self) -> bool:
"""Returns whether or not this snapshot requires a schema migration when deployed to production."""
return (
self.is_paused
and self.is_model
and not self.is_symbolic
and (
(self.previous_version and self.previous_version.version == self.version)
or self.model.forward_only
or bool(self.model.physical_version)
or self.is_view
)
)

@property
def ttl_ms(self) -> int:
return self.expiration_ts - self.updated_ts
Expand Down
6 changes: 1 addition & 5 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,11 +918,7 @@ def _migrate_snapshot(
adapter: EngineAdapter,
deployability_index: DeployabilityIndex,
) -> None:
if (
not snapshot.is_paused
or not snapshot.is_model
or (deployability_index.is_representative(snapshot) and not snapshot.is_view)
):
if not snapshot.requires_schema_migration_in_prod:
return

deployability_index = DeployabilityIndex.all_deployable()
Expand Down
29 changes: 29 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,35 @@ def test_indirect_non_breaking_change_after_forward_only_in_dev(init_and_plan_co
)


@time_machine.travel("2023-01-08 15:00:00 UTC", tick=True)
def test_metadata_change_after_forward_only_results_in_migration(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
context.apply(plan)

# Make a forward-only change
model = context.get_model("sushi.waiter_revenue_by_day")
model = model.copy(update={"kind": model.kind.copy(update={"forward_only": True})})
model = add_projection_to_model(t.cast(SqlModel, model))
context.upsert_model(model)
plan = context.plan("dev", skip_tests=True, auto_apply=True, no_prompts=True)
assert len(plan.new_snapshots) == 2
assert all(s.change_category == SnapshotChangeCategory.FORWARD_ONLY for s in plan.new_snapshots)

# Follow-up with a metadata change in the same environment
model = model.copy(update={"owner": "new_owner"})
context.upsert_model(model)
plan = context.plan("dev", skip_tests=True, auto_apply=True, no_prompts=True)
assert len(plan.new_snapshots) == 2
assert all(s.change_category == SnapshotChangeCategory.METADATA for s in plan.new_snapshots)

# Deploy the latest change to prod
context.plan("prod", skip_tests=True, auto_apply=True, no_prompts=True)

# Check that the new column was added in prod
columns = context.engine_adapter.columns("sushi.waiter_revenue_by_day")
assert "one" in columns


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_forward_only_precedence_over_indirect_non_breaking(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
Expand Down
87 changes: 0 additions & 87 deletions tests/core/test_plan_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,93 +1211,6 @@ def test_build_plan_stages_environment_suffix_target_changed(
)


def test_build_plan_stages_indirect_non_breaking_no_migration(
snapshot_a: Snapshot, snapshot_b: Snapshot, make_snapshot, mocker: MockerFixture
) -> None:
# Categorize snapshot_a as forward-only
new_snapshot_a = make_snapshot(
snapshot_a.model.copy(update={"stamp": "new_version"}),
)
new_snapshot_a.previous_versions = snapshot_a.all_versions
new_snapshot_a.categorize_as(SnapshotChangeCategory.NON_BREAKING)

new_snapshot_b = make_snapshot(
snapshot_b.model.copy(),
nodes={'"a"': new_snapshot_a.model},
)
new_snapshot_b.previous_versions = snapshot_b.all_versions
new_snapshot_b.change_category = SnapshotChangeCategory.INDIRECT_NON_BREAKING
new_snapshot_b.version = new_snapshot_b.previous_version.data_version.version

state_reader = mocker.Mock(spec=StateReader)
state_reader.get_snapshots.return_value = {}
existing_environment = Environment(
name="prod",
snapshots=[snapshot_a.table_info, snapshot_b.table_info],
start_at="2023-01-01",
end_at="2023-01-02",
plan_id="previous_plan",
previous_plan_id=None,
promoted_snapshot_ids=[snapshot_a.snapshot_id, snapshot_b.snapshot_id],
finalized_ts=to_timestamp("2023-01-02"),
)
state_reader.get_environment.return_value = existing_environment

# Create environment
environment = Environment(
name="prod",
snapshots=[new_snapshot_a.table_info, new_snapshot_b.table_info],
start_at="2023-01-01",
end_at="2023-01-02",
plan_id="test_plan",
previous_plan_id="previous_plan",
promoted_snapshot_ids=[new_snapshot_a.snapshot_id, new_snapshot_b.snapshot_id],
)

# Create evaluatable plan
plan = EvaluatablePlan(
start="2023-01-01",
end="2023-01-02",
new_snapshots=[new_snapshot_a, new_snapshot_b],
environment=environment,
no_gaps=False,
skip_backfill=False,
empty_backfill=False,
restatements={},
is_dev=False,
allow_destructive_models=set(),
forward_only=False,
end_bounded=False,
ensure_finalized_snapshots=False,
directly_modified_snapshots=[new_snapshot_a.snapshot_id],
indirectly_modified_snapshots={
new_snapshot_a.name: [new_snapshot_b.snapshot_id],
},
metadata_updated_snapshots=[],
removed_snapshots=[],
requires_backfill=True,
models_to_backfill=None,
execution_time="2023-01-02",
disabled_restatement_models=set(),
environment_statements=None,
user_provided_flags=None,
)

# Build plan stages
stages = build_plan_stages(plan, state_reader, None)

# Verify stages
assert len(stages) == 7

assert isinstance(stages[0], CreateSnapshotRecordsStage)
assert isinstance(stages[1], PhysicalLayerUpdateStage)
assert isinstance(stages[2], BackfillStage)
assert isinstance(stages[3], EnvironmentRecordUpdateStage)
assert isinstance(stages[4], UnpauseStage)
assert isinstance(stages[5], VirtualLayerUpdateStage)
assert isinstance(stages[6], FinalizeEnvironmentStage)


def test_build_plan_stages_indirect_non_breaking_view_migration(
snapshot_a: Snapshot, snapshot_c: Snapshot, make_snapshot, mocker: MockerFixture
) -> None:
Expand Down
8 changes: 8 additions & 0 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ def columns(table_name):
)
snapshot = make_snapshot(model, version="1")
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
snapshot.previous_versions = snapshot.all_versions

evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())

Expand Down Expand Up @@ -1217,6 +1218,7 @@ def test_migrate_missing_table(mocker: MockerFixture, make_snapshot):
)
snapshot = make_snapshot(model, version="1")
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
snapshot.previous_versions = snapshot.all_versions

evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())

Expand Down Expand Up @@ -1714,6 +1716,7 @@ def columns(table_name):
)
snapshot = make_snapshot(model, version="1")
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
snapshot.previous_versions = snapshot.all_versions

with pytest.raises(NodeExecutionFailedError) as ex:
evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
Expand All @@ -1735,6 +1738,7 @@ def columns(table_name):
)
snapshot = make_snapshot(model, version="1")
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
snapshot.previous_versions = snapshot.all_versions

logger = logging.getLogger("sqlmesh.core.snapshot.evaluator")
with patch.object(logger, "warning") as mock_logger:
Expand Down Expand Up @@ -3654,6 +3658,7 @@ def test_migrate_snapshot(snapshot: Snapshot, mocker: MockerFixture, adapter_moc

new_snapshot = make_snapshot(updated_model)
new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
new_snapshot.previous_versions = snapshot.all_versions
new_snapshot.version = snapshot.version

assert new_snapshot.table_name() == snapshot.table_name()
Expand Down Expand Up @@ -3724,6 +3729,7 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture):
)
snapshot: Snapshot = make_snapshot(model)
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
snapshot.previous_versions = snapshot.all_versions

# no schema changes - no-op
adapter_mock.get_alter_expressions.return_value = []
Expand Down Expand Up @@ -3925,6 +3931,7 @@ def columns(table_name):
)
snapshot_1 = make_snapshot(model, version="1")
snapshot_1.change_category = SnapshotChangeCategory.FORWARD_ONLY
snapshot_1.previous_versions = snapshot_1.all_versions
model_2 = SqlModel(
name="test_schema.test_model_2",
kind=IncrementalByTimeRangeKind(
Expand All @@ -3935,6 +3942,7 @@ def columns(table_name):
)
snapshot_2 = make_snapshot(model_2, version="1")
snapshot_2.change_category = SnapshotChangeCategory.FORWARD_ONLY
snapshot_2.previous_versions = snapshot_2.all_versions
evaluator.migrate(
[snapshot_1, snapshot_2], {}, deployability_index=DeployabilityIndex.none_deployable()
)
Expand Down