Skip to content

Commit 6b3cf6d

Browse files
committed
Feat: Decouple forward-only from change categorization
1 parent 49b5574 commit 6b3cf6d

File tree

14 files changed

+328
-266
lines changed

14 files changed

+328
-266
lines changed

sqlmesh/core/plan/builder.py

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,6 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> Plan
239239
snapshot: The target snapshot.
240240
choice: The user decision on how to version the target snapshot and its children.
241241
"""
242-
if self._forward_only:
243-
raise PlanError("Choice setting is not supported by a forward-only plan.")
244242
if not self._is_new_snapshot(snapshot):
245243
raise PlanError(
246244
f"A choice can't be changed for the existing version of {snapshot.name}."
@@ -250,8 +248,6 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> Plan
250248
and snapshot.snapshot_id not in self._context_diff.added
251249
):
252250
raise PlanError(f"Only directly modified models can be categorized ({snapshot.name}).")
253-
if snapshot.is_model and snapshot.model.forward_only:
254-
raise PlanError(f"Forward-only model {snapshot.name} cannot be categorized manually.")
255251

256252
self._choices[snapshot.snapshot_id] = choice
257253
self._latest_plan = None
@@ -369,8 +365,10 @@ def _build_restatements(
369365
restate_models = {
370366
s.name
371367
for s in self._context_diff.new_snapshots.values()
372-
if s.is_materialized
373-
and (self._forward_only or s.model.forward_only)
368+
if s.is_model
369+
and not s.is_symbolic
370+
and (s.is_forward_only or s.model.forward_only)
371+
and not s.is_no_preview
374372
and (
375373
# Metadata changes should not be previewed.
376374
self._context_diff.directly_modified(s.name)
@@ -395,6 +393,9 @@ def _build_restatements(
395393
for s_id in dag:
396394
snapshot = self._context_diff.snapshots[s_id]
397395

396+
if is_preview and snapshot.is_no_preview:
397+
continue
398+
398399
# Since we are traversing the graph in topological order and the largest interval range is pushed down
399400
# the graph we just have to check our immediate parents in the graph and not the whole upstream graph.
400401
restating_parents = [
@@ -583,38 +584,35 @@ def _categorize_snapshots(
583584
if not snapshot or not self._is_new_snapshot(snapshot):
584585
continue
585586

587+
forward_only = self._is_forward_only_change(s_id) or self._forward_only
588+
586589
if s_id in self._choices:
587-
snapshot.categorize_as(self._choices[s_id])
590+
snapshot.categorize_as(self._choices[s_id], forward_only)
588591
continue
589592

590593
if s_id in self._context_diff.added:
591-
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
592-
elif self._is_forward_only_change(s_id) or self._forward_only:
593-
# In case of the forward only plan any modifications result in reuse of the
594-
# previous version for non-seed models.
595-
# New snapshots of seed models are considered non-breaking ones.
596-
category = (
597-
SnapshotChangeCategory.NON_BREAKING
598-
if snapshot.is_seed
599-
else SnapshotChangeCategory.FORWARD_ONLY
600-
)
601-
# If the model kind changes mark as breaking
602-
if snapshot.is_model and snapshot.name in self._context_diff.modified_snapshots:
603-
_, old = self._context_diff.modified_snapshots[snapshot.name]
604-
if _is_breaking_kind_change(old, snapshot):
605-
category = SnapshotChangeCategory.BREAKING
606-
607-
snapshot.categorize_as(category)
594+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
608595
elif s_id.name in self._context_diff.modified_snapshots:
609-
self._categorize_snapshot(snapshot, dag, indirectly_modified)
596+
self._categorize_snapshot(snapshot, forward_only, dag, indirectly_modified)
610597

611598
def _categorize_snapshot(
612-
self, snapshot: Snapshot, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping
599+
self,
600+
snapshot: Snapshot,
601+
forward_only: bool,
602+
dag: DAG[SnapshotId],
603+
indirectly_modified: SnapshotMapping,
613604
) -> None:
614605
s_id = snapshot.snapshot_id
615606

616607
if self._context_diff.directly_modified(s_id.name):
617-
if self._auto_categorization_enabled:
608+
new, old = self._context_diff.modified_snapshots[s_id.name]
609+
if _is_breaking_kind_change(old, new):
610+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, False)
611+
elif self._auto_categorization_enabled:
612+
if snapshot.is_seed:
613+
# Seed changes can't be forward-only.
614+
forward_only = False
615+
618616
s_id_with_missing_columns: t.Optional[SnapshotId] = None
619617
this_sid_with_downstream = indirectly_modified.get(s_id, set()) | {s_id}
620618
for downstream_s_id in this_sid_with_downstream:
@@ -626,18 +624,18 @@ def _categorize_snapshot(
626624
s_id_with_missing_columns = downstream_s_id
627625
break
628626

629-
new, old = self._context_diff.modified_snapshots[s_id.name]
630627
if s_id_with_missing_columns is None:
631628
change_category = categorize_change(new, old, config=self._categorizer_config)
632629
if change_category is not None:
633-
snapshot.categorize_as(change_category)
630+
snapshot.categorize_as(change_category, forward_only)
634631
else:
635632
mode = self._categorizer_config.dict().get(
636633
new.model.source_type, AutoCategorizationMode.OFF
637634
)
638635
if mode == AutoCategorizationMode.FULL:
639-
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
636+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
640637
elif self._context_diff.indirectly_modified(snapshot.name):
638+
all_upstream_forward_only = set()
641639
all_upstream_categories = set()
642640
direct_parent_categories = set()
643641

@@ -646,27 +644,30 @@ def _categorize_snapshot(
646644

647645
if parent and self._is_new_snapshot(parent):
648646
all_upstream_categories.add(parent.change_category)
647+
all_upstream_forward_only.add(parent.is_forward_only)
649648
if p_id in snapshot.parents:
650649
direct_parent_categories.add(parent.change_category)
651650

652-
if snapshot.is_model and snapshot.model.forward_only:
653-
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
654-
elif direct_parent_categories.intersection(
651+
if all_upstream_forward_only == {True} or (
652+
snapshot.is_model and snapshot.model.forward_only
653+
):
654+
forward_only = True
655+
656+
if direct_parent_categories.intersection(
655657
{SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING}
656658
):
657-
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING)
659+
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only)
658660
elif not direct_parent_categories:
659-
snapshot.categorize_as(self._get_orphaned_indirect_change_category(snapshot))
660-
elif SnapshotChangeCategory.FORWARD_ONLY in all_upstream_categories:
661-
# FORWARD_ONLY must take precedence over INDIRECT_NON_BREAKING
662-
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
661+
snapshot.categorize_as(
662+
self._get_orphaned_indirect_change_category(snapshot), forward_only
663+
)
663664
elif all_upstream_categories == {SnapshotChangeCategory.METADATA}:
664-
snapshot.categorize_as(SnapshotChangeCategory.METADATA)
665+
snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only)
665666
else:
666-
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING)
667+
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only)
667668
else:
668669
# Metadata updated.
669-
snapshot.categorize_as(SnapshotChangeCategory.METADATA)
670+
snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only)
670671

671672
def _get_orphaned_indirect_change_category(
672673
self, indirect_snapshot: Snapshot
@@ -769,10 +770,7 @@ def _is_forward_only_change(self, s_id: SnapshotId) -> bool:
769770
if snapshot.is_model and _is_breaking_kind_change(old, snapshot):
770771
return False
771772
return (
772-
snapshot.is_model
773-
and snapshot.model.forward_only
774-
and not snapshot.change_category
775-
and bool(snapshot.previous_versions)
773+
snapshot.is_model and snapshot.model.forward_only and bool(snapshot.previous_versions)
776774
)
777775

778776
def _is_new_snapshot(self, snapshot: Snapshot) -> bool:
@@ -811,7 +809,7 @@ def _ensure_no_forward_only_revert(self) -> None:
811809
and not candidate.model.forward_only
812810
and promoted.is_forward_only
813811
and not promoted.is_paused
814-
and not candidate.reuses_previous_version
812+
and not candidate.is_no_rebuild
815813
and promoted.version == candidate.version
816814
):
817815
raise PlanError(

sqlmesh/core/snapshot/definition.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class SnapshotChangeCategory(IntEnum):
7676

7777
BREAKING = 1
7878
NON_BREAKING = 2
79+
# FORWARD_ONLY category is deprecated and is kept for backwards compatibility.
7980
FORWARD_ONLY = 3
8081
INDIRECT_BREAKING = 4
8182
INDIRECT_NON_BREAKING = 5
@@ -336,6 +337,7 @@ class SnapshotInfoMixin(ModelKindMixin):
336337
base_table_name_override: t.Optional[str]
337338
dev_table_suffix: str
338339
table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
340+
forward_only: bool
339341

340342
@cached_property
341343
def identifier(self) -> str:
@@ -383,7 +385,7 @@ def fully_qualified_table(self) -> t.Optional[exp.Table]:
383385

384386
@property
385387
def is_forward_only(self) -> bool:
386-
return self.change_category == SnapshotChangeCategory.FORWARD_ONLY
388+
return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY
387389

388390
@property
389391
def is_metadata(self) -> bool:
@@ -394,9 +396,18 @@ def is_indirect_non_breaking(self) -> bool:
394396
return self.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING
395397

396398
@property
397-
def reuses_previous_version(self) -> bool:
398-
return self.change_category in (
399-
SnapshotChangeCategory.FORWARD_ONLY,
399+
def is_no_rebuild(self) -> bool:
400+
"""Returns true if this snapshot doesn't require a rebuild in production."""
401+
return self.forward_only or self.change_category in (
402+
SnapshotChangeCategory.FORWARD_ONLY, # Backwards compatibility
403+
SnapshotChangeCategory.METADATA,
404+
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
405+
)
406+
407+
@property
408+
def is_no_preview(self) -> bool:
409+
"""Returns true if this snapshot doesn't require a preview in development."""
410+
return self.forward_only and self.change_category in (
400411
SnapshotChangeCategory.METADATA,
401412
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
402413
)
@@ -487,6 +498,7 @@ class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True):
487498
custom_materialization: t.Optional[str] = None
488499
dev_table_suffix: str
489500
model_gateway: t.Optional[str] = None
501+
forward_only: bool = False
490502

