Skip to content

Commit 5e59d18

Browse files
authored
feat!: add ignore destructive support (#5117)
1 parent 0109a3a commit 5e59d18

File tree

95 files changed

+3828
-768
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+3828
-768
lines changed

docs/concepts/models/overview.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -507,11 +507,15 @@ Some properties are only available in specific model kinds - see the [model conf
507507
: Set this to true to indicate that all changes to this model should be [forward-only](../plans.md#forward-only-plans).
508508

509509
### on_destructive_change
510-
: What should happen when a change to a [forward-only model](../../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../plans.md#forward-only-plans) causes a destructive modification to the table schema (i.e., requires dropping an existing column).
510+
: What should happen when a change to a [forward-only model](../../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../plans.md#forward-only-plans) causes a destructive modification to the table schema (i.e., requires dropping an existing column or modifying column constraints in ways that could cause data loss).
511511

512512
SQLMesh checks for destructive changes at plan time based on the model definition and run time based on the model's underlying physical tables.
513513

514-
Must be one of the following values: `allow`, `warn`, or `error` (default).
514+
Must be one of the following values: `allow`, `warn`, `error` (default), or `ignore`.
515+
516+
!!! warning "Ignore is Dangerous"
517+
518+
`ignore` is dangerous since it can result in error or data loss. It likely should never be used but could be useful as an "escape-hatch" or a way to workaround unexpected behavior.
515519

516520
### disable_restatement
517521
: Set this to true to indicate that [data restatement](../plans.md#restatement-plans) is disabled for this model.

docs/guides/custom_materializations.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class CustomFullMaterialization(CustomMaterialization):
6464
query_or_df: QueryOrDF,
6565
model: Model,
6666
is_first_insert: bool,
67+
render_kwargs: t.Dict[str, t.Any],
6768
**kwargs: t.Any,
6869
) -> None:
6970
self.adapter.replace_query(table_name, query_or_df)
@@ -78,6 +79,7 @@ Let's unpack this materialization:
7879
* `query_or_df` - a query (of SQLGlot expression type) or DataFrame (Pandas, PySpark, or Snowpark) instance to be inserted
7980
* `model` - the model definition object used to access model parameters and user-specified materialization arguments
8081
* `is_first_insert` - whether this is the first insert for the current version of the model (used with batched or multi-step inserts)
82+
* `render_kwargs` - a dictionary of arguments used to render the model query
8183
* `kwargs` - additional and future arguments
8284
* The `self.adapter` instance is used to interact with the target engine. It comes with a set of useful high-level APIs like `replace_query`, `columns`, and `table_exists`, but also supports executing arbitrary SQL expressions with its `execute` method.
8385

@@ -150,6 +152,7 @@ class CustomFullMaterialization(CustomMaterialization):
150152
query_or_df: QueryOrDF,
151153
model: Model,
152154
is_first_insert: bool,
155+
render_kwargs: t.Dict[str, t.Any],
153156
**kwargs: t.Any,
154157
) -> None:
155158
config_value = model.custom_materialization_properties["config_key"]
@@ -232,6 +235,7 @@ class CustomFullMaterialization(CustomMaterialization[MyCustomKind]):
232235
query_or_df: QueryOrDF,
233236
model: Model,
234237
is_first_insert: bool,
238+
render_kwargs: t.Dict[str, t.Any],
235239
**kwargs: t.Any,
236240
) -> None:
237241
assert isinstance(model.kind, MyCustomKind)

docs/guides/incremental_time.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,12 @@ The check is performed at plan time based on the model definition. SQLMesh may n
171171

172172
A model's `on_destructive_change` [configuration setting](../reference/model_configuration.md#incremental-models) determines what happens when SQLMesh detects a destructive change.
173173

174-
By default, SQLMesh will error so no data is lost. You can set `on_destructive_change` to `warn` or `allow` in the model's `MODEL` block to allow destructive changes.
174+
By default, SQLMesh will error so no data is lost. You can set `on_destructive_change` to `warn` or `allow` in the model's `MODEL` block to allow destructive changes.
175+
`ignore` can be used to not perform the schema change and allow the table's definition to diverge from the model definition.
176+
177+
!!! warning "Ignore is Dangerous"
178+
179+
`ignore` is dangerous since it can result in error or data loss. It likely should never be used but could be useful as an "escape-hatch" or a way to workaround unexpected behavior.
175180

176181
This example configures a model to silently `allow` destructive changes:
177182

examples/custom_materializations/custom_materializations/custom_kind.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ def insert(
2424
query_or_df: QueryOrDF,
2525
model: Model,
2626
is_first_insert: bool,
27+
render_kwargs: t.Dict[str, t.Any],
2728
**kwargs: t.Any,
2829
) -> None:
2930
assert type(model.kind).__name__ == "ExtendedCustomKind"
3031

31-
self._replace_query_for_model(model, table_name, query_or_df)
32+
self._replace_query_for_model(model, table_name, query_or_df, render_kwargs)

examples/custom_materializations/custom_materializations/full.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ def insert(
1717
query_or_df: QueryOrDF,
1818
model: Model,
1919
is_first_insert: bool,
20+
render_kwargs: t.Dict[str, t.Any],
2021
**kwargs: t.Any,
2122
) -> None:
22-
self._replace_query_for_model(model, table_name, query_or_df)
23+
self._replace_query_for_model(model, table_name, query_or_df, render_kwargs)

sqlmesh/core/dialect.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from sqlglot.tokens import Token
2424

2525
from sqlmesh.core.constants import MAX_MODEL_DEFINITION_SIZE
26+
from sqlmesh.utils import get_source_columns_to_types
2627
from sqlmesh.utils.errors import SQLMeshError, ConfigError
2728
from sqlmesh.utils.pandas import columns_to_types_from_df
2829

@@ -1121,7 +1122,7 @@ def select_from_values(
11211122
for i in range(0, num_rows, batch_size):
11221123
yield select_from_values_for_batch_range(
11231124
values=values,
1124-
columns_to_types=columns_to_types,
1125+
target_columns_to_types=columns_to_types,
11251126
batch_start=i,
11261127
batch_end=min(i + batch_size, num_rows),
11271128
alias=alias,
@@ -1130,35 +1131,49 @@ def select_from_values(
11301131

11311132
def select_from_values_for_batch_range(
11321133
values: t.List[t.Tuple[t.Any, ...]],
1133-
columns_to_types: t.Dict[str, exp.DataType],
1134+
target_columns_to_types: t.Dict[str, exp.DataType],
11341135
batch_start: int,
11351136
batch_end: int,
11361137
alias: str = "t",
1138+
source_columns: t.Optional[t.List[str]] = None,
11371139
) -> exp.Select:
1138-
casted_columns = [
1139-
exp.alias_(exp.cast(exp.column(column), to=kind), column, copy=False)
1140-
for column, kind in columns_to_types.items()
1141-
]
1140+
source_columns = source_columns or list(target_columns_to_types)
1141+
source_columns_to_types = get_source_columns_to_types(target_columns_to_types, source_columns)
11421142

11431143
if not values:
11441144
# Ensures we don't generate an empty VALUES clause & forces a zero-row output
11451145
where = exp.false()
1146-
expressions = [tuple(exp.cast(exp.null(), to=kind) for kind in columns_to_types.values())]
1146+
expressions = [
1147+
tuple(exp.cast(exp.null(), to=kind) for kind in source_columns_to_types.values())
1148+
]
11471149
else:
11481150
where = None
11491151
expressions = [
1150-
tuple(transform_values(v, columns_to_types)) for v in values[batch_start:batch_end]
1152+
tuple(transform_values(v, source_columns_to_types))
1153+
for v in values[batch_start:batch_end]
11511154
]
11521155

1153-
values_exp = exp.values(expressions, alias=alias, columns=columns_to_types)
1156+
values_exp = exp.values(expressions, alias=alias, columns=source_columns_to_types)
11541157
if values:
11551158
# BigQuery crashes on `SELECT CAST(x AS TIMESTAMP) FROM UNNEST([NULL]) AS x`, but not
11561159
# on `SELECT CAST(x AS TIMESTAMP) FROM UNNEST([CAST(NULL AS TIMESTAMP)]) AS x`. This
11571160
# ensures nulls under the `Values` expression are cast to avoid similar issues.
1158-
for value, kind in zip(values_exp.expressions[0].expressions, columns_to_types.values()):
1161+
for value, kind in zip(
1162+
values_exp.expressions[0].expressions, source_columns_to_types.values()
1163+
):
11591164
if isinstance(value, exp.Null):
11601165
value.replace(exp.cast(value, to=kind))
11611166

1167+
casted_columns = [
1168+
exp.alias_(
1169+
exp.cast(
1170+
exp.column(column) if column in source_columns_to_types else exp.Null(), to=kind
1171+
),
1172+
column,
1173+
copy=False,
1174+
)
1175+
for column, kind in target_columns_to_types.items()
1176+
]
11621177
return exp.select(*casted_columns).from_(values_exp, copy=False).where(where, copy=False)
11631178

11641179

sqlmesh/core/engine_adapter/_typing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
snowpark = optional_import("snowflake.snowpark")
1515

16-
Query = t.Union[exp.Query, exp.DerivedTable]
16+
Query = exp.Query
1717
PySparkSession = t.Union[pyspark.sql.SparkSession, pyspark.sql.connect.dataframe.SparkSession]
1818
PySparkDataFrame = t.Union[pyspark.sql.DataFrame, pyspark.sql.connect.dataframe.DataFrame]
1919

sqlmesh/core/engine_adapter/athena.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,12 @@ def catalog_support(self) -> CatalogSupport:
8484
def create_state_table(
8585
self,
8686
table_name: str,
87-
columns_to_types: t.Dict[str, exp.DataType],
87+
target_columns_to_types: t.Dict[str, exp.DataType],
8888
primary_key: t.Optional[t.Tuple[str, ...]] = None,
8989
) -> None:
9090
self.create_table(
9191
table_name,
92-
columns_to_types,
92+
target_columns_to_types,
9393
primary_key=primary_key,
9494
# it's painfully slow, but it works
9595
table_format="iceberg",
@@ -178,7 +178,7 @@ def _build_create_table_exp(
178178
expression: t.Optional[exp.Expression],
179179
exists: bool = True,
180180
replace: bool = False,
181-
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
181+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
182182
table_description: t.Optional[str] = None,
183183
table_kind: t.Optional[str] = None,
184184
partitioned_by: t.Optional[t.List[exp.Expression]] = None,
@@ -198,7 +198,7 @@ def _build_create_table_exp(
198198
properties = self._build_table_properties_exp(
199199
table=table,
200200
expression=expression,
201-
columns_to_types=columns_to_types,
201+
target_columns_to_types=target_columns_to_types,
202202
partitioned_by=partitioned_by,
203203
table_properties=table_properties,
204204
table_description=table_description,
@@ -237,7 +237,7 @@ def _build_table_properties_exp(
237237
partition_interval_unit: t.Optional[IntervalUnit] = None,
238238
clustered_by: t.Optional[t.List[exp.Expression]] = None,
239239
table_properties: t.Optional[t.Dict[str, exp.Expression]] = None,
240-
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
240+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
241241
table_description: t.Optional[str] = None,
242242
table_kind: t.Optional[str] = None,
243243
table: t.Optional[exp.Table] = None,
@@ -265,12 +265,12 @@ def _build_table_properties_exp(
265265

266266
if partitioned_by:
267267
schema_expressions: t.List[exp.Expression] = []
268-
if is_hive and columns_to_types:
268+
if is_hive and target_columns_to_types:
269269
# For Hive-style tables, you cannot include the partitioned by columns in the main set of columns
270270
# In the PARTITIONED BY expression, you also cant just include the column names, you need to include the data type as well
271271
# ref: https://docs.aws.amazon.com/athena/latest/ug/partitions.html
272272
for match_name, match_dtype in self._find_matching_columns(
273-
partitioned_by, columns_to_types
273+
partitioned_by, target_columns_to_types
274274
):
275275
column_def = exp.ColumnDef(this=exp.to_identifier(match_name), kind=match_dtype)
276276
schema_expressions.append(column_def)
@@ -431,9 +431,10 @@ def replace_query(
431431
self,
432432
table_name: TableName,
433433
query_or_df: QueryOrDF,
434-
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
434+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
435435
table_description: t.Optional[str] = None,
436436
column_descriptions: t.Optional[t.Dict[str, str]] = None,
437+
source_columns: t.Optional[t.List[str]] = None,
437438
**kwargs: t.Any,
438439
) -> None:
439440
table = exp.to_table(table_name)
@@ -444,17 +445,18 @@ def replace_query(
444445
return super().replace_query(
445446
table_name=table,
446447
query_or_df=query_or_df,
447-
columns_to_types=columns_to_types,
448+
target_columns_to_types=target_columns_to_types,
448449
table_description=table_description,
449450
column_descriptions=column_descriptions,
451+
source_columns=source_columns,
450452
**kwargs,
451453
)
452454

453455
def _insert_overwrite_by_time_partition(
454456
self,
455457
table_name: TableName,
456458
source_queries: t.List[SourceQuery],
457-
columns_to_types: t.Dict[str, exp.DataType],
459+
target_columns_to_types: t.Dict[str, exp.DataType],
458460
where: exp.Condition,
459461
**kwargs: t.Any,
460462
) -> None:
@@ -465,7 +467,7 @@ def _insert_overwrite_by_time_partition(
465467
if table_type == "iceberg":
466468
# Iceberg tables work as expected, we can use the default behaviour
467469
return super()._insert_overwrite_by_time_partition(
468-
table, source_queries, columns_to_types, where, **kwargs
470+
table, source_queries, target_columns_to_types, where, **kwargs
469471
)
470472

471473
# For Hive tables, we need to drop all the partitions covered by the query and delete the data from S3
@@ -475,7 +477,7 @@ def _insert_overwrite_by_time_partition(
475477
return super()._insert_overwrite_by_time_partition(
476478
table,
477479
source_queries,
478-
columns_to_types,
480+
target_columns_to_types,
479481
where,
480482
insert_overwrite_strategy_override=InsertOverwriteStrategy.INTO_IS_OVERWRITE, # since we already cleared the data
481483
**kwargs,

0 commit comments

Comments
 (0)