@@ -2158,7 +2158,7 @@ def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
21582158
21592159def apply_auto_restatements (
21602160 snapshots : t .Dict [SnapshotId , Snapshot ], execution_time : TimeLike
2161- ) -> t .List [SnapshotIntervals ]:
2161+ ) -> t .Tuple [ t . List [SnapshotIntervals ], t . Dict [ SnapshotId , SnapshotId ] ]:
21622162 """Applies auto restatements to the snapshots.
21632163
21642164 This operation results in the removal of intervals for snapshots that are ready to be restated based
@@ -2173,6 +2173,8 @@ def apply_auto_restatements(
21732173 A list of SnapshotIntervals with **new** intervals that need to be restated.
21742174 """
21752175 dag = snapshots_to_dag (snapshots .values ())
2176+ snapshots_with_auto_restatements : t .List [SnapshotId ] = []
2177+ auto_restatement_triggers : t .Dict [SnapshotId , SnapshotId ] = {}
21762178 auto_restated_intervals_per_snapshot : t .Dict [SnapshotId , Interval ] = {}
21772179 for s_id in dag :
21782180 if s_id not in snapshots :
@@ -2196,6 +2198,23 @@ def apply_auto_restatements(
21962198 )
21972199 auto_restated_intervals .append (next_auto_restated_interval )
21982200
2201+ # auto-restated snapshot is its own trigger
2202+ snapshots_with_auto_restatements .append (s_id )
2203+ auto_restatement_triggers [s_id ] = s_id
2204+ else :
2205+ for parent_s_id in snapshot .parents :
2206+ # first auto-restated parent is the trigger
2207+ if parent_s_id in snapshots_with_auto_restatements :
2208+ auto_restatement_triggers [s_id ] = parent_s_id
2209+ break
2210+ # if no trigger yet and parent has trigger, inherit their trigger
2211+ # - will be overwritten if a different parent is auto-restated
2212+ if (
2213+ parent_s_id in auto_restatement_triggers
2214+ and s_id not in auto_restatement_triggers
2215+ ):
2216+ auto_restatement_triggers [s_id ] = auto_restatement_triggers [parent_s_id ]
2217+
21992218 if auto_restated_intervals :
22002219 auto_restated_interval_start = sys .maxsize
22012220 auto_restated_interval_end = - sys .maxsize
@@ -2225,20 +2244,22 @@ def apply_auto_restatements(
22252244
22262245 snapshot .apply_pending_restatement_intervals ()
22272246 snapshot .update_next_auto_restatement_ts (execution_time )
2228-
2229- return [
2230- SnapshotIntervals (
2231- name = snapshots [s_id ].name ,
2232- identifier = None ,
2233- version = snapshots [s_id ].version ,
2234- dev_version = None ,
2235- intervals = [],
2236- dev_intervals = [],
2237- pending_restatement_intervals = [interval ],
2238- )
2239- for s_id , interval in auto_restated_intervals_per_snapshot .items ()
2240- if s_id in snapshots
2241- ]
2247+ return (
2248+ [
2249+ SnapshotIntervals (
2250+ name = snapshots [s_id ].name ,
2251+ identifier = None ,
2252+ version = snapshots [s_id ].version ,
2253+ dev_version = None ,
2254+ intervals = [],
2255+ dev_intervals = [],
2256+ pending_restatement_intervals = [interval ],
2257+ )
2258+ for s_id , interval in auto_restated_intervals_per_snapshot .items ()
2259+ if s_id in snapshots
2260+ ],
2261+ auto_restatement_triggers ,
2262+ )
22422263
22432264
22442265def parent_snapshots_by_name (
0 commit comments