Skip to content

Commit 37d7fef

Browse files
committed
List all upstream auto-restated models
1 parent 555d0c9 commit 37d7fef

File tree

5 files changed

+128
-29
lines changed

5 files changed

+128
-29
lines changed

sqlmesh/core/console.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ def update_snapshot_evaluation_progress(
428428
num_audits_passed: int,
429429
num_audits_failed: int,
430430
audit_only: bool = False,
431-
auto_restatement_trigger: t.Optional[SnapshotId] = None,
431+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
432432
) -> None:
433433
"""Updates the snapshot evaluation progress."""
434434

@@ -576,7 +576,7 @@ def update_snapshot_evaluation_progress(
576576
num_audits_passed: int,
577577
num_audits_failed: int,
578578
audit_only: bool = False,
579-
auto_restatement_trigger: t.Optional[SnapshotId] = None,
579+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
580580
) -> None:
581581
pass
582582

@@ -1058,7 +1058,7 @@ def update_snapshot_evaluation_progress(
10581058
num_audits_passed: int,
10591059
num_audits_failed: int,
10601060
audit_only: bool = False,
1061-
auto_restatement_trigger: t.Optional[SnapshotId] = None,
1061+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
10621062
) -> None:
10631063
"""Update the snapshot evaluation progress."""
10641064
if (
@@ -3642,7 +3642,7 @@ def update_snapshot_evaluation_progress(
36423642
num_audits_passed: int,
36433643
num_audits_failed: int,
36443644
audit_only: bool = False,
3645-
auto_restatement_trigger: t.Optional[SnapshotId] = None,
3645+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
36463646
) -> None:
36473647
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
36483648

@@ -3812,12 +3812,12 @@ def update_snapshot_evaluation_progress(
38123812
num_audits_passed: int,
38133813
num_audits_failed: int,
38143814
audit_only: bool = False,
3815-
auto_restatement_trigger: t.Optional[SnapshotId] = None,
3815+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
38163816
) -> None:
38173817
message = f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38183818

3819-
if auto_restatement_trigger:
3820-
message += f" | evaluation_triggered_by={auto_restatement_trigger.name}"
3819+
if auto_restatement_triggers:
3820+
message += f" | auto_restatement_triggers={','.join(trigger.name for trigger in auto_restatement_triggers)}"
38213821

38223822
if audit_only:
38233823
message = f"Auditing {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"

sqlmesh/core/scheduler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ def run_merged_intervals(
374374
run_environment_statements: bool = False,
375375
audit_only: bool = False,
376376
restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
377-
auto_restatement_triggers: t.Dict[SnapshotId, SnapshotId] = {},
377+
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {},
378378
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
379379
"""Runs precomputed batches of missing intervals.
380380
@@ -477,7 +477,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
477477
evaluation_duration_ms,
478478
num_audits - num_audits_failed,
479479
num_audits_failed,
480-
auto_restatement_trigger=auto_restatement_triggers.get(snapshot.snapshot_id),
480+
auto_restatement_triggers=auto_restatement_triggers.get(snapshot.snapshot_id),
481481
)
482482

483483
try:
@@ -641,7 +641,7 @@ def _run_or_audit(
641641
for s_id, interval in (remove_intervals or {}).items():
642642
self.snapshots[s_id].remove_interval(interval)
643643

644-
auto_restatement_triggers: t.Dict[SnapshotId, SnapshotId] = {}
644+
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
645645
if auto_restatement_enabled:
646646
auto_restated_intervals, auto_restatement_triggers = apply_auto_restatements(
647647
self.snapshots, execution_time

sqlmesh/core/snapshot/definition.py

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2158,7 +2158,7 @@ def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
21582158

21592159
def apply_auto_restatements(
21602160
snapshots: t.Dict[SnapshotId, Snapshot], execution_time: TimeLike
2161-
) -> t.Tuple[t.List[SnapshotIntervals], t.Dict[SnapshotId, SnapshotId]]:
2161+
) -> t.Tuple[t.List[SnapshotIntervals], t.Dict[SnapshotId, t.List[SnapshotId]]]:
21622162
"""Applies auto restatements to the snapshots.
21632163
21642164
This operation results in the removal of intervals for snapshots that are ready to be restated based
@@ -2173,8 +2173,7 @@ def apply_auto_restatements(
21732173
A list of SnapshotIntervals with **new** intervals that need to be restated.
21742174
"""
21752175
dag = snapshots_to_dag(snapshots.values())
2176-
snapshots_with_auto_restatements: t.List[SnapshotId] = []
2177-
auto_restatement_triggers: t.Dict[SnapshotId, SnapshotId] = {}
2176+
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
21782177
auto_restated_intervals_per_snapshot: t.Dict[SnapshotId, Interval] = {}
21792178
for s_id in dag:
21802179
if s_id not in snapshots:
@@ -2189,6 +2188,7 @@ def apply_auto_restatements(
21892188
for parent_s_id in snapshot.parents
21902189
if parent_s_id in auto_restated_intervals_per_snapshot
21912190
]
2191+
upstream_triggers = []
21922192
if next_auto_restated_interval:
21932193
logger.info(
21942194
"Calculated the next auto restated interval (%s, %s) for snapshot %s",
@@ -2199,21 +2199,15 @@ def apply_auto_restatements(
21992199
auto_restated_intervals.append(next_auto_restated_interval)
22002200

22012201
# auto-restated snapshot is its own trigger
2202-
snapshots_with_auto_restatements.append(s_id)
2203-
auto_restatement_triggers[s_id] = s_id
2204-
else:
2205-
for parent_s_id in snapshot.parents:
2206-
# first auto-restated parent is the trigger
2207-
if parent_s_id in snapshots_with_auto_restatements:
2208-
auto_restatement_triggers[s_id] = parent_s_id
2209-
break
2210-
# if no trigger yet and parent has trigger, inherit their trigger
2211-
# - will be overwritten if a different parent is auto-restated
2212-
if (
2213-
parent_s_id in auto_restatement_triggers
2214-
and s_id not in auto_restatement_triggers
2215-
):
2216-
auto_restatement_triggers[s_id] = auto_restatement_triggers[parent_s_id]
2202+
upstream_triggers = [s_id]
2203+
2204+
for parent_s_id in snapshot.parents:
2205+
if parent_s_id in auto_restatement_triggers:
2206+
upstream_triggers.extend(auto_restatement_triggers[parent_s_id])
2207+
2208+
# remove duplicate triggers
2209+
if upstream_triggers:
2210+
auto_restatement_triggers[s_id] = list(dict.fromkeys(upstream_triggers))
22172211

22182212
if auto_restated_intervals:
22192213
auto_restated_interval_start = sys.maxsize

tests/core/test_snapshot.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3276,6 +3276,111 @@ def test_apply_auto_restatements_disable_restatement_downstream(make_snapshot):
32763276
]
32773277

32783278

3279+
def test_auto_restatement_triggers(make_snapshot):
3280+
model_a = SqlModel(
3281+
name="test_model_a",
3282+
kind=IncrementalByTimeRangeKind(
3283+
time_column=TimeColumn(column="ds"),
3284+
auto_restatement_cron="0 10 * * *",
3285+
auto_restatement_intervals=24,
3286+
),
3287+
start="2020-01-01",
3288+
cron="@daily",
3289+
query=parse_one("SELECT 1 as ds"),
3290+
)
3291+
snapshot_a = make_snapshot(model_a, version="1")
3292+
snapshot_a.add_interval("2020-01-01", "2020-01-05")
3293+
snapshot_a.next_auto_restatement_ts = to_timestamp("2020-01-06 10:00:00")
3294+
3295+
model_b = SqlModel(
3296+
name="test_model_b",
3297+
kind=IncrementalByTimeRangeKind(
3298+
time_column=TimeColumn(column="ds"),
3299+
),
3300+
start="2020-01-01",
3301+
cron="@daily",
3302+
query=parse_one("SELECT ds FROM test_model_a"),
3303+
)
3304+
snapshot_b = make_snapshot(model_b, nodes={model_a.fqn: model_a}, version="1")
3305+
snapshot_b.add_interval("2020-01-01", "2020-01-05")
3306+
3307+
model_c = SqlModel(
3308+
name="test_model_c",
3309+
kind=IncrementalByTimeRangeKind(
3310+
time_column=TimeColumn(column="ds"),
3311+
auto_restatement_cron="0 10 * * *",
3312+
auto_restatement_intervals=24,
3313+
),
3314+
start="2020-01-01",
3315+
cron="@daily",
3316+
query=parse_one("SELECT ds FROM test_model_a"),
3317+
)
3318+
snapshot_c = make_snapshot(model_c, nodes={model_a.fqn: model_a}, version="1")
3319+
snapshot_c.add_interval("2020-01-01", "2020-01-05")
3320+
snapshot_c.next_auto_restatement_ts = to_timestamp("2020-01-06 10:00:00")
3321+
3322+
model_d = SqlModel(
3323+
name="test_model_d",
3324+
kind=IncrementalByTimeRangeKind(
3325+
time_column=TimeColumn(column="ds"),
3326+
auto_restatement_cron="0 10 * * *",
3327+
auto_restatement_intervals=24,
3328+
),
3329+
start="2020-01-01",
3330+
cron="@daily",
3331+
query=parse_one("SELECT 1 as ds"),
3332+
)
3333+
snapshot_d = make_snapshot(model_d, version="1")
3334+
snapshot_d.add_interval("2020-01-01", "2020-01-05")
3335+
snapshot_d.next_auto_restatement_ts = to_timestamp("2020-01-06 10:00:00")
3336+
3337+
model_e = SqlModel(
3338+
name="test_model_e",
3339+
kind=IncrementalByTimeRangeKind(
3340+
time_column=TimeColumn(column="ds"),
3341+
),
3342+
start="2020-01-01",
3343+
cron="@daily",
3344+
query=parse_one(
3345+
"SELECT ds from test_model_b UNION ALL SELECT ds from test_model_c UNION ALL SELECT ds from test_model_d"
3346+
),
3347+
)
3348+
snapshot_e = make_snapshot(
3349+
model_e,
3350+
nodes={
3351+
model_a.fqn: model_a,
3352+
model_b.fqn: model_b,
3353+
model_c.fqn: model_c,
3354+
model_d.fqn: model_d,
3355+
},
3356+
version="1",
3357+
)
3358+
snapshot_e.add_interval("2020-01-01", "2020-01-05")
3359+
3360+
_, auto_restatement_triggers = apply_auto_restatements(
3361+
{
3362+
snapshot_a.snapshot_id: snapshot_a,
3363+
snapshot_b.snapshot_id: snapshot_b,
3364+
snapshot_c.snapshot_id: snapshot_c,
3365+
snapshot_d.snapshot_id: snapshot_d,
3366+
snapshot_e.snapshot_id: snapshot_e,
3367+
},
3368+
"2020-01-06 10:01:00",
3369+
)
3370+
3371+
assert auto_restatement_triggers == {
3372+
snapshot_a.snapshot_id: [snapshot_a.snapshot_id],
3373+
snapshot_d.snapshot_id: [snapshot_d.snapshot_id],
3374+
snapshot_b.snapshot_id: [snapshot_a.snapshot_id],
3375+
snapshot_c.snapshot_id: [snapshot_c.snapshot_id, snapshot_a.snapshot_id],
3376+
snapshot_e.snapshot_id: [
3377+
snapshot_d.snapshot_id,
3378+
snapshot_c.snapshot_id,
3379+
snapshot_a.snapshot_id,
3380+
],
3381+
}
3382+
3383+
32793384
def test_render_signal(make_snapshot, mocker):
32803385
@signal()
32813386
def check_types(batch, env: str, sql: list[SQL], table: exp.Table, default: int = 0):

web/server/console.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def update_snapshot_evaluation_progress(
142142
num_audits_passed: int,
143143
num_audits_failed: int,
144144
audit_only: bool = False,
145-
auto_restatement_trigger: t.Optional[SnapshotId] = None,
145+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
146146
) -> None:
147147
if audit_only:
148148
return

0 commit comments

Comments
 (0)