Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
SnapshotInfoLike,
SnapshotTableInfo,
SnapshotCreationFailedError,
SnapshotNameVersion,
)
from sqlmesh.utils import to_snake_case
from sqlmesh.core.state_sync import StateSync
Expand Down Expand Up @@ -427,6 +428,10 @@ def _restatement_intervals_across_all_environments(
if not prod_restatements:
return set()

prod_name_versions: t.Set[SnapshotNameVersion] = {
s.name_version for s in loaded_snapshots.values()
}

snapshots_to_restate: t.Dict[SnapshotId, t.Tuple[SnapshotTableInfo, Interval]] = {}

for env_summary in self.state_sync.get_environments_summary():
Expand Down Expand Up @@ -454,6 +459,8 @@ def _restatement_intervals_across_all_environments(
{
keyed_snapshots[a].snapshot_id: (keyed_snapshots[a], intervals)
for a in affected_snapshot_names
# Don't restate a snapshot if it shares the version with a snapshot in prod
if keyed_snapshots[a].name_version not in prod_name_versions
}
)

Expand Down
49 changes: 49 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
FullKind,
IncrementalByTimeRangeKind,
IncrementalByUniqueKeyKind,
IncrementalUnmanagedKind,
Model,
ModelKind,
ModelKindName,
Expand Down Expand Up @@ -2485,6 +2486,54 @@ def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable):
context.apply(plan)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_restatement_plan_across_environments_snapshot_with_shared_version(
init_and_plan_context: t.Callable,
):
context, _ = init_and_plan_context("examples/sushi")

# Change kind to incremental unmanaged
model = context.get_model("sushi.waiter_revenue_by_day")
previous_kind = model.kind.copy(update={"forward_only": True})
assert isinstance(previous_kind, IncrementalByTimeRangeKind)

model = model.copy(
update={"kind": IncrementalUnmanagedKind(), "physical_version": "pinned_version_12345"}
)
context.upsert_model(model)
context.plan("prod", auto_apply=True, no_prompts=True)

# Make some change and deploy it to both dev and prod environments
model = add_projection_to_model(t.cast(SqlModel, model))
context.upsert_model(model)
context.plan("dev_a", auto_apply=True, no_prompts=True)
context.plan("prod", auto_apply=True, no_prompts=True)

# Change the kind back to incremental by time range and deploy to prod
model = model.copy(update={"kind": previous_kind})
context.upsert_model(model)
context.plan("prod", auto_apply=True, no_prompts=True)

# Restate the model and verify that the interval hasn't been expanded because of the old snapshot
# with the same version
context.plan(
restate_models=["sushi.waiter_revenue_by_day"],
start="2023-01-06",
end="2023-01-08",
auto_apply=True,
no_prompts=True,
)

assert (
context.fetchdf(
"SELECT COUNT(*) AS cnt FROM sushi.waiter_revenue_by_day WHERE one IS NOT NULL AND event_date < '2023-01-06'"
)["cnt"][0]
== 0
)
plan = context.plan_builder("prod").build()
assert not plan.missing_intervals


def test_restatement_plan_hourly_with_downstream_daily_restates_correct_intervals(tmp_path: Path):
model_a = """
MODEL (
Expand Down