491503
def __lt__(self, other: SnapshotTableInfo) -> bool:
492504
return self.name < other.name
@@ -614,6 +626,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
614626
table_naming_convention_: TableNamingConvention = Field(
615627
default=TableNamingConvention.default, alias="table_naming_convention"
616628
)
629+
forward_only: bool = False
617630

618631
@field_validator("ttl")
619632
@classmethod
@@ -1006,22 +1019,26 @@ def check_ready_intervals(
10061019
)
10071020
return intervals
10081021

1009-
def categorize_as(self, category: SnapshotChangeCategory) -> None:
1022+
def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = False) -> None:
10101023
"""Assigns the given category to this snapshot.
10111024
10121025
Args:
10131026
category: The change category to assign to this snapshot.
1027+
forward_only: Whether or not this snapshot is applied going forward in production.
10141028
"""
1029+
assert category != SnapshotChangeCategory.FORWARD_ONLY, (
1030+
"FORWARD_ONLY change category is deprecated"
1031+
)
1032+
10151033
self.dev_version_ = self.fingerprint.to_version()
1016-
reuse_previous_version = category in (
1017-
SnapshotChangeCategory.FORWARD_ONLY,
1034+
is_no_rebuild = forward_only or category in (
10181035
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
10191036
SnapshotChangeCategory.METADATA,
10201037
)
10211038
if self.is_model and self.model.physical_version:
10221039
# If the model has a pinned version then use that.
10231040
self.version = self.model.physical_version
1024-
elif reuse_previous_version and self.previous_version:
1041+
elif is_no_rebuild and self.previous_version:
10251042
previous_version = self.previous_version
10261043
self.version = previous_version.data_version.version
10271044
self.physical_schema_ = previous_version.physical_schema
@@ -1040,6 +1057,7 @@ def categorize_as(self, category: SnapshotChangeCategory) -> None:
10401057
self.version = self.fingerprint.to_version()
10411058

10421059
self.change_category = category
1060+
self.forward_only = forward_only
10431061

10441062
@property
10451063
def categorized(self) -> bool:
@@ -1220,6 +1238,7 @@ def table_info(self) -> SnapshotTableInfo:
12201238
dev_table_suffix=self.dev_table_suffix,
12211239
model_gateway=self.model_gateway,
12221240
table_naming_convention=self.table_naming_convention, # type: ignore
1241+
forward_only=self.forward_only,
12231242
)
12241243

