diff --git a/sqlmesh/core/plan/stages.py b/sqlmesh/core/plan/stages.py index 194177b0cf..144e12c887 100644 --- a/sqlmesh/core/plan/stages.py +++ b/sqlmesh/core/plan/stages.py @@ -8,7 +8,6 @@ from sqlmesh.core.snapshot.definition import ( DeployabilityIndex, Snapshot, - SnapshotChangeCategory, SnapshotTableInfo, SnapshotId, Interval, @@ -251,18 +250,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]: deployability_index = DeployabilityIndex.all_deployable() snapshots_with_schema_migration = [ - s - for s in snapshots.values() - if s.is_paused - and s.is_model - and not s.is_symbolic - and ( - not deployability_index_for_creation.is_representative(s) - or ( - s.is_view - and s.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING - ) - ) + s for s in snapshots.values() if s.requires_schema_migration_in_prod ] snapshots_to_intervals = self._missing_intervals( diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 941ef6aae7..266a974821 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1352,6 +1352,21 @@ def expiration_ts(self) -> int: check_categorical_relative_expression=False, ) + @property + def requires_schema_migration_in_prod(self) -> bool: + """Returns whether or not this snapshot requires a schema migration when deployed to production.""" + return ( + self.is_paused + and self.is_model + and not self.is_symbolic + and ( + (self.previous_version and self.previous_version.version == self.version) + or self.model.forward_only + or bool(self.model.physical_version) + or self.is_view + ) + ) + @property def ttl_ms(self) -> int: return self.expiration_ts - self.updated_ts diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index f8aa08a075..bdbf76250f 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -918,11 +918,7 @@ def _migrate_snapshot( adapter: EngineAdapter, deployability_index: DeployabilityIndex, ) -> None: - if ( - not snapshot.is_paused - or not snapshot.is_model - or (deployability_index.is_representative(snapshot) and not snapshot.is_view) - ): + if not snapshot.requires_schema_migration_in_prod: return deployability_index = DeployabilityIndex.all_deployable() diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index bad05d0c30..17f26aee2e 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -1381,6 +1381,35 @@ def test_indirect_non_breaking_change_after_forward_only_in_dev(init_and_plan_co ) +@time_machine.travel("2023-01-08 15:00:00 UTC", tick=True) +def test_metadata_change_after_forward_only_results_in_migration(init_and_plan_context: t.Callable): + context, plan = init_and_plan_context("examples/sushi") + context.apply(plan) + + # Make a forward-only change + model = context.get_model("sushi.waiter_revenue_by_day") + model = model.copy(update={"kind": model.kind.copy(update={"forward_only": True})}) + model = add_projection_to_model(t.cast(SqlModel, model)) + context.upsert_model(model) + plan = context.plan("dev", skip_tests=True, auto_apply=True, no_prompts=True) + assert len(plan.new_snapshots) == 2 + assert all(s.change_category == SnapshotChangeCategory.FORWARD_ONLY for s in plan.new_snapshots) + + # Follow-up with a metadata change in the same environment + model = model.copy(update={"owner": "new_owner"}) + context.upsert_model(model) + plan = context.plan("dev", skip_tests=True, auto_apply=True, no_prompts=True) + assert len(plan.new_snapshots) == 2 + assert all(s.change_category == SnapshotChangeCategory.METADATA for s in plan.new_snapshots) + + # Deploy the latest change to prod + context.plan("prod", skip_tests=True, auto_apply=True, no_prompts=True) + + # Check that the new column was added in prod + columns = context.engine_adapter.columns("sushi.waiter_revenue_by_day") + assert "one" in columns + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_forward_only_precedence_over_indirect_non_breaking(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi") diff --git a/tests/core/test_plan_stages.py b/tests/core/test_plan_stages.py index 6e5a1fe43f..d79be24262 100644 --- a/tests/core/test_plan_stages.py +++ b/tests/core/test_plan_stages.py @@ -1211,93 +1211,6 @@ def test_build_plan_stages_environment_suffix_target_changed( ) -def test_build_plan_stages_indirect_non_breaking_no_migration( - snapshot_a: Snapshot, snapshot_b: Snapshot, make_snapshot, mocker: MockerFixture -) -> None: - # Categorize snapshot_a as forward-only - new_snapshot_a = make_snapshot( - snapshot_a.model.copy(update={"stamp": "new_version"}), - ) - new_snapshot_a.previous_versions = snapshot_a.all_versions - new_snapshot_a.categorize_as(SnapshotChangeCategory.NON_BREAKING) - - new_snapshot_b = make_snapshot( - snapshot_b.model.copy(), - nodes={'"a"': new_snapshot_a.model}, - ) - new_snapshot_b.previous_versions = snapshot_b.all_versions - new_snapshot_b.change_category = SnapshotChangeCategory.INDIRECT_NON_BREAKING - new_snapshot_b.version = new_snapshot_b.previous_version.data_version.version - - state_reader = mocker.Mock(spec=StateReader) - state_reader.get_snapshots.return_value = {} - existing_environment = Environment( - name="prod", - snapshots=[snapshot_a.table_info, snapshot_b.table_info], - start_at="2023-01-01", - end_at="2023-01-02", - plan_id="previous_plan", - previous_plan_id=None, - promoted_snapshot_ids=[snapshot_a.snapshot_id, snapshot_b.snapshot_id], - finalized_ts=to_timestamp("2023-01-02"), - ) - state_reader.get_environment.return_value = existing_environment - - # Create environment - environment = Environment( - name="prod", - snapshots=[new_snapshot_a.table_info, new_snapshot_b.table_info], - start_at="2023-01-01", - end_at="2023-01-02", - plan_id="test_plan", - previous_plan_id="previous_plan", - promoted_snapshot_ids=[new_snapshot_a.snapshot_id, new_snapshot_b.snapshot_id], - ) - - # Create evaluatable plan - plan = EvaluatablePlan( - start="2023-01-01", - end="2023-01-02", - new_snapshots=[new_snapshot_a, new_snapshot_b], - environment=environment, - no_gaps=False, - skip_backfill=False, - empty_backfill=False, - restatements={}, - is_dev=False, - allow_destructive_models=set(), - forward_only=False, - end_bounded=False, - ensure_finalized_snapshots=False, - directly_modified_snapshots=[new_snapshot_a.snapshot_id], - indirectly_modified_snapshots={ - new_snapshot_a.name: [new_snapshot_b.snapshot_id], - }, - metadata_updated_snapshots=[], - removed_snapshots=[], - requires_backfill=True, - models_to_backfill=None, - execution_time="2023-01-02", - disabled_restatement_models=set(), - environment_statements=None, - user_provided_flags=None, - ) - - # Build plan stages - stages = build_plan_stages(plan, state_reader, None) - - # Verify stages - assert len(stages) == 7 - - assert isinstance(stages[0], CreateSnapshotRecordsStage) - assert isinstance(stages[1], PhysicalLayerUpdateStage) - assert isinstance(stages[2], BackfillStage) - assert isinstance(stages[3], EnvironmentRecordUpdateStage) - assert isinstance(stages[4], UnpauseStage) - assert isinstance(stages[5], VirtualLayerUpdateStage) - assert isinstance(stages[6], FinalizeEnvironmentStage) - - def test_build_plan_stages_indirect_non_breaking_view_migration( snapshot_a: Snapshot, snapshot_c: Snapshot, make_snapshot, mocker: MockerFixture ) -> None: diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index c96ddf6e56..fc5df244b3 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -1180,6 +1180,7 @@ def columns(table_name): ) snapshot = make_snapshot(model, version="1") snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot.previous_versions = snapshot.all_versions evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) @@ -1217,6 +1218,7 @@ def test_migrate_missing_table(mocker: MockerFixture, make_snapshot): ) snapshot = make_snapshot(model, version="1") snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot.previous_versions = snapshot.all_versions evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) @@ -1714,6 +1716,7 @@ def columns(table_name): ) snapshot = make_snapshot(model, version="1") snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot.previous_versions = snapshot.all_versions with pytest.raises(NodeExecutionFailedError) as ex: evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) @@ -1735,6 +1738,7 @@ def columns(table_name): ) snapshot = make_snapshot(model, version="1") snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot.previous_versions = snapshot.all_versions logger = logging.getLogger("sqlmesh.core.snapshot.evaluator") with patch.object(logger, "warning") as mock_logger: @@ -3654,6 +3658,7 @@ def test_migrate_snapshot(snapshot: Snapshot, mocker: MockerFixture, adapter_moc new_snapshot = make_snapshot(updated_model) new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.previous_versions = snapshot.all_versions new_snapshot.version = snapshot.version assert new_snapshot.table_name() == snapshot.table_name() @@ -3724,6 +3729,7 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture): ) snapshot: Snapshot = make_snapshot(model) snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.previous_versions = snapshot.all_versions # no schema changes - no-op adapter_mock.get_alter_expressions.return_value = [] @@ -3925,6 +3931,7 @@ def columns(table_name): ) snapshot_1 = make_snapshot(model, version="1") snapshot_1.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot_1.previous_versions = snapshot_1.all_versions model_2 = SqlModel( name="test_schema.test_model_2", kind=IncrementalByTimeRangeKind( @@ -3935,6 +3942,7 @@ def columns(table_name): ) snapshot_2 = make_snapshot(model_2, version="1") snapshot_2.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot_2.previous_versions = snapshot_2.all_versions evaluator.migrate( [snapshot_1, snapshot_2], {}, deployability_index=DeployabilityIndex.none_deployable() )