33import re
44import sys
55import typing as t
6+ import logging
67from collections import defaultdict
78from datetime import datetime
89from functools import cached_property
910
11+ from sqlmesh .core .console import Console , get_console
1012from sqlmesh .core .config import (
1113 AutoCategorizationMode ,
1214 CategorizerConfig ,
2830from sqlmesh .utils .errors import NoChangesPlanError , PlanError , SQLMeshError
2931
3032
33+ logger = logging .getLogger (__name__ )
34+
35+
3136class PlanBuilder :
3237 """Plan Builder constructs a Plan based on user choices for how they want to backfill, preview, etc. their changes.
3338
@@ -88,6 +93,7 @@ def __init__(
8893 enable_preview : bool = False ,
8994 end_bounded : bool = False ,
9095 ensure_finalized_snapshots : bool = False ,
96+ console : t .Optional [Console ] = None ,
9197 ):
9298 self ._context_diff = context_diff
9399 self ._no_gaps = no_gaps
@@ -101,12 +107,13 @@ def __init__(
101107 self ._categorizer_config = categorizer_config or CategorizerConfig ()
102108 self ._auto_categorization_enabled = auto_categorization_enabled
103109 self ._include_unmodified = include_unmodified
104- self ._restate_models = set (restate_models or [])
110+ self ._restate_models = set (restate_models ) if restate_models is not None else None
105111 self ._effective_from = effective_from
106112 self ._execution_time = execution_time
107113 self ._backfill_models = backfill_models
108114 self ._end = end or default_end
109115 self ._apply = apply
116+ self ._console = console or get_console ()
110117
111118 self ._start = start
112119 if not self ._start and self ._forward_only_preview_needed :
@@ -286,9 +293,15 @@ def is_restateable_snapshot(snapshot: Snapshot) -> bool:
286293 return False
287294 return not snapshot .is_symbolic and not snapshot .is_seed
288295
289- restatements : t .Dict [SnapshotId , Interval ] = {}
290- dummy_interval = (sys .maxsize , - sys .maxsize )
291296 restate_models = self ._restate_models
297+ if restate_models == set ():
298+ # This is a warning but we print this as error since the Console is lacking API for warnings.
299+ self ._console .log_error (
300+ "Provided restated models do not match any models. No models will be included in plan."
301+ )
302+ return {}
303+
304+ restatements : t .Dict [SnapshotId , Interval ] = {}
292305 forward_only_preview_needed = self ._forward_only_preview_needed
293306 if not restate_models and forward_only_preview_needed :
294307 # Add model names for new forward-only snapshots to the restatement list
@@ -298,18 +311,27 @@ def is_restateable_snapshot(snapshot: Snapshot) -> bool:
298311 for s in self ._context_diff .new_snapshots .values ()
299312 if s .is_materialized and (self ._forward_only or s .model .forward_only )
300313 }
314+
301315 if not restate_models :
302- return restatements
316+ return {}
303317
304318 # Add restate snapshots and their downstream snapshots
319+ dummy_interval = (sys .maxsize , - sys .maxsize )
305320 for model_fqn in restate_models :
306321 snapshot = self ._model_fqn_to_snapshot .get (model_fqn )
307- if not snapshot or (
308- not forward_only_preview_needed and not is_restateable_snapshot (snapshot )
309- ):
310- raise PlanError (
311- f"Cannot restate from '{ model_fqn } '. Either such model doesn't exist, no other materialized model references it, or restatement was disabled for this model."
312- )
322+ if not snapshot :
323+ raise PlanError (f"Cannot restate model '{ model_fqn } '. Model does not exist." )
324+ if not forward_only_preview_needed :
325+ if not self ._is_dev and snapshot .disable_restatement :
326+ # This is a warning but we print this as error since the Console is lacking API for warnings.
327+ self ._console .log_error (
328+ f"Cannot restate model '{ model_fqn } '. Restatement is disabled for this model."
329+ )
330+ continue
331+ elif snapshot .is_symbolic or snapshot .is_seed :
332+ logger .info ("Skipping restatement for model '%s'" , model_fqn )
333+ continue
334+
313335 restatements [snapshot .snapshot_id ] = dummy_interval
314336 for downstream_s_id in dag .downstream (snapshot .snapshot_id ):
315337 if is_restateable_snapshot (self ._context_diff .snapshots [downstream_s_id ]):
@@ -632,7 +654,9 @@ def _ensure_no_broken_references(self) -> None:
632654 )
633655
634656 def _ensure_no_new_snapshots_with_restatements (self ) -> None :
635- if self ._restate_models and self ._context_diff .new_snapshots :
657+ if self ._restate_models is not None and (
658+ self ._context_diff .new_snapshots or self ._context_diff .modified_snapshots
659+ ):
636660 raise PlanError (
637661 "Model changes and restatements can't be a part of the same plan. "
638662 "Revert or apply changes before proceeding with restatements."
0 commit comments