Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions docs/concepts/plans.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,24 @@ This is a common choice in scenarios such as an addition of a new column, an act

If any downstream models contain a `select *` from the model, SQLMesh attempts to infer breaking status on a best-effort basis. We recommend explicitly specifying a query's columns to avoid unnecessary recomputation.

### Forward-only change
A modified (either directly or indirectly) model that is categorized as forward-only will continue to use the existing physical table once the change is deployed to production (the `prod` environment). This means that no backfill will take place.

While iterating on forward-only changes in the development environment, the model's output will be stored in either a temporary table or a shallow clone of the production table if supported by the engine.

In either case the data produced this way in the development environment can only be used for preview and will **not** be reused once the change is deployed to production. See [Forward-only Plans](#forward-only-plans) for more details.

This category is assigned by SQLMesh automatically either when a user opts into using a [forward-only plan](#forward-only-plans) or when a model is explicitly configured to be forward-only.

### Summary

| Change Category | Change Type | Behaviour |
|--------------------------------------|--------------------------------------------------------------------------------------------|----------------------------------------------------|
| [Breaking](#breaking-change) | [Direct](glossary.md#direct-modification) or [Indirect](glossary.md#indirect-modification) | [Backfill](glossary.md#backfill) |
| [Non-breaking](#non-breaking-change) | [Direct](glossary.md#direct-modification) | [Backfill](glossary.md#backfill) |
| [Non-breaking](#non-breaking-change) | [Indirect](glossary.md#indirect-modification) | [No Backfill](glossary.md#backfill) |
| [Forward-only](#forward-only-change) | [Direct](glossary.md#direct-modification) or [Indirect](glossary.md#indirect-modification) | [No Backfill](glossary.md#backfill), schema change |

## Forward-only change
In addition to categorizing a change as breaking or non-breaking, it can also be classified as forward-only.

A model change classified as forward-only will continue to use the existing physical table once the change is deployed to production (the `prod` environment). This means that no backfill will take place.

While iterating on forward-only changes in the development environment, the model's output will be stored in either a temporary table or a shallow clone of the production table if supported by the engine.

In either case the data produced this way in the development environment can only be used for preview and will **not** be reused once the change is deployed to production. See [Forward-only Plans](#forward-only-plans) for more details.

This category is assigned by SQLMesh automatically either when a user opts into using a [forward-only plan](#forward-only-plans) or when a model is explicitly configured to be forward-only.

## Plan application
Once a plan has been created and reviewed, it is then applied to the target [environment](environments.md) in order for its changes to take effect.
Expand Down
94 changes: 49 additions & 45 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,6 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> Plan
snapshot: The target snapshot.
choice: The user decision on how to version the target snapshot and its children.
"""
if self._forward_only:
raise PlanError("Choice setting is not supported by a forward-only plan.")
if not self._is_new_snapshot(snapshot):
raise PlanError(
f"A choice can't be changed for the existing version of {snapshot.name}."
Expand All @@ -250,8 +248,6 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> Plan
and snapshot.snapshot_id not in self._context_diff.added
):
raise PlanError(f"Only directly modified models can be categorized ({snapshot.name}).")
if snapshot.is_model and snapshot.model.forward_only:
raise PlanError(f"Forward-only model {snapshot.name} cannot be categorized manually.")

self._choices[snapshot.snapshot_id] = choice
self._latest_plan = None
Expand Down Expand Up @@ -369,8 +365,10 @@ def _build_restatements(
restate_models = {
s.name
for s in self._context_diff.new_snapshots.values()
if s.is_materialized
and (self._forward_only or s.model.forward_only)
if s.is_model
and not s.is_symbolic
and (s.is_forward_only or s.model.forward_only)
and not s.is_no_preview
and (
# Metadata changes should not be previewed.
self._context_diff.directly_modified(s.name)
Expand All @@ -395,6 +393,9 @@ def _build_restatements(
for s_id in dag:
snapshot = self._context_diff.snapshots[s_id]

if is_preview and snapshot.is_no_preview:
continue

# Since we are traversing the graph in topological order and the largest interval range is pushed down
# the graph we just have to check our immediate parents in the graph and not the whole upstream graph.
restating_parents = [
Expand Down Expand Up @@ -526,6 +527,9 @@ def _adjust_new_snapshot_intervals(self) -> None:

def _check_destructive_changes(self, directly_modified: t.Set[SnapshotId]) -> None:
for s_id in sorted(directly_modified):
if s_id.name not in self._context_diff.modified_snapshots:
continue

snapshot = self._context_diff.snapshots[s_id]
# should we raise/warn if this snapshot has/inherits a destructive change?
should_raise_or_warn = (
Expand Down Expand Up @@ -583,38 +587,38 @@ def _categorize_snapshots(
if not snapshot or not self._is_new_snapshot(snapshot):
continue

forward_only = self._is_forward_only_change(s_id) or self._forward_only

if s_id in self._choices:
snapshot.categorize_as(self._choices[s_id])
snapshot.categorize_as(self._choices[s_id], forward_only)
continue

if s_id in self._context_diff.added:
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
elif self._is_forward_only_change(s_id) or self._forward_only:
# In case of the forward only plan any modifications result in reuse of the
# previous version for non-seed models.
# New snapshots of seed models are considered non-breaking ones.
category = (
SnapshotChangeCategory.NON_BREAKING
if snapshot.is_seed
else SnapshotChangeCategory.FORWARD_ONLY
)
# If the model kind changes mark as breaking
if snapshot.is_model and snapshot.name in self._context_diff.modified_snapshots:
_, old = self._context_diff.modified_snapshots[snapshot.name]
if _is_breaking_kind_change(old, snapshot):
category = SnapshotChangeCategory.BREAKING

snapshot.categorize_as(category)
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
elif s_id.name in self._context_diff.modified_snapshots:
self._categorize_snapshot(snapshot, dag, indirectly_modified)
self._categorize_snapshot(snapshot, forward_only, dag, indirectly_modified)

def _categorize_snapshot(
self, snapshot: Snapshot, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping
self,
snapshot: Snapshot,
forward_only: bool,
dag: DAG[SnapshotId],
indirectly_modified: SnapshotMapping,
) -> None:
s_id = snapshot.snapshot_id

if self._context_diff.directly_modified(s_id.name):
new, old = self._context_diff.modified_snapshots[s_id.name]
is_breaking_kind_change = _is_breaking_kind_change(old, new)
if is_breaking_kind_change or snapshot.is_seed:
# Breaking kind changes and seed changes can't be forward-only.
forward_only = False

if self._auto_categorization_enabled:
if is_breaking_kind_change:
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
return

s_id_with_missing_columns: t.Optional[SnapshotId] = None
this_sid_with_downstream = indirectly_modified.get(s_id, set()) | {s_id}
for downstream_s_id in this_sid_with_downstream:
Expand All @@ -626,18 +630,18 @@ def _categorize_snapshot(
s_id_with_missing_columns = downstream_s_id
break

new, old = self._context_diff.modified_snapshots[s_id.name]
if s_id_with_missing_columns is None:
change_category = categorize_change(new, old, config=self._categorizer_config)
if change_category is not None:
snapshot.categorize_as(change_category)
snapshot.categorize_as(change_category, forward_only)
else:
mode = self._categorizer_config.dict().get(
new.model.source_type, AutoCategorizationMode.OFF
)
if mode == AutoCategorizationMode.FULL:
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
elif self._context_diff.indirectly_modified(snapshot.name):
all_upstream_forward_only = set()
all_upstream_categories = set()
direct_parent_categories = set()

Expand All @@ -646,27 +650,30 @@ def _categorize_snapshot(

if parent and self._is_new_snapshot(parent):
all_upstream_categories.add(parent.change_category)
all_upstream_forward_only.add(parent.is_forward_only)
if p_id in snapshot.parents:
direct_parent_categories.add(parent.change_category)

if snapshot.is_model and snapshot.model.forward_only:
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
elif direct_parent_categories.intersection(
if all_upstream_forward_only == {True} or (
snapshot.is_model and snapshot.model.forward_only
):
forward_only = True

if direct_parent_categories.intersection(
{SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING}
):
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING)
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only)
elif not direct_parent_categories:
snapshot.categorize_as(self._get_orphaned_indirect_change_category(snapshot))
elif SnapshotChangeCategory.FORWARD_ONLY in all_upstream_categories:
# FORWARD_ONLY must take precedence over INDIRECT_NON_BREAKING
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
snapshot.categorize_as(
self._get_orphaned_indirect_change_category(snapshot), forward_only
)
elif all_upstream_categories == {SnapshotChangeCategory.METADATA}:
snapshot.categorize_as(SnapshotChangeCategory.METADATA)
snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only)
else:
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING)
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only)
else:
# Metadata updated.
snapshot.categorize_as(SnapshotChangeCategory.METADATA)
snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only)

def _get_orphaned_indirect_change_category(
self, indirect_snapshot: Snapshot
Expand Down Expand Up @@ -769,10 +776,7 @@ def _is_forward_only_change(self, s_id: SnapshotId) -> bool:
if snapshot.is_model and _is_breaking_kind_change(old, snapshot):
return False
return (
snapshot.is_model
and snapshot.model.forward_only
and not snapshot.change_category
and bool(snapshot.previous_versions)
snapshot.is_model and snapshot.model.forward_only and bool(snapshot.previous_versions)
)

def _is_new_snapshot(self, snapshot: Snapshot) -> bool:
Expand Down Expand Up @@ -811,7 +815,7 @@ def _ensure_no_forward_only_revert(self) -> None:
and not candidate.model.forward_only
and promoted.is_forward_only
and not promoted.is_paused
and not candidate.reuses_previous_version
and not candidate.is_no_rebuild
and promoted.version == candidate.version
):
raise PlanError(
Expand Down
35 changes: 27 additions & 8 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class SnapshotChangeCategory(IntEnum):

BREAKING = 1
NON_BREAKING = 2
# FORWARD_ONLY category is deprecated and is kept for backwards compatibility.
FORWARD_ONLY = 3
INDIRECT_BREAKING = 4
INDIRECT_NON_BREAKING = 5
Expand Down Expand Up @@ -336,6 +337,7 @@ class SnapshotInfoMixin(ModelKindMixin):
base_table_name_override: t.Optional[str]
dev_table_suffix: str
table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
forward_only: bool

@cached_property
def identifier(self) -> str:
Expand Down Expand Up @@ -383,7 +385,7 @@ def fully_qualified_table(self) -> t.Optional[exp.Table]:

@property
def is_forward_only(self) -> bool:
return self.change_category == SnapshotChangeCategory.FORWARD_ONLY
return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY

@property
def is_metadata(self) -> bool:
Expand All @@ -394,9 +396,18 @@ def is_indirect_non_breaking(self) -> bool:
return self.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING

@property
def reuses_previous_version(self) -> bool:
return self.change_category in (
SnapshotChangeCategory.FORWARD_ONLY,
def is_no_rebuild(self) -> bool:
"""Returns true if this snapshot doesn't require a rebuild in production."""
return self.forward_only or self.change_category in (
SnapshotChangeCategory.FORWARD_ONLY, # Backwards compatibility
SnapshotChangeCategory.METADATA,
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
)

@property
def is_no_preview(self) -> bool:
"""Returns true if this snapshot doesn't require a preview in development."""
return self.forward_only and self.change_category in (
SnapshotChangeCategory.METADATA,
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
)
Expand Down Expand Up @@ -487,6 +498,7 @@ class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True):
custom_materialization: t.Optional[str] = None
dev_table_suffix: str
model_gateway: t.Optional[str] = None
forward_only: bool = False

def __lt__(self, other: SnapshotTableInfo) -> bool:
return self.name < other.name
Expand Down Expand Up @@ -614,6 +626,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
table_naming_convention_: TableNamingConvention = Field(
default=TableNamingConvention.default, alias="table_naming_convention"
)
forward_only: bool = False

@field_validator("ttl")
@classmethod
Expand Down Expand Up @@ -1006,22 +1019,26 @@ def check_ready_intervals(
)
return intervals

def categorize_as(self, category: SnapshotChangeCategory) -> None:
def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = False) -> None:
"""Assigns the given category to this snapshot.

Args:
category: The change category to assign to this snapshot.
forward_only: Whether or not this snapshot is applied going forward in production.
"""
assert category != SnapshotChangeCategory.FORWARD_ONLY, (
"FORWARD_ONLY change category is deprecated"
)

self.dev_version_ = self.fingerprint.to_version()
reuse_previous_version = category in (
SnapshotChangeCategory.FORWARD_ONLY,
is_no_rebuild = forward_only or category in (
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
SnapshotChangeCategory.METADATA,
)
if self.is_model and self.model.physical_version:
# If the model has a pinned version then use that.
self.version = self.model.physical_version
elif reuse_previous_version and self.previous_version:
elif is_no_rebuild and self.previous_version:
previous_version = self.previous_version
self.version = previous_version.data_version.version
self.physical_schema_ = previous_version.physical_schema
Expand All @@ -1040,6 +1057,7 @@ def categorize_as(self, category: SnapshotChangeCategory) -> None:
self.version = self.fingerprint.to_version()

self.change_category = category
self.forward_only = forward_only

@property
def categorized(self) -> bool:
Expand Down Expand Up @@ -1220,6 +1238,7 @@ def table_info(self) -> SnapshotTableInfo:
dev_table_suffix=self.dev_table_suffix,
model_gateway=self.model_gateway,
table_naming_convention=self.table_naming_convention, # type: ignore
forward_only=self.forward_only,
)

@property
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def create(
continue
deployability_flags = [True]
if (
snapshot.reuses_previous_version
snapshot.is_no_rebuild
or snapshot.is_managed
or (snapshot.is_model and snapshot.model.forward_only)
or (deployability_index and not deployability_index.is_deployable(snapshot))
Expand Down
4 changes: 3 additions & 1 deletion sqlmesh/core/state_sync/db/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,14 +733,15 @@ class SharedVersionSnapshot(PydanticModel):
disable_restatement: bool
effective_from: t.Optional[TimeLike]
raw_snapshot: t.Dict[str, t.Any]
forward_only: bool

@property
def snapshot_id(self) -> SnapshotId:
return SnapshotId(name=self.name, identifier=self.identifier)

@property
def is_forward_only(self) -> bool:
return self.change_category == SnapshotChangeCategory.FORWARD_ONLY
return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY

@property
def normalized_effective_from_ts(self) -> t.Optional[int]:
Expand Down Expand Up @@ -803,4 +804,5 @@ def from_snapshot_record(
disable_restatement=raw_node.get("kind", {}).get("disable_restatement", False),
effective_from=raw_snapshot.get("effective_from"),
raw_snapshot=raw_snapshot,
forward_only=raw_snapshot.get("forward_only", False),
)
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def _make_function(
metadata_hash="test_metadata_hash",
),
version="test_version",
change_category=SnapshotChangeCategory.FORWARD_ONLY,
change_category=SnapshotChangeCategory.NON_BREAKING,
dev_table_suffix="dev",
),
)
Expand Down
Loading