Skip to content

Commit f73cdfe

Browse files
authored
Fix: Use drop cascade in janitor (#5133)
1 parent 1e2760f commit f73cdfe

File tree

16 files changed

+465
-25
lines changed

16 files changed

+465
-25
lines changed

sqlmesh/core/engine_adapter/athena.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class AthenaEngineAdapter(PandasNativeFetchDFSupportMixin, RowDiffMixin):
4545
# >>> self._execute('/* test */ DESCRIBE foo')
4646
# pyathena.error.OperationalError: FAILED: ParseException line 1:0 cannot recognize input near '/' '*' 'test'
4747
ATTACH_CORRELATION_ID = False
48+
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA"]
4849

4950
def __init__(
5051
self, *args: t.Any, s3_warehouse_location: t.Optional[str] = None, **kwargs: t.Any
@@ -314,13 +315,13 @@ def _build_table_properties_exp(
314315

315316
return None
316317

317-
def drop_table(self, table_name: TableName, exists: bool = True) -> None:
318+
def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None:
318319
table = exp.to_table(table_name)
319320

320321
if self._query_table_type(table) == "hive":
321322
self._truncate_table(table)
322323

323-
return super().drop_table(table_name=table, exists=exists)
324+
return super().drop_table(table_name=table, exists=exists, **kwargs)
324325

325326
def _truncate_table(self, table_name: TableName) -> None:
326327
table = exp.to_table(table_name)

sqlmesh/core/engine_adapter/base.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ class EngineAdapter:
108108
SUPPORTS_CLONING = False
109109
SUPPORTS_MANAGED_MODELS = False
110110
SUPPORTS_CREATE_DROP_CATALOG = False
111+
SUPPORTED_DROP_CASCADE_OBJECT_KINDS: t.List[str] = []
111112
SCHEMA_DIFFER = SchemaDiffer()
112113
SUPPORTS_TUPLE_IN = True
113114
HAS_VIEW_BINDING = False
@@ -1044,14 +1045,14 @@ def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool =
10441045
f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'"
10451046
)
10461047

1047-
def drop_table(self, table_name: TableName, exists: bool = True) -> None:
1048+
def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None:
10481049
"""Drops a table.
10491050
10501051
Args:
10511052
table_name: The name of the table to drop.
10521053
exists: If exists, defaults to True.
10531054
"""
1054-
self._drop_object(name=table_name, exists=exists)
1055+
self._drop_object(name=table_name, exists=exists, **kwargs)
10551056

10561057
def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None:
10571058
"""Drops a managed table.
@@ -1067,6 +1068,7 @@ def _drop_object(
10671068
name: TableName | SchemaName,
10681069
exists: bool = True,
10691070
kind: str = "TABLE",
1071+
cascade: bool = False,
10701072
**drop_args: t.Any,
10711073
) -> None:
10721074
"""Drops an object.
@@ -1077,8 +1079,13 @@ def _drop_object(
10771079
name: The name of the table to drop.
10781080
exists: If exists, defaults to True.
10791081
kind: What kind of object to drop. Defaults to TABLE
1082+
cascade: Whether or not to DROP ... CASCADE.
1083+
Note that this is ignored for :kind's that are not present in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS
10801084
**drop_args: Any extra arguments to set on the Drop expression
10811085
"""
1086+
if cascade and kind.upper() in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS:
1087+
drop_args["cascade"] = cascade
1088+
10821089
self.execute(exp.Drop(this=exp.to_table(name), kind=kind, exists=exists, **drop_args))
10831090

10841091
def get_alter_operations(

sqlmesh/core/engine_adapter/base_postgres.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class BasePostgresEngineAdapter(EngineAdapter):
2424
DEFAULT_BATCH_SIZE = 400
2525
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
2626
COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
27+
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"]
2728

2829
def columns(
2930
self, table_name: TableName, include_pseudo_columns: bool = False

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class BigQueryEngineAdapter(InsertOverwriteWithMergeMixin, ClusteredByMixin, Row
6666
SUPPORTS_CLONING = True
6767
MAX_TABLE_COMMENT_LENGTH = 1024
6868
MAX_COLUMN_COMMENT_LENGTH = 1024
69+
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"]
6970

7071
SCHEMA_DIFFER = SchemaDiffer(
7172
compatible_types={

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,7 @@ def _drop_object(
616616
name: TableName | SchemaName,
617617
exists: bool = True,
618618
kind: str = "TABLE",
619+
cascade: bool = False,
619620
**drop_args: t.Any,
620621
) -> None:
621622
"""Drops an object.
@@ -628,7 +629,6 @@ def _drop_object(
628629
kind: What kind of object to drop. Defaults to TABLE
629630
**drop_args: Any extra arguments to set on the Drop expression
630631
"""
631-
drop_args.pop("cascade", None)
632632
self.execute(
633633
exp.Drop(
634634
this=exp.to_table(name),

sqlmesh/core/engine_adapter/duckdb.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin,
3737
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
3838
COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
3939
SUPPORTS_CREATE_DROP_CATALOG = True
40+
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"]
4041

4142
@property
4243
def catalog_support(self) -> CatalogSupport:

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi
5555
SUPPORTS_MANAGED_MODELS = True
5656
CURRENT_CATALOG_EXPRESSION = exp.func("current_database")
5757
SUPPORTS_CREATE_DROP_CATALOG = True
58+
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA", "TABLE"]
5859
SCHEMA_DIFFER = SchemaDiffer(
5960
parameterized_type_defaults={
6061
exp.DataType.build("BINARY", dialect=DIALECT).this: [(8388608,)],

sqlmesh/core/engine_adapter/spark.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class SparkEngineAdapter(
5757
# currently check for storage formats we say we don't support REPLACE TABLE
5858
SUPPORTS_REPLACE_TABLE = False
5959
QUOTE_IDENTIFIERS_IN_VIEWS = False
60+
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA"]
6061

6162
WAP_PREFIX = "wap_"
6263
BRANCH_PREFIX = "branch_"

sqlmesh/core/engine_adapter/trino.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class TrinoEngineAdapter(
5353
COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS
5454
COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
5555
SUPPORTS_REPLACE_TABLE = False
56+
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"]
5657
DEFAULT_CATALOG_TYPE = "hive"
5758
QUOTE_IDENTIFIERS_IN_VIEWS = False
5859
SCHEMA_DIFFER = SchemaDiffer(

sqlmesh/core/snapshot/evaluator.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,10 @@ def _cleanup_snapshot(
12411241
table_name,
12421242
is_table_deployable=is_table_deployable,
12431243
physical_schema=snapshot.physical_schema,
1244+
# we need to set cascade=true or we will get a 'cant drop because other objects depend on it'-style
1245+
# error on engines that enforce referential integrity, such as Postgres
1246+
# this situation can happen when a snapshot expires but downstream view snapshots that reference it have not yet expired
1247+
cascade=True,
12441248
)
12451249
except Exception:
12461250
# Use `get_data_object` to check if the table exists instead of `table_exists` since the former
@@ -1771,7 +1775,7 @@ def migrate(
17711775

17721776
def delete(self, name: str, **kwargs: t.Any) -> None:
17731777
_check_table_db_is_physical_schema(name, kwargs["physical_schema"])
1774-
self.adapter.drop_table(name)
1778+
self.adapter.drop_table(name, cascade=kwargs.pop("cascade", False))
17751779
logger.info("Dropped table '%s'", name)
17761780

17771781
def _replace_query_for_model(
@@ -2372,15 +2376,16 @@ def migrate(
23722376
)
23732377

23742378
def delete(self, name: str, **kwargs: t.Any) -> None:
2379+
cascade = kwargs.pop("cascade", False)
23752380
try:
2376-
self.adapter.drop_view(name)
2381+
self.adapter.drop_view(name, cascade=cascade)
23772382
except Exception:
23782383
logger.debug(
23792384
"Failed to drop view '%s'. Trying to drop the materialized view instead",
23802385
name,
23812386
exc_info=True,
23822387
)
2383-
self.adapter.drop_view(name, materialized=True)
2388+
self.adapter.drop_view(name, materialized=True, cascade=cascade)
23842389
logger.info("Dropped view '%s'", name)
23852390

23862391
def _is_materialized_view(self, model: Model) -> bool:

0 commit comments

Comments
 (0)