Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sqlmesh/core/engine_adapter/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class AthenaEngineAdapter(PandasNativeFetchDFSupportMixin, RowDiffMixin):
# >>> self._execute('/* test */ DESCRIBE foo')
# pyathena.error.OperationalError: FAILED: ParseException line 1:0 cannot recognize input near '/' '*' 'test'
ATTACH_CORRELATION_ID = False
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA"]

def __init__(
self, *args: t.Any, s3_warehouse_location: t.Optional[str] = None, **kwargs: t.Any
Expand Down Expand Up @@ -314,13 +315,13 @@ def _build_table_properties_exp(

return None

def drop_table(self, table_name: TableName, exists: bool = True) -> None:
def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since drop_view() already had **kwargs I figured it was ok to add it to drop_table() as well

table = exp.to_table(table_name)

if self._query_table_type(table) == "hive":
self._truncate_table(table)

return super().drop_table(table_name=table, exists=exists)
return super().drop_table(table_name=table, exists=exists, **kwargs)

def _truncate_table(self, table_name: TableName) -> None:
table = exp.to_table(table_name)
Expand Down
11 changes: 9 additions & 2 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class EngineAdapter:
SUPPORTS_CLONING = False
SUPPORTS_MANAGED_MODELS = False
SUPPORTS_CREATE_DROP_CATALOG = False
SUPPORTED_DROP_CASCADE_OBJECT_KINDS: t.List[str] = []
SCHEMA_DIFFER = SchemaDiffer()
SUPPORTS_TUPLE_IN = True
HAS_VIEW_BINDING = False
Expand Down Expand Up @@ -1037,14 +1038,14 @@ def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool =
f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'"
)

def drop_table(self, table_name: TableName, exists: bool = True) -> None:
def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None:
"""Drops a table.

Args:
table_name: The name of the table to drop.
exists: If exists, defaults to True.
"""
self._drop_object(name=table_name, exists=exists)
self._drop_object(name=table_name, exists=exists, **kwargs)

def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None:
"""Drops a managed table.
Expand All @@ -1060,6 +1061,7 @@ def _drop_object(
name: TableName | SchemaName,
exists: bool = True,
kind: str = "TABLE",
cascade: bool = False,
**drop_args: t.Any,
) -> None:
"""Drops an object.
Expand All @@ -1070,8 +1072,13 @@ def _drop_object(
name: The name of the table to drop.
exists: If exists, defaults to True.
kind: What kind of object to drop. Defaults to TABLE
cascade: Whether or not to DROP ... CASCADE.
Note that this is ignored for :kind's that are not present in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS
**drop_args: Any extra arguments to set on the Drop expression
"""
if cascade and kind.upper() in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS:
drop_args["cascade"] = cascade
Comment on lines +1079 to +1080
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're expecting CASCADE semantics, should this warn if cascade is unsupported?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't really "expect" the CASCADE semantics per se. We just want to delete our stuff without error. We're forced to do CASCADE because otherwise the engine won't let us delete.

Copy link
Contributor

@georgesittas georgesittas Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair; I was thinking that a warning could simply surface the fact that some objects were left orphan because we couldn't cascade. Although, if we don't know which objects depend on the removed thing, then its value is probably questionable anyway.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I did consider warning but then I noticed we silently do nothing elsewhere so decided to keep with that strategy


self.execute(exp.Drop(this=exp.to_table(name), kind=kind, exists=exists, **drop_args))

def get_alter_expressions(
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/base_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class BasePostgresEngineAdapter(EngineAdapter):
DEFAULT_BATCH_SIZE = 400
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"]

def columns(
self, table_name: TableName, include_pseudo_columns: bool = False
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class BigQueryEngineAdapter(InsertOverwriteWithMergeMixin, ClusteredByMixin, Row
SUPPORTS_CLONING = True
MAX_TABLE_COMMENT_LENGTH = 1024
MAX_COLUMN_COMMENT_LENGTH = 1024
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"]

SCHEMA_DIFFER = SchemaDiffer(
compatible_types={
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/engine_adapter/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ def _drop_object(
name: TableName | SchemaName,
exists: bool = True,
kind: str = "TABLE",
cascade: bool = False,
**drop_args: t.Any,
) -> None:
"""Drops an object.
Expand All @@ -626,7 +627,6 @@ def _drop_object(
kind: What kind of object to drop. Defaults to TABLE
**drop_args: Any extra arguments to set on the Drop expression
"""
drop_args.pop("cascade", None)
self.execute(
exp.Drop(
this=exp.to_table(name),
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin,
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
SUPPORTS_CREATE_DROP_CATALOG = True
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"]

@property
def catalog_support(self) -> CatalogSupport:
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi
SUPPORTS_MANAGED_MODELS = True
CURRENT_CATALOG_EXPRESSION = exp.func("current_database")
SUPPORTS_CREATE_DROP_CATALOG = True
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA", "TABLE"]
SCHEMA_DIFFER = SchemaDiffer(
parameterized_type_defaults={
exp.DataType.build("BINARY", dialect=DIALECT).this: [(8388608,)],
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class SparkEngineAdapter(
# currently check for storage formats we say we don't support REPLACE TABLE
SUPPORTS_REPLACE_TABLE = False
QUOTE_IDENTIFIERS_IN_VIEWS = False
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA"]

WAP_PREFIX = "wap_"
BRANCH_PREFIX = "branch_"
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class TrinoEngineAdapter(
COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS
COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
SUPPORTS_REPLACE_TABLE = False
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"]
DEFAULT_CATALOG_TYPE = "hive"
QUOTE_IDENTIFIERS_IN_VIEWS = False
SCHEMA_DIFFER = SchemaDiffer(
Expand Down
11 changes: 8 additions & 3 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,10 @@ def _cleanup_snapshot(
table_name,
is_table_deployable=is_table_deployable,
physical_schema=snapshot.physical_schema,
# we need to set cascade=true or we will get a 'cant drop because other objects depend on it'-style
# error on engines that enforce referential integrity, such as Postgres
# this situation can happen when a snapshot expires but downstream view snapshots that reference it have not yet expired
cascade=True,
Copy link
Contributor

@themisvaltinos themisvaltinos Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we control this with a flag which is set if the engine supports cascade or not (maybe from the schema differ)? unless im doing something wrong I tried with a BigQuery project for example to run the janitor which stops when it tries to drop a table with the error Syntax error: Expected end of input but got keyword CASCADE at

relevant docs: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#drop_table_statement it seems cascade is supported for schema but not table

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we control this with a flag which is set if the engine supports cascade or not (maybe from the schema differ)?

Imo, this should happen further downstream, e.g., in the adapter itself.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nice catch, the downside of not running this test across all engines.

I naively thought SQLGlot would not generate the CASCADE output if it's unsupported (even if cascade=true on the AST node) but I guess the fact it doesn't is why its not considered a validator.

I'll improve the coverage and make sure this works on all engines

)
except Exception:
# Use `get_data_object` to check if the table exists instead of `table_exists` since the former
Expand Down Expand Up @@ -1726,7 +1730,7 @@ def migrate(

def delete(self, name: str, **kwargs: t.Any) -> None:
_check_table_db_is_physical_schema(name, kwargs["physical_schema"])
self.adapter.drop_table(name)
self.adapter.drop_table(name, cascade=kwargs.pop("cascade", False))
Copy link
Collaborator Author

@erindru erindru Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deliberately just pop off the cascade argument because delete() is called with a bunch of other arguments that arent relevant for drop_table() but end up making their way to the exp.Drop AST node in the EngineAdapter if they aren't filtered out here

logger.info("Dropped table '%s'", name)

def _replace_query_for_model(
Expand Down Expand Up @@ -2324,15 +2328,16 @@ def migrate(
)

def delete(self, name: str, **kwargs: t.Any) -> None:
cascade = kwargs.pop("cascade", False)
try:
self.adapter.drop_view(name)
self.adapter.drop_view(name, cascade=cascade)
except Exception:
logger.debug(
"Failed to drop view '%s'. Trying to drop the materialized view instead",
name,
exc_info=True,
)
self.adapter.drop_view(name, materialized=True)
self.adapter.drop_view(name, materialized=True, cascade=cascade)
logger.info("Dropped view '%s'", name)

def _is_materialized_view(self, model: Model) -> bool:
Expand Down
11 changes: 9 additions & 2 deletions tests/core/engine_adapter/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from _pytest.mark.structures import ParameterSet

if t.TYPE_CHECKING:
from sqlmesh.core._typing import TableName
from sqlmesh.core._typing import TableName, SchemaName
from sqlmesh.core.engine_adapter._typing import Query

TEST_SCHEMA = "test_schema"
Expand Down Expand Up @@ -222,6 +222,13 @@ def df_type(self) -> t.Optional[str]:
return self._test_type.split("-", maxsplit=1)[1]
return None

@property
def engine_type(self) -> str:
if self.mark.startswith("gcp_postgres"):
return "gcp_postgres"

return self.mark.split("_")[0]

@property
def columns_to_types(self):
if self._columns_to_types is None:
Expand Down Expand Up @@ -307,7 +314,7 @@ def default_table_format(self) -> t.Optional[str]:
def add_test_suffix(self, value: str) -> str:
return f"{value}_{self.test_id}"

def get_metadata_results(self, schema: t.Optional[str] = None) -> MetadataResults:
def get_metadata_results(self, schema: t.Optional[SchemaName] = None) -> MetadataResults:
schema = schema if schema else self.schema(TEST_SCHEMA)
return MetadataResults.from_data_objects(self.engine_adapter.get_data_objects(schema))

Expand Down
141 changes: 136 additions & 5 deletions tests/core/engine_adapter/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sqlmesh.core.config import load_config_from_paths
from sqlmesh.core.config.connection import ConnectionConfig
import sqlmesh.core.dialect as d
from sqlmesh.core.environment import EnvironmentSuffixTarget
from sqlmesh.core.dialect import select_from_values
from sqlmesh.core.model import Model, load_sql_based_model
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
Expand Down Expand Up @@ -2333,11 +2334,7 @@ def _normalize_snowflake(name: str, prefix_regex: str = "(sqlmesh__)(.*)"):
k: [_normalize_snowflake(name) for name in v] for k, v in object_names.items()
}

if ctx.mark.startswith("gcp_postgres"):
engine_type = "gcp_postgres"
else:
engine_type = ctx.mark.split("_")[0]
init_example_project(tmp_path, engine_type, schema_name=schema_name)
init_example_project(tmp_path, ctx.engine_type, schema_name=schema_name)

config = load_config_from_paths(
Config,
Expand Down Expand Up @@ -3557,3 +3554,137 @@ def test_identifier_length_limit(ctx: TestContext):
match=re.escape(match),
):
adapter.create_table(long_table_name, {"col": exp.DataType.build("int")})


@pytest.mark.parametrize(
"environment_suffix_target",
[
EnvironmentSuffixTarget.TABLE,
EnvironmentSuffixTarget.SCHEMA,
EnvironmentSuffixTarget.CATALOG,
],
)
def test_janitor(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test inits a project, creates a dev env, invalidates it, runs the janitor to clean it up and checks it was cleaned up

  • Across every engine we support
  • For every EnvironmentSuffixTarget we support

ctx: TestContext, tmp_path: pathlib.Path, environment_suffix_target: EnvironmentSuffixTarget
):
if (
environment_suffix_target == EnvironmentSuffixTarget.CATALOG
and not ctx.engine_adapter.SUPPORTS_CREATE_DROP_CATALOG
):
pytest.skip("Engine does not support catalog-based virtual environments")

schema = ctx.schema() # catalog.schema
parsed_schema = d.to_schema(schema)

init_example_project(tmp_path, ctx.engine_type, schema_name=parsed_schema.db)

def _set_config(_gateway: str, config: Config) -> None:
config.environment_suffix_target = environment_suffix_target
config.model_defaults.dialect = ctx.dialect

sqlmesh = ctx.create_context(path=tmp_path, config_mutator=_set_config)

sqlmesh.plan(auto_apply=True)

# create a new model in dev
(tmp_path / "models" / "new_model.sql").write_text(f"""
MODEL (
name {schema}.new_model,
kind FULL
);

select * from {schema}.full_model
""")
sqlmesh.load()

result = sqlmesh.plan(environment="dev", auto_apply=True)
assert result.context_diff.is_new_environment
assert len(result.context_diff.new_snapshots) == 1
new_model = list(result.context_diff.new_snapshots.values())[0]
assert "new_model" in new_model.name.lower()

# check physical objects
snapshot_table_name = exp.to_table(new_model.table_name(), dialect=ctx.dialect)
snapshot_schema = snapshot_table_name.db

prod_schema = normalize_identifiers(d.to_schema(schema), dialect=ctx.dialect)
dev_env_schema = prod_schema.copy()
if environment_suffix_target == EnvironmentSuffixTarget.CATALOG:
dev_env_schema.set("catalog", exp.to_identifier(f"{prod_schema.catalog}__dev"))
else:
dev_env_schema.set("db", exp.to_identifier(f"{prod_schema.db}__dev"))
normalize_identifiers(dev_env_schema, dialect=ctx.dialect)

md = ctx.get_metadata_results(prod_schema)
if environment_suffix_target == EnvironmentSuffixTarget.TABLE:
assert sorted([v.lower() for v in md.views]) == [
"full_model",
"incremental_model",
"new_model__dev",
"seed_model",
]
else:
assert sorted([v.lower() for v in md.views]) == [
"full_model",
"incremental_model",
"seed_model",
]
assert not md.tables
assert not md.managed_tables

if environment_suffix_target != EnvironmentSuffixTarget.TABLE:
# note: this is "catalog__dev.schema" for EnvironmentSuffixTarget.CATALOG and "catalog.schema__dev" for EnvironmentSuffixTarget.SCHEMA
md = ctx.get_metadata_results(dev_env_schema)
assert [v.lower() for v in md.views] == ["new_model"]
assert not md.tables
assert not md.managed_tables

md = ctx.get_metadata_results(snapshot_schema)
assert not md.views
assert not md.managed_tables
assert sorted(t.split("__")[1].lower() for t in md.tables) == [
"full_model",
"incremental_model",
"new_model",
"seed_model",
]

# invalidate dev and run the janitor to clean it up
sqlmesh.invalidate_environment("dev")
assert sqlmesh.run_janitor(
ignore_ttl=True
) # ignore_ttl to delete the new_model snapshot even though it hasnt expired yet

# there should be no dev environment or dev tables / schemas
md = ctx.get_metadata_results(prod_schema)
assert sorted([v.lower() for v in md.views]) == [
"full_model",
"incremental_model",
"seed_model",
]
assert not md.tables
assert not md.managed_tables

if environment_suffix_target != EnvironmentSuffixTarget.TABLE:
if environment_suffix_target == EnvironmentSuffixTarget.SCHEMA:
md = ctx.get_metadata_results(dev_env_schema)
else:
try:
md = ctx.get_metadata_results(dev_env_schema)
except Exception as e:
# Most engines will raise an error when @set_catalog tries to set a catalog that doesnt exist
# in this case, we just swallow the error. We know this call already worked before in the earlier checks
md = MetadataResults()

assert not md.views
assert not md.tables
assert not md.managed_tables

md = ctx.get_metadata_results(snapshot_schema)
assert not md.views
assert not md.managed_tables
assert sorted(t.split("__")[1].lower() for t in md.tables) == [
"full_model",
"incremental_model",
"seed_model",
]
Loading