From 19e794db121b33364b52e3b33bfacfce4e73c9b8 Mon Sep 17 00:00:00 2001 From: eakmanrq <6326532+eakmanrq@users.noreply.github.com> Date: Thu, 7 Aug 2025 17:23:09 -0700 Subject: [PATCH] feat!: add support for `on_additive_change` --- docs/concepts/models/overview.md | 11 +- docs/concepts/plans.md | 20 +- docs/guides/incremental_time.md | 133 +++- docs/guides/model_selection.md | 2 +- docs/integrations/dbt.md | 18 +- docs/reference/cli.md | 2 + docs/reference/model_configuration.md | 12 +- sqlmesh/cli/main.py | 8 + sqlmesh/core/config/model.py | 5 + sqlmesh/core/console.py | 34 + sqlmesh/core/context.py | 10 + sqlmesh/core/model/kind.py | 58 ++ sqlmesh/core/model/meta.py | 32 + sqlmesh/core/plan/builder.py | 136 +++- sqlmesh/core/plan/definition.py | 3 + sqlmesh/core/schema_diff.py | 121 +++- sqlmesh/core/snapshot/definition.py | 10 + sqlmesh/core/snapshot/evaluator.py | 100 ++- sqlmesh/dbt/model.py | 36 +- ...ive_change_to_incremental_metadata_hash.py | 5 + sqlmesh/utils/errors.py | 89 ++- tests/core/test_model.py | 80 ++- tests/core/test_plan.py | 642 +++++++++++++++++- tests/core/test_plan_stages.py | 12 + tests/core/test_schema_diff.py | 483 +++++++++++++ tests/core/test_snapshot.py | 5 +- tests/core/test_snapshot_evaluator.py | 4 +- tests/dbt/test_config.py | 43 +- tests/dbt/test_transformation.py | 183 ++++- .../github/cicd/test_integration.py | 12 +- 30 files changed, 2170 insertions(+), 139 deletions(-) create mode 100644 sqlmesh/migrations/v0088_add_on_additive_change_to_incremental_metadata_hash.py diff --git a/docs/concepts/models/overview.md b/docs/concepts/models/overview.md index dd9fd0d767..d6356462b4 100644 --- a/docs/concepts/models/overview.md +++ b/docs/concepts/models/overview.md @@ -507,11 +507,18 @@ Some properties are only available in specific model kinds - see the [model conf : Set this to true to indicate that all changes to this model should be [forward-only](../plans.md#forward-only-plans). ### on_destructive_change -: What should happen when a change to a [forward-only model](../../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../plans.md#forward-only-plans) causes a destructive modification to the table schema (i.e., requires dropping an existing column). +: What should happen when a change to a [forward-only model](../../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../plans.md#forward-only-plans) causes a destructive modification to the table schema (i.e., requires dropping an existing column or modifying column constraints in ways that could cause data loss). SQLMesh checks for destructive changes at plan time based on the model definition and run time based on the model's underlying physical tables. - Must be one of the following values: `allow`, `warn`, or `error` (default). + Must be one of the following values: `allow`, `warn`, `error` (default), or `ignore`. + +### on_additive_change +: What should happen when a change to a [forward-only model](../../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../plans.md#forward-only-plans) causes an additive modification to the table schema (i.e., adding new columns, modifying column data types in compatible ways, ect.). + + SQLMesh checks for additive changes at plan time based on the model definition and run time based on the model's underlying physical tables. + + Must be one of the following values: `allow` (default), `warn`, `error`, or `ignore`. ### disable_restatement : Set this to true to indicate that [data restatement](../plans.md#restatement-plans) is disabled for this model. diff --git a/docs/concepts/plans.md b/docs/concepts/plans.md index da3d3debb7..59db8d1aba 100644 --- a/docs/concepts/plans.md +++ b/docs/concepts/plans.md @@ -354,9 +354,25 @@ Some model changes destroy existing data in a table. SQLMesh automatically detec Forward-only plans treats all of the plan's model changes as forward-only. In these plans, SQLMesh will check all modified incremental models for destructive schema changes, not just forward-only models. -SQLMesh determines what to do for each model based on this setting hierarchy: the [model's `on_destructive_change` value](../guides/incremental_time.md#destructive-changes) (if present), the `on_destructive_change` [model defaults](../reference/model_configuration.md#model-defaults) value (if present), and the SQLMesh global default of `error`. +SQLMesh determines what to do for each model based on this setting hierarchy: -If you want to temporarily allow destructive changes to models that don't allow them, use the `plan` command's `--allow-destructive-model` selector to specify which models. Learn more about model selectors [here](../guides/model_selection.md). +- **For destructive changes**: the [model's `on_destructive_change` value](../guides/incremental_time.md#schema-changes) (if present), the `on_destructive_change` [model defaults](../reference/model_configuration.md#model-defaults) value (if present), and the SQLMesh global default of `error` +- **For additive changes**: the [model's `on_additive_change` value](../guides/incremental_time.md#schema-changes) (if present), the `on_additive_change` [model defaults](../reference/model_configuration.md#model-defaults) value (if present), and the SQLMesh global default of `allow` + +If you want to temporarily allow destructive changes to models that don't allow them, use the `plan` command's `--allow-destructive-model` selector to specify which models. +Similarly, if you want to temporarily allow additive changes to models configured with `on_additive_change=error`, use the `--allow-additive-model` selector. + +For example, to allow additive changes to all models in the `analytics` schema: +```bash +sqlmesh plan --forward-only --allow-additive-model "analytics.*" +``` + +Or to allow additive changes to multiple specific models: +```bash +sqlmesh plan --forward-only --allow-additive-model "sales.revenue_model" --allow-additive-model "marketing.campaign_model" +``` + +Learn more about model selectors [here](../guides/model_selection.md). ### Effective date Changes that are part of the forward-only plan can also be applied retroactively to the production environment by specifying the effective date: diff --git a/docs/guides/incremental_time.md b/docs/guides/incremental_time.md index 7c773f7edc..b9ab1b37a3 100644 --- a/docs/guides/incremental_time.md +++ b/docs/guides/incremental_time.md @@ -159,19 +159,47 @@ WHERE Alternatively, all the changes contained in a *specific plan* can be classified as forward-only with a flag: `sqlmesh plan --forward-only`. A subsequent plan that did not include the forward-only flag would fully refresh the model's physical table. Learn more about forward-only plans [here](../concepts/plans.md#forward-only-plans). -### Destructive changes +### Schema changes -Some model changes destroy existing data in a table. Dropping a column from the model is the most direct cause, but changing a column's data type (such as casting a column from a `STRING` to `INTEGER`) can also require a drop. (Whether or not a specific change requires dropping a column may differ across SQL engines.) +When SQLMesh processes forward-only changes to incremental models, it compares the model's new schema with the existing physical table schema to detect potential data loss or compatibility issues. SQLMesh categorizes schema changes into two types: -Forward-only models are used to retain existing data. Before executing forward-only changes to incremental models, SQLMesh performs a check to determine if existing data will be destroyed. +#### Destructive changes +Some model changes destroy existing data in a table. Examples include: -The check is performed at plan time based on the model definition. SQLMesh may not be able to resolve all of a model's column data types and complete the check, so the check is performed again at run time based on the physical tables underlying the model. +- **Dropping a column** from the model +- **Renaming a column** without proper aliasing +- **Modifying column constraints** in ways that could cause data loss + +Whether a specific change is destructive may differ across SQL engines based on their schema evolution capabilities. + +#### Additive changes +Additive changes are any changes that aren't categorized as destructive. A simple examples would be adding a column to a table. + +SQLMesh performs schema change detection at plan time based on the model definition. If SQLMesh cannot resolve all of a model's column data types at plan time, the check is performed again at run time based on the physical tables underlying the model. #### Changes to forward-only models -A model's `on_destructive_change` [configuration setting](../reference/model_configuration.md#incremental-models) determines what happens when SQLMesh detects a destructive change. +SQLMesh provides two configuration settings to control how schema changes are handled: + +- **`on_destructive_change`** - Controls behavior for destructive schema changes +- **`on_additive_change`** - Controls behavior for additive schema changes + +##### Configuration options + +Both properties support four values: + +- **`error`** (default for `on_destructive_change`): Stop execution and raise an error +- **`warn`**: Log a warning but proceed with the change +- **`allow`** (default for `on_additive_change`): Silently proceed with the change +- **`ignore`**: Skip the schema change check entirely for this change type + +!!! warning "Ignore is Dangerous" -By default, SQLMesh will error so no data is lost. You can set `on_destructive_change` to `warn` or `allow` in the model's `MODEL` block to allow destructive changes. +`ignore` is dangerous since it can result in error or data loss. It likely should never be used but could be useful as an "escape-hatch" or a way to workaround unexpected behavior. + +##### Destructive change handling + +The `on_destructive_change` [configuration setting](../reference/model_configuration.md#incremental-models) determines what happens when SQLMesh detects a destructive change. By default, SQLMesh will error so no data is lost. This example configures a model to silently `allow` destructive changes: @@ -186,12 +214,97 @@ MODEL ( ); ``` -A default `on_destructive_change` value can be set for all incremental models that do not specify it themselves in the [model defaults configuration](../reference/model_configuration.md#model-defaults). +##### Additive change handling + +The `on_additive_change` configuration setting determines what happens when SQLMesh detects an additive change like adding new columns. By default, SQLMesh allows these changes since they don't destroy existing data. + +This example configures a model to raise an error for additive changes (useful for strict schema control): + +``` sql linenums="1" +MODEL ( + name sqlmesh_example.new_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column model_time_column, + forward_only true, + on_additive_change error + ), +); +``` + +##### Combining both settings + +You can configure both settings together to have fine-grained control over schema evolution: + +``` sql linenums="1" +MODEL ( + name sqlmesh_example.new_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column model_time_column, + forward_only true, + on_destructive_change warn, -- Warn but allow destructive changes + on_additive_change allow -- Silently allow new columns + ), +); +``` + +!!! warning "Unusual combinations" + + SQLMesh will warn about unusual combinations such as allowing destructive changes while erroring on additive changes, since this indicates a potentially inconsistent schema change policy. + +##### Model defaults + +Default values for both `on_destructive_change` and `on_additive_change` can be set for all incremental models in the [model defaults configuration](../reference/model_configuration.md#model-defaults). + +##### Common use cases + +Here are some common patterns for configuring schema change handling: + +**Strict schema control** - Prevent any schema changes: +```sql linenums="1" +MODEL ( + name sqlmesh_example.strict_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date, + forward_only true, + on_destructive_change error, -- Block destructive changes + on_additive_change error -- Block even new columns + ), +); +``` + +**Permissive development model** - Allow all schema changes: +```sql linenums="1" +MODEL ( + name sqlmesh_example.dev_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date, + forward_only true, + on_destructive_change allow, -- Allow dropping columns + on_additive_change allow -- Allow new columns + ), +); +``` + +**Production safety** - Allow safe changes, warn about risky ones: +```sql linenums="1" +MODEL ( + name sqlmesh_example.production_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date, + forward_only true, + on_destructive_change warn, -- Warn about destructive changes + on_additive_change allow -- Silently allow new columns + ), +); +``` #### Changes in forward-only plans -The SQLMesh `plan` [`--forward-only` option](../concepts/plans.md#forward-only-plans) treats all the plan's model changes as forward-only. When this option is specified, SQLMesh will check all modified incremental models for destructive schema changes, not just models configured with `forward_only true`. +The SQLMesh `plan` [`--forward-only` option](../concepts/plans.md#forward-only-plans) treats all the plan's model changes as forward-only. When this option is specified, SQLMesh will check all modified incremental models for both destructive and additive schema changes, not just models configured with `forward_only true`. + +SQLMesh determines what to do for each model based on this setting hierarchy: -SQLMesh determines what to do for each model based on this setting hierarchy: the model's `on_destructive_change` value (if present), the `on_destructive_change` [model defaults](../reference/model_configuration.md#model-defaults) value (if present), and the SQLMesh global default of `error`. +- **For destructive changes**: the model's `on_destructive_change` value (if present), the `on_destructive_change` [model defaults](../reference/model_configuration.md#model-defaults) value (if present), and the SQLMesh global default of `error` +- **For additive changes**: the model's `on_additive_change` value (if present), the `on_additive_change` [model defaults](../reference/model_configuration.md#model-defaults) value (if present), and the SQLMesh global default of `allow` -If you want to temporarily allow destructive changes to models that don't allow them, use the `plan` command's [`--allow-destructive-model` selector](../concepts/plans.md#destructive-changes) to specify which models. Learn more about model selectors [here](../guides/model_selection.md). +If you want to temporarily allow destructive changes to models that don't allow them, use the `plan` command's [`--allow-destructive-model` selector](../concepts/plans.md#destructive-changes) to specify which models. Similarly, if you want to temporarily allow additive changes to models configured with `on_additive_change=error`, use the [`--allow-additive-model` selector](../concepts/plans.md#destructive-changes). Learn more about model selectors [here](../guides/model_selection.md). diff --git a/docs/guides/model_selection.md b/docs/guides/model_selection.md index db098a1538..9cc0a4358a 100644 --- a/docs/guides/model_selection.md +++ b/docs/guides/model_selection.md @@ -2,7 +2,7 @@ This guide describes how to select specific models to include in a SQLMesh plan, which can be useful when modifying a subset of the models in a SQLMesh project. -Note: the selector syntax described below is also used for the SQLMesh `plan` [`--allow-destructive-model` selector](../concepts/plans.md#destructive-changes) and for the `table_diff` command to [diff a selection of models](./tablediff.md#diffing-multiple-models-across-environments). +Note: the selector syntax described below is also used for the SQLMesh `plan` [`--allow-destructive-model` and `--allow-additive-model` selectors](../concepts/plans.md#destructive-changes) and for the `table_diff` command to [diff a selection of models](./tablediff.md#diffing-multiple-models-across-environments). ## Background diff --git a/docs/integrations/dbt.md b/docs/integrations/dbt.md index e46e2fef39..b9a24fbce8 100644 --- a/docs/integrations/dbt.md +++ b/docs/integrations/dbt.md @@ -211,18 +211,18 @@ Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) p #### on_schema_change -SQLMesh automatically detects destructive schema changes to [forward-only incremental models](../guides/incremental_time.md#forward-only-models) and to all incremental models in [forward-only plans](../concepts/plans.md#destructive-changes). +SQLMesh automatically detects both destructive and additive schema changes to [forward-only incremental models](../guides/incremental_time.md#forward-only-models) and to all incremental models in [forward-only plans](../concepts/plans.md#destructive-changes). -A model's [`on_destructive_change` setting](../guides/incremental_time.md#destructive-changes) determines whether it errors (default), warns, or silently allows the changes. SQLMesh always allows non-destructive forward-only schema changes, such as adding or casting a column in place. +A model's [`on_destructive_change` and `on_additive_change` settings](../guides/incremental_time.md#schema-changes) determine whether it errors, warns, silently allows, or ignores the changes. SQLMesh provides fine-grained control over both destructive changes (like dropping columns) and additive changes (like adding new columns). -`on_schema_change` configuration values are mapped to these SQLMesh `on_destructive_change` values: +`on_schema_change` configuration values are mapped to these SQLMesh settings: -| `on_schema_change` | SQLMesh `on_destructive_change` | -| ------------------ | ------------------------------- | -| ignore | warn | -| append_new_columns | warn | -| sync_all_columns | allow | -| fail | error | +| `on_schema_change` | SQLMesh `on_destructive_change` | SQLMesh `on_additive_change` | +|--------------------|---------------------------------|------------------------------| +| ignore | ignore | ignore | +| fail | error | error | +| append_new_columns | ignore | allow | +| sync_all_columns | allow | allow | ## Snapshot support diff --git a/docs/reference/cli.md b/docs/reference/cli.md index b6877962ab..a9ce9366e1 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -367,6 +367,8 @@ Options: --forward-only Create a plan for forward-only changes. --allow-destructive-model TEXT Allow destructive forward-only changes to models whose names match the expression. + --allow-additive-model TEXT Allow additive forward-only changes to + models whose names match the expression. --effective-from TEXT The effective date from which to apply forward-only changes on production. --no-prompts Disable interactive prompts for the backfill diff --git a/docs/reference/model_configuration.md b/docs/reference/model_configuration.md index 6ea3dd68b6..a5a96ebbf9 100644 --- a/docs/reference/model_configuration.md +++ b/docs/reference/model_configuration.md @@ -186,6 +186,7 @@ The SQLMesh project-level `model_defaults` key supports the following options, d - virtual_properties - session_properties (on per key basis) - on_destructive_change (described [below](#incremental-models)) +- on_additive_change (described [below](#incremental-models)) - audits (described [here](../concepts/audits.md#generic-audits)) - optimize_query - allow_partials @@ -231,11 +232,12 @@ Python model kind `name` enum value: [ModelKindName.FULL](https://sqlmesh.readth Configuration options for all incremental models (in addition to [general model properties](#general-model-properties)). -| Option | Description | Type | Required | -|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:----:|:--------:| -| `forward_only` | Whether the model's changes should always be classified as [forward-only](../concepts/plans.md#forward-only-change). (Default: `False`) | bool | N | -| `on_destructive_change` | What should happen when a change to a [forward-only model](../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../concepts/plans.md#forward-only-plans) causes a destructive modification to the model schema. Valid values: `allow`, `warn`, `error`. (Default: `error`) | str | N | -| `disable_restatement` | Whether [restatements](../concepts/plans.md#restatement-plans) should be disabled for the model. (Default: `False`) | bool | N | +| Option | Description | Type | Required | +|-------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:----:|:--------:| +| `forward_only` | Whether the model's changes should always be classified as [forward-only](../concepts/plans.md#forward-only-change). (Default: `False`) | bool | N | +| `on_destructive_change` | What should happen when a change to a [forward-only model](../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../concepts/plans.md#forward-only-plans) causes a destructive modification to the model schema. Valid values: `allow`, `warn`, `error`, `ignore`. (Default: `error`) | str | N | +| `on_additive_change` | What should happen when a change to a [forward-only model](../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../concepts/plans.md#forward-only-plans) causes an additive modification to the model schema (like adding new columns). Valid values: `allow`, `warn`, `error`, `ignore`. (Default: `allow`) | str | N | +| `disable_restatement` | Whether [restatements](../concepts/plans.md#restatement-plans) should be disabled for the model. (Default: `False`) | bool | N | #### Incremental by time range diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index 8982efc9f8..d1953c0cbc 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -450,6 +450,12 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None: multiple=True, help="Allow destructive forward-only changes to models whose names match the expression.", ) +@click.option( + "--allow-additive-model", + type=str, + multiple=True, + help="Allow additive forward-only changes to models whose names match the expression.", +) @click.option( "--effective-from", type=str, @@ -542,6 +548,7 @@ def plan( restate_models = kwargs.pop("restate_model") or None select_models = kwargs.pop("select_model") or None allow_destructive_models = kwargs.pop("allow_destructive_model") or None + allow_additive_models = kwargs.pop("allow_additive_model") or None backfill_models = kwargs.pop("backfill_model") or None setattr(get_console(), "verbosity", Verbosity(verbose)) @@ -550,6 +557,7 @@ def plan( restate_models=restate_models, select_models=select_models, allow_destructive_models=allow_destructive_models, + allow_additive_models=allow_additive_models, backfill_models=backfill_models, **kwargs, ) diff --git a/sqlmesh/core/config/model.py b/sqlmesh/core/config/model.py index 3a6266928a..294b437447 100644 --- a/sqlmesh/core/config/model.py +++ b/sqlmesh/core/config/model.py @@ -8,8 +8,10 @@ from sqlmesh.core.model.kind import ( ModelKind, OnDestructiveChange, + OnAdditiveChange, model_kind_validator, on_destructive_change_validator, + on_additive_change_validator, ) from sqlmesh.core.model.meta import FunctionCall from sqlmesh.core.node import IntervalUnit @@ -34,6 +36,7 @@ class ModelDefaultsConfig(BaseConfig): storage_format: The storage format used to store the physical table, only applicable in certain engines. (eg. 'parquet', 'orc') on_destructive_change: What should happen when a forward-only model requires a destructive schema change. + on_additive_change: What should happen when a forward-only model requires an additive schema change. physical_properties: A key-value mapping of arbitrary properties that are applied to the model table / view in the physical layer. virtual_properties: A key-value mapping of arbitrary properties that are applied to the model view in the virtual layer. session_properties: A key-value mapping of properties specific to the target engine that are applied to the engine session. @@ -56,6 +59,7 @@ class ModelDefaultsConfig(BaseConfig): table_format: t.Optional[str] = None storage_format: t.Optional[str] = None on_destructive_change: t.Optional[OnDestructiveChange] = None + on_additive_change: t.Optional[OnAdditiveChange] = None physical_properties: t.Optional[t.Dict[str, t.Any]] = None virtual_properties: t.Optional[t.Dict[str, t.Any]] = None session_properties: t.Optional[t.Dict[str, t.Any]] = None @@ -71,6 +75,7 @@ class ModelDefaultsConfig(BaseConfig): _model_kind_validator = model_kind_validator _on_destructive_change_validator = on_destructive_change_validator + _on_additive_change_validator = on_additive_change_validator @field_validator("audits", mode="before") def _audits_validator(cls, v: t.Any) -> t.Any: diff --git a/sqlmesh/core/console.py b/sqlmesh/core/console.py index cf87fd7443..76ff856607 100644 --- a/sqlmesh/core/console.py +++ b/sqlmesh/core/console.py @@ -47,6 +47,7 @@ PythonModelEvalError, NodeAuditsErrors, format_destructive_change_msg, + format_additive_change_msg, ) from sqlmesh.utils.rich import strip_ansi_codes @@ -334,6 +335,16 @@ def log_destructive_change( ) -> None: """Display a destructive change error or warning to the user.""" + @abc.abstractmethod + def log_additive_change( + self, + snapshot_name: str, + additive_changes: t.List[exp.Alter], + dialect: str, + error: bool = True, + ) -> None: + """Display an additive change error or warning to the user.""" + class UnitTestConsole(abc.ABC): @abc.abstractmethod @@ -764,6 +775,15 @@ def log_destructive_change( ) -> None: pass + def log_additive_change( + self, + snapshot_name: str, + additive_changes: t.List[exp.Alter], + dialect: str, + error: bool = True, + ) -> None: + pass + def log_error(self, message: str) -> None: pass @@ -2217,6 +2237,20 @@ def log_destructive_change( ) ) + def log_additive_change( + self, + snapshot_name: str, + additive_changes: t.List[exp.Alter], + dialect: str, + error: bool = True, + ) -> None: + if error: + self._print(format_additive_change_msg(snapshot_name, additive_changes, dialect)) + else: + self.log_warning( + format_additive_change_msg(snapshot_name, additive_changes, dialect, error) + ) + def log_error(self, message: str) -> None: self._print(f"[red]{message}[/red]") diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 18df3a01fd..257fc5b768 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1276,6 +1276,7 @@ def plan( empty_backfill: t.Optional[bool] = None, forward_only: t.Optional[bool] = None, allow_destructive_models: t.Optional[t.Collection[str]] = None, + allow_additive_models: t.Optional[t.Collection[str]] = None, no_prompts: t.Optional[bool] = None, auto_apply: t.Optional[bool] = None, no_auto_categorization: t.Optional[bool] = None, @@ -1356,6 +1357,7 @@ def plan( empty_backfill=empty_backfill, forward_only=forward_only, allow_destructive_models=allow_destructive_models, + allow_additive_models=allow_additive_models, no_auto_categorization=no_auto_categorization, effective_from=effective_from, include_unmodified=include_unmodified, @@ -1406,6 +1408,7 @@ def plan_builder( empty_backfill: t.Optional[bool] = None, forward_only: t.Optional[bool] = None, allow_destructive_models: t.Optional[t.Collection[str]] = None, + allow_additive_models: t.Optional[t.Collection[str]] = None, no_auto_categorization: t.Optional[bool] = None, effective_from: t.Optional[TimeLike] = None, include_unmodified: t.Optional[bool] = None, @@ -1442,6 +1445,7 @@ def plan_builder( empty_backfill: Like skip_backfill, but also records processed intervals. forward_only: Whether the purpose of the plan is to make forward only changes. allow_destructive_models: Models whose forward-only changes are allowed to be destructive. + allow_additive_models: Models whose forward-only changes are allowed to be additive. no_auto_categorization: Indicates whether to disable automatic categorization of model changes (breaking / non-breaking). If not provided, then the corresponding configuration option determines the behavior. @@ -1527,6 +1531,11 @@ def plan_builder( else: expanded_destructive_models = None + if allow_additive_models: + expanded_additive_models = model_selector.expand_model_selections(allow_additive_models) + else: + expanded_additive_models = None + if backfill_models: backfill_models = model_selector.expand_model_selections(backfill_models) else: @@ -1632,6 +1641,7 @@ def plan_builder( forward_only if forward_only is not None else self.config.plan.forward_only ), allow_destructive_models=expanded_destructive_models, + allow_additive_models=expanded_additive_models, environment_ttl=environment_ttl, environment_suffix_target=self.config.environment_suffix_target, environment_catalog_mapping=self.environment_catalog_mapping, diff --git a/sqlmesh/core/model/kind.py b/sqlmesh/core/model/kind.py index c297d916d5..a4e3f66de5 100644 --- a/sqlmesh/core/model/kind.py +++ b/sqlmesh/core/model/kind.py @@ -188,6 +188,7 @@ class OnDestructiveChange(str, Enum): ERROR = "ERROR" WARN = "WARN" ALLOW = "ALLOW" + IGNORE = "IGNORE" @property def is_error(self) -> bool: @@ -201,6 +202,35 @@ def is_warn(self) -> bool: def is_allow(self) -> bool: return self == OnDestructiveChange.ALLOW + @property + def is_ignore(self) -> bool: + return self == OnDestructiveChange.IGNORE + + +class OnAdditiveChange(str, Enum): + """What should happen when a forward-only model change requires an additive schema change.""" + + ERROR = "ERROR" + WARN = "WARN" + ALLOW = "ALLOW" + IGNORE = "IGNORE" + + @property + def is_error(self) -> bool: + return self == OnAdditiveChange.ERROR + + @property + def is_warn(self) -> bool: + return self == OnAdditiveChange.WARN + + @property + def is_allow(self) -> bool: + return self == OnAdditiveChange.ALLOW + + @property + def is_ignore(self) -> bool: + return self == OnAdditiveChange.IGNORE + def _on_destructive_change_validator( cls: t.Type, v: t.Union[OnDestructiveChange, str, exp.Identifier] @@ -217,6 +247,21 @@ def _on_destructive_change_validator( ) +def _on_additive_change_validator( + cls: t.Type, v: t.Union[OnAdditiveChange, str, exp.Identifier] +) -> t.Any: + if v and not isinstance(v, OnAdditiveChange): + return OnAdditiveChange( + v.this.upper() if isinstance(v, (exp.Identifier, exp.Literal)) else v.upper() + ) + return v + + +on_additive_change_validator = field_validator("on_additive_change", mode="before")( + _on_additive_change_validator +) + + class _ModelKind(PydanticModel, ModelKindMixin): name: ModelKindName @@ -330,15 +375,18 @@ def _kind_dialect_validator(cls: t.Type, v: t.Optional[str]) -> str: class _Incremental(_ModelKind): on_destructive_change: OnDestructiveChange = OnDestructiveChange.ERROR + on_additive_change: OnAdditiveChange = OnAdditiveChange.ALLOW auto_restatement_cron: t.Optional[SQLGlotCron] = None _on_destructive_change_validator = on_destructive_change_validator + _on_additive_change_validator = on_additive_change_validator @property def metadata_hash_values(self) -> t.List[t.Optional[str]]: return [ *super().metadata_hash_values, str(self.on_destructive_change), + str(self.on_additive_change), self.auto_restatement_cron, ] @@ -351,6 +399,7 @@ def to_expression( *_properties( { "on_destructive_change": self.on_destructive_change.value, + "on_additive_change": self.on_additive_change.value, "auto_restatement_cron": self.auto_restatement_cron, } ), @@ -1004,6 +1053,15 @@ def create_model_kind(v: t.Any, dialect: str, defaults: t.Dict[str, t.Any]) -> M ): props["on_destructive_change"] = defaults.get("on_destructive_change") + # only pass the on_additive_change user default to models inheriting from _Incremental + # that don't explicitly set it in the model definition + if ( + issubclass(kind_type, _Incremental) + and props.get("on_additive_change") is None + and defaults.get("on_additive_change") is not None + ): + props["on_additive_change"] = defaults.get("on_additive_change") + if kind_type == CustomKind: # load the custom materialization class and check if it uses a custom kind type from sqlmesh.core.snapshot.evaluator import get_custom_materialization_type diff --git a/sqlmesh/core/model/meta.py b/sqlmesh/core/model/meta.py index 585bb15a6c..3c365c086a 100644 --- a/sqlmesh/core/model/meta.py +++ b/sqlmesh/core/model/meta.py @@ -24,11 +24,13 @@ IncrementalByUniqueKeyKind, ModelKind, OnDestructiveChange, + OnAdditiveChange, SCDType2ByColumnKind, SCDType2ByTimeKind, TimeColumn, ViewKind, _IncrementalBy, + _Incremental, model_kind_validator, ) from sqlmesh.core.node import _Node, str_or_exp_to_str @@ -418,6 +420,31 @@ def _root_validator(self) -> Self: f"Model {self.name} has `storage_format` set to a table format '{storage_format}' which is deprecated. Please use the `table_format` property instead." ) + # Warn about unusual combinations of on_destructive_change and on_additive_change + if isinstance(kind, _Incremental): + destructive = kind.on_destructive_change + additive = kind.on_additive_change + + # Unusual combination: ALLOW destructive but ERROR on additive + # (very restrictive on additions, permissive on removals) + if destructive.is_allow and additive.is_error: + from sqlmesh.core.console import get_console + + get_console().log_warning( + f"Model {self.name} has unusual combination: on_destructive_change=ALLOW but on_additive_change=ERROR. " + "This allows destructive changes (like dropping columns) but blocks additive changes (like adding columns)." + ) + + # Unusual combination: ERROR on destructive but IGNORE on additive + # (very permissive on additions, strict on removals - this is actually reasonable) + elif destructive.is_error and additive.is_ignore: + from sqlmesh.core.console import get_console + + get_console().log_warning( + f"Model {self.name} has on_destructive_change=ERROR and on_additive_change=IGNORE. " + "This is an unusual combination and likely a mistake." + ) + return self @property @@ -545,6 +572,11 @@ def fqn(self) -> str: def on_destructive_change(self) -> OnDestructiveChange: return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW) + @property + def on_additive_change(self) -> OnAdditiveChange: + """Return the model's additive change setting if it has one.""" + return getattr(self.kind, "on_additive_change", OnAdditiveChange.ALLOW) + @property def ignored_rules(self) -> t.Set[str]: return self.ignored_rules_ or set() diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 6f3c7f0805..b79476c1d4 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -8,6 +8,7 @@ from datetime import datetime +from sqlglot import exp from sqlmesh.core.console import PlanBuilderConsole, get_console from sqlmesh.core.config import ( AutoCategorizationMode, @@ -26,6 +27,10 @@ get_schema_differ, has_drop_alteration, get_dropped_column_names, + has_additive_changes, + get_additive_changes, + filter_additive_changes, + filter_destructive_changes, ) from sqlmesh.core.snapshot import ( DeployabilityIndex, @@ -46,6 +51,7 @@ is_relative, ) from sqlmesh.utils.errors import NoChangesPlanError, PlanError +from sqlmesh.core.model.kind import _Incremental logger = logging.getLogger(__name__) @@ -72,6 +78,7 @@ class PlanBuilder: is_dev: Whether this plan is for development purposes. forward_only: Whether the purpose of the plan is to make forward only changes. allow_destructive_models: A list of fully qualified model names whose forward-only changes are allowed to be destructive. + allow_additive_models: A list of fully qualified model names whose forward-only changes are allowed to be additive. environment_ttl: The period of time that a development environment should exist before being deleted. categorizer_config: Auto categorization settings. auto_categorization_enabled: Whether to apply auto categorization. @@ -106,6 +113,7 @@ def __init__( is_dev: bool = False, forward_only: bool = False, allow_destructive_models: t.Optional[t.Iterable[str]] = None, + allow_additive_models: t.Optional[t.Iterable[str]] = None, environment_ttl: t.Optional[str] = None, environment_suffix_target: EnvironmentSuffixTarget = EnvironmentSuffixTarget.default, environment_catalog_mapping: t.Optional[t.Dict[re.Pattern, str]] = None, @@ -133,6 +141,9 @@ def __init__( self._allow_destructive_models = set( allow_destructive_models if allow_destructive_models is not None else [] ) + self._allow_additive_models = set( + allow_additive_models if allow_additive_models is not None else [] + ) self._enable_preview = enable_preview self._end_bounded = end_bounded self._ensure_finalized_snapshots = ensure_finalized_snapshots @@ -280,6 +291,7 @@ def build(self) -> Plan: directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag) self._check_destructive_changes(directly_modified) + self._check_additive_changes(directly_modified) self._categorize_snapshots(dag, indirectly_modified) self._adjust_new_snapshot_intervals() @@ -323,6 +335,7 @@ def build(self) -> Plan: forward_only=self._forward_only, explain=self._explain, allow_destructive_models=t.cast(t.Set, self._allow_destructive_models), + allow_additive_models=t.cast(t.Set, self._allow_additive_models), include_unmodified=self._include_unmodified, environment_ttl=self._environment_ttl, environment_naming_info=self.environment_naming_info, @@ -524,6 +537,48 @@ def _adjust_new_snapshot_intervals(self) -> None: if new.is_forward_only: new.dev_intervals = new.intervals.copy() + def _get_schema_changes_for_snapshot(self, snapshot: Snapshot) -> t.Optional[t.List[exp.Alter]]: + """Get schema changes for a snapshot if available, or None if schema analysis can't be performed. + + IMPORTANT: This method applies IGNORE filtering, so returned expressions will have + ignored change types already filtered out. This ensures that when IGNORE is set, + no alter expressions are generated for the ignored change types. + """ + if snapshot.name not in self._context_diff.modified_snapshots: + return None + + new, old = self._context_diff.modified_snapshots[snapshot.name] + + # we must know all columns_to_types to determine schema changes + old_columns_to_types = old.model.columns_to_types or {} + new_columns_to_types = new.model.columns_to_types or {} + + if not ( + columns_to_types_all_known(old_columns_to_types) + and columns_to_types_all_known(new_columns_to_types) + ): + return None + + # Generate all schema changes first + alter_expressions = get_schema_differ(snapshot.model.dialect).compare_columns( + new.name, + old_columns_to_types, + new_columns_to_types, + ) + + # Apply IGNORE filtering BEFORE returning expressions + # This ensures that ignored change types never reach validation or processing + + # Filter destructive changes if on_destructive_change is IGNORE + if snapshot.model.on_destructive_change.is_ignore: + alter_expressions = filter_destructive_changes(alter_expressions) + + # Filter additive changes if on_additive_change is IGNORE + if snapshot.model.on_additive_change.is_ignore: + alter_expressions = filter_additive_changes(alter_expressions) + + return alter_expressions + def _check_destructive_changes(self, directly_modified: t.Set[SnapshotId]) -> None: for s_id in sorted(directly_modified): snapshot = self._context_diff.snapshots[s_id] @@ -535,37 +590,62 @@ def _check_destructive_changes(self, directly_modified: t.Set[SnapshotId]) -> No if not should_raise_or_warn or not snapshot.is_model: continue - new, old = self._context_diff.modified_snapshots[snapshot.name] - - # we must know all columns_to_types to determine whether a change is destructive - old_columns_to_types = old.model.columns_to_types or {} - new_columns_to_types = new.model.columns_to_types or {} + schema_diff = self._get_schema_changes_for_snapshot(snapshot) + if schema_diff is None: + continue - if columns_to_types_all_known(old_columns_to_types) and columns_to_types_all_known( - new_columns_to_types - ): - schema_diff = get_schema_differ(snapshot.model.dialect).compare_columns( - new.name, - old_columns_to_types, - new_columns_to_types, + if has_drop_alteration(schema_diff): + # Note: IGNORE filtering happens in _get_schema_changes_for_snapshot + # so if we reach here, destructive changes are not being ignored + snapshot_name = snapshot.name + dropped_column_names = get_dropped_column_names(schema_diff) + model_dialect = snapshot.model.dialect + + self._console.log_destructive_change( + snapshot_name, + dropped_column_names, + schema_diff, + model_dialect, + error=not snapshot.model.on_destructive_change.is_warn, ) + if snapshot.model.on_destructive_change.is_error: + raise PlanError("Plan requires a destructive change to a forward-only model.") - if has_drop_alteration(schema_diff): - snapshot_name = snapshot.name - dropped_column_names = get_dropped_column_names(schema_diff) - model_dialect = snapshot.model.dialect - - self._console.log_destructive_change( - snapshot_name, - dropped_column_names, - schema_diff, - model_dialect, - error=not snapshot.model.on_destructive_change.is_warn, - ) - if snapshot.model.on_destructive_change.is_error: - raise PlanError( - "Plan requires a destructive change to a forward-only model." - ) + def _check_additive_changes(self, directly_modified: t.Set[SnapshotId]) -> None: + for s_id in sorted(directly_modified): + snapshot = self._context_diff.snapshots[s_id] + # should we raise/warn if this snapshot has/inherits an additive change? + should_raise_or_warn = ( + self._is_forward_only_change(s_id) or self._forward_only + ) and snapshot.needs_additive_check(self._allow_additive_models) + + if not should_raise_or_warn or not snapshot.is_model: + continue + + # Skip if model doesn't have additive change handling (non-incremental models) + # Only incremental model kinds have on_additive_change attribute + if not isinstance(snapshot.model.kind, _Incremental): + continue + + schema_diff = self._get_schema_changes_for_snapshot(snapshot) + if schema_diff is None: + continue + + if has_additive_changes(schema_diff): + # Note: IGNORE filtering happens in _get_schema_changes_for_snapshot + # so if we reach here, additive changes are not being ignored + snapshot_name = snapshot.name + additive_changes = get_additive_changes(schema_diff) + model_dialect = snapshot.model.dialect + + self._console.log_additive_change( + snapshot_name, + additive_changes, + model_dialect, + error=not snapshot.model.on_additive_change.is_warn, + ) + if snapshot.model.on_additive_change.is_error: + raise PlanError("Plan requires an additive change to a forward-only model.") def _categorize_snapshots( self, dag: DAG[SnapshotId], indirectly_modified: SnapshotMapping diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 584c0d9b51..8c0ba01dce 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -44,6 +44,7 @@ class Plan(PydanticModel, frozen=True): no_gaps: bool forward_only: bool allow_destructive_models: t.Set[str] + allow_additive_models: t.Set[str] include_unmodified: bool end_bounded: bool ensure_finalized_snapshots: bool @@ -256,6 +257,7 @@ def to_evaluatable(self) -> EvaluatablePlan: restatements={s.name: i for s, i in self.restatements.items()}, is_dev=self.is_dev, allow_destructive_models=self.allow_destructive_models, + allow_additive_models=self.allow_additive_models, forward_only=self.forward_only, end_bounded=self.end_bounded, ensure_finalized_snapshots=self.ensure_finalized_snapshots, @@ -297,6 +299,7 @@ class EvaluatablePlan(PydanticModel): restatements: t.Dict[str, Interval] is_dev: bool allow_destructive_models: t.Set[str] + allow_additive_models: t.Set[str] forward_only: bool end_bounded: bool ensure_finalized_snapshots: bool diff --git a/sqlmesh/core/schema_diff.py b/sqlmesh/core/schema_diff.py index 70b4f72163..c311fcaac3 100644 --- a/sqlmesh/core/schema_diff.py +++ b/sqlmesh/core/schema_diff.py @@ -168,6 +168,7 @@ class TableAlterOperation(PydanticModel): add_position: t.Optional[TableAlterColumnPosition] = None current_type: t.Optional[exp.DataType] = None cascade: bool = False + is_part_of_destructive_change: bool = False @classmethod def add( @@ -176,6 +177,7 @@ def add( column_type: t.Union[str, exp.DataType], expected_table_struct: t.Union[str, exp.DataType], position: t.Optional[TableAlterColumnPosition] = None, + is_part_of_destructive_change: bool = False, ) -> TableAlterOperation: return cls( op=TableAlterOperationType.ADD, @@ -183,6 +185,7 @@ def add( column_type=exp.DataType.build(column_type), add_position=position, expected_table_struct=exp.DataType.build(expected_table_struct), + is_part_of_destructive_change=is_part_of_destructive_change, ) @classmethod @@ -256,15 +259,17 @@ def expression( self, table_name: t.Union[str, exp.Table], array_element_selector: str ) -> exp.Alter: if self.is_alter_type: + alter_column = exp.AlterColumn( + this=self.column(array_element_selector), + dtype=self.column_type, + ) + # Add current type metadata for additive classification + if self.current_type: + alter_column.meta["current_type"] = self.current_type return exp.Alter( this=exp.to_table(table_name), kind="TABLE", - actions=[ - exp.AlterColumn( - this=self.column(array_element_selector), - dtype=self.column_type, - ) - ], + actions=[alter_column], ) if self.is_add: alter_table = exp.Alter(this=exp.to_table(table_name), kind="TABLE") @@ -272,6 +277,9 @@ def expression( alter_table.set("actions", [column]) if self.add_position: column.set("position", self.add_position.column_position_node) + # Set metadata if this ADD operation supports destructive changes + if self.is_part_of_destructive_change: + column.meta["sqlmesh_destructive"] = True return alter_table if self.is_drop: alter_table = exp.Alter(this=exp.to_table(table_name), kind="TABLE") @@ -530,6 +538,31 @@ def _add_operation( ) ] + def _add_destructive_operation( + self, + columns: t.List[TableAlterColumn], + new_pos: int, + new_kwarg: exp.ColumnDef, + current_struct: exp.DataType, + root_struct: exp.DataType, + ) -> t.List[TableAlterOperation]: + """Add operation marked as destructive support (for DROP+ADD scenarios).""" + if self.support_positional_add: + col_pos = TableAlterColumnPosition.create(new_pos, current_struct.expressions) + current_struct.expressions.insert(new_pos, new_kwarg) + else: + col_pos = None + current_struct.expressions.append(new_kwarg) + return [ + TableAlterOperation.add( + columns, + new_kwarg.args["kind"], + root_struct.copy(), + col_pos, + is_part_of_destructive_change=True, + ) + ] + def _resolve_add_operations( self, parent_columns: t.List[TableAlterColumn], @@ -607,10 +640,14 @@ def _alter_operation( col_pos, ) ] - return self._drop_operation(columns, root_struct, pos, root_struct) + self._add_operation( + # This is a destructive change requiring DROP + ADD + drop_operations = self._drop_operation(columns, root_struct, pos, root_struct) + add_operations = self._add_destructive_operation( columns, pos, new_kwarg, struct, root_struct ) + return drop_operations + add_operations + def _resolve_alter_operations( self, parent_columns: t.List[TableAlterColumn], @@ -745,5 +782,75 @@ def get_schema_differ(dialect: str) -> SchemaDiffer: return getattr(engine_adapter_class, "SCHEMA_DIFFER", SchemaDiffer()) +def has_additive_changes(alter_expressions: t.List[exp.Alter]) -> bool: + return len(get_additive_changes(alter_expressions)) > 0 + + +def _is_destructive_action(action: exp.Expression) -> bool: + if isinstance(action, exp.Drop): + # Dropping columns/constraints is destructive + return True + if isinstance(action, exp.AlterColumn): + # Altering column types is treated as additive (non-destructive) + return False + if isinstance(action, exp.ColumnDef): + # Adding columns is additive by default, but can be marked as destructive + # (e.g., when part of a DROP+ADD sequence for incompatible type changes) + return action.meta.get("sqlmesh_destructive", False) + # Conservative default: unknown action types are considered destructive + # This ensures safety - unknown actions won't be accidentally treated as safe + return True + + +def _is_additive_action(action: exp.Expression) -> bool: + return not _is_destructive_action(action) + + +def get_additive_changes(alter_expressions: t.List[exp.Alter]) -> t.List[exp.Alter]: + additive_changes = [] + + for alter in alter_expressions: + actions = alter.args.get("actions", []) + if actions and all(_is_additive_action(action) for action in actions): + additive_changes.append(alter) + + return additive_changes + + +def filter_additive_changes(alter_expressions: t.List[exp.Alter]) -> t.List[exp.Alter]: + filtered_expressions = [] + + for alter in alter_expressions: + actions = alter.args.get("actions", []) + non_additive_actions = [action for action in actions if not _is_additive_action(action)] + + # Only include the ALTER expression if it has non-additive actions + if non_additive_actions: + filtered_alter = alter.copy() + filtered_alter.set("actions", non_additive_actions) + filtered_expressions.append(filtered_alter) + + return filtered_expressions + + +def filter_destructive_changes(alter_expressions: t.List[exp.Alter]) -> t.List[exp.Alter]: + filtered_expressions = [] + + for alter in alter_expressions: + actions = alter.args.get("actions", []) + # Use the single source of truth for destructive classification + non_destructive_actions = [ + action for action in actions if not _is_destructive_action(action) + ] + + # Only include the ALTER expression if it has non-destructive actions + if non_destructive_actions: + filtered_alter = alter.copy() + filtered_alter.set("actions", non_destructive_actions) + filtered_expressions.append(filtered_alter) + + return filtered_expressions + + def _get_name_and_type(struct: exp.ColumnDef) -> t.Tuple[exp.Identifier, exp.DataType]: return struct.this, struct.args["kind"] diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 266a974821..94c6abb60c 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1117,6 +1117,16 @@ def needs_destructive_check( and self.name not in allow_destructive_snapshots ) + def needs_additive_check( + self, + allow_additive_snapshots: t.Set[str], + ) -> bool: + return ( + self.is_model + and not self.model.on_additive_change.is_allow + and self.name not in allow_additive_snapshots + ) + def get_next_auto_restatement_interval(self, execution_time: TimeLike) -> t.Optional[Interval]: """Returns the next auto restatement interval for the snapshot. diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index bdbf76250f..c770599250 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -50,8 +50,16 @@ ViewKind, CustomKind, ) +from sqlmesh.core.model.kind import _Incremental from sqlmesh.utils import CompletionStatus -from sqlmesh.core.schema_diff import has_drop_alteration, get_dropped_column_names +from sqlmesh.core.schema_diff import ( + has_drop_alteration, + get_dropped_column_names, + has_additive_changes, + get_additive_changes, + filter_additive_changes, + filter_destructive_changes, +) from sqlmesh.core.snapshot import ( DeployabilityIndex, Intervals, @@ -71,8 +79,10 @@ from sqlmesh.utils.errors import ( ConfigError, DestructiveChangeError, + AdditiveChangeError, SQLMeshError, format_destructive_change_msg, + format_additive_change_msg, ) if sys.version_info >= (3, 12): @@ -872,10 +882,17 @@ def _create_snapshot( alter_expressions = adapter.get_alter_expressions( target_table_name, tmp_table_name ) + # Apply IGNORE filtering BEFORE validation and execution + filtered_expressions = _filter_alter_expressions_by_ignore_settings( + snapshot, alter_expressions + ) _check_destructive_schema_change( - snapshot, alter_expressions, allow_destructive_snapshots + snapshot, filtered_expressions, allow_destructive_snapshots + ) + _check_additive_schema_change( + snapshot, filtered_expressions, allow_destructive_snapshots ) - adapter.alter_table(alter_expressions) + adapter.alter_table(filtered_expressions) except Exception: adapter.drop_table(target_table_name) raise @@ -1583,10 +1600,17 @@ def migrate( ) -> None: logger.info(f"Altering table '{target_table_name}'") alter_expressions = self.adapter.get_alter_expressions(target_table_name, source_table_name) + # Apply IGNORE filtering BEFORE validation and execution + filtered_expressions = _filter_alter_expressions_by_ignore_settings( + snapshot, alter_expressions + ) _check_destructive_schema_change( - snapshot, alter_expressions, kwargs["allow_destructive_snapshots"] + snapshot, filtered_expressions, kwargs["allow_destructive_snapshots"] + ) + _check_additive_schema_change( + snapshot, filtered_expressions, kwargs["allow_destructive_snapshots"] ) - self.adapter.alter_table(alter_expressions) + self.adapter.alter_table(filtered_expressions) def delete(self, name: str, **kwargs: t.Any) -> None: _check_table_db_is_physical_schema(name, kwargs["physical_schema"]) @@ -2261,6 +2285,36 @@ def delete(self, name: str, **kwargs: t.Any) -> None: logger.info("Dropped dev preview for managed table '%s'", name) +def _filter_alter_expressions_by_ignore_settings( + snapshot: Snapshot, alter_expressions: t.List[exp.Alter] +) -> t.List[exp.Alter]: + """ + Filter alter expressions based on the snapshot's ignore settings. + + When IGNORE is set for a change type, this function removes those + change types from the alter expressions, ensuring they never reach + validation or execution. + + Args: + snapshot: The snapshot with ignore settings + alter_expressions: The original alter expressions + + Returns: + Filtered alter expressions with ignored change types removed + """ + filtered_expressions = alter_expressions + + # Filter destructive changes if on_destructive_change is IGNORE + if snapshot.model.on_destructive_change.is_ignore: + filtered_expressions = filter_destructive_changes(filtered_expressions) + + # Filter additive changes if on_additive_change is IGNORE + if snapshot.model.on_additive_change.is_ignore: + filtered_expressions = filter_additive_changes(filtered_expressions) + + return filtered_expressions + + def _intervals(snapshot: Snapshot, deployability_index: DeployabilityIndex) -> Intervals: return ( snapshot.intervals @@ -2277,6 +2331,8 @@ def _check_destructive_schema_change( if snapshot.needs_destructive_check(allow_destructive_snapshots) and has_drop_alteration( alter_expressions ): + # Note: IGNORE filtering is applied before this function is called + # so if we reach here, destructive changes are not being ignored snapshot_name = snapshot.name dropped_column_names = get_dropped_column_names(alter_expressions) model_dialect = snapshot.model.dialect @@ -2299,6 +2355,40 @@ def _check_destructive_schema_change( ) +def _check_additive_schema_change( + snapshot: Snapshot, + alter_expressions: t.List[exp.Alter], + allow_destructive_snapshots: t.Set[str], +) -> None: + # Only check additive changes for incremental models that have the on_additive_change property + if not isinstance(snapshot.model.kind, _Incremental): + return + + if snapshot.needs_destructive_check(allow_destructive_snapshots) and has_additive_changes( + alter_expressions + ): + # Note: IGNORE filtering is applied before this function is called + # so if we reach here, additive changes are not being ignored + snapshot_name = snapshot.name + additive_changes = get_additive_changes(alter_expressions) + model_dialect = snapshot.model.dialect + + if snapshot.model.on_additive_change.is_warn: + logger.warning( + format_additive_change_msg( + snapshot_name, + additive_changes, + model_dialect, + error=False, + ) + ) + return + if snapshot.model.on_additive_change.is_error: + raise AdditiveChangeError( + format_additive_change_msg(snapshot_name, additive_changes, model_dialect) + ) + + def _check_table_db_is_physical_schema(table_name: str, physical_schema: str) -> None: table = exp.to_table(table_name) if table.db != physical_schema: diff --git a/sqlmesh/dbt/model.py b/sqlmesh/dbt/model.py index 4cbca09aee..e105bc464a 100644 --- a/sqlmesh/dbt/model.py +++ b/sqlmesh/dbt/model.py @@ -22,7 +22,7 @@ ManagedKind, create_sql_model, ) -from sqlmesh.core.model.kind import SCDType2ByTimeKind, OnDestructiveChange +from sqlmesh.core.model.kind import SCDType2ByTimeKind, OnDestructiveChange, OnAdditiveChange from sqlmesh.dbt.basemodel import BaseModelConfig, Materialization, SnapshotStrategy from sqlmesh.dbt.common import SqlStr, extract_jinja_config, sql_str_validator from sqlmesh.utils.errors import ConfigError @@ -90,7 +90,7 @@ class ModelConfig(BaseModelConfig): unique_key: t.Optional[t.List[str]] = None partition_by: t.Optional[t.Union[t.List[str], t.Dict[str, t.Any]]] = None full_refresh: t.Optional[bool] = None - on_schema_change: t.Optional[str] = None + on_schema_change: str = "ignore" # Snapshot (SCD Type 2) Fields updated_at: t.Optional[str] = None @@ -226,16 +226,32 @@ def model_kind(self, context: DbtContext) -> ModelKind: # args common to all sqlmesh incremental kinds, regardless of materialization incremental_kind_kwargs: t.Dict[str, t.Any] = {} - if self.on_schema_change: - on_schema_change = self.on_schema_change.lower() + on_schema_change = self.on_schema_change.lower() - on_destructive_change = OnDestructiveChange.WARN - if on_schema_change == "sync_all_columns": - on_destructive_change = OnDestructiveChange.ALLOW - elif on_schema_change in ("fail", "append_new_columns", "ignore"): - on_destructive_change = OnDestructiveChange.ERROR + if materialization == Materialization.SNAPSHOT: + # dbt snapshots default to `append_new_columns` behavior and can't be changed + on_schema_change = "append_new_columns" + + if on_schema_change == "ignore": + on_destructive_change = OnDestructiveChange.IGNORE + on_additive_change = OnAdditiveChange.IGNORE + elif on_schema_change == "fail": + on_destructive_change = OnDestructiveChange.ERROR + on_additive_change = OnAdditiveChange.ERROR + elif on_schema_change == "append_new_columns": + on_destructive_change = OnDestructiveChange.IGNORE + on_additive_change = OnAdditiveChange.ALLOW + elif on_schema_change == "sync_all_columns": + on_destructive_change = OnDestructiveChange.ALLOW + on_additive_change = OnAdditiveChange.ALLOW + else: + raise ConfigError( + f"{self.canonical_name(context)}: Invalid on_schema_change value '{on_schema_change}'. " + "Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'." + ) - incremental_kind_kwargs["on_destructive_change"] = on_destructive_change + incremental_kind_kwargs["on_destructive_change"] = on_destructive_change + incremental_kind_kwargs["on_additive_change"] = on_additive_change for field in ("forward_only", "auto_restatement_cron"): field_val = getattr(self, field, None) diff --git a/sqlmesh/migrations/v0088_add_on_additive_change_to_incremental_metadata_hash.py b/sqlmesh/migrations/v0088_add_on_additive_change_to_incremental_metadata_hash.py new file mode 100644 index 0000000000..56059b982f --- /dev/null +++ b/sqlmesh/migrations/v0088_add_on_additive_change_to_incremental_metadata_hash.py @@ -0,0 +1,5 @@ +"""Add on_additive_change to incremental model metadata hash.""" + + +def migrate(state_sync, **kwargs): # type: ignore + pass diff --git a/sqlmesh/utils/errors.py b/sqlmesh/utils/errors.py index 9974fdce0a..1e74185d90 100644 --- a/sqlmesh/utils/errors.py +++ b/sqlmesh/utils/errors.py @@ -129,6 +129,10 @@ class DestructiveChangeError(SQLMeshError): pass +class AdditiveChangeError(SQLMeshError): + pass + + class NotificationTargetError(SQLMeshError): pass @@ -210,27 +214,94 @@ def raise_for_status(response: Response) -> None: raise ApiServerError(response.text, response.status_code) -def format_destructive_change_msg( +def _format_schema_change_msg( snapshot_name: str, - dropped_column_names: t.List[str], + is_destructive: bool, + column_names: t.List[str], alter_expressions: t.List[exp.Alter], dialect: str, error: bool = True, ) -> str: - dropped_column_str = "', '".join(dropped_column_names) - dropped_column_msg = ( - f" that drops column{'s' if dropped_column_names and len(dropped_column_names) > 1 else ''} '{dropped_column_str}'" - if dropped_column_str + """ + Common function to format schema change messages. + + Args: + snapshot_name: Name of the model/snapshot + is_destructive: if change is destructive else it would be additive + column_names: List of affected column names + alter_expressions: List of ALTER expressions + dialect: SQL dialect for formatting + error: Whether this is an error or warning + """ + change_type = "destructive" if is_destructive else "additive" + setting_name = "on_destructive_change" if is_destructive else "on_additive_change" + action_verb = "drops" if is_destructive else "adds" + cli_flag = "--allow-destructive-model" if is_destructive else "--allow-additive-model" + + column_str = "', '".join(column_names) + column_msg = ( + f" that {action_verb} column{'s' if column_names and len(column_names) > 1 else ''} '{column_str}'" + if column_str else "" ) + # Format ALTER expressions alter_expr_msg = "\n\nSchema changes:\n " + "\n ".join( [alter.sql(dialect) for alter in alter_expressions] ) + # Main warning message warning_msg = ( - f"Plan requires a destructive change to forward-only model '{snapshot_name}'s schema" + f"Plan requires {change_type} change to forward-only model '{snapshot_name}'s schema" ) - err_msg = "\n\nTo allow the destructive change, set the model's `on_destructive_change` setting to `warn` or `allow` or include the model in the plan's `--allow-destructive-model` option.\n" - return f"\n{warning_msg}{dropped_column_msg}.{alter_expr_msg}{err_msg if error else ''}" + if error: + permissive_values = "`warn`, `allow`, or `ignore`" + cli_part = f" or include the model in the plan's `{cli_flag}` option" + err_msg = f"\n\nTo allow the {change_type} change, set the model's `{setting_name}` setting to {permissive_values}{cli_part}.\n" + else: + err_msg = "" + + return f"\n{warning_msg}{column_msg}.{alter_expr_msg}{err_msg}" + + +def format_destructive_change_msg( + snapshot_name: str, + dropped_column_names: t.List[str], + alter_expressions: t.List[exp.Alter], + dialect: str, + error: bool = True, +) -> str: + return _format_schema_change_msg( + snapshot_name=snapshot_name, + is_destructive=True, + column_names=dropped_column_names, + alter_expressions=alter_expressions, + dialect=dialect, + error=error, + ) + + +def format_additive_change_msg( + snapshot_name: str, + additive_changes: t.List[exp.Alter], + dialect: str, + error: bool = True, +) -> str: + # Extract column names being added from the alter expressions + added_column_names = [] + for alter in additive_changes: + actions = alter.args.get("actions", []) + for action in actions: + if isinstance(action, exp.ColumnDef): + # Adding columns + added_column_names.append(action.alias_or_name) + + return _format_schema_change_msg( + snapshot_name=snapshot_name, + is_destructive=False, + column_names=added_column_names, + alter_expressions=additive_changes, + dialect=dialect, + error=error, + ) diff --git a/tests/core/test_model.py b/tests/core/test_model.py index 3cadbae9ca..4e05ba584e 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -1814,7 +1814,8 @@ def test_render_definition(): partition_by_time_column TRUE, forward_only FALSE, disable_restatement FALSE, - on_destructive_change 'ERROR' + on_destructive_change 'ERROR', + on_additive_change 'ALLOW' ), storage_format iceberg, partitioned_by `a`, @@ -5521,7 +5522,8 @@ def test_when_matched(): batch_concurrency 1, forward_only FALSE, disable_restatement FALSE, - on_destructive_change 'ERROR' + on_destructive_change 'ERROR', + on_additive_change 'ALLOW' ) ); @@ -5573,7 +5575,8 @@ def fingerprint_merge( batch_concurrency 1, forward_only FALSE, disable_restatement FALSE, - on_destructive_change 'ERROR' + on_destructive_change 'ERROR', + on_additive_change 'ALLOW' ) ); @@ -7462,6 +7465,45 @@ def test_forward_only_on_destructive_change_config() -> None: assert context_model.on_destructive_change.is_allow +def test_model_meta_on_additive_change_property() -> None: + """Test that ModelMeta has on_additive_change property that works like on_destructive_change.""" + from sqlmesh.core.model.kind import IncrementalByTimeRangeKind, OnAdditiveChange + from sqlmesh.core.model.meta import ModelMeta + + # Test incremental model with on_additive_change=ERROR + incremental_kind = IncrementalByTimeRangeKind( + time_column="c", + forward_only=True, + on_additive_change=OnAdditiveChange.ERROR, + ) + model_meta = ModelMeta(name="test_model", kind=incremental_kind) + assert model_meta.on_additive_change == OnAdditiveChange.ERROR + + # Test incremental model with on_additive_change=WARN + incremental_kind = IncrementalByTimeRangeKind( + time_column="c", + forward_only=True, + on_additive_change=OnAdditiveChange.WARN, + ) + model_meta = ModelMeta(name="test_model", kind=incremental_kind) + assert model_meta.on_additive_change == OnAdditiveChange.WARN + + # Test incremental model with default on_additive_change (should be ALLOW) + incremental_kind = IncrementalByTimeRangeKind( + time_column="c", + forward_only=True, + ) + model_meta = ModelMeta(name="test_model", kind=incremental_kind) + assert model_meta.on_additive_change == OnAdditiveChange.ALLOW + + # Test non-incremental model (should return None) + from sqlmesh.core.model.kind import FullKind + + full_kind = FullKind() + model_meta = ModelMeta(name="test_model", kind=full_kind) + assert model_meta.on_additive_change == OnAdditiveChange.ALLOW + + def test_incremental_by_partition(sushi_context, assert_exp_eq): expressions = d.parse( """ @@ -7780,7 +7822,8 @@ def test_model_kind_to_expression(): partition_by_time_column TRUE, forward_only FALSE, disable_restatement FALSE, -on_destructive_change 'ERROR' +on_destructive_change 'ERROR', +on_additive_change 'ALLOW' )""" ) @@ -7814,7 +7857,8 @@ def test_model_kind_to_expression(): lookback 3, forward_only TRUE, disable_restatement TRUE, -on_destructive_change 'WARN' +on_destructive_change 'WARN', +on_additive_change 'ALLOW' )""" ) @@ -7839,7 +7883,8 @@ def test_model_kind_to_expression(): batch_concurrency 1, forward_only FALSE, disable_restatement FALSE, -on_destructive_change 'ERROR' +on_destructive_change 'ERROR', +on_additive_change 'ALLOW' )""" ) @@ -7866,7 +7911,8 @@ def test_model_kind_to_expression(): batch_concurrency 1, forward_only FALSE, disable_restatement FALSE, -on_destructive_change 'ERROR' +on_destructive_change 'ERROR', +on_additive_change 'ALLOW' )""" ) @@ -7894,7 +7940,8 @@ def test_model_kind_to_expression(): batch_concurrency 1, forward_only FALSE, disable_restatement FALSE, -on_destructive_change 'ERROR' +on_destructive_change 'ERROR', +on_additive_change 'ALLOW' )""" ) @@ -7916,7 +7963,8 @@ def test_model_kind_to_expression(): == """INCREMENTAL_BY_PARTITION ( forward_only TRUE, disable_restatement FALSE, -on_destructive_change 'ERROR' +on_destructive_change 'ERROR', +on_additive_change 'ALLOW' )""" ) @@ -7968,7 +8016,8 @@ def test_model_kind_to_expression(): time_data_type TIMESTAMP, forward_only TRUE, disable_restatement TRUE, -on_destructive_change 'ERROR' +on_destructive_change 'ERROR', +on_additive_change 'ALLOW' )""" ) @@ -7999,7 +8048,8 @@ def test_model_kind_to_expression(): time_data_type TIMESTAMP, forward_only TRUE, disable_restatement TRUE, -on_destructive_change 'ERROR' +on_destructive_change 'ERROR', +on_additive_change 'ALLOW' )""" ) @@ -8030,7 +8080,8 @@ def test_model_kind_to_expression(): time_data_type TIMESTAMP, forward_only TRUE, disable_restatement TRUE, -on_destructive_change 'ERROR' +on_destructive_change 'ERROR', +on_additive_change 'ALLOW' )""" ) @@ -8211,7 +8262,8 @@ def test_merge_filter(): batch_concurrency 1, forward_only FALSE, disable_restatement FALSE, - on_destructive_change 'ERROR' + on_destructive_change 'ERROR', + on_additive_change 'ALLOW' ) ); @@ -8938,6 +8990,7 @@ def test_auto_restatement(): forward_only FALSE, disable_restatement FALSE, on_destructive_change 'ERROR', + on_additive_change 'ALLOW', auto_restatement_cron '@daily' )""" ) @@ -8967,6 +9020,7 @@ def test_auto_restatement(): forward_only FALSE, disable_restatement FALSE, on_destructive_change 'ERROR', + on_additive_change 'ALLOW', auto_restatement_cron '@daily' )""" ) diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index efaeba8623..3dae3c9d3b 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -24,7 +24,7 @@ SqlModel, ModelKindName, ) -from sqlmesh.core.model.kind import OnDestructiveChange +from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange from sqlmesh.core.model.seed import Seed from sqlmesh.core.plan import Plan, PlanBuilder, SnapshotIntervals from sqlmesh.core.snapshot import ( @@ -750,6 +750,7 @@ def test_missing_intervals_lookback(make_snapshot, mocker: MockerFixture): no_gaps=False, forward_only=False, allow_destructive_models=set(), + allow_additive_models=set(), include_unmodified=True, environment_naming_info=EnvironmentNamingInfo(), directly_modified={snapshot_a.snapshot_id}, @@ -3312,6 +3313,645 @@ def _build_plan() -> Plan: assert to_datetime(plan.execution_time) == to_datetime(output_execution_time) +def test_plan_builder_additive_change_error_blocks_plan(make_snapshot): + """Test that additive changes block plan when on_additive_change=ERROR.""" + # Create models with actual schema differences + # Use explicit column schemas in CTE so columns_to_types can be determined + old_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, '2022-01-01'::DATE as ds + ) + select id, name, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.ERROR, + ), + ) + + # New model with additional column (additive change) + new_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, 'email@test.com'::VARCHAR as email, '2022-01-01'::DATE as ds + ) + select id, name, email, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.ERROR, + ), + ) + + old_snapshot = make_snapshot(old_model) + new_snapshot = make_snapshot(new_model) + + # Set previous versions to simulate a modification + new_snapshot.previous_versions = ( + SnapshotDataVersion( + fingerprint=SnapshotFingerprint( + data_hash="old_data_hash", + metadata_hash="old_metadata_hash", + ), + version="old_version", + change_category=SnapshotChangeCategory.FORWARD_ONLY, + dev_table_suffix="dev", + ), + ) + + context_diff = ContextDiff( + environment="prod", + is_new_environment=False, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={old_snapshot.name: (new_snapshot, old_snapshot)}, + snapshots={ + old_snapshot.snapshot_id: old_snapshot, + new_snapshot.snapshot_id: new_snapshot, + }, + new_snapshots={new_snapshot.snapshot_id: new_snapshot}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + builder = PlanBuilder(context_diff, forward_only=True) + + # Should raise PlanError for additive changes when on_additive_change=ERROR + with pytest.raises(PlanError, match="additive change"): + builder.build() + + +def test_plan_builder_additive_change_warn_allows_plan(make_snapshot): + """Test that additive changes allow plan with warning when on_additive_change=WARN.""" + old_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, '2022-01-01'::DATE as ds + ) + select id, name, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.WARN, + ), + ) + + new_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, 'email@test.com'::VARCHAR as email, '2022-01-01'::DATE as ds + ) + select id, name, email, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.WARN, + ), + ) + + old_snapshot = make_snapshot(old_model) + new_snapshot = make_snapshot(new_model) + + # Set previous versions to simulate a modification + new_snapshot.previous_versions = ( + SnapshotDataVersion( + fingerprint=SnapshotFingerprint( + data_hash="old_data_hash", + metadata_hash="old_metadata_hash", + ), + version="old_version", + change_category=SnapshotChangeCategory.FORWARD_ONLY, + dev_table_suffix="dev", + ), + ) + + context_diff = ContextDiff( + environment="prod", + is_new_environment=False, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={old_snapshot.name: (new_snapshot, old_snapshot)}, + snapshots={ + old_snapshot.snapshot_id: old_snapshot, + new_snapshot.snapshot_id: new_snapshot, + }, + new_snapshots={new_snapshot.snapshot_id: new_snapshot}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + builder = PlanBuilder(context_diff, forward_only=True) + + # Should log warning but not fail + with patch.object(builder._console, "log_additive_change") as mock_log_additive: + plan = builder.build() + assert plan is not None + mock_log_additive.assert_called() # Should have logged an additive change + + +def test_plan_builder_additive_change_allow_permits_plan(make_snapshot): + """Test that additive changes are permitted when on_additive_change=ALLOW.""" + old_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, '2022-01-01'::DATE as ds + ) + select id, name, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.ALLOW, + ), + ) + + new_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, 'email@test.com'::VARCHAR as email, '2022-01-01'::DATE as ds + ) + select id, name, email, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.ALLOW, + ), + ) + + old_snapshot = make_snapshot(old_model) + new_snapshot = make_snapshot(new_model) + + # Set previous versions to simulate a modification + new_snapshot.previous_versions = ( + SnapshotDataVersion( + fingerprint=SnapshotFingerprint( + data_hash="old_data_hash", + metadata_hash="old_metadata_hash", + ), + version="old_version", + change_category=SnapshotChangeCategory.FORWARD_ONLY, + dev_table_suffix="dev", + ), + ) + + context_diff = ContextDiff( + environment="prod", + is_new_environment=False, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={old_snapshot.name: (new_snapshot, old_snapshot)}, + snapshots={ + old_snapshot.snapshot_id: old_snapshot, + new_snapshot.snapshot_id: new_snapshot, + }, + new_snapshots={new_snapshot.snapshot_id: new_snapshot}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + builder = PlanBuilder(context_diff, forward_only=True) + + # Should build plan without issues + plan = builder.build() + assert plan is not None + + +def test_plan_builder_additive_change_ignore_skips_validation(make_snapshot): + """Test that additive changes are ignored when on_additive_change=IGNORE.""" + old_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, '2022-01-01'::DATE as ds + ) + select id, name, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.IGNORE, + ), + ) + + new_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, 'email@test.com'::VARCHAR as email, '2022-01-01'::DATE as ds + ) + select id, name, email, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.IGNORE, + ), + ) + + old_snapshot = make_snapshot(old_model) + new_snapshot = make_snapshot(new_model) + + # Set previous versions to simulate a modification + new_snapshot.previous_versions = ( + SnapshotDataVersion( + fingerprint=SnapshotFingerprint( + data_hash="old_data_hash", + metadata_hash="old_metadata_hash", + ), + version="old_version", + change_category=SnapshotChangeCategory.FORWARD_ONLY, + dev_table_suffix="dev", + ), + ) + + context_diff = ContextDiff( + environment="prod", + is_new_environment=False, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={old_snapshot.name: (new_snapshot, old_snapshot)}, + snapshots={ + old_snapshot.snapshot_id: old_snapshot, + new_snapshot.snapshot_id: new_snapshot, + }, + new_snapshots={new_snapshot.snapshot_id: new_snapshot}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + builder = PlanBuilder(context_diff, forward_only=True) + + # Should build plan without any validation + with patch("sqlmesh.core.plan.builder.logger.warning") as mock_warning: + plan = builder.build() + assert plan is not None + mock_warning.assert_not_called() # Should not log any warnings + + +def test_plan_builder_mixed_destructive_and_additive_changes(make_snapshot): + """Test scenarios with both destructive and additive changes.""" + # Test case: on_destructive_change=IGNORE, on_additive_change=ERROR + # Should ignore destructive changes but error on additive changes + old_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, 'old_value'::VARCHAR as old_col, '2022-01-01'::DATE as ds + ) + select id, name, old_col, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.ERROR, + ), + ) + + new_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, 'new_value'::VARCHAR as new_col, '2022-01-01'::DATE as ds + ) + select id, name, new_col, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.ERROR, + ), + ) + + old_snapshot = make_snapshot(old_model) + new_snapshot = make_snapshot(new_model) + + # Set previous versions to simulate a modification + new_snapshot.previous_versions = ( + SnapshotDataVersion( + fingerprint=SnapshotFingerprint( + data_hash="old_data_hash", + metadata_hash="old_metadata_hash", + ), + version="old_version", + change_category=SnapshotChangeCategory.FORWARD_ONLY, + dev_table_suffix="dev", + ), + ) + + context_diff = ContextDiff( + environment="prod", + is_new_environment=False, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={old_snapshot.name: (new_snapshot, old_snapshot)}, + snapshots={ + old_snapshot.snapshot_id: old_snapshot, + new_snapshot.snapshot_id: new_snapshot, + }, + new_snapshots={new_snapshot.snapshot_id: new_snapshot}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + builder = PlanBuilder(context_diff, forward_only=True) + + # Should error on additive change (new_col), but ignore destructive change (old_col removal) + with pytest.raises(PlanError, match="additive change"): + builder.build() + + +def test_plan_builder_allow_additive_models_flag(make_snapshot): + """Test that --allow-additive-model flag overrides on_additive_change=ERROR.""" + old_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, '2022-01-01'::DATE as ds + ) + select id, name, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.ERROR, + ), + ) + + # New model with additional column (additive change) + new_model = SqlModel( + name="test_model", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, 'email@test.com'::VARCHAR as email, '2022-01-01'::DATE as ds + ) + select id, name, email, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.ERROR, + ), + ) + + old_snapshot = make_snapshot(old_model) + new_snapshot = make_snapshot(new_model) + + # Set previous versions to simulate a modification + new_snapshot.previous_versions = ( + SnapshotDataVersion( + fingerprint=SnapshotFingerprint( + data_hash="old_data_hash", + metadata_hash="old_metadata_hash", + ), + version="old_version", + change_category=SnapshotChangeCategory.FORWARD_ONLY, + dev_table_suffix="dev", + ), + ) + + context_diff = ContextDiff( + environment="prod", + is_new_environment=False, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={new_snapshot.name: (new_snapshot, old_snapshot)}, + snapshots={new_snapshot.snapshot_id: new_snapshot}, + new_snapshots={new_snapshot.snapshot_id: new_snapshot}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + # First, verify that without the flag, the plan fails with additive change error + builder = PlanBuilder(context_diff, forward_only=True) + with pytest.raises(PlanError, match="additive change"): + builder.build() + + # Now test that the --allow-additive-model flag allows the plan to succeed + builder_with_flag = PlanBuilder( + context_diff, + forward_only=True, + allow_additive_models={'"test_model"'}, + ) + + # Should succeed without raising an exception + plan = builder_with_flag.build() + assert plan is not None + + +def test_plan_builder_allow_additive_models_pattern_matching(make_snapshot): + """Test that --allow-additive-model flag supports pattern matching like destructive models.""" + # Create two models with additive changes + old_model_1 = SqlModel( + name="test.model_1", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, '2022-01-01'::DATE as ds + ) + select id, name, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.ERROR, + ), + ) + + new_model_1 = SqlModel( + name="test.model_1", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, 'email@test.com'::VARCHAR as email, '2022-01-01'::DATE as ds + ) + select id, name, email, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.ERROR, + ), + ) + + old_model_2 = SqlModel( + name="other.model_2", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, '2022-01-01'::DATE as ds + ) + select id, name, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.ERROR, + ), + ) + + new_model_2 = SqlModel( + name="other.model_2", + dialect="duckdb", + query=parse_one(""" + with source as ( + select 1::INT as id, 'test'::VARCHAR as name, 'phone'::VARCHAR as phone, '2022-01-01'::DATE as ds + ) + select id, name, phone, ds from source + """), + kind=IncrementalByTimeRangeKind( + time_column="ds", + forward_only=True, + on_additive_change=OnAdditiveChange.ERROR, + ), + ) + + old_snapshot_1 = make_snapshot(old_model_1) + new_snapshot_1 = make_snapshot(new_model_1) + old_snapshot_2 = make_snapshot(old_model_2) + new_snapshot_2 = make_snapshot(new_model_2) + + # Set previous versions to simulate modifications + for new_snapshot in [new_snapshot_1, new_snapshot_2]: + new_snapshot.previous_versions = ( + SnapshotDataVersion( + fingerprint=SnapshotFingerprint( + data_hash="old_data_hash", + metadata_hash="old_metadata_hash", + ), + version="old_version", + change_category=SnapshotChangeCategory.FORWARD_ONLY, + dev_table_suffix="dev", + ), + ) + + context_diff = ContextDiff( + environment="prod", + is_new_environment=False, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added=set(), + removed_snapshots={}, + modified_snapshots={ + new_snapshot_1.name: (new_snapshot_1, old_snapshot_1), + new_snapshot_2.name: (new_snapshot_2, old_snapshot_2), + }, + snapshots={ + new_snapshot_1.snapshot_id: new_snapshot_1, + new_snapshot_2.snapshot_id: new_snapshot_2, + }, + new_snapshots={ + new_snapshot_1.snapshot_id: new_snapshot_1, + new_snapshot_2.snapshot_id: new_snapshot_2, + }, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + # Test pattern matching: allow only models in "test" schema + # In real usage, patterns would be expanded by Context.expand_model_selections + # Here we simulate what the expansion would produce + builder_with_pattern = PlanBuilder( + context_diff, + forward_only=True, + allow_additive_models={'"test"."model_1"'}, # Only allow test.model_1, not other.model_2 + ) + + # Should still fail because other.model_2 is not allowed + with pytest.raises(PlanError, match="additive change"): + builder_with_pattern.build() + + # Test allowing both patterns + builder_with_both = PlanBuilder( + context_diff, + forward_only=True, + allow_additive_models={'"test"."model_1"', '"other"."model_2"'}, # Allow both models + ) + + # Should succeed + plan = builder_with_both.build() + assert plan is not None + + def test_environment_statements_change_allows_dev_environment_creation(make_snapshot): snapshot = make_snapshot( SqlModel( diff --git a/tests/core/test_plan_stages.py b/tests/core/test_plan_stages.py index d79be24262..fa092e5097 100644 --- a/tests/core/test_plan_stages.py +++ b/tests/core/test_plan_stages.py @@ -103,6 +103,7 @@ def test_build_plan_stages_basic( restatements={}, is_dev=False, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, @@ -214,6 +215,7 @@ def test_build_plan_stages_with_before_all_and_after_all( restatements={}, is_dev=False, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, @@ -322,6 +324,7 @@ def test_build_plan_stages_select_models( restatements={}, is_dev=False, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, @@ -422,6 +425,7 @@ def test_build_plan_stages_basic_no_backfill( restatements={}, is_dev=False, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, @@ -531,6 +535,7 @@ def test_build_plan_stages_restatement( }, is_dev=False, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, @@ -640,6 +645,7 @@ def test_build_plan_stages_forward_only( restatements={}, is_dev=False, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, @@ -768,6 +774,7 @@ def test_build_plan_stages_forward_only_dev( restatements={}, is_dev=True, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, @@ -891,6 +898,7 @@ def _get_snapshots(snapshot_ids: t.List[SnapshotId]) -> t.Dict[SnapshotId, Snaps restatements={}, is_dev=True, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, @@ -1014,6 +1022,7 @@ def test_build_plan_stages_forward_only_ensure_finalized_snapshots( restatements={}, is_dev=False, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=True, @@ -1088,6 +1097,7 @@ def test_build_plan_stages_removed_model( restatements={}, is_dev=False, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, @@ -1169,6 +1179,7 @@ def test_build_plan_stages_environment_suffix_target_changed( restatements={}, is_dev=True, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, @@ -1266,6 +1277,7 @@ def test_build_plan_stages_indirect_non_breaking_view_migration( restatements={}, is_dev=False, allow_destructive_models=set(), + allow_additive_models=set(), forward_only=False, end_bounded=False, ensure_finalized_snapshots=False, diff --git a/tests/core/test_schema_diff.py b/tests/core/test_schema_diff.py index 1e57cab57c..a6d37d59d3 100644 --- a/tests/core/test_schema_diff.py +++ b/tests/core/test_schema_diff.py @@ -10,9 +10,30 @@ TableAlterColumnPosition, TableAlterOperation, get_schema_differ, + has_additive_changes, + filter_additive_changes, + has_drop_alteration, + filter_destructive_changes, + get_additive_changes, ) +def is_only_additive_changes(alter_expressions: t.List[exp.Alter]) -> bool: + """ + Check if all changes in the list of alter expressions are additive. + + Args: + alter_expressions: List of ALTER TABLE expressions + + Returns: + True if all changes are additive (or no changes), False if there are any non-additive changes + """ + if not alter_expressions: + return True # No changes at all - this should be considered "only additive" + + return len(get_additive_changes(alter_expressions)) == len(alter_expressions) + + def test_schema_diff_calculate(): alter_expressions = SchemaDiffer( **{ @@ -521,6 +542,7 @@ def test_schema_diff_calculate_type_transitions(): ], expected_table_struct="STRUCT>", column_type="STRUCT", + is_part_of_destructive_change=True, ), ], dict( @@ -649,6 +671,7 @@ def test_schema_diff_calculate_type_transitions(): ], expected_table_struct="STRUCT>", column_type="STRUCT", + is_part_of_destructive_change=True, ), ], dict( @@ -835,6 +858,7 @@ def test_schema_diff_calculate_type_transitions(): ], "ARRAY", expected_table_struct="STRUCT", + is_part_of_destructive_change=True, ), ], {}, @@ -857,6 +881,7 @@ def test_schema_diff_calculate_type_transitions(): ], "INT", expected_table_struct="STRUCT", + is_part_of_destructive_change=True, ), ], {}, @@ -915,6 +940,7 @@ def test_schema_diff_calculate_type_transitions(): TableAlterColumn.primitive("address"), "VARCHAR(121)", expected_table_struct="STRUCT", + is_part_of_destructive_change=True, ), ], dict( @@ -936,6 +962,7 @@ def test_schema_diff_calculate_type_transitions(): "VARCHAR(100)", expected_table_struct="STRUCT", position=TableAlterColumnPosition.last("id"), + is_part_of_destructive_change=True, ), ], dict( @@ -959,6 +986,7 @@ def test_schema_diff_calculate_type_transitions(): "VARCHAR", expected_table_struct="STRUCT", position=TableAlterColumnPosition.last("id"), + is_part_of_destructive_change=True, ), ], dict( @@ -980,6 +1008,7 @@ def test_schema_diff_calculate_type_transitions(): "VARCHAR(120)", expected_table_struct="STRUCT", position=TableAlterColumnPosition.last("id"), + is_part_of_destructive_change=True, ), ], dict( @@ -1019,6 +1048,7 @@ def test_schema_diff_calculate_type_transitions(): "VARCHAR", expected_table_struct="STRUCT", position=TableAlterColumnPosition.last("id"), + is_part_of_destructive_change=True, ), ], dict( @@ -1079,6 +1109,7 @@ def test_schema_diff_calculate_type_transitions(): "VARCHAR(120)", expected_table_struct="STRUCT", position=TableAlterColumnPosition.last("id"), + is_part_of_destructive_change=True, ), ], dict( @@ -1123,6 +1154,7 @@ def test_schema_diff_calculate_type_transitions(): "VARCHAR(120)", expected_table_struct="STRUCT", position=TableAlterColumnPosition.last("id"), + is_part_of_destructive_change=True, ), ], dict( @@ -1376,3 +1408,454 @@ def test_get_schema_differ(): schema_differ_upper.support_coercing_compatible_types == schema_differ_lower.support_coercing_compatible_types ) + + +def test_destructive_add_operation_metadata(): + """Test that ADD operations part of destructive changes are properly marked with metadata.""" + # Test scenario: Type change that requires DROP + ADD + current_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("STRING"), + "value": exp.DataType.build("INT"), + } + new_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("STRING"), + "value": exp.DataType.build("STRING"), # Type change requiring DROP+ADD + } + + differ = SchemaDiffer() + alter_expressions = differ.compare_columns("test_table", current_schema, new_schema) + + # Should generate DROP value + ADD value + assert len(alter_expressions) == 2 + + drop_expr = alter_expressions[0] + add_expr = alter_expressions[1] + + # Verify we have the expected operations + assert "DROP COLUMN value" in drop_expr.sql() + assert "ADD COLUMN value" in add_expr.sql() # SQLGlot might use TEXT instead of STRING + + # The ADD operation should be marked as destructive support via metadata + add_actions = add_expr.args.get("actions", []) + assert len(add_actions) == 1 + add_action = add_actions[0] + + # Check that the ADD action has metadata marking it as destructive + assert add_action.meta.get("sqlmesh_destructive") is True + + +def test_pure_additive_add_operation_no_metadata(): + """Test that pure ADD operations (not part of destructive changes) have no destructive metadata.""" + current_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("STRING"), + } + new_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("STRING"), + "email": exp.DataType.build("STRING"), # Pure addition + "created_at": exp.DataType.build("TIMESTAMP"), # Pure addition + } + + differ = SchemaDiffer() + alter_expressions = differ.compare_columns("test_table", current_schema, new_schema) + + # Should generate ADD email + ADD created_at + assert len(alter_expressions) == 2 + + for add_expr in alter_expressions: + assert "ADD COLUMN" in add_expr.sql() + + add_actions = add_expr.args.get("actions", []) + assert len(add_actions) == 1 + add_action = add_actions[0] + + # Pure ADD operations should NOT have destructive metadata + assert ( + add_action.meta.get("sqlmesh_destructive") is None + or add_action.meta.get("sqlmesh_destructive") is False + ) + + +def test_filtering_with_destructive_metadata(): + """Test that filtering functions respect destructive metadata on ADD operations.""" + from sqlmesh.core.schema_diff import ( + filter_additive_changes, + filter_destructive_changes, + has_additive_changes, + get_additive_changes, + ) + + # Scenario: Type change (destructive) + pure addition (additive) + current_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("STRING"), + } + new_schema = { + "id": exp.DataType.build("BIGINT"), # Type change -> DROP + ADD (both destructive) + "name": exp.DataType.build("STRING"), + "email": exp.DataType.build("STRING"), # Pure addition (additive) + } + + differ = SchemaDiffer() + all_expressions = differ.compare_columns("test_table", current_schema, new_schema) + + # Should have: DROP id + ADD id (with metadata) + ADD email (without metadata) + assert len(all_expressions) == 3 + + # Test filter_additive_changes - should remove the pure ADD operations + non_additive = filter_additive_changes(all_expressions) + # Should keep: DROP id + ADD id (marked destructive) + # Should remove: ADD email (pure additive) + assert len(non_additive) == 2 + for expr in non_additive: + sql = expr.sql() + # Should not contain the pure additive change + assert "ADD COLUMN email" not in sql + + # Test filter_destructive_changes - should remove DROP and marked ADD operations + non_destructive = filter_destructive_changes(all_expressions) + # Should keep: ADD email (pure additive) + # Should remove: DROP id + ADD id (destructive) + assert len(non_destructive) == 1 + assert "ADD COLUMN email" in non_destructive[0].sql() + + # Test has_additive_changes - should only count pure additive changes + assert has_additive_changes(all_expressions) is True # Has pure ADD email + + # Test get_additive_changes - should only return pure additive changes + additive_changes = get_additive_changes(all_expressions) + assert len(additive_changes) == 1 + assert "ADD COLUMN email" in additive_changes[0].sql() + + # Test is_only_additive_changes - should return False due to destructive changes + assert is_only_additive_changes(all_expressions) is False + + +def test_complex_destructive_scenario_metadata(): + """Test complex scenario with multiple destructive changes and metadata propagation.""" + current_schema = { + "id": exp.DataType.build("INT"), + "value": exp.DataType.build("STRING"), + "old_col": exp.DataType.build("DECIMAL(10,2)"), + "unchanged": exp.DataType.build("STRING"), + } + new_schema = { + "id": exp.DataType.build("BIGINT"), # Type change requiring DROP+ADD + "value": exp.DataType.build("INT"), # Type change requiring DROP+ADD + "old_col": exp.DataType.build("DECIMAL(5,1)"), # Precision decrease requiring DROP+ADD + "unchanged": exp.DataType.build("STRING"), + "pure_new": exp.DataType.build("INT"), # Pure addition + } + + differ = SchemaDiffer() + alter_expressions = differ.compare_columns("test_table", current_schema, new_schema) + + # Expected operations based on real behavior: + # 1. DROP id → ADD id BIGINT (destructive pair) + # 2. DROP value → ADD value INT (destructive pair) + # 3. DROP old_col → ADD old_col DECIMAL(5,1) (destructive pair) + # 4. ADD pure_new INT (additive) + + # Count ADD operations with destructive metadata + destructive_add_count = 0 + pure_add_count = 0 + + for expr in alter_expressions: + if "ADD" in expr.sql(): + actions = expr.args.get("actions", []) + if actions and actions[0].meta.get("sqlmesh_destructive"): + destructive_add_count += 1 + else: + pure_add_count += 1 + + # Should have 3 destructive ADD operations (id, value, old_col) and 1 pure ADD (pure_new) + assert destructive_add_count == 3 + assert pure_add_count == 1 + + +def test_integration_destructive_add_filtering(): + """Integration test demonstrating that the fix works in realistic scenarios.""" + from sqlmesh.core.schema_diff import filter_destructive_changes, filter_additive_changes + + # Scenario: User has a model with type changes + pure additions + # When using on_destructive_change=IGNORE, they should only see pure additions + current_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("STRING"), + } + new_schema = { + "id": exp.DataType.build( + "BIGINT" + ), # Type change -> generates DROP id + ADD id (with destructive flag) + "name": exp.DataType.build("STRING"), + "email": exp.DataType.build( + "STRING" + ), # Pure addition -> generates ADD email (no destructive flag) + "age": exp.DataType.build( + "INT" + ), # Pure addition -> generates ADD age (no destructive flag) + } + + differ = SchemaDiffer() + all_changes = differ.compare_columns("test_table", current_schema, new_schema) + + # Should have 4 operations: DROP id + ADD id (destructive) + ADD email + ADD age (additive) + assert len(all_changes) == 4 + + # When user sets on_destructive_change=IGNORE, they should filter out destructive changes + non_destructive_changes = filter_destructive_changes(all_changes) + + # Should only have the 2 pure ADD operations (email, age) + assert len(non_destructive_changes) == 2 + for expr in non_destructive_changes: + assert "ADD COLUMN" in expr.sql() + # Verify these are pure additions, not part of destructive changes + actions = expr.args.get("actions", []) + assert len(actions) == 1 + add_action = actions[0] + assert not add_action.meta.get("sqlmesh_destructive", False) + + # When user sets on_additive_change=IGNORE, they should filter out additive changes + non_additive_changes = filter_additive_changes(all_changes) + + # Should only have the 2 destructive operations (DROP id + ADD id with metadata) + assert len(non_additive_changes) == 2 + drop_found = False + add_found = False + for expr in non_additive_changes: + if "DROP COLUMN" in expr.sql(): + drop_found = True + elif "ADD COLUMN" in expr.sql(): + add_found = True + # Verify this ADD operation has destructive metadata + actions = expr.args.get("actions", []) + assert len(actions) == 1 + add_action = actions[0] + assert add_action.meta.get("sqlmesh_destructive") is True + + assert drop_found and add_found + + +def test_filter_additive_changes_removes_only_additive_expressions(): + """Test that filter_additive_changes removes only additive alter expressions.""" + # Create mixed alter expressions (both additive and destructive) + current_schema = {"id": exp.DataType.build("INT"), "name": exp.DataType.build("STRING")} + new_schema = { + "id": exp.DataType.build("INT"), # no change + "new_col": exp.DataType.build("STRING"), # additive - should be removed + } + + differ = SchemaDiffer() + alter_expressions = differ.compare_columns("test_table", current_schema, new_schema) + + # Should have additive changes initially + assert has_additive_changes(alter_expressions) + + # After filtering, should have no additive changes + filtered_expressions = filter_additive_changes(alter_expressions) + assert not has_additive_changes(filtered_expressions) + + # Original expressions should still have additive changes (not mutated) + assert has_additive_changes(alter_expressions) + + +def test_filter_destructive_changes_removes_only_destructive_expressions(): + """Test that filter_destructive_changes removes only destructive alter expressions.""" + # Create mixed alter expressions (both additive and destructive) + current_schema = {"id": exp.DataType.build("INT"), "name": exp.DataType.build("STRING")} + new_schema = { + "id": exp.DataType.build("INT"), # no change + "name": exp.DataType.build("STRING"), # no change + "new_col": exp.DataType.build("STRING"), # additive - should remain + } + + differ = SchemaDiffer() + alter_expressions = differ.compare_columns("test_table", current_schema, new_schema) + + # Should have no destructive changes initially (only additive) + assert not has_drop_alteration(alter_expressions) + + # After filtering destructive changes, should still have the same expressions + filtered_expressions = filter_destructive_changes(alter_expressions) + assert len(filtered_expressions) == len(alter_expressions) + + # Now test with actual destructive changes + destructive_new_schema = {"id": exp.DataType.build("INT")} # removes name column + destructive_expressions = differ.compare_columns( + "test_table", current_schema, destructive_new_schema + ) + + # Should have destructive changes + assert has_drop_alteration(destructive_expressions) + + # After filtering, should have no destructive changes + filtered_destructive = filter_destructive_changes(destructive_expressions) + assert not has_drop_alteration(filtered_destructive) + + +def test_filter_mixed_changes_correctly(): + """Test filtering when there are both additive and destructive changes.""" + # Schema change that both adds and removes columns + current_schema = { + "id": exp.DataType.build("INT"), + "old_col": exp.DataType.build("STRING"), # will be removed (destructive) + } + new_schema = { + "id": exp.DataType.build("INT"), # unchanged + "new_col": exp.DataType.build("STRING"), # will be added (additive) + } + + differ = SchemaDiffer() + alter_expressions = differ.compare_columns("test_table", current_schema, new_schema) + + # Should have both types of changes + assert has_additive_changes(alter_expressions) + assert has_drop_alteration(alter_expressions) + + # Filter out additive changes - should only have destructive + no_additive = filter_additive_changes(alter_expressions) + assert not has_additive_changes(no_additive) + assert has_drop_alteration(no_additive) + + # Filter out destructive changes - should only have additive + no_destructive = filter_destructive_changes(alter_expressions) + assert has_additive_changes(no_destructive) + assert not has_drop_alteration(no_destructive) + + # Filter out both - should have no changes + no_changes = filter_destructive_changes(filter_additive_changes(alter_expressions)) + assert not has_additive_changes(no_changes) + assert not has_drop_alteration(no_changes) + assert len(no_changes) == 0 + + +def test_alter_column_integration_with_schema_differ(): + """Test that ALTER COLUMN operations work correctly with the SchemaDiffer.""" + # Test additive ALTER COLUMN operations + current_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("VARCHAR(50)"), + "value": exp.DataType.build("DECIMAL(10,2)"), + } + additive_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("VARCHAR(100)"), # Increase precision - additive + "value": exp.DataType.build("DECIMAL(15,5)"), # Increase precision/scale - additive + } + + # Use a SchemaDiffer that supports compatible type changes + differ = SchemaDiffer( + compatible_types={ + exp.DataType.build("VARCHAR(50)"): {exp.DataType.build("VARCHAR(100)")}, + exp.DataType.build("DECIMAL(10,2)"): {exp.DataType.build("DECIMAL(15,5)")}, + } + ) + + alter_expressions = differ.compare_columns("test_table", current_schema, additive_schema) + + # Should generate ALTER COLUMN operations + assert len(alter_expressions) == 2 + for expr in alter_expressions: + assert "ALTER COLUMN" in expr.sql() + + # These should be classified as additive + from sqlmesh.core.schema_diff import has_additive_changes + + assert has_additive_changes(alter_expressions) + assert is_only_additive_changes(alter_expressions) + + +def test_alter_column_destructive_integration(): + """Test that destructive ALTER COLUMN operations are correctly handled.""" + current_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("VARCHAR(100)"), + "value": exp.DataType.build("DECIMAL(15,5)"), + } + destructive_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("VARCHAR(50)"), # Decrease precision - destructive + "value": exp.DataType.build("DECIMAL(10,2)"), # Decrease precision/scale - destructive + } + + differ = SchemaDiffer() + alter_expressions = differ.compare_columns("test_table", current_schema, destructive_schema) + + # Should generate DROP + ADD operations (destructive changes) + # because the type changes are not compatible + assert len(alter_expressions) >= 2 + + from sqlmesh.core.schema_diff import has_additive_changes, has_drop_alteration + + assert not has_additive_changes(alter_expressions) + assert has_drop_alteration(alter_expressions) + + +def test_alter_column_mixed_additive_destructive(): + """Test mixed scenario with both additive and destructive ALTER COLUMN operations.""" + current_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("VARCHAR(50)"), + "description": exp.DataType.build("VARCHAR(100)"), + "value": exp.DataType.build("DECIMAL(10,2)"), + } + mixed_schema = { + "id": exp.DataType.build("INT"), + "name": exp.DataType.build("VARCHAR(100)"), # Increase precision - additive + "description": exp.DataType.build("VARCHAR(50)"), # Decrease precision - destructive + "value": exp.DataType.build("DECIMAL(15,5)"), # Increase precision - additive + } + + differ = SchemaDiffer( + compatible_types={ + # Only define compatible types for additive changes + exp.DataType.build("VARCHAR(50)"): {exp.DataType.build("VARCHAR(100)")}, + exp.DataType.build("DECIMAL(10,2)"): {exp.DataType.build("DECIMAL(15,5)")}, + } + ) + + alter_expressions = differ.compare_columns("test_table", current_schema, mixed_schema) + + from sqlmesh.core.schema_diff import ( + has_additive_changes, + has_drop_alteration, + filter_destructive_changes, + get_additive_changes, + ) + + # Should have both additive and destructive changes + assert has_additive_changes(alter_expressions) + assert has_drop_alteration(alter_expressions) # From destructive description change + + # Test filtering + additive_only = get_additive_changes(alter_expressions) + assert len(additive_only) >= 1 # Should have additive ALTER COLUMN operations + + non_destructive = filter_destructive_changes(alter_expressions) + assert has_additive_changes(non_destructive) + assert not has_drop_alteration(non_destructive) + + +def test_alter_column_with_default_values(): + """Test ALTER COLUMN operations involving default values.""" + # This test is more conceptual since current SQLMesh schema diff + # doesn't directly handle DEFAULT value changes, but the framework should support it + current_schema = { + "id": exp.DataType.build("INT"), + "status": exp.DataType.build("VARCHAR(50)"), + } + # In practice, default value changes would need to be detected at a higher level + # and passed down with appropriate metadata + schema_with_default = { + "id": exp.DataType.build("INT"), + "status": exp.DataType.build("VARCHAR(50)"), # Same type, but with default (additive) + } + + differ = SchemaDiffer() + alter_expressions = differ.compare_columns("test_table", current_schema, schema_with_default) + + # No schema changes detected since types are the same + assert len(alter_expressions) == 0 diff --git a/tests/core/test_snapshot.py b/tests/core/test_snapshot.py index d71cbc4db6..79b01a07e1 100644 --- a/tests/core/test_snapshot.py +++ b/tests/core/test_snapshot.py @@ -133,6 +133,7 @@ def test_json(snapshot: Snapshot): "time_column": {"column": "`ds`"}, "batch_size": 30, "forward_only": False, + "on_additive_change": "ALLOW", "on_destructive_change": "ERROR", "partition_by_time_column": True, "disable_restatement": False, @@ -909,7 +910,7 @@ def test_fingerprint(model: Model, parent_model: Model): original_fingerprint = SnapshotFingerprint( data_hash="1312415267", - metadata_hash="1125608408", + metadata_hash="3575333731", ) assert fingerprint == original_fingerprint @@ -1009,7 +1010,7 @@ def test_fingerprint_jinja_macros(model: Model): ) original_fingerprint = SnapshotFingerprint( data_hash="923305614", - metadata_hash="1125608408", + metadata_hash="3575333731", ) fingerprint = fingerprint_from_node(model, nodes={}) diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index fc5df244b3..3d3b06a568 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -1725,7 +1725,7 @@ def columns(table_name): assert isinstance(destructive_change_err, DestructiveChangeError) assert ( str(destructive_change_err) - == "\nPlan requires a destructive change to forward-only model '\"test_schema\".\"test_model\"'s schema that drops column 'b'.\n\nSchema changes:\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 DROP COLUMN b\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 ADD COLUMN a INT\n\nTo allow the destructive change, set the model's `on_destructive_change` setting to `warn` or `allow` or include the model in the plan's `--allow-destructive-model` option.\n" + == "\nPlan requires destructive change to forward-only model '\"test_schema\".\"test_model\"'s schema that drops column 'b'.\n\nSchema changes:\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 DROP COLUMN b\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 ADD COLUMN a INT\n\nTo allow the destructive change, set the model's `on_destructive_change` setting to `warn`, `allow`, or `ignore` or include the model in the plan's `--allow-destructive-model` option.\n" ) # WARN @@ -1745,7 +1745,7 @@ def columns(table_name): evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) assert ( mock_logger.call_args[0][0] - == "\nPlan requires a destructive change to forward-only model '\"test_schema\".\"test_model\"'s schema that drops column 'b'.\n\nSchema changes:\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 DROP COLUMN b\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 ADD COLUMN a INT" + == "\nPlan requires destructive change to forward-only model '\"test_schema\".\"test_model\"'s schema that drops column 'b'.\n\nSchema changes:\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 DROP COLUMN b\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 ADD COLUMN a INT" ) # allow destructive diff --git a/tests/dbt/test_config.py b/tests/dbt/test_config.py index eaa2fe94ad..dd6cbbd2b5 100644 --- a/tests/dbt/test_config.py +++ b/tests/dbt/test_config.py @@ -9,7 +9,7 @@ from sqlmesh.core.config import Config, ModelDefaultsConfig from sqlmesh.core.dialect import jinja_query from sqlmesh.core.model import SqlModel -from sqlmesh.core.model.kind import OnDestructiveChange +from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange from sqlmesh.dbt.common import Dependencies from sqlmesh.dbt.context import DbtContext from sqlmesh.dbt.loader import sqlmesh_config @@ -133,6 +133,7 @@ def test_model_to_sqlmesh_fields(): assert kind.batch_size == 5 assert kind.lookback == 3 assert kind.on_destructive_change == OnDestructiveChange.ALLOW + assert kind.on_additive_change == OnAdditiveChange.ALLOW assert ( kind.merge_filter.sql(dialect=model.dialect) == """55 > "__MERGE_SOURCE__"."b" AND "__MERGE_TARGET__"."session_start" > CURRENT_DATE + INTERVAL '7' DAY""" @@ -162,11 +163,14 @@ def test_model_to_sqlmesh_fields(): start="Jan 1 2023", batch_size=5, batch_concurrency=2, + on_schema_change="ignore", ) model = model_config.to_sqlmesh(context) assert isinstance(model.kind, IncrementalByTimeRangeKind) assert model.kind.batch_concurrency == 2 assert model.kind.time_column.column.name == "ds" + assert model.kind.on_destructive_change == OnDestructiveChange.IGNORE + assert model.kind.on_additive_change == OnAdditiveChange.IGNORE def test_test_to_sqlmesh_fields(): @@ -988,3 +992,40 @@ def test_depends_on(assert_exp_eq, sushi_test_project): # Make sure the query wasn't rendered assert not sqlmesh_model._query_renderer._cache + + +@pytest.mark.parametrize( + "on_schema_change, expected_additive, expected_destructive", + [ + ("ignore", OnAdditiveChange.IGNORE, OnDestructiveChange.IGNORE), + ("fail", OnAdditiveChange.ERROR, OnDestructiveChange.ERROR), + ("append_new_columns", OnAdditiveChange.ALLOW, OnDestructiveChange.IGNORE), + ("sync_all_columns", OnAdditiveChange.ALLOW, OnDestructiveChange.ALLOW), + ], +) +def test_on_schema_change_properties( + on_schema_change: str, + expected_additive: OnAdditiveChange, + expected_destructive: OnDestructiveChange, +): + model_config = ModelConfig( + name="name", + package_name="package", + alias="model", + schema="custom", + database="database", + materialized=Materialization.INCREMENTAL, + sql="SELECT * FROM foo.table", + time_column="ds", + start="Jan 1 2023", + batch_size=5, + batch_concurrency=2, + on_schema_change=on_schema_change, + ) + context = DbtContext() + context.project_name = "Foo" + context.target = DuckDbConfig(name="target", schema="foo") + model = model_config.to_sqlmesh(context) + + assert model.on_additive_change == expected_additive + assert model.on_destructive_change == expected_destructive diff --git a/tests/dbt/test_transformation.py b/tests/dbt/test_transformation.py index 809e28fba4..a94e4ab993 100644 --- a/tests/dbt/test_transformation.py +++ b/tests/dbt/test_transformation.py @@ -29,7 +29,12 @@ SqlModel, ViewKind, ) -from sqlmesh.core.model.kind import SCDType2ByColumnKind, SCDType2ByTimeKind +from sqlmesh.core.model.kind import ( + SCDType2ByColumnKind, + SCDType2ByTimeKind, + OnDestructiveChange, + OnAdditiveChange, +) from sqlmesh.core.state_sync.db.snapshot import _snapshot_to_json from sqlmesh.dbt.builtin import _relation_info_to_relation from sqlmesh.dbt.column import ( @@ -113,6 +118,8 @@ def test_model_kind(): updated_at_as_valid_from=True, updated_at_name="updated_at", dialect="duckdb", + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.ALLOW, ) assert ModelConfig( materialized=Materialization.SNAPSHOT, @@ -126,6 +133,8 @@ def test_model_kind(): columns=["foo"], execution_time_as_valid_from=True, dialect="duckdb", + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.ALLOW, ) assert ModelConfig( materialized=Materialization.SNAPSHOT, @@ -140,23 +149,40 @@ def test_model_kind(): columns=["foo"], execution_time_as_valid_from=True, dialect="bigquery", + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.ALLOW, ) assert ModelConfig(materialized=Materialization.INCREMENTAL, time_column="foo").model_kind( context - ) == IncrementalByTimeRangeKind(time_column="foo", dialect="duckdb", forward_only=True) + ) == IncrementalByTimeRangeKind( + time_column="foo", + dialect="duckdb", + forward_only=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, + ) assert ModelConfig( materialized=Materialization.INCREMENTAL, time_column="foo", incremental_strategy="delete+insert", forward_only=False, - ).model_kind(context) == IncrementalByTimeRangeKind(time_column="foo", dialect="duckdb") + ).model_kind(context) == IncrementalByTimeRangeKind( + time_column="foo", + dialect="duckdb", + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, + ) assert ModelConfig( materialized=Materialization.INCREMENTAL, time_column="foo", incremental_strategy="insert_overwrite", ).model_kind(context) == IncrementalByTimeRangeKind( - time_column="foo", dialect="duckdb", forward_only=True + time_column="foo", + dialect="duckdb", + forward_only=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( materialized=Materialization.INCREMENTAL, @@ -164,13 +190,22 @@ def test_model_kind(): unique_key=["bar"], dialect="bigquery", ).model_kind(context) == IncrementalByTimeRangeKind( - time_column="foo", dialect="bigquery", forward_only=True + time_column="foo", + dialect="bigquery", + forward_only=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( materialized=Materialization.INCREMENTAL, unique_key=["bar"], incremental_strategy="merge" ).model_kind(context) == IncrementalByUniqueKeyKind( - unique_key=["bar"], dialect="duckdb", forward_only=True, disable_restatement=False + unique_key=["bar"], + dialect="duckdb", + forward_only=True, + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) dbt_incremental_predicate = "DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)" @@ -189,30 +224,52 @@ def test_model_kind(): forward_only=True, disable_restatement=False, merge_filter=expected_sqlmesh_predicate, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig(materialized=Materialization.INCREMENTAL, unique_key=["bar"]).model_kind( context ) == IncrementalByUniqueKeyKind( - unique_key=["bar"], dialect="duckdb", forward_only=True, disable_restatement=False + unique_key=["bar"], + dialect="duckdb", + forward_only=True, + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( materialized=Materialization.INCREMENTAL, unique_key=["bar"], full_refresh=False ).model_kind(context) == IncrementalByUniqueKeyKind( - unique_key=["bar"], dialect="duckdb", forward_only=True, disable_restatement=True + unique_key=["bar"], + dialect="duckdb", + forward_only=True, + disable_restatement=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( materialized=Materialization.INCREMENTAL, unique_key=["bar"], full_refresh=True ).model_kind(context) == IncrementalByUniqueKeyKind( - unique_key=["bar"], dialect="duckdb", forward_only=True, disable_restatement=False + unique_key=["bar"], + dialect="duckdb", + forward_only=True, + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( materialized=Materialization.INCREMENTAL, unique_key=["bar"], disable_restatement=True ).model_kind(context) == IncrementalByUniqueKeyKind( - unique_key=["bar"], dialect="duckdb", forward_only=True, disable_restatement=True + unique_key=["bar"], + dialect="duckdb", + forward_only=True, + disable_restatement=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -221,7 +278,12 @@ def test_model_kind(): disable_restatement=True, full_refresh=True, ).model_kind(context) == IncrementalByUniqueKeyKind( - unique_key=["bar"], dialect="duckdb", forward_only=True, disable_restatement=True + unique_key=["bar"], + dialect="duckdb", + forward_only=True, + disable_restatement=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -236,12 +298,19 @@ def test_model_kind(): forward_only=True, disable_restatement=True, auto_restatement_cron="0 0 * * *", + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( materialized=Materialization.INCREMENTAL, time_column="foo", incremental_strategy="merge" ).model_kind(context) == IncrementalByTimeRangeKind( - time_column="foo", dialect="duckdb", forward_only=True, disable_restatement=False + time_column="foo", + dialect="duckdb", + forward_only=True, + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -250,7 +319,12 @@ def test_model_kind(): incremental_strategy="merge", full_refresh=True, ).model_kind(context) == IncrementalByTimeRangeKind( - time_column="foo", dialect="duckdb", forward_only=True, disable_restatement=False + time_column="foo", + dialect="duckdb", + forward_only=True, + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -259,7 +333,12 @@ def test_model_kind(): incremental_strategy="merge", full_refresh=False, ).model_kind(context) == IncrementalByTimeRangeKind( - time_column="foo", dialect="duckdb", forward_only=True, disable_restatement=False + time_column="foo", + dialect="duckdb", + forward_only=True, + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -268,7 +347,12 @@ def test_model_kind(): incremental_strategy="append", disable_restatement=True, ).model_kind(context) == IncrementalByTimeRangeKind( - time_column="foo", dialect="duckdb", forward_only=True, disable_restatement=True + time_column="foo", + dialect="duckdb", + forward_only=True, + disable_restatement=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -277,7 +361,12 @@ def test_model_kind(): incremental_strategy="insert_overwrite", partition_by={"field": "bar"}, forward_only=False, - ).model_kind(context) == IncrementalByTimeRangeKind(time_column="foo", dialect="duckdb") + ).model_kind(context) == IncrementalByTimeRangeKind( + time_column="foo", + dialect="duckdb", + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, + ) assert ModelConfig( materialized=Materialization.INCREMENTAL, @@ -293,6 +382,8 @@ def test_model_kind(): forward_only=False, auto_restatement_cron="0 0 * * *", auto_restatement_intervals=3, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -300,33 +391,56 @@ def test_model_kind(): incremental_strategy="insert_overwrite", partition_by={"field": "bar"}, ).model_kind(context) == IncrementalUnmanagedKind( - insert_overwrite=True, disable_restatement=False + insert_overwrite=True, + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig(materialized=Materialization.INCREMENTAL).model_kind( context - ) == IncrementalUnmanagedKind(insert_overwrite=True, disable_restatement=False) + ) == IncrementalUnmanagedKind( + insert_overwrite=True, + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, + ) assert ModelConfig(materialized=Materialization.INCREMENTAL, forward_only=False).model_kind( context ) == IncrementalUnmanagedKind( - insert_overwrite=True, disable_restatement=False, forward_only=False + insert_overwrite=True, + disable_restatement=False, + forward_only=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( materialized=Materialization.INCREMENTAL, incremental_strategy="append" - ).model_kind(context) == IncrementalUnmanagedKind(disable_restatement=False) + ).model_kind(context) == IncrementalUnmanagedKind( + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, + ) assert ModelConfig( materialized=Materialization.INCREMENTAL, incremental_strategy="append", full_refresh=None - ).model_kind(context) == IncrementalUnmanagedKind(disable_restatement=False) + ).model_kind(context) == IncrementalUnmanagedKind( + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, + ) assert ModelConfig( materialized=Materialization.INCREMENTAL, incremental_strategy="insert_overwrite", partition_by={"field": "bar", "data_type": "int64"}, ).model_kind(context) == IncrementalUnmanagedKind( - insert_overwrite=True, disable_restatement=False + insert_overwrite=True, + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -335,7 +449,10 @@ def test_model_kind(): partition_by={"field": "bar", "data_type": "int64"}, full_refresh=False, ).model_kind(context) == IncrementalUnmanagedKind( - insert_overwrite=True, disable_restatement=True + insert_overwrite=True, + disable_restatement=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -345,7 +462,10 @@ def test_model_kind(): disable_restatement=True, full_refresh=True, ).model_kind(context) == IncrementalUnmanagedKind( - insert_overwrite=True, disable_restatement=True + insert_overwrite=True, + disable_restatement=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -354,7 +474,10 @@ def test_model_kind(): partition_by={"field": "bar", "data_type": "int64"}, disable_restatement=True, ).model_kind(context) == IncrementalUnmanagedKind( - insert_overwrite=True, disable_restatement=True + insert_overwrite=True, + disable_restatement=True, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ModelConfig( @@ -362,7 +485,11 @@ def test_model_kind(): incremental_strategy="insert_overwrite", auto_restatement_cron="0 0 * * *", ).model_kind(context) == IncrementalUnmanagedKind( - insert_overwrite=True, auto_restatement_cron="0 0 * * *", disable_restatement=False + insert_overwrite=True, + auto_restatement_cron="0 0 * * *", + disable_restatement=False, + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.IGNORE, ) assert ( @@ -410,6 +537,8 @@ def test_model_kind_snapshot_bigquery(): updated_at_name="updated_at", time_data_type=exp.DataType.build("TIMESTAMPTZ"), dialect="bigquery", + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.ALLOW, ) # time_data_type is bigquery version even though model dialect is DuckDB @@ -428,6 +557,8 @@ def test_model_kind_snapshot_bigquery(): updated_at_name="updated_at", time_data_type=exp.DataType.build("TIMESTAMPTZ"), # bigquery version dialect="duckdb", + on_destructive_change=OnDestructiveChange.IGNORE, + on_additive_change=OnAdditiveChange.ALLOW, ) diff --git a/tests/integrations/github/cicd/test_integration.py b/tests/integrations/github/cicd/test_integration.py index 15e8be0f6b..b72da14037 100644 --- a/tests/integrations/github/cicd/test_integration.py +++ b/tests/integrations/github/cicd/test_integration.py @@ -305,7 +305,7 @@ def test_merge_pr_has_non_breaking_change( +++ - @@ -16,7 +16,8 @@ + @@ -17,7 +17,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id, @@ -516,7 +516,7 @@ def test_merge_pr_has_non_breaking_change_diff_start( +++ - @@ -16,7 +16,8 @@ + @@ -17,7 +17,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id, @@ -1039,7 +1039,7 @@ def test_no_merge_since_no_deploy_signal( +++ - @@ -16,7 +16,8 @@ + @@ -17,7 +17,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id, @@ -1239,7 +1239,7 @@ def test_no_merge_since_no_deploy_signal_no_approvers_defined( +++ - @@ -16,7 +16,8 @@ + @@ -17,7 +17,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id, @@ -1421,7 +1421,7 @@ def test_deploy_comment_pre_categorized( +++ - @@ -16,7 +16,8 @@ + @@ -17,7 +17,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id, @@ -2338,7 +2338,7 @@ def test_has_required_approval_but_not_base_branch( +++ - @@ -16,7 +16,8 @@ + @@ -17,7 +17,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id,