12251244
@property

sqlmesh/core/snapshot/evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ def create(
338338
continue
339339
deployability_flags = [True]
340340
if (
341-
snapshot.reuses_previous_version
341+
snapshot.is_no_rebuild
342342
or snapshot.is_managed
343343
or (snapshot.is_model and snapshot.model.forward_only)
344344
or (deployability_index and not deployability_index.is_deployable(snapshot))

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -733,14 +733,15 @@ class SharedVersionSnapshot(PydanticModel):
733733
disable_restatement: bool
734734
effective_from: t.Optional[TimeLike]
735735
raw_snapshot: t.Dict[str, t.Any]
736+
forward_only: bool
736737

737738
@property
738739
def snapshot_id(self) -> SnapshotId:
739740
return SnapshotId(name=self.name, identifier=self.identifier)
740741

741742
@property
742743
def is_forward_only(self) -> bool:
743-
return self.change_category == SnapshotChangeCategory.FORWARD_ONLY
744+
return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY
744745

745746
@property
746747
def normalized_effective_from_ts(self) -> t.Optional[int]:
@@ -803,4 +804,5 @@ def from_snapshot_record(
803804
disable_restatement=raw_node.get("kind", {}).get("disable_restatement", False),
804805
effective_from=raw_snapshot.get("effective_from"),
805806
raw_snapshot=raw_snapshot,
807+
forward_only=raw_snapshot.get("forward_only", False),
806808
)

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ def _make_function(
438438
metadata_hash="test_metadata_hash",
439439
),
440440
version="test_version",
441-
change_category=SnapshotChangeCategory.FORWARD_ONLY,
441+
change_category=SnapshotChangeCategory.NON_BREAKING,
442442
dev_table_suffix="dev",
443443
),
444444
)

