Skip to content

Commit a73feca

Browse files
committed
feat: add ignore destructive support
1 parent 4e80a93 commit a73feca

File tree

13 files changed

+1144
-49
lines changed

13 files changed

+1144
-49
lines changed

docs/concepts/models/overview.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -507,11 +507,15 @@ Some properties are only available in specific model kinds - see the [model conf
507507
: Set this to true to indicate that all changes to this model should be [forward-only](../plans.md#forward-only-plans).
508508

509509
### on_destructive_change
510-
: What should happen when a change to a [forward-only model](../../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../plans.md#forward-only-plans) causes a destructive modification to the table schema (i.e., requires dropping an existing column).
510+
: What should happen when a change to a [forward-only model](../../guides/incremental_time.md#forward-only-models) or incremental model in a [forward-only plan](../plans.md#forward-only-plans) causes a destructive modification to the table schema (i.e., requires dropping an existing column or modifying column constraints in ways that could cause data loss).
511511

512512
SQLMesh checks for destructive changes at plan time based on the model definition and run time based on the model's underlying physical tables.
513513

514-
Must be one of the following values: `allow`, `warn`, or `error` (default).
514+
Must be one of the following values: `allow`, `warn`, `error` (default), or `ignore`.
515+
516+
!!! warning "Ignore is Dangerous"
517+
518+
`ignore` is dangerous since it can result in error or data loss. It likely should never be used but could be useful as an "escape-hatch" or a way to workaround unexpected behavior.
515519

516520
### disable_restatement
517521
: Set this to true to indicate that [data restatement](../plans.md#restatement-plans) is disabled for this model.

docs/guides/incremental_time.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,12 @@ The check is performed at plan time based on the model definition. SQLMesh may n
171171

172172
A model's `on_destructive_change` [configuration setting](../reference/model_configuration.md#incremental-models) determines what happens when SQLMesh detects a destructive change.
173173

174-
By default, SQLMesh will error so no data is lost. You can set `on_destructive_change` to `warn` or `allow` in the model's `MODEL` block to allow destructive changes.
174+
By default, SQLMesh will error so no data is lost. You can set `on_destructive_change` to `warn` or `allow` in the model's `MODEL` block to allow destructive changes.
175+
`ignore` can be used to not perform the schema change and allow the table's definition to diverge from the model definition.
176+
177+
!!! warning "Ignore is Dangerous"
178+
179+
`ignore` is dangerous since it can result in error or data loss. It likely should never be used but could be useful as an "escape-hatch" or a way to workaround unexpected behavior.
175180

176181
This example configures a model to silently `allow` destructive changes:
177182

sqlmesh/core/engine_adapter/base.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,8 @@ def get_alter_expressions(
971971
self,
972972
current_table_name: TableName,
973973
target_table_name: TableName,
974+
*,
975+
ignore_destructive: bool = False,
974976
) -> t.List[exp.Alter]:
975977
"""
976978
Determines the alter statements needed to change the current table into the structure of the target table.
@@ -979,6 +981,7 @@ def get_alter_expressions(
979981
current_table_name,
980982
self.columns(current_table_name),
981983
self.columns(target_table_name),
984+
ignore_destructive=ignore_destructive,
982985
)
983986

984987
def alter_table(
@@ -1463,6 +1466,7 @@ def scd_type_2_by_time(
14631466
column_descriptions: t.Optional[t.Dict[str, str]] = None,
14641467
truncate: bool = False,
14651468
is_restatement: bool = False,
1469+
extra_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
14661470
**kwargs: t.Any,
14671471
) -> None:
14681472
self._scd_type_2(
@@ -1480,6 +1484,7 @@ def scd_type_2_by_time(
14801484
column_descriptions=column_descriptions,
14811485
truncate=truncate,
14821486
is_restatement=is_restatement,
1487+
extra_columns_to_types=extra_columns_to_types,
14831488
**kwargs,
14841489
)
14851490

@@ -1499,6 +1504,7 @@ def scd_type_2_by_column(
14991504
column_descriptions: t.Optional[t.Dict[str, str]] = None,
15001505
truncate: bool = False,
15011506
is_restatement: bool = False,
1507+
extra_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
15021508
**kwargs: t.Any,
15031509
) -> None:
15041510
self._scd_type_2(
@@ -1516,6 +1522,7 @@ def scd_type_2_by_column(
15161522
column_descriptions=column_descriptions,
15171523
truncate=truncate,
15181524
is_restatement=is_restatement,
1525+
extra_columns_to_types=extra_columns_to_types,
15191526
**kwargs,
15201527
)
15211528

@@ -1538,6 +1545,7 @@ def _scd_type_2(
15381545
column_descriptions: t.Optional[t.Dict[str, str]] = None,
15391546
truncate: bool = False,
15401547
is_restatement: bool = False,
1548+
extra_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
15411549
**kwargs: t.Any,
15421550
) -> None:
15431551
def remove_managed_columns(
@@ -1934,6 +1942,14 @@ def remove_managed_columns(
19341942
)
19351943
)
19361944

1945+
if extra_columns_to_types:
1946+
columns_to_types = columns_to_types.update(extra_columns_to_types)
1947+
select_columns = table_columns + [
1948+
exp.cast(exp.Null(), dtype, copy=False).as_(col, copy=False, quoted=True)
1949+
for col, dtype in extra_columns_to_types.items()
1950+
]
1951+
query = query.select(*select_columns)
1952+
19371953
self.replace_query(
19381954
target_table,
19391955
self.ensure_nulls_for_unmatched_after_join(query),

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,15 @@ def _parse_clustering_key(self, clustering_key: t.Optional[str]) -> t.List[exp.E
357357
return parsed_cluster_key.expressions or [parsed_cluster_key.this]
358358

359359
def get_alter_expressions(
360-
self, current_table_name: TableName, target_table_name: TableName
360+
self,
361+
current_table_name: TableName,
362+
target_table_name: TableName,
363+
*,
364+
ignore_destructive: bool = False,
361365
) -> t.List[exp.Alter]:
362-
expressions = super().get_alter_expressions(current_table_name, target_table_name)
366+
expressions = super().get_alter_expressions(
367+
current_table_name, target_table_name, ignore_destructive=ignore_destructive
368+
)
363369

364370
# check for a change in clustering
365371
current_table = exp.to_table(current_table_name)

sqlmesh/core/engine_adapter/trino.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ def _scd_type_2(
268268
column_descriptions: t.Optional[t.Dict[str, str]] = None,
269269
truncate: bool = False,
270270
is_restatement: bool = False,
271+
extra_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
271272
**kwargs: t.Any,
272273
) -> None:
273274
if columns_to_types and self.current_catalog_type == "delta_lake":
@@ -291,6 +292,7 @@ def _scd_type_2(
291292
column_descriptions,
292293
truncate,
293294
is_restatement,
295+
extra_columns_to_types,
294296
**kwargs,
295297
)
296298

sqlmesh/core/model/kind.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ class OnDestructiveChange(str, Enum):
188188
ERROR = "ERROR"
189189
WARN = "WARN"
190190
ALLOW = "ALLOW"
191+
IGNORE = "IGNORE"
191192

192193
@property
193194
def is_error(self) -> bool:
@@ -201,6 +202,10 @@ def is_warn(self) -> bool:
201202
def is_allow(self) -> bool:
202203
return self == OnDestructiveChange.ALLOW
203204

205+
@property
206+
def is_ignore(self) -> bool:
207+
return self == OnDestructiveChange.IGNORE
208+
204209

205210
def _on_destructive_change_validator(
206211
cls: t.Type, v: t.Union[OnDestructiveChange, str, exp.Identifier]

sqlmesh/core/plan/builder.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,7 @@ def _check_destructive_changes(self, directly_modified: t.Set[SnapshotId]) -> No
552552
new.name,
553553
old_columns_to_types,
554554
new_columns_to_types,
555+
ignore_destructive=new.model.on_destructive_change.is_ignore,
555556
)
556557

557558
if has_drop_alteration(schema_diff):

sqlmesh/core/schema_diff.py

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,8 @@ def _alter_operation(
556556
current_type: t.Union[str, exp.DataType],
557557
root_struct: exp.DataType,
558558
new_kwarg: exp.ColumnDef,
559+
*,
560+
ignore_destructive: bool = False,
559561
) -> t.List[TableAlterOperation]:
560562
# We don't copy on purpose here because current_type may need to be mutated inside
561563
# _get_operations (struct.expressions.pop and struct.expressions.insert)
@@ -570,6 +572,7 @@ def _alter_operation(
570572
current_type,
571573
new_type,
572574
root_struct,
575+
ignore_destructive=ignore_destructive,
573576
)
574577

575578
if new_type.this == current_type.this == exp.DataType.Type.ARRAY:
@@ -587,6 +590,7 @@ def _alter_operation(
587590
current_array_type,
588591
new_array_type,
589592
root_struct,
593+
ignore_destructive=ignore_destructive,
590594
)
591595
if self._is_coerceable_type(current_type, new_type):
592596
return []
@@ -607,6 +611,8 @@ def _alter_operation(
607611
col_pos,
608612
)
609613
]
614+
if ignore_destructive:
615+
return []
610616
return self._drop_operation(columns, root_struct, pos, root_struct) + self._add_operation(
611617
columns, pos, new_kwarg, struct, root_struct
612618
)
@@ -617,11 +623,16 @@ def _resolve_alter_operations(
617623
current_struct: exp.DataType,
618624
new_struct: exp.DataType,
619625
root_struct: exp.DataType,
626+
*,
627+
ignore_destructive: bool = False,
620628
) -> t.List[TableAlterOperation]:
621629
operations = []
622630
for current_pos, current_kwarg in enumerate(current_struct.expressions.copy()):
623631
_, new_kwarg = self._get_matching_kwarg(current_kwarg, new_struct, current_pos)
624-
assert new_kwarg
632+
if new_kwarg is None:
633+
if ignore_destructive:
634+
continue
635+
raise ValueError("Cannot alter a column that is being dropped")
625636
_, new_type = _get_name_and_type(new_kwarg)
626637
_, current_type = _get_name_and_type(current_kwarg)
627638
columns = parent_columns + [TableAlterColumn.from_struct_kwarg(current_kwarg)]
@@ -636,6 +647,7 @@ def _resolve_alter_operations(
636647
current_type,
637648
root_struct,
638649
new_kwarg,
650+
ignore_destructive=ignore_destructive,
639651
)
640652
)
641653
return operations
@@ -646,62 +658,69 @@ def _get_operations(
646658
current_struct: exp.DataType,
647659
new_struct: exp.DataType,
648660
root_struct: exp.DataType,
661+
*,
662+
ignore_destructive: bool = False,
649663
) -> t.List[TableAlterOperation]:
650664
root_struct = root_struct or current_struct
651665
parent_columns = parent_columns or []
652666
operations = []
653-
operations.extend(
654-
self._resolve_drop_operation(parent_columns, current_struct, new_struct, root_struct)
655-
)
667+
if not ignore_destructive:
668+
operations.extend(
669+
self._resolve_drop_operation(
670+
parent_columns, current_struct, new_struct, root_struct
671+
)
672+
)
656673
operations.extend(
657674
self._resolve_add_operations(parent_columns, current_struct, new_struct, root_struct)
658675
)
659676
operations.extend(
660-
self._resolve_alter_operations(parent_columns, current_struct, new_struct, root_struct)
677+
self._resolve_alter_operations(
678+
parent_columns,
679+
current_struct,
680+
new_struct,
681+
root_struct,
682+
ignore_destructive=ignore_destructive,
683+
)
661684
)
662685
return operations
663686

664687
def _from_structs(
665-
self, current_struct: exp.DataType, new_struct: exp.DataType
688+
self,
689+
current_struct: exp.DataType,
690+
new_struct: exp.DataType,
691+
*,
692+
ignore_destructive: bool = False,
666693
) -> t.List[TableAlterOperation]:
667-
return self._get_operations([], current_struct, new_struct, current_struct)
694+
return self._get_operations(
695+
[], current_struct, new_struct, current_struct, ignore_destructive=ignore_destructive
696+
)
668697

669-
def compare_structs(
670-
self, table_name: t.Union[str, exp.Table], current: exp.DataType, new: exp.DataType
698+
def _compare_structs(
699+
self,
700+
table_name: t.Union[str, exp.Table],
701+
current: exp.DataType,
702+
new: exp.DataType,
703+
*,
704+
ignore_destructive: bool = False,
671705
) -> t.List[exp.Alter]:
672-
"""
673-
Compares two schemas represented as structs.
674-
675-
Args:
676-
current: The current schema.
677-
new: The new schema.
678-
679-
Returns:
680-
The list of table alter operations.
681-
"""
682706
return [
683707
op.expression(table_name, self.array_element_selector)
684-
for op in self._from_structs(current, new)
708+
for op in self._from_structs(current, new, ignore_destructive=ignore_destructive)
685709
]
686710

687711
def compare_columns(
688712
self,
689713
table_name: TableName,
690714
current: t.Dict[str, exp.DataType],
691715
new: t.Dict[str, exp.DataType],
716+
*,
717+
ignore_destructive: bool = False,
692718
) -> t.List[exp.Alter]:
693-
"""
694-
Compares two schemas represented as dictionaries of column names and types.
695-
696-
Args:
697-
current: The current schema.
698-
new: The new schema.
699-
700-
Returns:
701-
The list of schema deltas.
702-
"""
703-
return self.compare_structs(
704-
table_name, columns_to_types_to_struct(current), columns_to_types_to_struct(new)
719+
return self._compare_structs(
720+
table_name,
721+
columns_to_types_to_struct(current),
722+
columns_to_types_to_struct(new),
723+
ignore_destructive=ignore_destructive,
705724
)
706725

707726

0 commit comments

Comments
 (0)