diff --git a/docs/concepts/plans.md b/docs/concepts/plans.md index da3d3debb7..91616a6e7e 100644 --- a/docs/concepts/plans.md +++ b/docs/concepts/plans.md @@ -43,15 +43,6 @@ This is a common choice in scenarios such as an addition of a new column, an act If any downstream models contain a `select *` from the model, SQLMesh attempts to infer breaking status on a best-effort basis. We recommend explicitly specifying a query's columns to avoid unnecessary recomputation. -### Forward-only change -A modified (either directly or indirectly) model that is categorized as forward-only will continue to use the existing physical table once the change is deployed to production (the `prod` environment). This means that no backfill will take place. - -While iterating on forward-only changes in the development environment, the model's output will be stored in either a temporary table or a shallow clone of the production table if supported by the engine. - -In either case the data produced this way in the development environment can only be used for preview and will **not** be reused once the change is deployed to production. See [Forward-only Plans](#forward-only-plans) for more details. - -This category is assigned by SQLMesh automatically either when a user opts into using a [forward-only plan](#forward-only-plans) or when a model is explicitly configured to be forward-only. - ### Summary | Change Category | Change Type | Behaviour | @@ -59,7 +50,17 @@ This category is assigned by SQLMesh automatically either when a user opts into | [Breaking](#breaking-change) | [Direct](glossary.md#direct-modification) or [Indirect](glossary.md#indirect-modification) | [Backfill](glossary.md#backfill) | | [Non-breaking](#non-breaking-change) | [Direct](glossary.md#direct-modification) | [Backfill](glossary.md#backfill) | | [Non-breaking](#non-breaking-change) | [Indirect](glossary.md#indirect-modification) | [No Backfill](glossary.md#backfill) | -| [Forward-only](#forward-only-change) | [Direct](glossary.md#direct-modification) or [Indirect](glossary.md#indirect-modification) | [No Backfill](glossary.md#backfill), schema change | + +## Forward-only change +In addition to categorizing a change as breaking or non-breaking, it can also be classified as forward-only. + +A model change classified as forward-only will continue to use the existing physical table once the change is deployed to production (the `prod` environment). This means that no backfill will take place. + +While iterating on forward-only changes in the development environment, the model's output will be stored in either a temporary table or a shallow clone of the production table if supported by the engine. + +In either case the data produced this way in the development environment can only be used for preview and will **not** be reused once the change is deployed to production. See [Forward-only Plans](#forward-only-plans) for more details. + +This category is assigned by SQLMesh automatically either when a user opts into using a [forward-only plan](#forward-only-plans) or when a model is explicitly configured to be forward-only. ## Plan application Once a plan has been created and reviewed, it is then applied to the target [environment](environments.md) in order for its changes to take effect. diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 6f3c7f0805..178cd8d2e4 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -239,8 +239,6 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> Plan snapshot: The target snapshot. choice: The user decision on how to version the target snapshot and its children. """ - if self._forward_only: - raise PlanError("Choice setting is not supported by a forward-only plan.") if not self._is_new_snapshot(snapshot): raise PlanError( f"A choice can't be changed for the existing version of {snapshot.name}." @@ -250,8 +248,6 @@ def set_choice(self, snapshot: Snapshot, choice: SnapshotChangeCategory) -> Plan and snapshot.snapshot_id not in self._context_diff.added ): raise PlanError(f"Only directly modified models can be categorized ({snapshot.name}).") - if snapshot.is_model and snapshot.model.forward_only: - raise PlanError(f"Forward-only model {snapshot.name} cannot be categorized manually.") self._choices[snapshot.snapshot_id] = choice self._latest_plan = None @@ -369,8 +365,10 @@ def _build_restatements( restate_models = { s.name for s in self._context_diff.new_snapshots.values() - if s.is_materialized - and (self._forward_only or s.model.forward_only) + if s.is_model + and not s.is_symbolic + and (s.is_forward_only or s.model.forward_only) + and not s.is_no_preview and ( # Metadata changes should not be previewed. self._context_diff.directly_modified(s.name) @@ -395,6 +393,9 @@ def _build_restatements( for s_id in dag: snapshot = self._context_diff.snapshots[s_id] + if is_preview and snapshot.is_no_preview: + continue + # Since we are traversing the graph in topological order and the largest interval range is pushed down # the graph we just have to check our immediate parents in the graph and not the whole upstream graph. restating_parents = [ @@ -526,6 +527,9 @@ def _adjust_new_snapshot_intervals(self) -> None: def _check_destructive_changes(self, directly_modified: t.Set[SnapshotId]) -> None: for s_id in sorted(directly_modified): + if s_id.name not in self._context_diff.modified_snapshots: + continue + snapshot = self._context_diff.snapshots[s_id] # should we raise/warn if this snapshot has/inherits a destructive change? should_raise_or_warn = ( @@ -583,38 +587,38 @@ def _categorize_snapshots( if not snapshot or not self._is_new_snapshot(snapshot): continue + forward_only = self._is_forward_only_change(s_id) or self._forward_only + if s_id in self._choices: - snapshot.categorize_as(self._choices[s_id]) + snapshot.categorize_as(self._choices[s_id], forward_only) continue if s_id in self._context_diff.added: - snapshot.categorize_as(SnapshotChangeCategory.BREAKING) - elif self._is_forward_only_change(s_id) or self._forward_only: - # In case of the forward only plan any modifications result in reuse of the - # previous version for non-seed models. - # New snapshots of seed models are considered non-breaking ones. - category = ( - SnapshotChangeCategory.NON_BREAKING - if snapshot.is_seed - else SnapshotChangeCategory.FORWARD_ONLY - ) - # If the model kind changes mark as breaking - if snapshot.is_model and snapshot.name in self._context_diff.modified_snapshots: - _, old = self._context_diff.modified_snapshots[snapshot.name] - if _is_breaking_kind_change(old, snapshot): - category = SnapshotChangeCategory.BREAKING - - snapshot.categorize_as(category) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only) elif s_id.name in self._context_diff.modified_snapshots: - self._categorize_snapshot(snapshot, dag, indirectly_modified) + self._categorize_snapshot(snapshot, forward_only, dag, indirectly_modified) def _categorize_snapshot( - self, snapshot: Snapshot, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping + self, + snapshot: Snapshot, + forward_only: bool, + dag: DAG[SnapshotId], + indirectly_modified: SnapshotMapping, ) -> None: s_id = snapshot.snapshot_id if self._context_diff.directly_modified(s_id.name): + new, old = self._context_diff.modified_snapshots[s_id.name] + is_breaking_kind_change = _is_breaking_kind_change(old, new) + if is_breaking_kind_change or snapshot.is_seed: + # Breaking kind changes and seed changes can't be forward-only. + forward_only = False + if self._auto_categorization_enabled: + if is_breaking_kind_change: + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only) + return + s_id_with_missing_columns: t.Optional[SnapshotId] = None this_sid_with_downstream = indirectly_modified.get(s_id, set()) | {s_id} for downstream_s_id in this_sid_with_downstream: @@ -626,18 +630,18 @@ def _categorize_snapshot( s_id_with_missing_columns = downstream_s_id break - new, old = self._context_diff.modified_snapshots[s_id.name] if s_id_with_missing_columns is None: change_category = categorize_change(new, old, config=self._categorizer_config) if change_category is not None: - snapshot.categorize_as(change_category) + snapshot.categorize_as(change_category, forward_only) else: mode = self._categorizer_config.dict().get( new.model.source_type, AutoCategorizationMode.OFF ) if mode == AutoCategorizationMode.FULL: - snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only) elif self._context_diff.indirectly_modified(snapshot.name): + all_upstream_forward_only = set() all_upstream_categories = set() direct_parent_categories = set() @@ -646,27 +650,30 @@ def _categorize_snapshot( if parent and self._is_new_snapshot(parent): all_upstream_categories.add(parent.change_category) + all_upstream_forward_only.add(parent.is_forward_only) if p_id in snapshot.parents: direct_parent_categories.add(parent.change_category) - if snapshot.is_model and snapshot.model.forward_only: - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) - elif direct_parent_categories.intersection( + if all_upstream_forward_only == {True} or ( + snapshot.is_model and snapshot.model.forward_only + ): + forward_only = True + + if direct_parent_categories.intersection( {SnapshotChangeCategory.BREAKING, SnapshotChangeCategory.INDIRECT_BREAKING} ): - snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING) + snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only) elif not direct_parent_categories: - snapshot.categorize_as(self._get_orphaned_indirect_change_category(snapshot)) - elif SnapshotChangeCategory.FORWARD_ONLY in all_upstream_categories: - # FORWARD_ONLY must take precedence over INDIRECT_NON_BREAKING - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as( + self._get_orphaned_indirect_change_category(snapshot), forward_only + ) elif all_upstream_categories == {SnapshotChangeCategory.METADATA}: - snapshot.categorize_as(SnapshotChangeCategory.METADATA) + snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only) else: - snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING) + snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only) else: # Metadata updated. - snapshot.categorize_as(SnapshotChangeCategory.METADATA) + snapshot.categorize_as(SnapshotChangeCategory.METADATA, forward_only) def _get_orphaned_indirect_change_category( self, indirect_snapshot: Snapshot @@ -769,10 +776,7 @@ def _is_forward_only_change(self, s_id: SnapshotId) -> bool: if snapshot.is_model and _is_breaking_kind_change(old, snapshot): return False return ( - snapshot.is_model - and snapshot.model.forward_only - and not snapshot.change_category - and bool(snapshot.previous_versions) + snapshot.is_model and snapshot.model.forward_only and bool(snapshot.previous_versions) ) def _is_new_snapshot(self, snapshot: Snapshot) -> bool: @@ -811,7 +815,7 @@ def _ensure_no_forward_only_revert(self) -> None: and not candidate.model.forward_only and promoted.is_forward_only and not promoted.is_paused - and not candidate.reuses_previous_version + and not candidate.is_no_rebuild and promoted.version == candidate.version ): raise PlanError( diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 266a974821..90cd963051 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -76,6 +76,7 @@ class SnapshotChangeCategory(IntEnum): BREAKING = 1 NON_BREAKING = 2 + # FORWARD_ONLY category is deprecated and is kept for backwards compatibility. FORWARD_ONLY = 3 INDIRECT_BREAKING = 4 INDIRECT_NON_BREAKING = 5 @@ -336,6 +337,7 @@ class SnapshotInfoMixin(ModelKindMixin): base_table_name_override: t.Optional[str] dev_table_suffix: str table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default) + forward_only: bool @cached_property def identifier(self) -> str: @@ -383,7 +385,7 @@ def fully_qualified_table(self) -> t.Optional[exp.Table]: @property def is_forward_only(self) -> bool: - return self.change_category == SnapshotChangeCategory.FORWARD_ONLY + return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY @property def is_metadata(self) -> bool: @@ -394,9 +396,18 @@ def is_indirect_non_breaking(self) -> bool: return self.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING @property - def reuses_previous_version(self) -> bool: - return self.change_category in ( - SnapshotChangeCategory.FORWARD_ONLY, + def is_no_rebuild(self) -> bool: + """Returns true if this snapshot doesn't require a rebuild in production.""" + return self.forward_only or self.change_category in ( + SnapshotChangeCategory.FORWARD_ONLY, # Backwards compatibility + SnapshotChangeCategory.METADATA, + SnapshotChangeCategory.INDIRECT_NON_BREAKING, + ) + + @property + def is_no_preview(self) -> bool: + """Returns true if this snapshot doesn't require a preview in development.""" + return self.forward_only and self.change_category in ( SnapshotChangeCategory.METADATA, SnapshotChangeCategory.INDIRECT_NON_BREAKING, ) @@ -487,6 +498,7 @@ class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True): custom_materialization: t.Optional[str] = None dev_table_suffix: str model_gateway: t.Optional[str] = None + forward_only: bool = False def __lt__(self, other: SnapshotTableInfo) -> bool: return self.name < other.name @@ -614,6 +626,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin): table_naming_convention_: TableNamingConvention = Field( default=TableNamingConvention.default, alias="table_naming_convention" ) + forward_only: bool = False @field_validator("ttl") @classmethod @@ -1006,22 +1019,26 @@ def check_ready_intervals( ) return intervals - def categorize_as(self, category: SnapshotChangeCategory) -> None: + def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = False) -> None: """Assigns the given category to this snapshot. Args: category: The change category to assign to this snapshot. + forward_only: Whether or not this snapshot is applied going forward in production. """ + assert category != SnapshotChangeCategory.FORWARD_ONLY, ( + "FORWARD_ONLY change category is deprecated" + ) + self.dev_version_ = self.fingerprint.to_version() - reuse_previous_version = category in ( - SnapshotChangeCategory.FORWARD_ONLY, + is_no_rebuild = forward_only or category in ( SnapshotChangeCategory.INDIRECT_NON_BREAKING, SnapshotChangeCategory.METADATA, ) if self.is_model and self.model.physical_version: # If the model has a pinned version then use that. self.version = self.model.physical_version - elif reuse_previous_version and self.previous_version: + elif is_no_rebuild and self.previous_version: previous_version = self.previous_version self.version = previous_version.data_version.version self.physical_schema_ = previous_version.physical_schema @@ -1040,6 +1057,7 @@ def categorize_as(self, category: SnapshotChangeCategory) -> None: self.version = self.fingerprint.to_version() self.change_category = category + self.forward_only = forward_only @property def categorized(self) -> bool: @@ -1220,6 +1238,7 @@ def table_info(self) -> SnapshotTableInfo: dev_table_suffix=self.dev_table_suffix, model_gateway=self.model_gateway, table_naming_convention=self.table_naming_convention, # type: ignore + forward_only=self.forward_only, ) @property diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index bdbf76250f..e053e1e108 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -338,7 +338,7 @@ def create( continue deployability_flags = [True] if ( - snapshot.reuses_previous_version + snapshot.is_no_rebuild or snapshot.is_managed or (snapshot.is_model and snapshot.model.forward_only) or (deployability_index and not deployability_index.is_deployable(snapshot)) diff --git a/sqlmesh/core/state_sync/db/snapshot.py b/sqlmesh/core/state_sync/db/snapshot.py index 7aaf902216..e2491baeba 100644 --- a/sqlmesh/core/state_sync/db/snapshot.py +++ b/sqlmesh/core/state_sync/db/snapshot.py @@ -733,6 +733,7 @@ class SharedVersionSnapshot(PydanticModel): disable_restatement: bool effective_from: t.Optional[TimeLike] raw_snapshot: t.Dict[str, t.Any] + forward_only: bool @property def snapshot_id(self) -> SnapshotId: @@ -740,7 +741,7 @@ def snapshot_id(self) -> SnapshotId: @property def is_forward_only(self) -> bool: - return self.change_category == SnapshotChangeCategory.FORWARD_ONLY + return self.forward_only or self.change_category == SnapshotChangeCategory.FORWARD_ONLY @property def normalized_effective_from_ts(self) -> t.Optional[int]: @@ -803,4 +804,5 @@ def from_snapshot_record( disable_restatement=raw_node.get("kind", {}).get("disable_restatement", False), effective_from=raw_snapshot.get("effective_from"), raw_snapshot=raw_snapshot, + forward_only=raw_snapshot.get("forward_only", False), ) diff --git a/tests/conftest.py b/tests/conftest.py index 574c802c0e..1bfa7a9f36 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -438,7 +438,7 @@ def _make_function( metadata_hash="test_metadata_hash", ), version="test_version", - change_category=SnapshotChangeCategory.FORWARD_ONLY, + change_category=SnapshotChangeCategory.NON_BREAKING, dev_table_suffix="dev", ), ) diff --git a/tests/core/analytics/test_collector.py b/tests/core/analytics/test_collector.py index 9eaca07ef3..1a4c42cbe3 100644 --- a/tests/core/analytics/test_collector.py +++ b/tests/core/analytics/test_collector.py @@ -218,7 +218,7 @@ def test_on_snapshots_created( context.get_snapshot("sushi.waiter_revenue_by_day"), context.get_snapshot("sushi.top_waiters"), ] - new_snapshots[0].categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshots[0].categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshots[0].effective_from = "2024-01-01" new_snapshots[0].version = "test_version" @@ -239,7 +239,7 @@ def test_on_snapshots_created( "node_type": "model", "model_kind": "incremental_by_time_range", "is_sql": False, - "change_category": "forward_only", + "change_category": "breaking", "dialect": "duckdb", "audits_count": 0, "effective_from_set": True, diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index 80ce7ac18d..eb593c273b 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -2087,7 +2087,9 @@ def _run_plan(sqlmesh_context: Context, environment: str = None) -> PlanResults: plan_1 = _run_plan(context) assert plan_1.snapshot_for(model_a).change_category == SnapshotChangeCategory.BREAKING + assert not plan_1.snapshot_for(model_a).is_forward_only assert plan_1.snapshot_for(model_b).change_category == SnapshotChangeCategory.BREAKING + assert not plan_1.snapshot_for(model_b).is_forward_only # so far so good, model_a should exist as a normal table, model b should be a managed table and the prod views should exist assert len(plan_1.schema_metadata.views) == 2 @@ -2134,8 +2136,10 @@ def _run_plan(sqlmesh_context: Context, environment: str = None) -> PlanResults: assert plan_2.plan.has_changes assert len(plan_2.plan.modified_snapshots) == 2 - assert plan_2.snapshot_for(new_model_a).change_category == SnapshotChangeCategory.FORWARD_ONLY + assert plan_2.snapshot_for(new_model_a).change_category == SnapshotChangeCategory.NON_BREAKING + assert plan_2.snapshot_for(new_model_a).is_forward_only assert plan_2.snapshot_for(model_b).change_category == SnapshotChangeCategory.NON_BREAKING + assert not plan_2.snapshot_for(model_b).is_forward_only # verify that the new snapshots were created correctly # the forward-only change to model A should be in a new table separate from the one created in the first plan @@ -2207,8 +2211,10 @@ def _run_plan(sqlmesh_context: Context, environment: str = None) -> PlanResults: plan_4 = _run_plan(context) assert plan_4.plan.has_changes - assert plan_4.snapshot_for(model_a).change_category == SnapshotChangeCategory.FORWARD_ONLY + assert plan_4.snapshot_for(model_a).change_category == SnapshotChangeCategory.NON_BREAKING + assert plan_4.snapshot_for(model_a).is_forward_only assert plan_4.snapshot_for(model_b).change_category == SnapshotChangeCategory.NON_BREAKING + assert not plan_4.snapshot_for(model_b).is_forward_only # verify the Model B table is created as a managed table in prod assert plan_4.table_name_for(model_b) == plan_3.table_name_for( diff --git a/tests/core/state_sync/test_state_sync.py b/tests/core/state_sync/test_state_sync.py index eba948bd9a..a5a6969e38 100644 --- a/tests/core/state_sync/test_state_sync.py +++ b/tests/core/state_sync/test_state_sync.py @@ -139,7 +139,7 @@ def test_push_snapshots( state_sync.push_snapshots([snapshot_a, snapshot_b]) snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING) - snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot_b.version = "2" state_sync.push_snapshots([snapshot_a, snapshot_b]) @@ -258,7 +258,8 @@ def test_add_interval( (to_timestamp("2020-01-05"), to_timestamp("2020-01-11")), ] - snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot.change_category = SnapshotChangeCategory.BREAKING + snapshot.forward_only = True state_sync.add_interval(snapshot, to_datetime("2020-01-16"), "2020-01-20", is_dev=True) intervals = get_snapshot_intervals(snapshot) assert intervals.intervals == [ @@ -1144,7 +1145,7 @@ def test_delete_expired_snapshots(state_sync: EngineAdapterStateSync, make_snaps ), ) new_snapshot.ttl = "in 10 seconds" - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = snapshot.version new_snapshot.updated_ts = now_ts - 11000 @@ -1298,7 +1299,7 @@ def test_delete_expired_snapshots_dev_table_cleanup_only( ), ) new_snapshot.ttl = "in 10 seconds" - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = snapshot.version new_snapshot.updated_ts = now_ts - 5000 @@ -1338,7 +1339,7 @@ def test_delete_expired_snapshots_shared_dev_table( ), ) new_snapshot.ttl = "in 10 seconds" - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = snapshot.version new_snapshot.dev_version_ = snapshot.dev_version new_snapshot.updated_ts = now_ts - 5000 @@ -1430,7 +1431,7 @@ def test_delete_expired_snapshots_cleanup_intervals( ), ) new_snapshot.ttl = "in 10 seconds" - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = snapshot.version new_snapshot.updated_ts = now_ts - 12000 @@ -1497,7 +1498,7 @@ def test_delete_expired_snapshots_cleanup_intervals_shared_version( ), ) new_snapshot.ttl = "in 10 seconds" - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = snapshot.version new_snapshot.updated_ts = now_ts - 5000 @@ -1612,7 +1613,7 @@ def test_delete_expired_snapshots_cleanup_intervals_shared_dev_version( ), ) new_snapshot.ttl = "in 10 seconds" - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = snapshot.version new_snapshot.dev_version_ = snapshot.dev_version new_snapshot.updated_ts = now_ts - 5000 @@ -1740,7 +1741,7 @@ def test_compact_intervals_after_cleanup( ) snapshot_b.previous_versions = snapshot_a.all_versions snapshot_b.ttl = "in 10 seconds" - snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot_b.updated_ts = now_ts - 12000 # An indirect non-breaking change on top of the forward-only change. Not expired. @@ -1872,7 +1873,7 @@ def test_unpause_snapshots(state_sync: EngineAdapterStateSync, make_snapshot: t. new_snapshot = make_snapshot( SqlModel(name="test_snapshot", query=parse_one("select 2, ds"), cron="@daily") ) - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = "a" assert not new_snapshot.unpaused_ts @@ -1917,7 +1918,7 @@ def test_unpause_snapshots_hourly(state_sync: EngineAdapterStateSync, make_snaps interval_unit="hour", ) ) - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = "a" assert not new_snapshot.unpaused_ts @@ -1974,7 +1975,7 @@ def test_unrestorable_snapshot(state_sync: EngineAdapterStateSync, make_snapshot new_forward_only_snapshot = make_snapshot( SqlModel(name="test_snapshot", query=parse_one("select 3, ds"), cron="@daily") ) - new_forward_only_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_forward_only_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_forward_only_snapshot.version = "a" assert not new_forward_only_snapshot.unpaused_ts @@ -2015,7 +2016,7 @@ def test_unpause_snapshots_remove_intervals( SqlModel(name="test_snapshot", query=parse_one("select 2, ds"), cron="@daily"), version="a", ) - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = "a" new_snapshot.effective_from = "2023-01-03" state_sync.push_snapshots([new_snapshot]) @@ -2053,7 +2054,7 @@ def test_unpause_snapshots_remove_intervals_disabled_restatement( SqlModel(name="test_snapshot", query=parse_one("select 2, ds"), cron="@daily", kind=kind), version="a", ) - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = "a" new_snapshot.effective_from = "2023-01-03" state_sync.push_snapshots([new_snapshot]) @@ -3019,7 +3020,7 @@ def test_seed_model_metadata_update( model = model.copy(update={"owner": "jen"}) new_snapshot = make_snapshot(model) new_snapshot.previous_versions = snapshot.all_versions - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) assert snapshot.fingerprint != new_snapshot.fingerprint assert snapshot.version == new_snapshot.version diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 0f14fd4a8b..f0877b37a6 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -1124,6 +1124,7 @@ def test_unrestorable_snapshot(sushi_context: Context) -> None: no_prompts=True, forward_only=True, allow_destructive_models=["memory.sushi.test_unrestorable"], + categorizer_config=CategorizerConfig.all_full(), ) sushi_context.upsert_model(model_v1) @@ -1132,6 +1133,7 @@ def test_unrestorable_snapshot(sushi_context: Context) -> None: no_prompts=True, forward_only=True, allow_destructive_models=["memory.sushi.test_unrestorable"], + categorizer_config=CategorizerConfig.all_full(), ) model_v1_new_snapshot = sushi_context.get_snapshot( "memory.sushi.test_unrestorable", raise_if_missing=True diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 17f26aee2e..d7edd8d131 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -114,18 +114,17 @@ def test_forward_only_plan_with_effective_date(context_fixture: Context, request assert len(plan.new_snapshots) == 2 assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.NON_BREAKING ) assert ( plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) + assert plan.context_diff.snapshots[snapshot.snapshot_id].is_forward_only + assert plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].is_forward_only + assert to_timestamp(plan.start) == to_timestamp("2023-01-07") assert plan.missing_intervals == [ - SnapshotIntervals( - snapshot_id=top_waiters_snapshot.snapshot_id, - intervals=[(to_timestamp("2023-01-07"), to_timestamp("2023-01-08"))], - ), SnapshotIntervals( snapshot_id=snapshot.snapshot_id, intervals=[(to_timestamp("2023-01-07"), to_timestamp("2023-01-08"))], @@ -256,12 +255,15 @@ def test_forward_only_model_regular_plan(init_and_plan_context: t.Callable): assert len(plan.new_snapshots) == 2 assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.NON_BREAKING ) assert ( plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) + assert plan.context_diff.snapshots[snapshot.snapshot_id].is_forward_only + assert plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].is_forward_only + assert plan.start == to_datetime("2023-01-01") assert not plan.missing_intervals @@ -362,20 +364,17 @@ def test_forward_only_model_regular_plan_preview_enabled(init_and_plan_context: assert len(plan.new_snapshots) == 2 assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.NON_BREAKING ) assert ( plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) + assert plan.context_diff.snapshots[snapshot.snapshot_id].is_forward_only + assert plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].is_forward_only + assert to_timestamp(plan.start) == to_timestamp("2023-01-07") assert plan.missing_intervals == [ - SnapshotIntervals( - snapshot_id=top_waiters_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), SnapshotIntervals( snapshot_id=snapshot.snapshot_id, intervals=[ @@ -429,7 +428,12 @@ def test_forward_only_model_restate_full_history_in_dev(init_and_plan_context: t context.upsert_model(SqlModel.parse_obj(model_kwargs)) # Apply the model change in dev - plan = context.plan_builder("dev", skip_tests=True, enable_preview=False).build() + plan = context.plan_builder( + "dev", + skip_tests=True, + enable_preview=False, + categorizer_config=CategorizerConfig.all_full(), + ).build() assert not plan.missing_intervals context.apply(plan) @@ -496,59 +500,30 @@ def test_full_history_restatement_model_regular_plan_preview_enabled( assert len(plan.new_snapshots) == 6 assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.NON_BREAKING ) assert ( plan.context_diff.snapshots[customers_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) assert ( plan.context_diff.snapshots[active_customers_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) assert ( plan.context_diff.snapshots[waiter_as_customer_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) + assert all(s.is_forward_only for s in plan.new_snapshots) assert to_timestamp(plan.start) == to_timestamp("2023-01-07") assert plan.missing_intervals == [ - SnapshotIntervals( - snapshot_id=active_customers_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), - SnapshotIntervals( - snapshot_id=count_customers_active_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), - SnapshotIntervals( - snapshot_id=count_customers_inactive_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), - SnapshotIntervals( - snapshot_id=customers_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), SnapshotIntervals( snapshot_id=snapshot.snapshot_id, intervals=[ (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), ], ), - SnapshotIntervals( - snapshot_id=waiter_as_customer_snapshot.snapshot_id, - intervals=[ - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), ] context.apply(plan) @@ -786,15 +761,6 @@ def test_cron_not_aligned_with_day_boundary_new_model(init_and_plan_context: t.C ).snapshot_id, intervals=[(to_timestamp("2023-01-06"), to_timestamp("2023-01-07"))], ), - SnapshotIntervals( - snapshot_id=context.get_snapshot( - "sushi.top_waiters", raise_if_missing=True - ).snapshot_id, - intervals=[ - (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), - (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), - ], - ), SnapshotIntervals( snapshot_id=context.get_snapshot( "sushi.waiter_revenue_by_day", raise_if_missing=True @@ -944,12 +910,13 @@ def test_forward_only_parent_created_in_dev_child_created_in_prod( assert len(plan.new_snapshots) == 2 assert ( plan.context_diff.snapshots[waiter_revenue_by_day_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.NON_BREAKING ) assert ( plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) + assert all(s.is_forward_only for s in plan.new_snapshots) assert plan.start == to_datetime("2023-01-01") assert not plan.missing_intervals @@ -1155,18 +1122,15 @@ def test_non_breaking_change_after_forward_only_in_dev( assert len(plan.new_snapshots) == 2 assert ( plan.context_diff.snapshots[waiter_revenue_by_day_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.NON_BREAKING ) assert ( plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) + assert all(s.is_forward_only for s in plan.new_snapshots) assert to_timestamp(plan.start) == to_timestamp("2023-01-07") assert plan.missing_intervals == [ - SnapshotIntervals( - snapshot_id=top_waiters_snapshot.snapshot_id, - intervals=[(to_timestamp("2023-01-07"), to_timestamp("2023-01-08"))], - ), SnapshotIntervals( snapshot_id=waiter_revenue_by_day_snapshot.snapshot_id, intervals=[(to_timestamp("2023-01-07"), to_timestamp("2023-01-08"))], @@ -1267,11 +1231,17 @@ def test_indirect_non_breaking_change_after_forward_only_in_dev(init_and_plan_co context.upsert_model(model) snapshot = context.get_snapshot(model, raise_if_missing=True) - plan = context.plan_builder("dev", skip_tests=True, enable_preview=False).build() + plan = context.plan_builder( + "dev", + skip_tests=True, + enable_preview=False, + categorizer_config=CategorizerConfig.all_full(), + ).build() assert ( plan.context_diff.snapshots[snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.BREAKING ) + assert plan.context_diff.snapshots[snapshot.snapshot_id].is_forward_only assert not plan.requires_backfill context.apply(plan) @@ -1393,7 +1363,7 @@ def test_metadata_change_after_forward_only_results_in_migration(init_and_plan_c context.upsert_model(model) plan = context.plan("dev", skip_tests=True, auto_apply=True, no_prompts=True) assert len(plan.new_snapshots) == 2 - assert all(s.change_category == SnapshotChangeCategory.FORWARD_ONLY for s in plan.new_snapshots) + assert all(s.is_forward_only for s in plan.new_snapshots) # Follow-up with a metadata change in the same environment model = model.copy(update={"owner": "new_owner"}) @@ -1411,7 +1381,7 @@ def test_metadata_change_after_forward_only_results_in_migration(init_and_plan_c @time_machine.travel("2023-01-08 15:00:00 UTC") -def test_forward_only_precedence_over_indirect_non_breaking(init_and_plan_context: t.Callable): +def test_indirect_non_breaking_downstream_of_forward_only(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi") context.apply(plan) @@ -1425,14 +1395,20 @@ def test_forward_only_precedence_over_indirect_non_breaking(init_and_plan_contex forward_only_snapshot = context.get_snapshot(forward_only_model, raise_if_missing=True) non_breaking_model = context.get_model("sushi.waiter_revenue_by_day") + non_breaking_model = non_breaking_model.copy(update={"start": "2023-01-01"}) context.upsert_model(add_projection_to_model(t.cast(SqlModel, non_breaking_model))) non_breaking_snapshot = context.get_snapshot(non_breaking_model, raise_if_missing=True) top_waiter_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True) - plan = context.plan_builder("dev", skip_tests=True, enable_preview=False).build() + plan = context.plan_builder( + "dev", + skip_tests=True, + enable_preview=False, + categorizer_config=CategorizerConfig.all_full(), + ).build() assert ( plan.context_diff.snapshots[forward_only_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.BREAKING ) assert ( plan.context_diff.snapshots[non_breaking_snapshot.snapshot_id].change_category @@ -1440,10 +1416,26 @@ def test_forward_only_precedence_over_indirect_non_breaking(init_and_plan_contex ) assert ( plan.context_diff.snapshots[top_waiter_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) + assert plan.context_diff.snapshots[forward_only_snapshot.snapshot_id].is_forward_only + assert not plan.context_diff.snapshots[non_breaking_snapshot.snapshot_id].is_forward_only + assert not plan.context_diff.snapshots[top_waiter_snapshot.snapshot_id].is_forward_only + assert plan.start == to_timestamp("2023-01-01") assert plan.missing_intervals == [ + SnapshotIntervals( + snapshot_id=top_waiter_snapshot.snapshot_id, + intervals=[ + (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")), + (to_timestamp("2023-01-02"), to_timestamp("2023-01-03")), + (to_timestamp("2023-01-03"), to_timestamp("2023-01-04")), + (to_timestamp("2023-01-04"), to_timestamp("2023-01-05")), + (to_timestamp("2023-01-05"), to_timestamp("2023-01-06")), + (to_timestamp("2023-01-06"), to_timestamp("2023-01-07")), + (to_timestamp("2023-01-07"), to_timestamp("2023-01-08")), + ], + ), SnapshotIntervals( snapshot_id=non_breaking_snapshot.snapshot_id, intervals=[ @@ -2164,15 +2156,15 @@ def test_indirect_non_breaking_view_model_non_representative_snapshot( dev_plan = context.plan("dev", auto_apply=True, no_prompts=True, enable_preview=False) assert ( dev_plan.snapshots[forward_only_model_snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.NON_BREAKING ) assert ( dev_plan.snapshots[full_downstream_model_snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) assert ( dev_plan.snapshots[full_downstream_model_2_snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.INDIRECT_NON_BREAKING ) assert not dev_plan.missing_intervals @@ -2363,21 +2355,6 @@ def test_indirect_non_breaking_view_model_non_representative_snapshot_migration( SnapshotChangeCategory.METADATA, SnapshotChangeCategory.METADATA, ), - ( - SnapshotChangeCategory.FORWARD_ONLY, - SnapshotChangeCategory.BREAKING, - SnapshotChangeCategory.INDIRECT_BREAKING, - ), - ( - SnapshotChangeCategory.BREAKING, - SnapshotChangeCategory.FORWARD_ONLY, - SnapshotChangeCategory.FORWARD_ONLY, - ), - ( - SnapshotChangeCategory.FORWARD_ONLY, - SnapshotChangeCategory.FORWARD_ONLY, - SnapshotChangeCategory.FORWARD_ONLY, - ), ], ) def test_rebase_two_changed_parents( @@ -4538,12 +4515,6 @@ def test_breaking_change(sushi_context: Context): validate_query_change(sushi_context, environment, SnapshotChangeCategory.BREAKING, False) -def test_forward_only(sushi_context: Context): - environment = "dev" - initial_add(sushi_context, environment) - validate_query_change(sushi_context, environment, SnapshotChangeCategory.FORWARD_ONLY, False) - - def test_logical_change(sushi_context: Context): environment = "dev" initial_add(sushi_context, environment) @@ -4795,8 +4766,11 @@ def _validate_plan(context, plan): plan.context_diff.modified_snapshots[sushi_customer_revenue_by_day_snapshot.name][ 0 ].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.NON_BREAKING ) + assert plan.context_diff.snapshots[ + sushi_customer_revenue_by_day_snapshot.snapshot_id + ].is_forward_only apply_to_environment( sushi_context, @@ -6074,6 +6048,9 @@ def apply_to_environment( plan_builder.set_start(plan_start or start(context)) if choice: + if choice == SnapshotChangeCategory.FORWARD_ONLY: + # FORWARD_ONLY is deprecated, fallback to NON_BREAKING to keep the existing tests + choice = SnapshotChangeCategory.NON_BREAKING plan_choice(plan_builder, choice) for validator in plan_validators: validator(context, plan_builder.build()) diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index efaeba8623..35c3628cff 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -63,7 +63,7 @@ def test_forward_only_plan_sets_version(make_snapshot, mocker: MockerFixture): metadata_hash="test_metadata_hash", ), version="test_version", - change_category=SnapshotChangeCategory.FORWARD_ONLY, + change_category=SnapshotChangeCategory.NON_BREAKING, dev_table_suffix="dev", ), ) @@ -97,10 +97,6 @@ def test_forward_only_plan_sets_version(make_snapshot, mocker: MockerFixture): plan_builder.build() assert snapshot_b.version == "test_version" - # Make sure that the choice can't be set manually. - with pytest.raises(PlanError, match="Choice setting is not supported by a forward-only plan."): - plan_builder.set_choice(snapshot_b, SnapshotChangeCategory.BREAKING).build() - def test_forward_only_dev(make_snapshot, mocker: MockerFixture): snapshot = make_snapshot( @@ -258,8 +254,10 @@ def test_forward_only_plan_added_models(make_snapshot, mocker: MockerFixture): ) PlanBuilder(context_diff, forward_only=True).build() - assert snapshot_a.change_category == SnapshotChangeCategory.FORWARD_ONLY + assert snapshot_a.change_category == SnapshotChangeCategory.METADATA assert snapshot_b.change_category == SnapshotChangeCategory.BREAKING + assert snapshot_a.is_forward_only + assert snapshot_b.is_forward_only def test_forward_only_plan_categorizes_change_model_kind_as_breaking( @@ -307,6 +305,7 @@ def test_forward_only_plan_categorizes_change_model_kind_as_breaking( PlanBuilder(context_diff, forward_only=True).build() assert updated_snapshot.change_category == SnapshotChangeCategory.BREAKING + assert not updated_snapshot.is_forward_only def test_paused_forward_only_parent(make_snapshot, mocker: MockerFixture): @@ -322,10 +321,10 @@ def test_paused_forward_only_parent(make_snapshot, mocker: MockerFixture): dev_table_suffix="dev", ), ) - snapshot_a.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot_b_old = make_snapshot(SqlModel(name="b", query=parse_one("select 2, ds from a"))) - snapshot_b_old.categorize_as(SnapshotChangeCategory.BREAKING) + snapshot_b_old.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=False) snapshot_b = make_snapshot(SqlModel(name="b", query=parse_one("select 3, ds from a"))) assert not snapshot_b.version @@ -1104,7 +1103,7 @@ def test_forward_only_revert_not_allowed(make_snapshot, mocker: MockerFixture): assert not snapshot.is_forward_only forward_only_snapshot = make_snapshot(SqlModel(name="a", query=parse_one("select 2, ds"))) - forward_only_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + forward_only_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) forward_only_snapshot.version = snapshot.version forward_only_snapshot.unpaused_ts = now_timestamp() assert forward_only_snapshot.is_forward_only @@ -1191,7 +1190,8 @@ def test_forward_only_plan_seed_models(make_snapshot, mocker: MockerFixture): PlanBuilder(context_diff, forward_only=True).build() assert snapshot_a_updated.version == snapshot_a_updated.fingerprint.to_version() - assert snapshot_a_updated.change_category == SnapshotChangeCategory.NON_BREAKING + assert snapshot_a_updated.change_category == SnapshotChangeCategory.BREAKING + assert not snapshot_a_updated.is_forward_only def test_start_inference(make_snapshot, mocker: MockerFixture): @@ -1443,7 +1443,9 @@ def test_effective_from(make_snapshot, mocker: MockerFixture): forward_only=True, start="2023-01-01", end="2023-03-01", + execution_time="2023-03-02 00:01:00", is_dev=True, + end_bounded=True, ) updated_snapshot.add_interval("2023-01-01", "2023-03-01") @@ -1455,9 +1457,9 @@ def test_effective_from(make_snapshot, mocker: MockerFixture): assert plan_builder.set_effective_from(None).build().effective_from is None assert updated_snapshot.effective_from is None - assert not plan_builder.build().missing_intervals plan_builder.set_effective_from("2023-02-01") + plan_builder.set_start("2023-02-01") assert plan_builder.build().effective_from == "2023-02-01" assert updated_snapshot.effective_from == "2023-02-01" @@ -1677,17 +1679,20 @@ def test_forward_only_models(make_snapshot, mocker: MockerFixture): ) PlanBuilder(context_diff, is_dev=True).build() - assert updated_snapshot.change_category == SnapshotChangeCategory.FORWARD_ONLY + assert updated_snapshot.change_category == SnapshotChangeCategory.BREAKING + assert updated_snapshot.is_forward_only updated_snapshot.change_category = None updated_snapshot.version = None PlanBuilder(context_diff, is_dev=True, forward_only=True).build() - assert updated_snapshot.change_category == SnapshotChangeCategory.FORWARD_ONLY + assert updated_snapshot.change_category == SnapshotChangeCategory.BREAKING + assert updated_snapshot.is_forward_only updated_snapshot.change_category = None updated_snapshot.version = None PlanBuilder(context_diff, forward_only=True).build() - assert updated_snapshot.change_category == SnapshotChangeCategory.FORWARD_ONLY + assert updated_snapshot.change_category == SnapshotChangeCategory.BREAKING + assert updated_snapshot.is_forward_only def test_forward_only_models_model_kind_changed(make_snapshot, mocker: MockerFixture): @@ -1727,16 +1732,16 @@ def test_forward_only_models_model_kind_changed(make_snapshot, mocker: MockerFix @pytest.mark.parametrize( - "partitioned_by, expected_change_category", + "partitioned_by, expected_forward_only", [ - ([], SnapshotChangeCategory.BREAKING), - ([d.parse_one("ds")], SnapshotChangeCategory.FORWARD_ONLY), + ([], False), + ([d.parse_one("ds")], True), ], ) def test_forward_only_models_model_kind_changed_to_incremental_by_time_range( make_snapshot, partitioned_by: t.List[exp.Expression], - expected_change_category: SnapshotChangeCategory, + expected_forward_only: bool, ): snapshot = make_snapshot( SqlModel( @@ -1777,7 +1782,8 @@ def test_forward_only_models_model_kind_changed_to_incremental_by_time_range( ) PlanBuilder(context_diff, is_dev=True).build() - assert updated_snapshot.change_category == expected_change_category + assert updated_snapshot.change_category == SnapshotChangeCategory.BREAKING + assert updated_snapshot.is_forward_only == expected_forward_only def test_indirectly_modified_forward_only_model(make_snapshot, mocker: MockerFixture): @@ -1795,7 +1801,7 @@ def test_indirectly_modified_forward_only_model(make_snapshot, mocker: MockerFix ), nodes={'"a"': snapshot_a.model}, ) - snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) updated_snapshot_b = make_snapshot(snapshot_b.model, nodes={'"a"': updated_snapshot_a.model}) updated_snapshot_b.previous_versions = snapshot_b.all_versions @@ -1868,10 +1874,15 @@ def test_indirectly_modified_forward_only_model(make_snapshot, mocker: MockerFix assert plan.directly_modified == {updated_snapshot_a.snapshot_id} assert updated_snapshot_a.change_category == SnapshotChangeCategory.BREAKING - assert updated_snapshot_b.change_category == SnapshotChangeCategory.FORWARD_ONLY - assert updated_snapshot_c.change_category == SnapshotChangeCategory.FORWARD_ONLY + assert updated_snapshot_b.change_category == SnapshotChangeCategory.INDIRECT_BREAKING + assert updated_snapshot_c.change_category == SnapshotChangeCategory.INDIRECT_BREAKING assert updated_snapshot_d.change_category == SnapshotChangeCategory.INDIRECT_BREAKING + assert not updated_snapshot_a.is_forward_only + assert updated_snapshot_b.is_forward_only + assert not updated_snapshot_c.is_forward_only + assert not updated_snapshot_d.is_forward_only + deployability_index = DeployabilityIndex.create( { updated_snapshot_a.snapshot_id: updated_snapshot_a, @@ -1886,7 +1897,7 @@ def test_indirectly_modified_forward_only_model(make_snapshot, mocker: MockerFix def test_added_model_with_forward_only_parent(make_snapshot, mocker: MockerFixture): snapshot_a = make_snapshot(SqlModel(name="a", query=parse_one("select 1 as a, ds"))) - snapshot_a.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot_b = make_snapshot(SqlModel(name="b", query=parse_one("select a, ds from a"))) @@ -1915,6 +1926,7 @@ def test_added_model_with_forward_only_parent(make_snapshot, mocker: MockerFixtu PlanBuilder(context_diff, is_dev=True).build() assert snapshot_b.change_category == SnapshotChangeCategory.BREAKING + assert not snapshot_b.is_forward_only def test_added_forward_only_model(make_snapshot, mocker: MockerFixture): @@ -2027,7 +2039,7 @@ def test_revert_to_previous_value(make_snapshot, mocker: MockerFixture): snapshot_b = make_snapshot( SqlModel(name="b", query=parse_one("select 1, ds FROM a"), depends_on={"a"}) ) - snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot_b.add_interval("2022-01-01", now()) context_diff = ContextDiff( @@ -2060,7 +2072,8 @@ def test_revert_to_previous_value(make_snapshot, mocker: MockerFixture): plan_builder.set_choice(snapshot_a, SnapshotChangeCategory.BREAKING) plan_builder.build() # Make sure it does not get assigned INDIRECT_BREAKING - assert snapshot_b.change_category == SnapshotChangeCategory.FORWARD_ONLY + assert snapshot_b.change_category == SnapshotChangeCategory.BREAKING + assert snapshot_b.is_forward_only test_add_restatement_fixtures = [ @@ -2407,7 +2420,7 @@ def test_dev_plan_depends_past_non_deployable(make_snapshot, mocker: MockerFixtu name="a_child", query=parse_one("select 1, ds FROM a"), start="2023-01-01", - kind=IncrementalByTimeRangeKind(time_column="ds"), + kind=IncrementalByTimeRangeKind(time_column="ds", forward_only=True), ), nodes={'"a"': updated_snapshot.model}, ) @@ -2457,7 +2470,7 @@ def test_dev_plan_depends_past_non_deployable(make_snapshot, mocker: MockerFixtu def new_builder(start, end): builder = PlanBuilder(context_diff, start=start, end=end, is_dev=True) - builder.set_choice(updated_snapshot, SnapshotChangeCategory.FORWARD_ONLY) + builder.set_choice(updated_snapshot, SnapshotChangeCategory.BREAKING) builder.set_choice(snapshot_child, SnapshotChangeCategory.BREAKING) builder.set_choice(unrelated_snapshot, SnapshotChangeCategory.BREAKING) return builder @@ -3146,15 +3159,14 @@ def test_set_choice_for_forward_only_model(make_snapshot): ) plan_builder = PlanBuilder(context_diff, is_dev=True) - - with pytest.raises(PlanError, match='Forward-only model "a" cannot be categorized manually.'): - plan_builder.set_choice(updated_snapshot, SnapshotChangeCategory.BREAKING) + plan_builder.set_choice(updated_snapshot, SnapshotChangeCategory.BREAKING) plan = plan_builder.build() assert ( plan.snapshots[updated_snapshot.snapshot_id].change_category - == SnapshotChangeCategory.FORWARD_ONLY + == SnapshotChangeCategory.BREAKING ) + assert plan.snapshots[updated_snapshot.snapshot_id].is_forward_only def test_user_provided_flags(sushi_context: Context): diff --git a/tests/core/test_plan_stages.py b/tests/core/test_plan_stages.py index d79be24262..f560b93251 100644 --- a/tests/core/test_plan_stages.py +++ b/tests/core/test_plan_stages.py @@ -594,14 +594,14 @@ def test_build_plan_stages_forward_only( snapshot_a.model.copy(update={"stamp": "new_version"}), ) new_snapshot_a.previous_versions = snapshot_a.all_versions - new_snapshot_a.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot_a.categorize_as(SnapshotChangeCategory.NON_BREAKING, forward_only=True) new_snapshot_b = make_snapshot( snapshot_b.model.copy(), nodes={'"a"': new_snapshot_a.model}, ) new_snapshot_b.previous_versions = snapshot_b.all_versions - new_snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot_b.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only=True) state_reader = mocker.Mock(spec=StateReader) state_reader.get_snapshots.return_value = {} @@ -732,14 +732,14 @@ def test_build_plan_stages_forward_only_dev( snapshot_a.model.copy(update={"stamp": "new_version"}), ) new_snapshot_a.previous_versions = snapshot_a.all_versions - new_snapshot_a.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot_a.categorize_as(SnapshotChangeCategory.NON_BREAKING, forward_only=True) new_snapshot_b = make_snapshot( snapshot_b.model.copy(), nodes={'"a"': new_snapshot_a.model}, ) new_snapshot_b.previous_versions = snapshot_b.all_versions - new_snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot_b.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only=True) state_reader = mocker.Mock(spec=StateReader) state_reader.get_snapshots.return_value = {} @@ -968,14 +968,14 @@ def test_build_plan_stages_forward_only_ensure_finalized_snapshots( snapshot_a.model.copy(update={"stamp": "new_version"}), ) new_snapshot_a.previous_versions = snapshot_a.all_versions - new_snapshot_a.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot_a.categorize_as(SnapshotChangeCategory.NON_BREAKING, forward_only=True) new_snapshot_b = make_snapshot( snapshot_b.model.copy(), nodes={'"a"': new_snapshot_a.model}, ) new_snapshot_b.previous_versions = snapshot_b.all_versions - new_snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot_b.categorize_as(SnapshotChangeCategory.INDIRECT_NON_BREAKING, forward_only=True) state_reader = mocker.Mock(spec=StateReader) state_reader.get_snapshots.return_value = {} diff --git a/tests/core/test_snapshot.py b/tests/core/test_snapshot.py index d71cbc4db6..0ba7180fb6 100644 --- a/tests/core/test_snapshot.py +++ b/tests/core/test_snapshot.py @@ -171,6 +171,7 @@ def test_json(snapshot: Snapshot): "version": snapshot.fingerprint.to_version(), "migrated": False, "unrestorable": False, + "forward_only": False, } @@ -264,7 +265,8 @@ def test_add_interval(snapshot: Snapshot, make_snapshot): def test_add_interval_dev(snapshot: Snapshot, make_snapshot): snapshot.version = "existing_version" snapshot.dev_version_ = "existing_dev_version" - snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot.change_category = SnapshotChangeCategory.BREAKING + snapshot.forward_only = True snapshot.add_interval("2020-01-01", "2020-01-01") assert snapshot.intervals == [(to_timestamp("2020-01-01"), to_timestamp("2020-01-02"))] @@ -1169,7 +1171,7 @@ def test_snapshot_table_name(snapshot: Snapshot, make_snapshot: t.Callable): data_hash="2", metadata_hash="1", parent_data_hash="1" ) snapshot.previous_versions = (previous_data_version,) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) assert snapshot.table_name(is_deployable=True) == "sqlmesh__default.name__3078928823" assert snapshot.table_name(is_deployable=False) == "sqlmesh__default.name__3049392110__dev" @@ -1320,7 +1322,7 @@ def test_table_naming_convention_change_reuse_previous_version(make_snapshot): assert changed_snapshot.previous_version == original_snapshot.data_version - changed_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + changed_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) # inherited from previous version even though changed_snapshot was created with TableNamingConvention.HASH_MD5 assert changed_snapshot.table_naming_convention == TableNamingConvention.SCHEMA_AND_TABLE @@ -1725,7 +1727,7 @@ def test_physical_schema(snapshot: Snapshot): new_snapshot.previous_versions = (snapshot.data_version,) new_snapshot.physical_schema_ = None new_snapshot.version = None - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) assert new_snapshot.physical_schema == "custom_schema" assert new_snapshot.data_version.physical_schema == "custom_schema" @@ -1735,7 +1737,7 @@ def test_physical_schema(snapshot: Snapshot): def test_has_paused_forward_only(snapshot: Snapshot): assert not has_paused_forward_only([snapshot], [snapshot]) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) assert has_paused_forward_only([snapshot], [snapshot]) snapshot.unpaused_ts = to_timestamp("2023-01-01") @@ -2029,7 +2031,7 @@ def test_deployability_index(make_snapshot): snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING) snapshot_b = make_snapshot(SqlModel(name="b", query=parse_one("SELECT 1"))) - snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot_b.parents = (snapshot_a.snapshot_id,) snapshot_c = make_snapshot(SqlModel(name="c", query=parse_one("SELECT 1"))) @@ -2093,7 +2095,7 @@ def test_deployability_index(make_snapshot): def test_deployability_index_unpaused_forward_only(make_snapshot): snapshot_a = make_snapshot(SqlModel(name="a", query=parse_one("SELECT 1"))) - snapshot_a.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot_a.unpaused_ts = 1 snapshot_b = make_snapshot(SqlModel(name="b", query=parse_one("SELECT 1"))) @@ -2120,7 +2122,7 @@ def test_deployability_index_unpaused_auto_restatement(make_snapshot): ), ) snapshot_a = make_snapshot(model_a) - snapshot_a.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot_a.unpaused_ts = 1 # Snapshot B is a child of a model with auto restatement and is not paused, @@ -2216,11 +2218,11 @@ def test_deployability_index_categorized_forward_only_model(make_snapshot): snapshot_a = make_snapshot(model_a) snapshot_a.previous_versions = snapshot_a_old.all_versions - snapshot_a.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot_b = make_snapshot(SqlModel(name="b", query=parse_one("SELECT 1"))) snapshot_b.parents = (snapshot_a.snapshot_id,) - snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) deployability_index = DeployabilityIndex.create( {s.snapshot_id: s for s in [snapshot_a, snapshot_b]} @@ -2238,7 +2240,7 @@ def test_deployability_index_missing_parent(make_snapshot): snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING) snapshot_b = make_snapshot(SqlModel(name="b", query=parse_one("SELECT 1"))) - snapshot_b.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_b.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot_b.parents = (snapshot_a.snapshot_id,) deplyability_index = DeployabilityIndex.create({snapshot_b.snapshot_id: snapshot_b}) @@ -2802,7 +2804,7 @@ def test_physical_version_pin_for_new_forward_only_models(make_snapshot): ), ) snapshot_c.previous_versions = snapshot_b.all_versions - snapshot_c.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_c.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) assert snapshot_b.fingerprint != snapshot_c.fingerprint assert snapshot_b.version == snapshot_c.version @@ -2832,7 +2834,7 @@ def test_physical_version_pin_for_new_forward_only_models(make_snapshot): ), ) snapshot_e.previous_versions = snapshot_d.all_versions - snapshot_e.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_e.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) assert snapshot_d.fingerprint != snapshot_e.fingerprint assert snapshot_d.version == snapshot_e.version @@ -2849,7 +2851,7 @@ def test_physical_version_pin_for_new_forward_only_models(make_snapshot): ), ) snapshot_f.previous_versions = snapshot_e.all_versions - snapshot_f.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot_f.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) assert snapshot_f.version == "1234" assert snapshot_f.fingerprint != snapshot_e.fingerprint diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index fc5df244b3..4b028e148b 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -370,7 +370,7 @@ def test_promote_forward_only(mocker: MockerFixture, adapter_mock, make_snapshot ) snapshot = make_snapshot(model) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.version = "test_version" evaluator.promote( @@ -428,7 +428,7 @@ def create_and_cleanup(name: str, dev_table_only: bool): ) snapshot = make_snapshot(model) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.version = "test_version" evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env")) @@ -810,42 +810,58 @@ def test_create_new_forward_only_model(mocker: MockerFixture, adapter_mock, make @pytest.mark.parametrize( - "deployability_index, snapshot_category, deployability_flags", + "deployability_index, snapshot_category, forward_only, deployability_flags", [ - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.BREAKING, [False]), - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.NON_BREAKING, [False]), - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.FORWARD_ONLY, [True]), - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.INDIRECT_BREAKING, [False]), - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.INDIRECT_NON_BREAKING, [True]), - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.METADATA, [True]), + (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.BREAKING, False, [False]), + (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.NON_BREAKING, False, [False]), + (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.BREAKING, True, [True]), + ( + DeployabilityIndex.all_deployable(), + SnapshotChangeCategory.INDIRECT_BREAKING, + False, + [False], + ), + ( + DeployabilityIndex.all_deployable(), + SnapshotChangeCategory.INDIRECT_NON_BREAKING, + False, + [True], + ), + (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.METADATA, False, [True]), ( DeployabilityIndex.none_deployable(), SnapshotChangeCategory.BREAKING, + False, [True, False], ), ( DeployabilityIndex.none_deployable(), SnapshotChangeCategory.NON_BREAKING, + False, [True, False], ), ( DeployabilityIndex.none_deployable(), - SnapshotChangeCategory.FORWARD_ONLY, + SnapshotChangeCategory.BREAKING, + True, [True], ), ( DeployabilityIndex.none_deployable(), SnapshotChangeCategory.INDIRECT_BREAKING, + False, [True, False], ), ( DeployabilityIndex.none_deployable(), SnapshotChangeCategory.INDIRECT_NON_BREAKING, + False, [True], ), ( DeployabilityIndex.none_deployable(), SnapshotChangeCategory.METADATA, + False, [True], ), ], @@ -857,12 +873,13 @@ def test_create_tables_exist( deployability_index: DeployabilityIndex, deployability_flags: t.List[bool], snapshot_category: SnapshotChangeCategory, + forward_only: bool, ): adapter_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter") adapter_mock.dialect = "duckdb" evaluator = SnapshotEvaluator(adapter_mock) - snapshot.categorize_as(category=snapshot_category) + snapshot.categorize_as(category=snapshot_category, forward_only=forward_only) adapter_mock.get_data_objects.return_value = [ DataObject( @@ -909,7 +926,7 @@ def test_create_prod_table_exists_forward_only(mocker: MockerFixture, adapter_mo ) snapshot = make_snapshot(model) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) adapter_mock.get_data_objects.return_value = [ DataObject( @@ -1179,7 +1196,8 @@ def columns(table_name): query=parse_one("SELECT c, a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), ) snapshot = make_snapshot(model, version="1") - snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot.change_category = SnapshotChangeCategory.BREAKING + snapshot.forward_only = True snapshot.previous_versions = snapshot.all_versions evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) @@ -1217,7 +1235,8 @@ def test_migrate_missing_table(mocker: MockerFixture, make_snapshot): post_statements=[parse_one("DROP TABLE pre")], ) snapshot = make_snapshot(model, version="1") - snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot.change_category = SnapshotChangeCategory.BREAKING + snapshot.forward_only = True snapshot.previous_versions = snapshot.all_versions evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) @@ -1234,11 +1253,17 @@ def test_migrate_missing_table(mocker: MockerFixture, make_snapshot): @pytest.mark.parametrize( - "change_category", - [SnapshotChangeCategory.FORWARD_ONLY, SnapshotChangeCategory.INDIRECT_NON_BREAKING], + "change_category, forward_only", + [ + (SnapshotChangeCategory.BREAKING, True), + (SnapshotChangeCategory.INDIRECT_NON_BREAKING, False), + ], ) def test_migrate_view( - mocker: MockerFixture, make_snapshot, change_category: SnapshotChangeCategory + mocker: MockerFixture, + make_snapshot, + change_category: SnapshotChangeCategory, + forward_only: bool, ): connection_mock = mocker.NonCallableMock() cursor_mock = mocker.Mock() @@ -1255,6 +1280,7 @@ def test_migrate_view( ) snapshot = make_snapshot(model, version="1") snapshot.change_category = change_category + snapshot.forward_only = forward_only evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) @@ -1316,7 +1342,7 @@ def test_migrate_duckdb(snapshot: Snapshot, duck_conn, make_snapshot): updated_model = SqlModel.parse_obj(updated_model_dict) new_snapshot = make_snapshot(updated_model) - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.version = snapshot.version evaluator.create([new_snapshot], {}) @@ -1468,7 +1494,7 @@ def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot) ) snapshot = make_snapshot(model) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.previous_versions = snapshot.all_versions adapter_mock.get_data_objects.return_value = [ @@ -1535,7 +1561,7 @@ def test_create_clone_in_dev_missing_table(mocker: MockerFixture, adapter_mock, ) snapshot = make_snapshot(model) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.previous_versions = snapshot.all_versions evaluator.create([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) @@ -1580,7 +1606,7 @@ def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_m ) snapshot = make_snapshot(model) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.previous_versions = snapshot.all_versions adapter_mock.get_data_objects.return_value = [ @@ -1643,7 +1669,7 @@ def test_create_clone_in_dev_self_referencing( ) snapshot = make_snapshot(model) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.previous_versions = snapshot.all_versions adapter_mock.get_data_objects.return_value = [ @@ -1715,7 +1741,8 @@ def columns(table_name): query=parse_one("SELECT c, a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), ) snapshot = make_snapshot(model, version="1") - snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot.change_category = SnapshotChangeCategory.BREAKING + snapshot.forward_only = True snapshot.previous_versions = snapshot.all_versions with pytest.raises(NodeExecutionFailedError) as ex: @@ -1737,7 +1764,8 @@ def columns(table_name): query=parse_one("SELECT c, a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), ) snapshot = make_snapshot(model, version="1") - snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot.change_category = SnapshotChangeCategory.BREAKING + snapshot.forward_only = True snapshot.previous_versions = snapshot.all_versions logger = logging.getLogger("sqlmesh.core.snapshot.evaluator") @@ -1779,7 +1807,7 @@ def test_forward_only_snapshot_for_added_model(mocker: MockerFixture, adapter_mo ) snapshot = make_snapshot(model) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) evaluator.create([snapshot], {}) @@ -3517,7 +3545,7 @@ def test_create_managed_forward_only_with_previous_version_doesnt_clone_for_dev_ ) snapshot = make_snapshot(model) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.previous_versions = ( SnapshotDataVersion( fingerprint=SnapshotFingerprint( @@ -3525,7 +3553,7 @@ def test_create_managed_forward_only_with_previous_version_doesnt_clone_for_dev_ metadata_hash="test_metadata_hash", ), version="test_version", - change_category=SnapshotChangeCategory.FORWARD_ONLY, + change_category=SnapshotChangeCategory.BREAKING, dev_table_suffix="dev", ), ) @@ -3551,46 +3579,58 @@ def test_create_managed_forward_only_with_previous_version_doesnt_clone_for_dev_ @pytest.mark.parametrize( - "deployability_index, snapshot_category, deployability_flags", + "deployability_index, snapshot_category, forward_only, deployability_flags", [ - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.BREAKING, [True]), - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.NON_BREAKING, [True]), - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.FORWARD_ONLY, [False]), - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.INDIRECT_BREAKING, [True]), + (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.BREAKING, False, [True]), + (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.NON_BREAKING, False, [True]), + (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.BREAKING, True, [False]), + ( + DeployabilityIndex.all_deployable(), + SnapshotChangeCategory.INDIRECT_BREAKING, + False, + [True], + ), ( DeployabilityIndex.all_deployable(), SnapshotChangeCategory.INDIRECT_NON_BREAKING, + False, [False], ), - (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.METADATA, [False]), + (DeployabilityIndex.all_deployable(), SnapshotChangeCategory.METADATA, False, [False]), ( DeployabilityIndex.none_deployable(), SnapshotChangeCategory.BREAKING, + False, [False, True], ), ( DeployabilityIndex.none_deployable(), SnapshotChangeCategory.NON_BREAKING, + False, [False, True], ), ( DeployabilityIndex.none_deployable(), - SnapshotChangeCategory.FORWARD_ONLY, + SnapshotChangeCategory.BREAKING, + True, [False], ), ( DeployabilityIndex.none_deployable(), SnapshotChangeCategory.INDIRECT_BREAKING, + False, [False, True], ), ( DeployabilityIndex.none_deployable(), SnapshotChangeCategory.INDIRECT_NON_BREAKING, + False, [False], ), ( DeployabilityIndex.none_deployable(), SnapshotChangeCategory.METADATA, + False, [False], ), ], @@ -3602,12 +3642,13 @@ def test_create_snapshot( deployability_index: DeployabilityIndex, deployability_flags: t.List[bool], snapshot_category: SnapshotChangeCategory, + forward_only: bool, ): adapter_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter") adapter_mock.dialect = "duckdb" evaluator = SnapshotEvaluator(adapter_mock) - snapshot.categorize_as(category=snapshot_category) + snapshot.categorize_as(category=snapshot_category, forward_only=forward_only) evaluator._create_snapshot( snapshot=snapshot, snapshots={}, @@ -3657,7 +3698,7 @@ def test_migrate_snapshot(snapshot: Snapshot, mocker: MockerFixture, adapter_moc updated_model = SqlModel.parse_obj(updated_model_dict) new_snapshot = make_snapshot(updated_model) - new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) new_snapshot.previous_versions = snapshot.all_versions new_snapshot.version = snapshot.version @@ -3728,7 +3769,7 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture): ) ) snapshot: Snapshot = make_snapshot(model) - snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.previous_versions = snapshot.all_versions # no schema changes - no-op @@ -3930,7 +3971,8 @@ def columns(table_name): query=parse_one("SELECT c FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), ) snapshot_1 = make_snapshot(model, version="1") - snapshot_1.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot_1.change_category = SnapshotChangeCategory.BREAKING + snapshot_1.forward_only = True snapshot_1.previous_versions = snapshot_1.all_versions model_2 = SqlModel( name="test_schema.test_model_2", @@ -3941,7 +3983,8 @@ def columns(table_name): query=parse_one("SELECT c FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), ) snapshot_2 = make_snapshot(model_2, version="1") - snapshot_2.change_category = SnapshotChangeCategory.FORWARD_ONLY + snapshot_2.change_category = SnapshotChangeCategory.BREAKING + snapshot_2.forward_only = True snapshot_2.previous_versions = snapshot_2.all_versions evaluator.migrate( [snapshot_1, snapshot_2], {}, deployability_index=DeployabilityIndex.none_deployable()