2626 snapshots_to_dag ,
2727 Intervals ,
2828)
29+ from sqlmesh .core .snapshot .definition import check_ready_intervals
2930from sqlmesh .core .snapshot .definition import (
3031 Interval ,
31- SnapshotEvaluationTriggers ,
3232 SnapshotIntervals ,
33- check_ready_intervals ,
3433 expand_range ,
3534 parent_snapshots_by_name ,
3635)
@@ -133,9 +132,6 @@ def merged_missing_intervals(
133132 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
134133 allow_partials, and other attributes that could cause the intervals to exceed the target end date.
135134 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
136-
137- Returns:
138- A dict containing all snapshots needing to be run with their associated interval params.
139135 """
140136 snapshots_to_intervals = merged_missing_intervals (
141137 snapshots = self .snapshot_per_version .values (),
@@ -226,7 +222,6 @@ def run(
226222 ignore_cron : bool = False ,
227223 end_bounded : bool = False ,
228224 selected_snapshots : t .Optional [t .Set [str ]] = None ,
229- selected_snapshots_auto_upstream : t .Optional [t .Set [str ]] = None ,
230225 circuit_breaker : t .Optional [t .Callable [[], bool ]] = None ,
231226 deployability_index : t .Optional [DeployabilityIndex ] = None ,
232227 auto_restatement_enabled : bool = False ,
@@ -243,7 +238,6 @@ def run(
243238 ignore_cron = ignore_cron ,
244239 end_bounded = end_bounded ,
245240 selected_snapshots = selected_snapshots ,
246- selected_snapshots_auto_upstream = selected_snapshots_auto_upstream ,
247241 circuit_breaker = circuit_breaker ,
248242 deployability_index = deployability_index ,
249243 auto_restatement_enabled = auto_restatement_enabled ,
@@ -378,6 +372,7 @@ def run_merged_intervals(
378372 end : t .Optional [TimeLike ] = None ,
379373 run_environment_statements : bool = False ,
380374 audit_only : bool = False ,
375+ restatements : t .Optional [t .Dict [SnapshotId , Interval ]] = None ,
381376 auto_restatement_triggers : t .Dict [SnapshotId , t .List [SnapshotId ]] = {},
382377 ) -> t .Tuple [t .List [NodeExecutionFailedError [SchedulingUnit ]], t .List [SchedulingUnit ]]:
383378 """Runs precomputed batches of missing intervals.
@@ -476,9 +471,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
476471 evaluation_duration_ms ,
477472 num_audits - num_audits_failed ,
478473 num_audits_failed ,
479- snapshot_evaluation_triggers = snapshot_evaluation_triggers .get (
480- snapshot .snapshot_id
481- ),
474+ auto_restatement_triggers = auto_restatement_triggers .get (snapshot .snapshot_id ),
482475 )
483476
484477 try :
@@ -589,7 +582,6 @@ def _run_or_audit(
589582 ignore_cron : bool = False ,
590583 end_bounded : bool = False ,
591584 selected_snapshots : t .Optional [t .Set [str ]] = None ,
592- selected_snapshots_auto_upstream : t .Optional [t .Set [str ]] = None ,
593585 circuit_breaker : t .Optional [t .Callable [[], bool ]] = None ,
594586 deployability_index : t .Optional [DeployabilityIndex ] = None ,
595587 auto_restatement_enabled : bool = False ,
@@ -613,7 +605,6 @@ def _run_or_audit(
613605 end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
614606 allow_partials, and other attributes that could cause the intervals to exceed the target end date.
615607 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
616- selected_snapshots_auto_upstream: The set of selected_snapshots that were automatically added because they're upstream of a selected snapshot.
617608 circuit_breaker: An optional handler which checks if the run should be aborted.
618609 deployability_index: Determines snapshots that are deployable in the context of this render.
619610 auto_restatement_enabled: Whether to enable auto restatements.
@@ -672,38 +663,9 @@ def _run_or_audit(
672663 return CompletionStatus .NOTHING_TO_DO
673664
674665 merged_intervals_snapshots = {snapshot .snapshot_id for snapshot in merged_intervals }
675- select_snapshot_triggers : t .Dict [SnapshotId , t .List [SnapshotId ]] = {}
676- if selected_snapshots and selected_snapshots_auto_upstream :
677- # actually selected snapshots are their own triggers
678- selected_snapshots_no_auto_upstream = (
679- selected_snapshots - selected_snapshots_auto_upstream
680- )
681- select_snapshot_triggers = {
682- s_id : [s_id ]
683- for s_id in [
684- snapshot_id
685- for snapshot_id in merged_intervals_snapshots
686- if snapshot_id .name in selected_snapshots_no_auto_upstream
687- ]
688- }
689666
690- # trace upstream by walking downstream on reversed dag
691- reversed_dag = snapshots_to_dag (self .snapshots .values ()).reversed
692- for s_id in reversed_dag :
693- if s_id in merged_intervals_snapshots :
694- triggers = select_snapshot_triggers .get (s_id , [])
695- for parent_s_id in reversed_dag .graph .get (s_id , set ()):
696- triggers .extend (select_snapshot_triggers .get (parent_s_id , []))
697- select_snapshot_triggers [s_id ] = list (dict .fromkeys (triggers ))
698-
699- all_snapshot_triggers : t .Dict [SnapshotId , SnapshotEvaluationTriggers ] = {
700- s_id : SnapshotEvaluationTriggers (
701- ignore_cron_flag = ignore_cron ,
702- cron_ready = s_id not in auto_restated_snapshots ,
703- auto_restatement_triggers = auto_restatement_triggers .get (s_id , []),
704- select_snapshot_triggers = select_snapshot_triggers .get (s_id , []),
705- )
706- for s_id in merged_intervals_snapshots
667+ auto_restatement_triggers_dict : t .Dict [SnapshotId , t .List [SnapshotId ]] = {
668+ s_id : auto_restatement_triggers .get (s_id , []) for s_id in merged_intervals_snapshots
707669 }
708670
709671 errors , _ = self .run_merged_intervals (
0 commit comments