Skip to content

Commit d3fee71

Browse files
committed
Infer cron ready from auto-restatement intervals
1 parent c4fcfdb commit d3fee71

File tree

4 files changed

+20
-23
lines changed

4 files changed

+20
-23
lines changed

sqlmesh/core/plan/stages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ def _missing_intervals(
513513
snapshots_by_name: t.Dict[str, Snapshot],
514514
deployability_index: DeployabilityIndex,
515515
) -> SnapshotToIntervals:
516-
missing_intervals, _ = merged_missing_intervals(
516+
missing_intervals = merged_missing_intervals(
517517
snapshots=snapshots_by_name.values(),
518518
start=plan.start,
519519
end=plan.end,

sqlmesh/core/scheduler.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@
2626
snapshots_to_dag,
2727
Intervals,
2828
)
29-
from sqlmesh.core.snapshot.definition import check_ready_intervals
3029
from sqlmesh.core.snapshot.definition import (
3130
Interval,
3231
SnapshotEvaluationTriggers,
32+
SnapshotIntervals,
33+
check_ready_intervals,
3334
expand_range,
3435
parent_snapshots_by_name,
3536
)
@@ -112,7 +113,7 @@ def merged_missing_intervals(
112113
ignore_cron: bool = False,
113114
end_bounded: bool = False,
114115
selected_snapshots: t.Optional[t.Set[str]] = None,
115-
) -> t.Tuple[SnapshotToIntervals, t.List[SnapshotId]]:
116+
) -> SnapshotToIntervals:
116117
"""Find the largest contiguous date interval parameters based only on what is missing.
117118
118119
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
@@ -134,9 +135,9 @@ def merged_missing_intervals(
134135
selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
135136
136137
Returns:
137-
A tuple containing a dict containing all snapshots needing to be run with their associated interval params and a list of snapshots that are ready to run based on their naive cron schedule (ignoring plan/run context and other attributes).
138+
A dict containing all snapshots needing to be run with their associated interval params.
138139
"""
139-
snapshots_to_intervals, snapshots_naive_cron_ready = merged_missing_intervals(
140+
snapshots_to_intervals = merged_missing_intervals(
140141
snapshots=self.snapshot_per_version.values(),
141142
start=start,
142143
end=end,
@@ -154,7 +155,7 @@ def merged_missing_intervals(
154155
snapshots_to_intervals = {
155156
s: i for s, i in snapshots_to_intervals.items() if s.name in selected_snapshots
156157
}
157-
return snapshots_to_intervals, snapshots_naive_cron_ready
158+
return snapshots_to_intervals
158159

159160
def evaluate(
160161
self,
@@ -643,6 +644,7 @@ def _run_or_audit(
643644
for s_id, interval in (remove_intervals or {}).items():
644645
self.snapshots[s_id].remove_interval(interval)
645646

647+
auto_restated_intervals: t.List[SnapshotIntervals] = []
646648
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
647649
if auto_restatement_enabled:
648650
auto_restated_intervals, auto_restatement_triggers = apply_auto_restatements(
@@ -652,8 +654,9 @@ def _run_or_audit(
652654
self.state_sync.update_auto_restatements(
653655
{s.name_version: s.next_auto_restatement_ts for s in self.snapshots.values()}
654656
)
657+
auto_restated_snapshots = {snapshot.snapshot_id for snapshot in auto_restated_intervals}
655658

656-
merged_intervals, snapshots_naive_cron_ready = self.merged_missing_intervals(
659+
merged_intervals = self.merged_missing_intervals(
657660
start,
658661
end,
659662
execution_time,
@@ -696,7 +699,7 @@ def _run_or_audit(
696699
all_snapshot_triggers: t.Dict[SnapshotId, SnapshotEvaluationTriggers] = {
697700
s_id: SnapshotEvaluationTriggers(
698701
ignore_cron_flag=ignore_cron,
699-
cron_ready=s_id in snapshots_naive_cron_ready,
702+
cron_ready=s_id not in auto_restated_snapshots,
700703
auto_restatement_triggers=auto_restatement_triggers.get(s_id, []),
701704
select_snapshot_triggers=select_snapshot_triggers.get(s_id, []),
702705
)
@@ -864,7 +867,7 @@ def merged_missing_intervals(
864867
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
865868
ignore_cron: bool = False,
866869
end_bounded: bool = False,
867-
) -> t.Tuple[SnapshotToIntervals, t.List[SnapshotId]]:
870+
) -> SnapshotToIntervals:
868871
"""Find the largest contiguous date interval parameters based only on what is missing.
869872
870873
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
@@ -914,7 +917,7 @@ def compute_interval_params(
914917
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
915918
ignore_cron: bool = False,
916919
end_bounded: bool = False,
917-
) -> t.Tuple[SnapshotToIntervals, t.List[SnapshotId]]:
920+
) -> SnapshotToIntervals:
918921
"""Find the largest contiguous date interval parameters based only on what is missing.
919922
920923
For each node name, find all dependencies and look for a stored snapshot from the metastore. If a snapshot is found,
@@ -936,7 +939,7 @@ def compute_interval_params(
936939
allow_partials, and other attributes that could cause the intervals to exceed the target end date.
937940
938941
Returns:
939-
A tuple containing a dict containing all snapshots needing to be run with their associated interval params and a list of snapshots that are ready to run based on their naive cron schedule (ignoring plan/run context and other attributes).
942+
A dict containing all snapshots needing to be run with their associated interval params.
940943
"""
941944
snapshot_merged_intervals = {}
942945

@@ -964,11 +967,7 @@ def compute_interval_params(
964967
contiguous_batch.append((next_batch[0][0], next_batch[-1][-1]))
965968
snapshot_merged_intervals[snapshot] = contiguous_batch
966969

967-
snapshots_naive_cron_ready = [
968-
snap.snapshot_id for snap in missing_intervals(snapshots, execution_time=execution_time)
969-
]
970-
971-
return snapshot_merged_intervals, snapshots_naive_cron_ready
970+
return snapshot_merged_intervals
972971

973972

974973
def interval_diff(

tests/core/test_scheduler.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ def test_interval_params(scheduler: Scheduler, sushi_context_fixed_date: Context
5656
start_ds = "2022-01-01"
5757
end_ds = "2022-02-05"
5858

59-
interval_params, _ = compute_interval_params(
60-
[orders, waiter_revenue], start=start_ds, end=end_ds
61-
)
59+
interval_params = compute_interval_params([orders, waiter_revenue], start=start_ds, end=end_ds)
6260
assert interval_params == {
6361
orders: [
6462
(to_timestamp(start_ds), to_timestamp("2022-02-06")),
@@ -79,7 +77,7 @@ def _get_batched_missing_intervals(
7977
end: TimeLike,
8078
execution_time: t.Optional[TimeLike] = None,
8179
) -> SnapshotToIntervals:
82-
merged_intervals, _ = scheduler.merged_missing_intervals(start, end, execution_time)
80+
merged_intervals = scheduler.merged_missing_intervals(start, end, execution_time)
8381
return scheduler.batch_intervals(merged_intervals, mocker.Mock(), mocker.Mock())
8482

8583
return _get_batched_missing_intervals
@@ -91,7 +89,7 @@ def test_interval_params_nonconsecutive(scheduler: Scheduler, orders: Snapshot):
9189

9290
orders.add_interval("2022-01-10", "2022-01-15")
9391

94-
interval_params, _ = compute_interval_params([orders], start=start_ds, end=end_ds)
92+
interval_params = compute_interval_params([orders], start=start_ds, end=end_ds)
9593
assert interval_params == {
9694
orders: [
9795
(to_timestamp(start_ds), to_timestamp("2022-01-10")),
@@ -108,7 +106,7 @@ def test_interval_params_missing(scheduler: Scheduler, sushi_context_fixed_date:
108106

109107
start_ds = "2022-01-01"
110108
end_ds = "2022-03-01"
111-
interval_params, _ = compute_interval_params(
109+
interval_params = compute_interval_params(
112110
sushi_context_fixed_date.snapshots.values(), start=start_ds, end=end_ds
113111
)
114112
assert interval_params[waiters] == [

web/server/api/endpoints/plan.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def _get_plan_changes(context: Context, plan: Plan) -> models.PlanChanges:
132132

133133
def _get_plan_backfills(context: Context, plan: Plan) -> t.Dict[str, t.Any]:
134134
"""Get plan backfills"""
135-
merged_intervals, _ = context.scheduler().merged_missing_intervals()
135+
merged_intervals = context.scheduler().merged_missing_intervals()
136136
batches = context.scheduler().batch_intervals(merged_intervals, None, EnvironmentNamingInfo())
137137
tasks = {snapshot.name: len(intervals) for snapshot, intervals in batches.items()}
138138
snapshots = plan.context_diff.snapshots

0 commit comments

Comments
 (0)