Skip to content

Commit 5f72a94

Browse files
committed
Infer cron ready from auto-restatement intervals
1 parent 1978b16 commit 5f72a94

File tree

4 files changed

+20
-23
lines changed

4 files changed

+20
-23
lines changed

sqlmesh/core/plan/stages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ def _missing_intervals(
504504
snapshots_by_name: t.Dict[str, Snapshot],
505505
deployability_index: DeployabilityIndex,
506506
) -> SnapshotToIntervals:
507-
missing_intervals, _ = merged_missing_intervals(
507+
missing_intervals = merged_missing_intervals(
508508
snapshots=snapshots_by_name.values(),
509509
start=plan.start,
510510
end=plan.end,

sqlmesh/core/scheduler.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@
2626
snapshots_to_dag,
2727
Intervals,
2828
)
29-
from sqlmesh.core.snapshot.definition import check_ready_intervals
3029
from sqlmesh.core.snapshot.definition import (
3130
Interval,
3231
SnapshotEvaluationTriggers,
32+
SnapshotIntervals,
33+
check_ready_intervals,
3334
expand_range,
3435
parent_snapshots_by_name,
3536
)
@@ -112,7 +113,7 @@ def merged_missing_intervals(
112113
ignore_cron: bool = False,
113114
end_bounded: bool = False,
114115
selected_snapshots: t.Optional[t.Set[str]] = None,
115-
) -> t.Tuple[SnapshotToIntervals, t.List[SnapshotId]]:
116+
) -> SnapshotToIntervals:
116117
"""Find the largest contiguous date interval parameters based only on what is missing.
117118
118119
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
@@ -134,9 +135,9 @@ def merged_missing_intervals(
134135
selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
135136
136137
Returns:
137-
A tuple containing a dict containing all snapshots needing to be run with their associated interval params and a list of snapshots that are ready to run based on their naive cron schedule (ignoring plan/run context and other attributes).
138+
A dict containing all snapshots needing to be run with their associated interval params.
138139
"""
139-
snapshots_to_intervals, snapshots_naive_cron_ready = merged_missing_intervals(
140+
snapshots_to_intervals = merged_missing_intervals(
140141
snapshots=self.snapshot_per_version.values(),
141142
start=start,
142143
end=end,
@@ -154,7 +155,7 @@ def merged_missing_intervals(
154155
snapshots_to_intervals = {
155156
s: i for s, i in snapshots_to_intervals.items() if s.name in selected_snapshots
156157
}
157-
return snapshots_to_intervals, snapshots_naive_cron_ready
158+
return snapshots_to_intervals
158159

159160
def evaluate(
160161
self,
@@ -651,6 +652,7 @@ def _run_or_audit(
651652
for s_id, interval in (remove_intervals or {}).items():
652653
self.snapshots[s_id].remove_interval(interval)
653654

655+
auto_restated_intervals: t.List[SnapshotIntervals] = []
654656
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
655657
if auto_restatement_enabled:
656658
auto_restated_intervals, auto_restatement_triggers = apply_auto_restatements(
@@ -660,8 +662,9 @@ def _run_or_audit(
660662
self.state_sync.update_auto_restatements(
661663
{s.name_version: s.next_auto_restatement_ts for s in self.snapshots.values()}
662664
)
665+
auto_restated_snapshots = {snapshot.snapshot_id for snapshot in auto_restated_intervals}
663666

664-
merged_intervals, snapshots_naive_cron_ready = self.merged_missing_intervals(
667+
merged_intervals = self.merged_missing_intervals(
665668
start,
666669
end,
667670
execution_time,
@@ -704,7 +707,7 @@ def _run_or_audit(
704707
all_snapshot_triggers: t.Dict[SnapshotId, SnapshotEvaluationTriggers] = {
705708
s_id: SnapshotEvaluationTriggers(
706709
ignore_cron_flag=ignore_cron,
707-
cron_ready=s_id in snapshots_naive_cron_ready,
710+
cron_ready=s_id not in auto_restated_snapshots,
708711
auto_restatement_triggers=auto_restatement_triggers.get(s_id, []),
709712
select_snapshot_triggers=select_snapshot_triggers.get(s_id, []),
710713
)
@@ -872,7 +875,7 @@ def merged_missing_intervals(
872875
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
873876
ignore_cron: bool = False,
874877
end_bounded: bool = False,
875-
) -> t.Tuple[SnapshotToIntervals, t.List[SnapshotId]]:
878+
) -> SnapshotToIntervals:
876879
"""Find the largest contiguous date interval parameters based only on what is missing.
877880
878881
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
@@ -922,7 +925,7 @@ def compute_interval_params(
922925
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
923926
ignore_cron: bool = False,
924927
end_bounded: bool = False,
925-
) -> t.Tuple[SnapshotToIntervals, t.List[SnapshotId]]:
928+
) -> SnapshotToIntervals:
926929
"""Find the largest contiguous date interval parameters based only on what is missing.
927930
928931
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
@@ -944,7 +947,7 @@ def compute_interval_params(
944947
allow_partials, and other attributes that could cause the intervals to exceed the target end date.
945948
946949
Returns:
947-
A tuple containing a dict containing all snapshots needing to be run with their associated interval params and a list of snapshots that are ready to run based on their naive cron schedule (ignoring plan/run context and other attributes).
950+
A dict containing all snapshots needing to be run with their associated interval params.
948951
"""
949952
snapshot_merged_intervals = {}
950953

@@ -972,11 +975,7 @@ def compute_interval_params(
972975
contiguous_batch.append((next_batch[0][0], next_batch[-1][-1]))
973976
snapshot_merged_intervals[snapshot] = contiguous_batch
974977

975-
snapshots_naive_cron_ready = [
976-
snap.snapshot_id for snap in missing_intervals(snapshots, execution_time=execution_time)
977-
]
978-
979-
return snapshot_merged_intervals, snapshots_naive_cron_ready
978+
return snapshot_merged_intervals
980979

981980

982981
def interval_diff(

tests/core/test_scheduler.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ def test_interval_params(scheduler: Scheduler, sushi_context_fixed_date: Context
5656
start_ds = "2022-01-01"
5757
end_ds = "2022-02-05"
5858

59-
interval_params, _ = compute_interval_params(
60-
[orders, waiter_revenue], start=start_ds, end=end_ds
61-
)
59+
interval_params = compute_interval_params([orders, waiter_revenue], start=start_ds, end=end_ds)
6260
assert interval_params == {
6361
orders: [
6462
(to_timestamp(start_ds), to_timestamp("2022-02-06")),
@@ -79,7 +77,7 @@ def _get_batched_missing_intervals(
7977
end: TimeLike,
8078
execution_time: t.Optional[TimeLike] = None,
8179
) -> SnapshotToIntervals:
82-
merged_intervals, _ = scheduler.merged_missing_intervals(start, end, execution_time)
80+
merged_intervals = scheduler.merged_missing_intervals(start, end, execution_time)
8381
return scheduler.batch_intervals(merged_intervals, mocker.Mock(), mocker.Mock())
8482

8583
return _get_batched_missing_intervals
@@ -91,7 +89,7 @@ def test_interval_params_nonconsecutive(scheduler: Scheduler, orders: Snapshot):
9189

9290
orders.add_interval("2022-01-10", "2022-01-15")
9391

94-
interval_params, _ = compute_interval_params([orders], start=start_ds, end=end_ds)
92+
interval_params = compute_interval_params([orders], start=start_ds, end=end_ds)
9593
assert interval_params == {
9694
orders: [
9795
(to_timestamp(start_ds), to_timestamp("2022-01-10")),
@@ -108,7 +106,7 @@ def test_interval_params_missing(scheduler: Scheduler, sushi_context_fixed_date:
108106

109107
start_ds = "2022-01-01"
110108
end_ds = "2022-03-01"
111-
interval_params, _ = compute_interval_params(
109+
interval_params = compute_interval_params(
112110
sushi_context_fixed_date.snapshots.values(), start=start_ds, end=end_ds
113111
)
114112
assert interval_params[waiters] == [

web/server/api/endpoints/plan.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def _get_plan_changes(context: Context, plan: Plan) -> models.PlanChanges:
132132

133133
def _get_plan_backfills(context: Context, plan: Plan) -> t.Dict[str, t.Any]:
134134
"""Get plan backfills"""
135-
merged_intervals, _ = context.scheduler().merged_missing_intervals()
135+
merged_intervals = context.scheduler().merged_missing_intervals()
136136
batches = context.scheduler().batch_intervals(merged_intervals, None, EnvironmentNamingInfo())
137137
tasks = {snapshot.name: len(intervals) for snapshot, intervals in batches.items()}
138138
snapshots = plan.context_diff.snapshots

0 commit comments

Comments
 (0)