diff --git a/sqlmesh/core/plan/stages.py b/sqlmesh/core/plan/stages.py index 0d829a6739..7e3275e4e0 100644 --- a/sqlmesh/core/plan/stages.py +++ b/sqlmesh/core/plan/stages.py @@ -270,7 +270,6 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]: } after_promote_snapshots = all_selected_for_backfill_snapshots - before_promote_snapshots deployability_index = DeployabilityIndex.all_deployable() - snapshots_with_schema_migration = [ s for s in snapshots.values() if s.requires_schema_migration_in_prod ] diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 9522366721..8f85f26a58 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1483,7 +1483,7 @@ def requires_schema_migration_in_prod(self) -> bool: return ( self.is_paused and self.is_model - and self.is_materialized + and not self.is_symbolic and ( (self.previous_version and self.previous_version.version == self.version) or self.model.forward_only diff --git a/tests/core/test_plan_stages.py b/tests/core/test_plan_stages.py index 4ada7d458d..63527a2f78 100644 --- a/tests/core/test_plan_stages.py +++ b/tests/core/test_plan_stages.py @@ -1659,16 +1659,17 @@ def test_build_plan_stages_indirect_non_breaking_view_migration( stages = build_plan_stages(plan, state_reader, None) # Verify stages - assert len(stages) == 8 + assert len(stages) == 9 assert isinstance(stages[0], CreateSnapshotRecordsStage) assert isinstance(stages[1], PhysicalLayerSchemaCreationStage) assert isinstance(stages[2], BackfillStage) assert isinstance(stages[3], EnvironmentRecordUpdateStage) - assert isinstance(stages[4], UnpauseStage) - assert isinstance(stages[5], BackfillStage) - assert isinstance(stages[6], VirtualLayerUpdateStage) - assert isinstance(stages[7], FinalizeEnvironmentStage) + assert isinstance(stages[4], MigrateSchemasStage) + assert isinstance(stages[5], UnpauseStage) + assert isinstance(stages[6], BackfillStage) + assert isinstance(stages[7], VirtualLayerUpdateStage) + assert isinstance(stages[8], FinalizeEnvironmentStage) def test_build_plan_stages_virtual_environment_mode_filtering( @@ -1936,6 +1937,76 @@ def test_build_plan_stages_virtual_environment_mode_no_updates( assert len(virtual_stages) == 0 +def test_build_plan_stages_virtual_environment_mode_metadata_only_changes( + mocker: MockerFixture, +) -> None: + snapshot_view: Snapshot = Snapshot.from_node( + SqlModel( + name="dev_only_view", + query=parse_one("select 1 as id"), + kind=dict(name=ModelKindName.VIEW), + virtual_environment_mode=VirtualEnvironmentMode.DEV_ONLY, + ), + nodes={}, + ttl="in 1 week", + ) + snapshot_view.categorize_as(SnapshotChangeCategory.METADATA) + + state_reader = mocker.Mock(spec=StateReader) + state_reader.get_snapshots.return_value = {} + state_reader.get_environment.return_value = None + + # Production environment with the view promoted + environment = Environment( + name="prod", + snapshots=[snapshot_view.table_info], + start_at="2023-01-01", + end_at="2023-01-02", + plan_id="test_plan", + previous_plan_id=None, + promoted_snapshot_ids=[snapshot_view.snapshot_id], + ) + + plan = EvaluatablePlan( + start="2023-01-01", + end="2023-01-02", + new_snapshots=[snapshot_view], + environment=environment, + no_gaps=False, + skip_backfill=True, + empty_backfill=False, + restatements={}, + is_dev=False, + allow_destructive_models=set(), + allow_additive_models=set(), + forward_only=False, + end_bounded=True, + ensure_finalized_snapshots=False, + ignore_cron=False, + directly_modified_snapshots=[snapshot_view.snapshot_id], + indirectly_modified_snapshots={}, + metadata_updated_snapshots=[snapshot_view.snapshot_id], + removed_snapshots=[], + requires_backfill=False, + models_to_backfill=None, + execution_time="2023-01-02", + disabled_restatement_models=set(), + environment_statements=None, + user_provided_flags=None, + ) + + stages = build_plan_stages(plan, state_reader, None) + + migrate_stages = [s for s in stages if isinstance(s, MigrateSchemasStage)] + assert migrate_stages, ( + "Expected a MigrateSchemasStage for dev_only VIEW with metadata-only change" + ) + migrate_stage = migrate_stages[0] + assert snapshot_view in migrate_stage.snapshots + + assert not any(isinstance(s, VirtualLayerUpdateStage) for s in stages) + + def test_adjust_intervals_new_forward_only_dev_intervals( make_snapshot, mocker: MockerFixture ) -> None: