Skip to content

Commit 828ab26

Browse files
committed
Print auto-restated trigger of model evaluation in debug console
1 parent 18017c4 commit 828ab26

File tree

4 files changed

+52
-17
lines changed

4 files changed

+52
-17
lines changed

sqlmesh/core/console.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ def update_snapshot_evaluation_progress(
428428
num_audits_passed: int,
429429
num_audits_failed: int,
430430
audit_only: bool = False,
431+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
431432
) -> None:
432433
"""Updates the snapshot evaluation progress."""
433434

@@ -575,6 +576,7 @@ def update_snapshot_evaluation_progress(
575576
num_audits_passed: int,
576577
num_audits_failed: int,
577578
audit_only: bool = False,
579+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
578580
) -> None:
579581
pass
580582

@@ -1056,6 +1058,7 @@ def update_snapshot_evaluation_progress(
10561058
num_audits_passed: int,
10571059
num_audits_failed: int,
10581060
audit_only: bool = False,
1061+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
10591062
) -> None:
10601063
"""Update the snapshot evaluation progress."""
10611064
if (
@@ -3635,6 +3638,7 @@ def update_snapshot_evaluation_progress(
36353638
num_audits_passed: int,
36363639
num_audits_failed: int,
36373640
audit_only: bool = False,
3641+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
36383642
) -> None:
36393643
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
36403644

@@ -3804,9 +3808,13 @@ def update_snapshot_evaluation_progress(
38043808
num_audits_passed: int,
38053809
num_audits_failed: int,
38063810
audit_only: bool = False,
3811+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
38073812
) -> None:
38083813
message = f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38093814

3815+
if auto_restatement_trigger:
3816+
message += f" | evaluation_triggered_by={auto_restatement_trigger.name}"
3817+
38103818
if audit_only:
38113819
message = f"Auditing {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38123820

sqlmesh/core/scheduler.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ def run_merged_intervals(
374374
run_environment_statements: bool = False,
375375
audit_only: bool = False,
376376
restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
377+
auto_restatement_triggers: t.Dict[SnapshotId, SnapshotId] = {},
377378
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
378379
"""Runs precomputed batches of missing intervals.
379380
@@ -476,6 +477,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
476477
evaluation_duration_ms,
477478
num_audits - num_audits_failed,
478479
num_audits_failed,
480+
auto_restatement_trigger=auto_restatement_triggers.get(snapshot.snapshot_id),
479481
)
480482

481483
try:
@@ -640,7 +642,9 @@ def _run_or_audit(
640642
self.snapshots[s_id].remove_interval(interval)
641643

642644
if auto_restatement_enabled:
643-
auto_restated_intervals = apply_auto_restatements(self.snapshots, execution_time)
645+
auto_restated_intervals, auto_restatement_triggers = apply_auto_restatements(
646+
self.snapshots, execution_time
647+
)
644648
self.state_sync.add_snapshots_intervals(auto_restated_intervals)
645649
self.state_sync.update_auto_restatements(
646650
{s.name_version: s.next_auto_restatement_ts for s in self.snapshots.values()}
@@ -672,6 +676,7 @@ def _run_or_audit(
672676
run_environment_statements=run_environment_statements,
673677
audit_only=audit_only,
674678
restatements=remove_intervals,
679+
auto_restatement_triggers=auto_restatement_triggers,
675680
)
676681

677682
return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS

sqlmesh/core/snapshot/definition.py

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2082,7 +2082,7 @@ def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
20822082

20832083
def apply_auto_restatements(
20842084
snapshots: t.Dict[SnapshotId, Snapshot], execution_time: TimeLike
2085-
) -> t.List[SnapshotIntervals]:
2085+
) -> t.Tuple[t.List[SnapshotIntervals], t.Dict[SnapshotId, SnapshotId]]:
20862086
"""Applies auto restatements to the snapshots.
20872087
20882088
This operation results in the removal of intervals for snapshots that are ready to be restated based
@@ -2097,6 +2097,8 @@ def apply_auto_restatements(
20972097
A list of SnapshotIntervals with **new** intervals that need to be restated.
20982098
"""
20992099
dag = snapshots_to_dag(snapshots.values())
2100+
snapshots_with_auto_restatements: t.List[SnapshotId] = []
2101+
auto_restatement_triggers: t.Dict[SnapshotId, SnapshotId] = {}
21002102
auto_restated_intervals_per_snapshot: t.Dict[SnapshotId, Interval] = {}
21012103
for s_id in dag:
21022104
if s_id not in snapshots:
@@ -2120,6 +2122,23 @@ def apply_auto_restatements(
21202122
)
21212123
auto_restated_intervals.append(next_auto_restated_interval)
21222124

2125+
# auto-restated snapshot is its own trigger
2126+
snapshots_with_auto_restatements.append(s_id)
2127+
auto_restatement_triggers[s_id] = s_id
2128+
else:
2129+
for parent_s_id in snapshot.parents:
2130+
# first auto-restated parent is the trigger
2131+
if parent_s_id in snapshots_with_auto_restatements:
2132+
auto_restatement_triggers[s_id] = parent_s_id
2133+
break
2134+
# if no trigger yet and parent has trigger, inherit their trigger
2135+
# - will be overwritten if a different parent is auto-restated
2136+
if (
2137+
parent_s_id in auto_restatement_triggers
2138+
and s_id not in auto_restatement_triggers
2139+
):
2140+
auto_restatement_triggers[s_id] = auto_restatement_triggers[parent_s_id]
2141+
21232142
if auto_restated_intervals:
21242143
auto_restated_interval_start = sys.maxsize
21252144
auto_restated_interval_end = -sys.maxsize
@@ -2149,20 +2168,22 @@ def apply_auto_restatements(
21492168

21502169
snapshot.apply_pending_restatement_intervals()
21512170
snapshot.update_next_auto_restatement_ts(execution_time)
2152-
2153-
return [
2154-
SnapshotIntervals(
2155-
name=snapshots[s_id].name,
2156-
identifier=None,
2157-
version=snapshots[s_id].version,
2158-
dev_version=None,
2159-
intervals=[],
2160-
dev_intervals=[],
2161-
pending_restatement_intervals=[interval],
2162-
)
2163-
for s_id, interval in auto_restated_intervals_per_snapshot.items()
2164-
if s_id in snapshots
2165-
]
2171+
return (
2172+
[
2173+
SnapshotIntervals(
2174+
name=snapshots[s_id].name,
2175+
identifier=None,
2176+
version=snapshots[s_id].version,
2177+
dev_version=None,
2178+
intervals=[],
2179+
dev_intervals=[],
2180+
pending_restatement_intervals=[interval],
2181+
)
2182+
for s_id, interval in auto_restated_intervals_per_snapshot.items()
2183+
if s_id in snapshots
2184+
],
2185+
auto_restatement_triggers,
2186+
)
21662187

21672188

21682189
def parent_snapshots_by_name(

web/server/console.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from sqlmesh.core.console import TerminalConsole
1010
from sqlmesh.core.environment import EnvironmentNamingInfo
1111
from sqlmesh.core.plan.definition import EvaluatablePlan
12-
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike, SnapshotTableInfo
12+
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike, SnapshotTableInfo, SnapshotId
1313
from sqlmesh.core.test import ModelTest
1414
from sqlmesh.core.test.result import ModelTextTestResult
1515
from sqlmesh.utils.date import now_timestamp
@@ -142,6 +142,7 @@ def update_snapshot_evaluation_progress(
142142
num_audits_passed: int,
143143
num_audits_failed: int,
144144
audit_only: bool = False,
145+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
145146
) -> None:
146147
if audit_only:
147148
return

0 commit comments

Comments
 (0)