2626 snapshots_to_dag ,
2727 Intervals ,
2828)
29+ from sqlmesh .core .snapshot .definition import check_ready_intervals
2930from sqlmesh .core .snapshot .definition import (
3031 Interval ,
31- SnapshotEvaluationTriggers ,
3232 SnapshotIntervals ,
33- check_ready_intervals ,
3433 expand_range ,
3534 parent_snapshots_by_name ,
3635)
@@ -133,9 +132,6 @@ def merged_missing_intervals(
133132 end_bounded: If set to true, the returned intervals will be bounded by the target end date, disregarding lookback,
134133 allow_partials, and other attributes that could cause the intervals to exceed the target end date.
135134 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
136-
137- Returns:
138- A dict containing all snapshots needing to be run with their associated interval params.
139135 """
140136 snapshots_to_intervals = merged_missing_intervals (
141137 snapshots = self .snapshot_per_version .values (),
@@ -228,7 +224,6 @@ def run(
228224 ignore_cron : bool = False ,
229225 end_bounded : bool = False ,
230226 selected_snapshots : t .Optional [t .Set [str ]] = None ,
231- selected_snapshots_auto_upstream : t .Optional [t .Set [str ]] = None ,
232227 circuit_breaker : t .Optional [t .Callable [[], bool ]] = None ,
233228 deployability_index : t .Optional [DeployabilityIndex ] = None ,
234229 auto_restatement_enabled : bool = False ,
@@ -245,7 +240,6 @@ def run(
245240 ignore_cron = ignore_cron ,
246241 end_bounded = end_bounded ,
247242 selected_snapshots = selected_snapshots ,
248- selected_snapshots_auto_upstream = selected_snapshots_auto_upstream ,
249243 circuit_breaker = circuit_breaker ,
250244 deployability_index = deployability_index ,
251245 auto_restatement_enabled = auto_restatement_enabled ,
@@ -381,7 +375,7 @@ def run_merged_intervals(
381375 run_environment_statements : bool = False ,
382376 audit_only : bool = False ,
383377 restatements : t .Optional [t .Dict [SnapshotId , Interval ]] = None ,
384- snapshot_evaluation_triggers : t .Dict [SnapshotId , SnapshotEvaluationTriggers ] = {},
378+ auto_restatement_triggers : t .Dict [SnapshotId , t . List [ SnapshotId ] ] = {},
385379 ) -> t .Tuple [t .List [NodeExecutionFailedError [SchedulingUnit ]], t .List [SchedulingUnit ]]:
386380 """Runs precomputed batches of missing intervals.
387381
@@ -484,9 +478,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
484478 evaluation_duration_ms ,
485479 num_audits - num_audits_failed ,
486480 num_audits_failed ,
487- snapshot_evaluation_triggers = snapshot_evaluation_triggers .get (
488- snapshot .snapshot_id
489- ),
481+ auto_restatement_triggers = auto_restatement_triggers .get (snapshot .snapshot_id ),
490482 )
491483
492484 try :
@@ -597,7 +589,6 @@ def _run_or_audit(
597589 ignore_cron : bool = False ,
598590 end_bounded : bool = False ,
599591 selected_snapshots : t .Optional [t .Set [str ]] = None ,
600- selected_snapshots_auto_upstream : t .Optional [t .Set [str ]] = None ,
601592 circuit_breaker : t .Optional [t .Callable [[], bool ]] = None ,
602593 deployability_index : t .Optional [DeployabilityIndex ] = None ,
603594 auto_restatement_enabled : bool = False ,
@@ -621,7 +612,6 @@ def _run_or_audit(
621612 end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
622613 allow_partials, and other attributes that could cause the intervals to exceed the target end date.
623614 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
624- selected_snapshots_auto_upstream: The set of selected_snapshots that were automatically added because they're upstream of a selected snapshot.
625615 circuit_breaker: An optional handler which checks if the run should be aborted.
626616 deployability_index: Determines snapshots that are deployable in the context of this render.
627617 auto_restatement_enabled: Whether to enable auto restatements.
@@ -680,38 +670,9 @@ def _run_or_audit(
680670 return CompletionStatus .NOTHING_TO_DO
681671
682672 merged_intervals_snapshots = {snapshot .snapshot_id for snapshot in merged_intervals }
683- select_snapshot_triggers : t .Dict [SnapshotId , t .List [SnapshotId ]] = {}
684- if selected_snapshots and selected_snapshots_auto_upstream :
685- # actually selected snapshots are their own triggers
686- selected_snapshots_no_auto_upstream = (
687- selected_snapshots - selected_snapshots_auto_upstream
688- )
689- select_snapshot_triggers = {
690- s_id : [s_id ]
691- for s_id in [
692- snapshot_id
693- for snapshot_id in merged_intervals_snapshots
694- if snapshot_id .name in selected_snapshots_no_auto_upstream
695- ]
696- }
697673
698- # trace upstream by walking downstream on reversed dag
699- reversed_dag = snapshots_to_dag (self .snapshots .values ()).reversed
700- for s_id in reversed_dag :
701- if s_id in merged_intervals_snapshots :
702- triggers = select_snapshot_triggers .get (s_id , [])
703- for parent_s_id in reversed_dag .graph .get (s_id , set ()):
704- triggers .extend (select_snapshot_triggers .get (parent_s_id , []))
705- select_snapshot_triggers [s_id ] = list (dict .fromkeys (triggers ))
706-
707- all_snapshot_triggers : t .Dict [SnapshotId , SnapshotEvaluationTriggers ] = {
708- s_id : SnapshotEvaluationTriggers (
709- ignore_cron_flag = ignore_cron ,
710- cron_ready = s_id not in auto_restated_snapshots ,
711- auto_restatement_triggers = auto_restatement_triggers .get (s_id , []),
712- select_snapshot_triggers = select_snapshot_triggers .get (s_id , []),
713- )
714- for s_id in merged_intervals_snapshots
674+ auto_restatement_triggers_dict : t .Dict [SnapshotId , t .List [SnapshotId ]] = {
675+ s_id : auto_restatement_triggers .get (s_id , []) for s_id in merged_intervals_snapshots
715676 }
716677
717678 errors , _ = self .run_merged_intervals (
@@ -725,7 +686,7 @@ def _run_or_audit(
725686 run_environment_statements = run_environment_statements ,
726687 audit_only = audit_only ,
727688 restatements = remove_intervals ,
728- snapshot_evaluation_triggers = all_snapshot_triggers ,
689+ auto_restatement_triggers = auto_restatement_triggers_dict ,
729690 )
730691
731692 return CompletionStatus .FAILURE if errors else CompletionStatus .SUCCESS
0 commit comments