Skip to content

Commit ddf0492

Browse files
authored
Fix: Prevent snapshots with shared versions in dev environments from expanding restatement intervals for prod snapshots (#5025)
1 parent c747ca5 commit ddf0492

File tree

2 files changed

+56
-0
lines changed

2 files changed

+56
-0
lines changed

sqlmesh/core/plan/evaluator.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
SnapshotInfoLike,
3636
SnapshotTableInfo,
3737
SnapshotCreationFailedError,
38+
SnapshotNameVersion,
3839
)
3940
from sqlmesh.utils import to_snake_case
4041
from sqlmesh.core.state_sync import StateSync
@@ -427,6 +428,10 @@ def _restatement_intervals_across_all_environments(
427428
if not prod_restatements:
428429
return set()
429430

431+
prod_name_versions: t.Set[SnapshotNameVersion] = {
432+
s.name_version for s in loaded_snapshots.values()
433+
}
434+
430435
snapshots_to_restate: t.Dict[SnapshotId, t.Tuple[SnapshotTableInfo, Interval]] = {}
431436

432437
for env_summary in self.state_sync.get_environments_summary():
@@ -454,6 +459,8 @@ def _restatement_intervals_across_all_environments(
454459
{
455460
keyed_snapshots[a].snapshot_id: (keyed_snapshots[a], intervals)
456461
for a in affected_snapshot_names
462+
# Don't restate a snapshot if it shares the version with a snapshot in prod
463+
if keyed_snapshots[a].name_version not in prod_name_versions
457464
}
458465
)
459466

tests/core/test_integration.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
FullKind,
4949
IncrementalByTimeRangeKind,
5050
IncrementalByUniqueKeyKind,
51+
IncrementalUnmanagedKind,
5152
Model,
5253
ModelKind,
5354
ModelKindName,
@@ -2485,6 +2486,54 @@ def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable):
24852486
context.apply(plan)
24862487

24872488

2489+
@time_machine.travel("2023-01-08 15:00:00 UTC")
2490+
def test_restatement_plan_across_environments_snapshot_with_shared_version(
2491+
init_and_plan_context: t.Callable,
2492+
):
2493+
context, _ = init_and_plan_context("examples/sushi")
2494+
2495+
# Change kind to incremental unmanaged
2496+
model = context.get_model("sushi.waiter_revenue_by_day")
2497+
previous_kind = model.kind.copy(update={"forward_only": True})
2498+
assert isinstance(previous_kind, IncrementalByTimeRangeKind)
2499+
2500+
model = model.copy(
2501+
update={"kind": IncrementalUnmanagedKind(), "physical_version": "pinned_version_12345"}
2502+
)
2503+
context.upsert_model(model)
2504+
context.plan("prod", auto_apply=True, no_prompts=True)
2505+
2506+
# Make some change and deploy it to both dev and prod environments
2507+
model = add_projection_to_model(t.cast(SqlModel, model))
2508+
context.upsert_model(model)
2509+
context.plan("dev_a", auto_apply=True, no_prompts=True)
2510+
context.plan("prod", auto_apply=True, no_prompts=True)
2511+
2512+
# Change the kind back to incremental by time range and deploy to prod
2513+
model = model.copy(update={"kind": previous_kind})
2514+
context.upsert_model(model)
2515+
context.plan("prod", auto_apply=True, no_prompts=True)
2516+
2517+
# Restate the model and verify that the interval hasn't been expanded because of the old snapshot
2518+
# with the same version
2519+
context.plan(
2520+
restate_models=["sushi.waiter_revenue_by_day"],
2521+
start="2023-01-06",
2522+
end="2023-01-08",
2523+
auto_apply=True,
2524+
no_prompts=True,
2525+
)
2526+
2527+
assert (
2528+
context.fetchdf(
2529+
"SELECT COUNT(*) AS cnt FROM sushi.waiter_revenue_by_day WHERE one IS NOT NULL AND event_date < '2023-01-06'"
2530+
)["cnt"][0]
2531+
== 0
2532+
)
2533+
plan = context.plan_builder("prod").build()
2534+
assert not plan.missing_intervals
2535+
2536+
24882537
def test_restatement_plan_hourly_with_downstream_daily_restates_correct_intervals(tmp_path: Path):
24892538
model_a = """
24902539
MODEL (

0 commit comments

Comments
 (0)