From be39c8d80631fab92f936a26a9a942785e43c451 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Fri, 25 Jul 2025 09:23:22 -0700 Subject: [PATCH] Fix: Prevent snapshots with shared versions in dev environments from expanding restatement intervals for prod snapshots --- sqlmesh/core/plan/evaluator.py | 7 +++++ tests/core/test_integration.py | 49 ++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index bb779fffe9..39a5d69355 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -35,6 +35,7 @@ SnapshotInfoLike, SnapshotTableInfo, SnapshotCreationFailedError, + SnapshotNameVersion, ) from sqlmesh.utils import to_snake_case from sqlmesh.core.state_sync import StateSync @@ -427,6 +428,10 @@ def _restatement_intervals_across_all_environments( if not prod_restatements: return set() + prod_name_versions: t.Set[SnapshotNameVersion] = { + s.name_version for s in loaded_snapshots.values() + } + snapshots_to_restate: t.Dict[SnapshotId, t.Tuple[SnapshotTableInfo, Interval]] = {} for env_summary in self.state_sync.get_environments_summary(): @@ -454,6 +459,8 @@ def _restatement_intervals_across_all_environments( { keyed_snapshots[a].snapshot_id: (keyed_snapshots[a], intervals) for a in affected_snapshot_names + # Don't restate a snapshot if it shares the version with a snapshot in prod + if keyed_snapshots[a].name_version not in prod_name_versions } ) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 0717ba11aa..9580072d92 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -48,6 +48,7 @@ FullKind, IncrementalByTimeRangeKind, IncrementalByUniqueKeyKind, + IncrementalUnmanagedKind, Model, ModelKind, ModelKindName, @@ -2485,6 +2486,54 @@ def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable): context.apply(plan) +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_restatement_plan_across_environments_snapshot_with_shared_version( + init_and_plan_context: t.Callable, +): + context, _ = init_and_plan_context("examples/sushi") + + # Change kind to incremental unmanaged + model = context.get_model("sushi.waiter_revenue_by_day") + previous_kind = model.kind.copy(update={"forward_only": True}) + assert isinstance(previous_kind, IncrementalByTimeRangeKind) + + model = model.copy( + update={"kind": IncrementalUnmanagedKind(), "physical_version": "pinned_version_12345"} + ) + context.upsert_model(model) + context.plan("prod", auto_apply=True, no_prompts=True) + + # Make some change and deploy it to both dev and prod environments + model = add_projection_to_model(t.cast(SqlModel, model)) + context.upsert_model(model) + context.plan("dev_a", auto_apply=True, no_prompts=True) + context.plan("prod", auto_apply=True, no_prompts=True) + + # Change the kind back to incremental by time range and deploy to prod + model = model.copy(update={"kind": previous_kind}) + context.upsert_model(model) + context.plan("prod", auto_apply=True, no_prompts=True) + + # Restate the model and verify that the interval hasn't been expanded because of the old snapshot + # with the same version + context.plan( + restate_models=["sushi.waiter_revenue_by_day"], + start="2023-01-06", + end="2023-01-08", + auto_apply=True, + no_prompts=True, + ) + + assert ( + context.fetchdf( + "SELECT COUNT(*) AS cnt FROM sushi.waiter_revenue_by_day WHERE one IS NOT NULL AND event_date < '2023-01-06'" + )["cnt"][0] + == 0 + ) + plan = context.plan_builder("prod").build() + assert not plan.missing_intervals + + def test_restatement_plan_hourly_with_downstream_daily_restates_correct_intervals(tmp_path: Path): model_a = """ MODEL (