Skip to content

Commit 4ef90ce

Browse files
committed
Fix: Unexpected backfill of a parent of a changed forward-only child when the child runs before the parent (#3871)
1 parent 959b606 commit 4ef90ce

File tree

3 files changed

+80
-10
lines changed

3 files changed

+80
-10
lines changed

sqlmesh/core/snapshot/definition.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1712,16 +1712,20 @@ def missing_intervals(
17121712
snapshot_start_date = start_dt
17131713
snapshot_end_date: TimeLike = end_date
17141714

1715-
existing_interval_end = interval_end_per_model.get(snapshot.name)
1716-
if existing_interval_end and existing_interval_end > to_timestamp(snapshot_start_date):
1717-
snapshot_end_date = existing_interval_end
1718-
1719-
interval = restatements.get(snapshot.snapshot_id)
1720-
if interval:
1721-
snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in interval)
1715+
restated_interval = restatements.get(snapshot.snapshot_id)
1716+
if restated_interval:
1717+
snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in restated_interval)
17221718
snapshot = snapshot.copy()
17231719
snapshot.intervals = snapshot.intervals.copy()
1724-
snapshot.remove_interval(interval)
1720+
snapshot.remove_interval(restated_interval)
1721+
else:
1722+
existing_interval_end = interval_end_per_model.get(snapshot.name)
1723+
if existing_interval_end:
1724+
if to_timestamp(snapshot_start_date) >= existing_interval_end:
1725+
# The start exceeds the provided interval end, so we can skip this snapshot
1726+
# since it doesn't have missing intervals by definition
1727+
continue
1728+
snapshot_end_date = existing_interval_end
17251729

17261730
missing_interval_end_date = snapshot_end_date
17271731
node_end_date = snapshot.node.end

tests/core/test_integration.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,73 @@ def test_cron_not_aligned_with_day_boundary_new_model(init_and_plan_context: t.C
767767
]
768768

769769

770+
@time_machine.travel("2023-01-08 00:00:00 UTC")
771+
def test_forward_only_preview_child_that_runs_before_parent(init_and_plan_context: t.Callable):
772+
context, _ = init_and_plan_context("examples/sushi")
773+
774+
# This model runs at minute 30 of every hour
775+
upstream_model = load_sql_based_model(
776+
d.parse(
777+
"""
778+
MODEL (
779+
name memory.sushi.upstream_model,
780+
kind FULL,
781+
cron '30 * * * *',
782+
start '2023-01-01',
783+
);
784+
785+
SELECT 1 AS a;
786+
"""
787+
)
788+
)
789+
context.upsert_model(upstream_model)
790+
791+
# This model runs at minute 0 of every hour, so it runs before the upstream model
792+
downstream_model = load_sql_based_model(
793+
d.parse(
794+
"""
795+
MODEL (
796+
name memory.sushi.downstream_model,
797+
kind INCREMENTAL_BY_TIME_RANGE(
798+
time_column event_date,
799+
forward_only True,
800+
),
801+
cron '0 * * * *',
802+
start '2023-01-01',
803+
);
804+
805+
SELECT a, '2023-01-06' AS event_date FROM memory.sushi.upstream_model;
806+
"""
807+
)
808+
)
809+
context.upsert_model(downstream_model)
810+
811+
context.plan("prod", skip_tests=True, auto_apply=True)
812+
813+
with time_machine.travel("2023-01-08 00:05:00 UTC"):
814+
# The downstream model runs but not the upstream model
815+
context.run("prod")
816+
817+
# Now it's time for the upstream model to run but it hasn't run yet
818+
with time_machine.travel("2023-01-08 00:35:00 UTC"):
819+
# Make a change to the downstream model.
820+
downstream_model = add_projection_to_model(t.cast(SqlModel, downstream_model), literal=True)
821+
context.upsert_model(downstream_model)
822+
823+
# The plan should only backfill the downstream model despite upstream missing intervals
824+
plan = context.plan_builder("dev", skip_tests=True, enable_preview=True).build()
825+
assert plan.missing_intervals == [
826+
SnapshotIntervals(
827+
snapshot_id=context.get_snapshot(
828+
downstream_model.name, raise_if_missing=True
829+
).snapshot_id,
830+
intervals=[
831+
(to_timestamp("2023-01-07 23:00:00"), to_timestamp("2023-01-08 00:00:00"))
832+
],
833+
),
834+
]
835+
836+
770837
@time_machine.travel("2023-01-08 00:00:00 UTC")
771838
def test_forward_only_monthly_model(init_and_plan_context: t.Callable):
772839
context, _ = init_and_plan_context("examples/sushi")

tests/core/test_snapshot.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2197,11 +2197,10 @@ def test_missing_intervals_interval_end_per_model(make_snapshot):
21972197
snapshot_a.name: to_timestamp("2023-01-09"),
21982198
snapshot_b.name: to_timestamp(
21992199
"2023-01-06"
2200-
), # The interval end is before the start. This should be ignored.
2200+
), # The interval end is before the start. The snapshot will be skipped
22012201
},
22022202
) == {
22032203
snapshot_a: [(to_timestamp("2023-01-08"), to_timestamp("2023-01-09"))],
2204-
snapshot_b: [(to_timestamp("2023-01-08"), to_timestamp("2023-01-09"))],
22052204
}
22062205

22072206

0 commit comments

Comments
 (0)