tests/core/analytics/test_collector.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def test_on_snapshots_created(
218218
context.get_snapshot("sushi.waiter_revenue_by_day"),
219219
context.get_snapshot("sushi.top_waiters"),
220220
]
221-
new_snapshots[0].categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
221+
new_snapshots[0].categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True)
222222
new_snapshots[0].effective_from = "2024-01-01"
223223
new_snapshots[0].version = "test_version"
224224

@@ -239,7 +239,7 @@ def test_on_snapshots_created(
239239
"node_type": "model",
240240
"model_kind": "incremental_by_time_range",
241241
"is_sql": False,
242-
"change_category": "forward_only",
242+
"change_category": "breaking",
243243
"dialect": "duckdb",
244244
"audits_count": 0,
245245
"effective_from_set": True,

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2087,7 +2087,9 @@ def _run_plan(sqlmesh_context: Context, environment: str = None) -> PlanResults:
20872087
plan_1 = _run_plan(context)
20882088

20892089
assert plan_1.snapshot_for(model_a).change_category == SnapshotChangeCategory.BREAKING
2090+
assert not plan_1.snapshot_for(model_a).is_forward_only
20902091
assert plan_1.snapshot_for(model_b).change_category == SnapshotChangeCategory.BREAKING
2092+
assert not plan_1.snapshot_for(model_b).is_forward_only
20912093

20922094
# so far so good, model_a should exist as a normal table, model b should be a managed table and the prod views should exist
20932095
assert len(plan_1.schema_metadata.views) == 2
@@ -2134,8 +2136,10 @@ def _run_plan(sqlmesh_context: Context, environment: str = None) -> PlanResults:
21342136

21352137
assert plan_2.plan.has_changes
21362138
assert len(plan_2.plan.modified_snapshots) == 2
2137-
assert plan_2.snapshot_for(new_model_a).change_category == SnapshotChangeCategory.FORWARD_ONLY
2139+
assert plan_2.snapshot_for(new_model_a).change_category == SnapshotChangeCategory.NON_BREAKING
2140+
assert plan_2.snapshot_for(new_model_a).is_forward_only
21382141
assert plan_2.snapshot_for(model_b).change_category == SnapshotChangeCategory.NON_BREAKING
2142+
assert not plan_2.snapshot_for(model_b).is_forward_only
21392143

21402144
# verify that the new snapshots were created correctly
21412145
# the forward-only change to model A should be in a new table separate from the one created in the first plan
@@ -2207,8 +2211,10 @@ def _run_plan(sqlmesh_context: Context, environment: str = None) -> PlanResults:
22072211
plan_4 = _run_plan(context)
22082212

22092213
assert plan_4.plan.has_changes
2210-
assert plan_4.snapshot_for(model_a).change_category == SnapshotChangeCategory.FORWARD_ONLY
2214+
assert plan_4.snapshot_for(model_a).change_category == SnapshotChangeCategory.NON_BREAKING
2215+
assert plan_4.snapshot_for(model_a).is_forward_only
22112216
assert plan_4.snapshot_for(model_b).change_category == SnapshotChangeCategory.NON_BREAKING
2217+
assert not plan_4.snapshot_for(model_b).is_forward_only
22122218

22132219
# verify the Model B table is created as a managed table in prod
22142220
assert plan_4.table_name_for(model_b) == plan_3.table_name_for(

0 commit comments

Comments
 (0)