diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 2eb4c54aeb..7d753cc330 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -680,6 +680,14 @@ def _categorize_snapshot( if mode == AutoCategorizationMode.FULL: snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only) elif self._context_diff.indirectly_modified(snapshot.name): + if snapshot.is_materialized_view and not forward_only: + # We categorize changes as breaking to allow for instantaneous switches in a virtual layer. + # Otherwise, there might be a potentially long downtime during MVs recreation. + # In the case of forward-only changes this optimization is not applicable because we want to continue + # using the same (existing) table version. + snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only) + return + all_upstream_forward_only = set() all_upstream_categories = set() direct_parent_categories = set() diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index 59bc91d1bf..96ad367842 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -26,7 +26,7 @@ SqlModel, ModelKindName, ) -from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange +from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange, ViewKind from sqlmesh.core.model.seed import Seed from sqlmesh.core.plan import Plan, PlanBuilder, SnapshotIntervals from sqlmesh.core.snapshot import ( @@ -4161,3 +4161,143 @@ def test_plan_ignore_cron_flag(make_snapshot): ], ) ] + + +def test_indirect_change_to_materialized_view_is_breaking(make_snapshot): + snapshot_a_old = make_snapshot( + SqlModel( + name="a", + query=parse_one("select 1 as col_a"), + kind=ViewKind(materialized=True), + ) + ) + snapshot_a_old.categorize_as(SnapshotChangeCategory.BREAKING) + + snapshot_b_old = make_snapshot( + SqlModel( + name="b", + query=parse_one("select col_a from a"), + kind=ViewKind(materialized=True), + ), + nodes={'"a"': snapshot_a_old.model}, + ) + snapshot_b_old.categorize_as(SnapshotChangeCategory.BREAKING) + + snapshot_a_new = make_snapshot( + SqlModel( + name="a", + query=parse_one("select 1 as col_a, 2 as col_b"), + kind=ViewKind(materialized=True), + ) + ) + + snapshot_a_new.previous_versions = snapshot_a_old.all_versions + + snapshot_b_new = make_snapshot( + snapshot_b_old.model, + nodes={'"a"': snapshot_a_new.model}, + ) + snapshot_b_new.previous_versions = snapshot_b_old.all_versions + + context_diff = ContextDiff( + environment="test_environment", + is_new_environment=True, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={ + snapshot_a_new.name: (snapshot_a_new, snapshot_a_old), + snapshot_b_new.name: (snapshot_b_new, snapshot_b_old), + }, + snapshots={ + snapshot_a_new.snapshot_id: snapshot_a_new, + snapshot_b_new.snapshot_id: snapshot_b_new, + }, + new_snapshots={ + snapshot_a_new.snapshot_id: snapshot_a_new, + snapshot_b_new.snapshot_id: snapshot_b_new, + }, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + PlanBuilder(context_diff, forward_only=False).build() + + assert snapshot_b_new.change_category == SnapshotChangeCategory.INDIRECT_BREAKING + + +def test_forward_only_indirect_change_to_materialized_view(make_snapshot): + snapshot_a_old = make_snapshot( + SqlModel( + name="a", + query=parse_one("select 1 as col_a"), + ) + ) + snapshot_a_old.categorize_as(SnapshotChangeCategory.BREAKING) + + snapshot_b_old = make_snapshot( + SqlModel( + name="b", + query=parse_one("select col_a from a"), + kind=ViewKind(materialized=True), + ), + nodes={'"a"': snapshot_a_old.model}, + ) + snapshot_b_old.categorize_as(SnapshotChangeCategory.BREAKING) + + snapshot_a_new = make_snapshot( + SqlModel( + name="a", + query=parse_one("select 1 as col_a, 2 as col_b"), + ) + ) + + snapshot_a_new.previous_versions = snapshot_a_old.all_versions + + snapshot_b_new = make_snapshot( + snapshot_b_old.model, + nodes={'"a"': snapshot_a_new.model}, + ) + snapshot_b_new.previous_versions = snapshot_b_old.all_versions + + context_diff = ContextDiff( + environment="test_environment", + is_new_environment=True, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={ + snapshot_a_new.name: (snapshot_a_new, snapshot_a_old), + snapshot_b_new.name: (snapshot_b_new, snapshot_b_old), + }, + snapshots={ + snapshot_a_new.snapshot_id: snapshot_a_new, + snapshot_b_new.snapshot_id: snapshot_b_new, + }, + new_snapshots={ + snapshot_a_new.snapshot_id: snapshot_a_new, + snapshot_b_new.snapshot_id: snapshot_b_new, + }, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + PlanBuilder(context_diff, forward_only=True).build() + + # Forward-only indirect changes to MVs should not always be classified as indirect breaking. + # Instead, we want to preserve the standard categorization. + assert snapshot_b_new.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING