4141from sqlmesh .utils .concurrency import NodeExecutionFailedError
4242from sqlmesh .utils .errors import PlanError , SQLMeshError
4343from sqlmesh .utils .dag import DAG
44+ from sqlmesh .utils import CorrelationId
4445from sqlmesh .utils .date import now
4546
4647logger = logging .getLogger (__name__ )
@@ -72,7 +73,7 @@ class BuiltInPlanEvaluator(PlanEvaluator):
7273 def __init__ (
7374 self ,
7475 state_sync : StateSync ,
75- create_scheduler : t .Callable [[t .Iterable [Snapshot ]], Scheduler ],
76+ create_scheduler : t .Callable [[t .Iterable [Snapshot ], t . Optional [ CorrelationId ] ], Scheduler ],
7677 default_catalog : t .Optional [str ],
7778 console : t .Optional [Console ] = None ,
7879 ):
@@ -230,7 +231,9 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
230231 self .console .log_success ("SKIP: No model batches to execute" )
231232 return
232233
233- scheduler = self .create_scheduler (stage .all_snapshots .values ())
234+ scheduler = self .create_scheduler (
235+ stage .all_snapshots .values (), CorrelationId .from_plan_id (plan .plan_id )
236+ )
234237 errors , _ = scheduler .run_merged_intervals (
235238 merged_intervals = stage .snapshot_to_intervals ,
236239 deployability_index = stage .deployability_index ,
@@ -251,7 +254,7 @@ def visit_audit_only_run_stage(
251254 return
252255
253256 # If there are any snapshots to be audited, we'll reuse the scheduler's internals to audit them
254- scheduler = self .create_scheduler (audit_snapshots )
257+ scheduler = self .create_scheduler (audit_snapshots , CorrelationId . from_plan_id ( plan . plan_id ) )
255258 completion_status = scheduler .audit (
256259 plan .environment ,
257260 plan .start ,
0 commit comments