Skip to content

Commit 2319c80

Browse files
authored
Fix: Don't drop intervals in state when restating models in a dev environment (#3580)
1 parent 795fd2b commit 2319c80

File tree

3 files changed

+84
-11
lines changed

3 files changed

+84
-11
lines changed

sqlmesh/core/plan/evaluator.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -357,24 +357,23 @@ def _demote_snapshots(
357357
)
358358

359359
def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapshot]) -> None:
360-
if not plan.restatements:
360+
if not plan.restatements or plan.is_dev:
361361
return
362362

363363
snapshot_intervals_to_restate = {
364364
(snapshots_by_name[name].table_info, intervals)
365365
for name, intervals in plan.restatements.items()
366366
}
367367

368-
if plan.is_prod:
369-
# Restating intervals on prod plans should mean that the intervals are cleared across
370-
# all environments, not just the version currently in prod
371-
# This ensures that work done in dev environments can still be promoted to prod
372-
# by forcing dev environments to re-run intervals that changed in prod
373-
#
374-
# Without this rule, its possible that promoting a dev table to prod will introduce old data to prod
375-
snapshot_intervals_to_restate.update(
376-
self._restatement_intervals_across_all_environments(plan.restatements)
377-
)
368+
# Restating intervals on prod plans should mean that the intervals are cleared across
369+
# all environments, not just the version currently in prod
370+
# This ensures that work done in dev environments can still be promoted to prod
371+
# by forcing dev environments to re-run intervals that changed in prod
372+
#
373+
# Without this rule, its possible that promoting a dev table to prod will introduce old data to prod
374+
snapshot_intervals_to_restate.update(
375+
self._restatement_intervals_across_all_environments(plan.restatements)
376+
)
378377

379378
self.state_sync.remove_intervals(
380379
snapshot_intervals=list(snapshot_intervals_to_restate),

sqlmesh/core/scheduler.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,9 @@ def run(
293293
execution_time = execution_time or now()
294294

295295
self.state_sync.refresh_snapshot_intervals(self.snapshots.values())
296+
for s_id, interval in (restatements or {}).items():
297+
self.snapshots[s_id].remove_interval(interval)
298+
296299
if auto_restatement_enabled:
297300
auto_restated_intervals = apply_auto_restatements(self.snapshots, execution_time)
298301
self.state_sync.add_snapshots_intervals(auto_restated_intervals)

tests/core/test_integration.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,77 @@ def test_forward_only_model_regular_plan_preview_enabled(init_and_plan_context:
359359
assert dev_df["event_date"].tolist() == [pd.to_datetime("2023-01-07")]
360360

361361

362+
@time_machine.travel("2023-01-08 15:00:00 UTC")
363+
def test_forward_only_model_restate_full_history_in_dev(init_and_plan_context: t.Callable):
364+
context, _ = init_and_plan_context("examples/sushi")
365+
366+
model_name = "memory.sushi.customer_max_revenue"
367+
expressions = d.parse(
368+
f"""
369+
MODEL (
370+
name {model_name},
371+
kind INCREMENTAL_BY_UNIQUE_KEY (
372+
unique_key customer_id,
373+
forward_only true,
374+
),
375+
);
376+
377+
SELECT
378+
customer_id, MAX(revenue) AS max_revenue
379+
FROM memory.sushi.customer_revenue_lifetime
380+
GROUP BY 1;
381+
"""
382+
)
383+
384+
model = load_sql_based_model(expressions)
385+
assert model.forward_only
386+
assert model.kind.full_history_restatement_only
387+
context.upsert_model(model)
388+
389+
context.plan("prod", skip_tests=True, auto_apply=True)
390+
391+
model_kwargs = {
392+
**model.dict(),
393+
# Make a breaking change.
394+
"query": model.query.order_by("customer_id"), # type: ignore
395+
}
396+
context.upsert_model(SqlModel.parse_obj(model_kwargs))
397+
398+
# Apply the model change in dev
399+
plan = context.plan_builder("dev", skip_tests=True).build()
400+
assert not plan.missing_intervals
401+
context.apply(plan)
402+
403+
snapshot = context.get_snapshot(model, raise_if_missing=True)
404+
snapshot_table_name = snapshot.table_name(False)
405+
406+
# Manually insert a dummy value to check that the table is recreated during the restatement
407+
context.engine_adapter.insert_append(
408+
snapshot_table_name,
409+
pd.DataFrame({"customer_id": [-1], "max_revenue": [100]}),
410+
)
411+
df = context.engine_adapter.fetchdf(
412+
"SELECT COUNT(*) AS cnt FROM sushi__dev.customer_max_revenue WHERE customer_id = -1"
413+
)
414+
assert df["cnt"][0] == 1
415+
416+
# Apply a restatement plan in dev
417+
plan = context.plan("dev", restate_models=[model.name], auto_apply=True)
418+
assert len(plan.missing_intervals) == 1
419+
420+
# Check that the dummy value is not present
421+
df = context.engine_adapter.fetchdf(
422+
"SELECT COUNT(*) AS cnt FROM sushi__dev.customer_max_revenue WHERE customer_id = -1"
423+
)
424+
assert df["cnt"][0] == 0
425+
426+
# Check that the table is not empty
427+
df = context.engine_adapter.fetchdf(
428+
"SELECT COUNT(*) AS cnt FROM sushi__dev.customer_max_revenue"
429+
)
430+
assert df["cnt"][0] > 0
431+
432+
362433
@time_machine.travel("2023-01-08 15:00:00 UTC")
363434
def test_full_history_restatement_model_regular_plan_preview_enabled(
364435
init_and_plan_context: t.Callable,

0 commit comments

Comments
 (0)