From 2498ffbafbb99552371176c231ca07609553f788 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Tue, 12 Aug 2025 01:33:58 +0000 Subject: [PATCH 01/10] Fix: Use drop cascade in janitor --- sqlmesh/core/engine_adapter/athena.py | 4 +- sqlmesh/core/engine_adapter/base.py | 4 +- sqlmesh/core/snapshot/evaluator.py | 11 +- .../integration/test_integration_postgres.py | 250 ++++++++++++++++++ tests/core/test_snapshot_evaluator.py | 19 +- 5 files changed, 274 insertions(+), 14 deletions(-) diff --git a/sqlmesh/core/engine_adapter/athena.py b/sqlmesh/core/engine_adapter/athena.py index d549de3f4c..54268f0805 100644 --- a/sqlmesh/core/engine_adapter/athena.py +++ b/sqlmesh/core/engine_adapter/athena.py @@ -314,13 +314,13 @@ def _build_table_properties_exp( return None - def drop_table(self, table_name: TableName, exists: bool = True) -> None: + def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None: table = exp.to_table(table_name) if self._query_table_type(table) == "hive": self._truncate_table(table) - return super().drop_table(table_name=table, exists=exists) + return super().drop_table(table_name=table, exists=exists, **kwargs) def _truncate_table(self, table_name: TableName) -> None: table = exp.to_table(table_name) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 24ee99bba5..cef171e2b4 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1037,14 +1037,14 @@ def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = f"Can't drop data object '{data_object.to_table().sql(dialect=self.dialect)}' of type '{data_object.type.value}'" ) - def drop_table(self, table_name: TableName, exists: bool = True) -> None: + def drop_table(self, table_name: TableName, exists: bool = True, **kwargs: t.Any) -> None: """Drops a table. Args: table_name: The name of the table to drop. exists: If exists, defaults to True. """ - self._drop_object(name=table_name, exists=exists) + self._drop_object(name=table_name, exists=exists, **kwargs) def drop_managed_table(self, table_name: TableName, exists: bool = True) -> None: """Drops a managed table. diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 1531997c1b..559b35ce2f 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1207,6 +1207,10 @@ def _cleanup_snapshot( table_name, is_table_deployable=is_table_deployable, physical_schema=snapshot.physical_schema, + # we need to set cascade=true or we will get a 'cant drop because other objects depend on it'-style + # error on engines that enforce referential integrity, such as Postgres + # this situation can happen when a snapshot expires but downstream view snapshots that reference it have not yet expired + cascade=True, ) except Exception: # Use `get_data_object` to check if the table exists instead of `table_exists` since the former @@ -1726,7 +1730,7 @@ def migrate( def delete(self, name: str, **kwargs: t.Any) -> None: _check_table_db_is_physical_schema(name, kwargs["physical_schema"]) - self.adapter.drop_table(name) + self.adapter.drop_table(name, cascade=kwargs.pop("cascade", False)) logger.info("Dropped table '%s'", name) def _replace_query_for_model( @@ -2324,15 +2328,16 @@ def migrate( ) def delete(self, name: str, **kwargs: t.Any) -> None: + cascade = kwargs.pop("cascade", False) try: - self.adapter.drop_view(name) + self.adapter.drop_view(name, cascade=cascade) except Exception: logger.debug( "Failed to drop view '%s'. Trying to drop the materialized view instead", name, exc_info=True, ) - self.adapter.drop_view(name, materialized=True) + self.adapter.drop_view(name, materialized=True, cascade=cascade) logger.info("Dropped view '%s'", name) def _is_materialized_view(self, model: Model) -> bool: diff --git a/tests/core/engine_adapter/integration/test_integration_postgres.py b/tests/core/engine_adapter/integration/test_integration_postgres.py index 82172378ae..22d0c68f6f 100644 --- a/tests/core/engine_adapter/integration/test_integration_postgres.py +++ b/tests/core/engine_adapter/integration/test_integration_postgres.py @@ -1,13 +1,24 @@ import typing as t import pytest from pytest import FixtureRequest +from pathlib import Path from sqlmesh.core.engine_adapter import PostgresEngineAdapter +from sqlmesh.core.config import Config, DuckDBConnectionConfig +from tests.core.engine_adapter.integration import TestContext +import time_machine +from datetime import timedelta +from sqlmesh.utils.date import to_ds +from sqlglot import exp +from sqlmesh.core.context import Context +from sqlmesh.core.state_sync import CachingStateSync, EngineAdapterStateSync +from sqlmesh.core.snapshot.definition import SnapshotId from tests.core.engine_adapter.integration import ( TestContext, generate_pytest_params, ENGINES_BY_NAME, IntegrationTestEngine, + TEST_SCHEMA, ) @@ -33,3 +44,242 @@ def test_engine_adapter(ctx: TestContext): def test_server_version_psycopg(ctx: TestContext): assert isinstance(ctx.engine_adapter, PostgresEngineAdapter) assert ctx.engine_adapter.server_version != (0, 0) + + +def test_janitor_drop_cascade(ctx: TestContext, tmp_path: Path) -> None: + """ + Scenario: + Ensure that cleaning up expired table snapshots also cleans up any unexpired view snapshots that depend on them + - We create a A (table) <- B (view) + - In dev, we modify A - triggers new version of A and a dev preview of B that both expire in 7 days + - We advance time by 3 days + - In dev, we modify B - triggers a new version of B that depends on A but expires 3 days after A + - In dev, we create B(view) <- C(view) and B(view) <- D(table) + - We advance time by 5 days so that A has reached its expiry but B, C and D have not + - We expire dev so that none of these snapshots are promoted and are thus targets for cleanup + - We run the janitor + + Expected outcome: + - All the dev versions of A and B should be dropped + - C should be dropped as well because it's a view that depends on B which was dropped + - D should not be dropped because while it depends on B which was dropped, it's a table so is still valid after B is dropped + - We should NOT get a 'ERROR: cannot drop table x because other objects depend on it' + + Note that the references in state to the views that were cascade-dropped by postgres will still exist, this is considered ok + as applying a plan will recreate the physical objects + """ + + def _all_snapshot_ids(context: Context) -> t.List[SnapshotId]: + assert isinstance(context.state_sync, CachingStateSync) + assert isinstance(context.state_sync.state_sync, EngineAdapterStateSync) + + return [ + SnapshotId(name=name, identifier=identifier) + for name, identifier in context.state_sync.state_sync.engine_adapter.fetchall( + "select name, identifier from sqlmesh._snapshots" + ) + ] + + models_dir = tmp_path / "models" + models_dir.mkdir() + schema = exp.to_table(ctx.schema(TEST_SCHEMA)).this + + (models_dir / "model_a.sql").write_text(f""" + MODEL ( + name {schema}.model_a, + kind FULL + ); + SELECT 1 as a, 2 as b; + """) + + (models_dir / "model_b.sql").write_text(f""" + MODEL ( + name {schema}.model_b, + kind VIEW + ); + SELECT a from {schema}.model_a; + """) + + def _mutate_config(gateway: str, config: Config): + config.gateways[gateway].state_connection = DuckDBConnectionConfig( + database=str(tmp_path / "state.db") + ) + + with time_machine.travel("2020-01-01 00:00:00"): + sqlmesh = ctx.create_context( + path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False + ) + sqlmesh.plan(auto_apply=True) + + model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n) + # expiry is last updated + ttl + assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_a_snapshot.updated_ts) == "2020-01-01" + assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-08" + + model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n) + assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_b_snapshot.updated_ts) == "2020-01-01" + assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-08" + + model_a_prod_snapshot = model_a_snapshot + model_b_prod_snapshot = model_b_snapshot + + # move forward 1 days + # new dev environment - touch models to create new snapshots + # model a / b expiry in prod should remain unmodified + # model a / b expiry in dev should be as at today + with time_machine.travel("2020-01-02 00:00:00"): + (models_dir / "model_a.sql").write_text(f""" + MODEL ( + name {schema}.model_a, + kind FULL + ); + SELECT 1 as a, 2 as b, 3 as c; + """) + + sqlmesh = ctx.create_context( + path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False + ) + sqlmesh.plan(environment="dev", auto_apply=True) + + # should now have 4 snapshots in state - 2x model a and 2x model b + # the new model b is a dev preview because its upstream model changed + all_snapshot_ids = _all_snapshot_ids(sqlmesh) + assert len(all_snapshot_ids) == 4 + assert len([s for s in all_snapshot_ids if "model_a" in s.name]) == 2 + assert len([s for s in all_snapshot_ids if "model_b" in s.name]) == 2 + + # context just has the two latest + assert len(sqlmesh.snapshots) == 2 + + # these expire 1 day later than what's in prod + model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n) + assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02" + assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09" + + model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n) + assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_b_snapshot.updated_ts) == "2020-01-02" + assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-09" + + # move forward 3 days + # touch model b in dev but leave model a + # this bumps the model b expiry but model a remains unchanged, so will expire before model b even though model b depends on it + with time_machine.travel("2020-01-05 00:00:00"): + (models_dir / "model_b.sql").write_text(f""" + MODEL ( + name {schema}.model_b, + kind VIEW + ); + SELECT a, 'b' as b from {schema}.model_a; + """) + + (models_dir / "model_c.sql").write_text(f""" + MODEL ( + name {schema}.model_c, + kind VIEW + ); + SELECT a, 'c' as c from {schema}.model_b; + """) + + (models_dir / "model_d.sql").write_text(f""" + MODEL ( + name {schema}.model_d, + kind FULL + ); + SELECT a, 'd' as d from {schema}.model_b; + """) + + sqlmesh = ctx.create_context( + path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False + ) + sqlmesh.plan(environment="dev", auto_apply=True) + + # should now have 7 snapshots in state - 2x model a, 3x model b, 1x model c and 1x model d + all_snapshot_ids = _all_snapshot_ids(sqlmesh) + assert len(all_snapshot_ids) == 7 + assert len([s for s in all_snapshot_ids if "model_a" in s.name]) == 2 + assert len([s for s in all_snapshot_ids if "model_b" in s.name]) == 3 + assert len([s for s in all_snapshot_ids if "model_c" in s.name]) == 1 + assert len([s for s in all_snapshot_ids if "model_d" in s.name]) == 1 + + # context just has the 4 latest + assert len(sqlmesh.snapshots) == 4 + + # model a expiry should not have changed + model_a_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_a" in n) + assert timedelta(milliseconds=model_a_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_a_snapshot.updated_ts) == "2020-01-02" + assert to_ds(model_a_snapshot.expiration_ts) == "2020-01-09" + + # model b should now expire well after model a + model_b_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_b" in n) + assert timedelta(milliseconds=model_b_snapshot.ttl_ms) == timedelta(weeks=1) + assert to_ds(model_b_snapshot.updated_ts) == "2020-01-05" + assert to_ds(model_b_snapshot.expiration_ts) == "2020-01-12" + + # model c should expire at the same time as model b + model_c_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_c" in n) + assert to_ds(model_c_snapshot.updated_ts) == to_ds(model_b_snapshot.updated_ts) + assert to_ds(model_c_snapshot.expiration_ts) == to_ds(model_b_snapshot.expiration_ts) + + # model d should expire at the same time as model b + model_d_snapshot = next(s for n, s in sqlmesh.snapshots.items() if "model_d" in n) + assert to_ds(model_d_snapshot.updated_ts) == to_ds(model_b_snapshot.updated_ts) + assert to_ds(model_d_snapshot.expiration_ts) == to_ds(model_b_snapshot.expiration_ts) + + # move forward to date where after model a has expired but before model b has expired + # invalidate dev to trigger cleanups + # run janitor + # - table model a is expired so will be cleaned up and this will cascade to view model b + # - view model b is not expired, but because it got cascaded to, this will cascade again to view model c + # - table model d is a not a view, so even though its parent view model b got dropped, it doesnt need to be dropped + with time_machine.travel("2020-01-10 00:00:00"): + sqlmesh = ctx.create_context( + path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False + ) + + before_snapshot_ids = _all_snapshot_ids(sqlmesh) + + before_objects = ctx.get_metadata_results(f"sqlmesh__{schema}") + assert set(before_objects.tables) == set( + [ + exp.to_table(s.table_name()).text("this") + for s in (model_a_prod_snapshot, model_a_snapshot, model_d_snapshot) + ] + ) + assert set(before_objects.views).issuperset( + [ + exp.to_table(s.table_name()).text("this") + for s in (model_b_prod_snapshot, model_b_snapshot, model_c_snapshot) + ] + ) + + sqlmesh.invalidate_environment("dev") + sqlmesh.run_janitor(ignore_ttl=False) + + after_snapshot_ids = _all_snapshot_ids(sqlmesh) + + assert len(before_snapshot_ids) != len(after_snapshot_ids) + + # Everything should be left in state except the model_a snapshot, which expired + assert set(after_snapshot_ids) == set(before_snapshot_ids) - set( + [model_a_snapshot.snapshot_id] + ) + + # In the db, there should be: + # - the two original snapshots that were in prod, table model_a and view model_b + # - model d, even though its not promoted in any environment, because it's a table snapshot that hasnt expired yet + # the view snapshots that depended on model_a should be gone due to the cascading delete + after_objects = ctx.get_metadata_results(f"sqlmesh__{schema}") + assert set(after_objects.tables) == set( + [ + exp.to_table(s.table_name()).text("this") + for s in (model_a_prod_snapshot, model_d_snapshot) + ] + ) + assert after_objects.views == [ + exp.to_table(model_b_prod_snapshot.table_name()).text("this") + ] diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 60931b1602..073b4ba1ad 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -434,7 +434,8 @@ def create_and_cleanup(name: str, dev_table_only: bool): snapshot = create_and_cleanup("catalog.test_schema.test_model", True) adapter_mock.get_data_object.assert_not_called() adapter_mock.drop_table.assert_called_once_with( - f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" + f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev", + cascade=True, ) adapter_mock.reset_mock() @@ -443,9 +444,10 @@ def create_and_cleanup(name: str, dev_table_only: bool): adapter_mock.drop_table.assert_has_calls( [ call( - f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" + f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev", + cascade=True, ), - call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}"), + call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}", cascade=True), ] ) adapter_mock.reset_mock() @@ -454,8 +456,11 @@ def create_and_cleanup(name: str, dev_table_only: bool): adapter_mock.get_data_object.assert_not_called() adapter_mock.drop_table.assert_has_calls( [ - call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"), - call(f"sqlmesh__default.test_model__{snapshot.version}"), + call( + f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev", + cascade=True, + ), + call(f"sqlmesh__default.test_model__{snapshot.version}", cascade=True), ] ) @@ -4013,10 +4018,10 @@ def test_multiple_engine_cleanup(snapshot: Snapshot, adapters, make_snapshot): # The clean up will happen using the specific gateway the model was created with engine_adapters["default"].drop_table.assert_called_once_with( - f"sqlmesh__db.db__model__{snapshot.version}__dev" + f"sqlmesh__db.db__model__{snapshot.version}__dev", cascade=True ) engine_adapters["secondary"].drop_table.assert_called_once_with( - f"sqlmesh__test_schema.test_schema__test_model__{snapshot_2.version}__dev" + f"sqlmesh__test_schema.test_schema__test_model__{snapshot_2.version}__dev", cascade=True ) From 37fb0e3107e29cfc8ac5340334742abf2387c29c Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Sun, 17 Aug 2025 23:02:01 +0000 Subject: [PATCH 02/10] mypy --- tests/dbt/test_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dbt/test_adapter.py b/tests/dbt/test_adapter.py index 5a41d237d3..2bcb47b759 100644 --- a/tests/dbt/test_adapter.py +++ b/tests/dbt/test_adapter.py @@ -37,7 +37,7 @@ def test_adapter_relation(sushi_test_project: Project, runtime_renderer: t.Calla table_name="foo.another", target_columns_to_types={"col": exp.DataType.build("int")} ) engine_adapter.create_view( - view_name="foo.bar_view", query_or_df=parse_one("select * from foo.bar") + view_name="foo.bar_view", query_or_df=parse_one("select * from foo.bar", into=exp.Query) ) engine_adapter.create_table( table_name="ignored.ignore", target_columns_to_types={"col": exp.DataType.build("int")} From 10b0797428d75af5fe9e8aee93dd1853edf3e3b8 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Mon, 18 Aug 2025 00:05:25 +0000 Subject: [PATCH 03/10] Revert "mypy" This reverts commit 23340ce66c2b12067bd9cc85a123bb8256f0d38b. --- tests/dbt/test_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dbt/test_adapter.py b/tests/dbt/test_adapter.py index 2bcb47b759..5a41d237d3 100644 --- a/tests/dbt/test_adapter.py +++ b/tests/dbt/test_adapter.py @@ -37,7 +37,7 @@ def test_adapter_relation(sushi_test_project: Project, runtime_renderer: t.Calla table_name="foo.another", target_columns_to_types={"col": exp.DataType.build("int")} ) engine_adapter.create_view( - view_name="foo.bar_view", query_or_df=parse_one("select * from foo.bar", into=exp.Query) + view_name="foo.bar_view", query_or_df=parse_one("select * from foo.bar") ) engine_adapter.create_table( table_name="ignored.ignore", target_columns_to_types={"col": exp.DataType.build("int")} From 046b42e742f0e2e4d07702a105005d8e7fb48f08 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Mon, 18 Aug 2025 00:06:25 +0000 Subject: [PATCH 04/10] Fix test --- tests/core/test_snapshot_evaluator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 073b4ba1ad..3258e4622d 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -514,7 +514,8 @@ def test_cleanup_skip_missing_table(adapter_mock, make_snapshot): f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" ) adapter_mock.drop_table.assert_called_once_with( - f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" + f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev", + cascade=True, ) From 057d24c27d5c013d46acd6d83c4af285daafd930 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Tue, 19 Aug 2025 04:50:12 +0000 Subject: [PATCH 05/10] Add janitor test across all adapters and fix drop cascade in BigQuery --- sqlmesh/core/engine_adapter/bigquery.py | 15 ++ .../engine_adapter/integration/__init__.py | 11 +- .../integration/test_integration.py | 141 +++++++++++++++++- tests/core/engine_adapter/test_bigquery.py | 27 +++- 4 files changed, 185 insertions(+), 9 deletions(-) diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 4fe50fdeef..460bda07db 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -1260,6 +1260,21 @@ def _native_df_to_pandas_df( return super()._native_df_to_pandas_df(query_or_df) + def _drop_object( + self, + name: TableName | SchemaName, + exists: bool = True, + kind: str = "TABLE", + **drop_args: t.Any, + ) -> None: + if kind.upper() == "TABLE" and "cascade" in drop_args: + # BigQuery doesnt support DROP CASCADE for tables + # ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#drop_table_statement + # so set it to False here so SQLGlot doesnt output a CASCADE argument + drop_args["cascade"] = False + + super()._drop_object(name=name, exists=exists, kind=kind, **drop_args) + @property def _query_data(self) -> t.Any: return self._connection_pool.get_attribute("query_data") diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index 8476d992eb..baf45efa9c 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -27,7 +27,7 @@ from _pytest.mark.structures import ParameterSet if t.TYPE_CHECKING: - from sqlmesh.core._typing import TableName + from sqlmesh.core._typing import TableName, SchemaName from sqlmesh.core.engine_adapter._typing import Query TEST_SCHEMA = "test_schema" @@ -222,6 +222,13 @@ def df_type(self) -> t.Optional[str]: return self._test_type.split("-", maxsplit=1)[1] return None + @property + def engine_type(self) -> str: + if self.mark.startswith("gcp_postgres"): + return "gcp_postgres" + + return self.mark.split("_")[0] + @property def columns_to_types(self): if self._columns_to_types is None: @@ -307,7 +314,7 @@ def default_table_format(self) -> t.Optional[str]: def add_test_suffix(self, value: str) -> str: return f"{value}_{self.test_id}" - def get_metadata_results(self, schema: t.Optional[str] = None) -> MetadataResults: + def get_metadata_results(self, schema: t.Optional[SchemaName] = None) -> MetadataResults: schema = schema if schema else self.schema(TEST_SCHEMA) return MetadataResults.from_data_objects(self.engine_adapter.get_data_objects(schema)) diff --git a/tests/core/engine_adapter/integration/test_integration.py b/tests/core/engine_adapter/integration/test_integration.py index d1a9c5afaa..8caafd7052 100644 --- a/tests/core/engine_adapter/integration/test_integration.py +++ b/tests/core/engine_adapter/integration/test_integration.py @@ -21,6 +21,7 @@ from sqlmesh.core.config import load_config_from_paths from sqlmesh.core.config.connection import ConnectionConfig import sqlmesh.core.dialect as d +from sqlmesh.core.environment import EnvironmentSuffixTarget from sqlmesh.core.dialect import select_from_values from sqlmesh.core.model import Model, load_sql_based_model from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType @@ -2333,11 +2334,7 @@ def _normalize_snowflake(name: str, prefix_regex: str = "(sqlmesh__)(.*)"): k: [_normalize_snowflake(name) for name in v] for k, v in object_names.items() } - if ctx.mark.startswith("gcp_postgres"): - engine_type = "gcp_postgres" - else: - engine_type = ctx.mark.split("_")[0] - init_example_project(tmp_path, engine_type, schema_name=schema_name) + init_example_project(tmp_path, ctx.engine_type, schema_name=schema_name) config = load_config_from_paths( Config, @@ -3557,3 +3554,137 @@ def test_identifier_length_limit(ctx: TestContext): match=re.escape(match), ): adapter.create_table(long_table_name, {"col": exp.DataType.build("int")}) + + +@pytest.mark.parametrize( + "environment_suffix_target", + [ + EnvironmentSuffixTarget.TABLE, + EnvironmentSuffixTarget.SCHEMA, + EnvironmentSuffixTarget.CATALOG, + ], +) +def test_janitor( + ctx: TestContext, tmp_path: pathlib.Path, environment_suffix_target: EnvironmentSuffixTarget +): + if ( + environment_suffix_target == EnvironmentSuffixTarget.CATALOG + and not ctx.engine_adapter.SUPPORTS_CREATE_DROP_CATALOG + ): + pytest.skip("Engine does not support catalog-based virtual environments") + + schema = ctx.schema() # catalog.schema + parsed_schema = d.to_schema(schema) + + init_example_project(tmp_path, ctx.engine_type, schema_name=parsed_schema.db) + + def _set_config(_gateway: str, config: Config) -> None: + config.environment_suffix_target = environment_suffix_target + config.model_defaults.dialect = ctx.dialect + + sqlmesh = ctx.create_context(path=tmp_path, config_mutator=_set_config) + + sqlmesh.plan(auto_apply=True) + + # create a new model in dev + (tmp_path / "models" / "new_model.sql").write_text(f""" + MODEL ( + name {schema}.new_model, + kind FULL + ); + + select * from {schema}.full_model + """) + sqlmesh.load() + + result = sqlmesh.plan(environment="dev", auto_apply=True) + assert result.context_diff.is_new_environment + assert len(result.context_diff.new_snapshots) == 1 + new_model = list(result.context_diff.new_snapshots.values())[0] + assert "new_model" in new_model.name.lower() + + # check physical objects + snapshot_table_name = exp.to_table(new_model.table_name(), dialect=ctx.dialect) + snapshot_schema = snapshot_table_name.db + + prod_schema = normalize_identifiers(d.to_schema(schema), dialect=ctx.dialect) + dev_env_schema = prod_schema.copy() + if environment_suffix_target == EnvironmentSuffixTarget.CATALOG: + dev_env_schema.set("catalog", exp.to_identifier(f"{prod_schema.catalog}__dev")) + else: + dev_env_schema.set("db", exp.to_identifier(f"{prod_schema.db}__dev")) + normalize_identifiers(dev_env_schema, dialect=ctx.dialect) + + md = ctx.get_metadata_results(prod_schema) + if environment_suffix_target == EnvironmentSuffixTarget.TABLE: + assert sorted([v.lower() for v in md.views]) == [ + "full_model", + "incremental_model", + "new_model__dev", + "seed_model", + ] + else: + assert sorted([v.lower() for v in md.views]) == [ + "full_model", + "incremental_model", + "seed_model", + ] + assert not md.tables + assert not md.managed_tables + + if environment_suffix_target != EnvironmentSuffixTarget.TABLE: + # note: this is "catalog__dev.schema" for EnvironmentSuffixTarget.CATALOG and "catalog.schema__dev" for EnvironmentSuffixTarget.SCHEMA + md = ctx.get_metadata_results(dev_env_schema) + assert [v.lower() for v in md.views] == ["new_model"] + assert not md.tables + assert not md.managed_tables + + md = ctx.get_metadata_results(snapshot_schema) + assert not md.views + assert not md.managed_tables + assert sorted(t.split("__")[1].lower() for t in md.tables) == [ + "full_model", + "incremental_model", + "new_model", + "seed_model", + ] + + # invalidate dev and run the janitor to clean it up + sqlmesh.invalidate_environment("dev") + assert sqlmesh.run_janitor( + ignore_ttl=True + ) # ignore_ttl to delete the new_model snapshot even though it hasnt expired yet + + # there should be no dev environment or dev tables / schemas + md = ctx.get_metadata_results(prod_schema) + assert sorted([v.lower() for v in md.views]) == [ + "full_model", + "incremental_model", + "seed_model", + ] + assert not md.tables + assert not md.managed_tables + + if environment_suffix_target != EnvironmentSuffixTarget.TABLE: + if environment_suffix_target == EnvironmentSuffixTarget.SCHEMA: + md = ctx.get_metadata_results(dev_env_schema) + else: + try: + md = ctx.get_metadata_results(dev_env_schema) + except Exception as e: + # Most engines will raise an error when @set_catalog tries to set a catalog that doesnt exist + # in this case, we just swallow the error. We know this call already worked before in the earlier checks + md = MetadataResults() + + assert not md.views + assert not md.tables + assert not md.managed_tables + + md = ctx.get_metadata_results(snapshot_schema) + assert not md.views + assert not md.managed_tables + assert sorted(t.split("__")[1].lower() for t in md.tables) == [ + "full_model", + "incremental_model", + "seed_model", + ] diff --git a/tests/core/engine_adapter/test_bigquery.py b/tests/core/engine_adapter/test_bigquery.py index f5a287defb..84822e5fe1 100644 --- a/tests/core/engine_adapter/test_bigquery.py +++ b/tests/core/engine_adapter/test_bigquery.py @@ -20,8 +20,10 @@ @pytest.fixture -def adapter(make_mocked_engine_adapter: t.Callable) -> BigQueryEngineAdapter: - return make_mocked_engine_adapter(BigQueryEngineAdapter) +def adapter(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture) -> BigQueryEngineAdapter: + mocked_adapter = make_mocked_engine_adapter(BigQueryEngineAdapter) + mocker.patch("sqlmesh.core.engine_adapter.bigquery.BigQueryEngineAdapter.execute") + return mocked_adapter def test_insert_overwrite_by_time_partition_query( @@ -575,6 +577,8 @@ def test_begin_end_session(mocker: MockerFixture): def _to_sql_calls(execute_mock: t.Any, identify: bool = True) -> t.List[str]: + if isinstance(execute_mock, BigQueryEngineAdapter): + execute_mock = execute_mock.execute output = [] for call in execute_mock.call_args_list: value = call[0][0] @@ -1150,3 +1154,22 @@ def test_job_cancellation_on_keyboard_interrupt_job_already_done(mocker: MockerF # Verify job status was checked but cancellation was NOT called mock_job.done.assert_called_once() mock_job.cancel.assert_not_called() + + +def test_drop_cascade(adapter: BigQueryEngineAdapter): + adapter.drop_table("foo", cascade=True) + adapter.drop_table("foo", cascade=False) + + # BigQuery doesnt support DROP CASCADE for tables + # ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#drop_table_statement + assert _to_sql_calls(adapter) == ["DROP TABLE IF EXISTS `foo`", "DROP TABLE IF EXISTS `foo`"] + adapter.execute.reset_mock() # type: ignore + + # But, it does for schemas + adapter.drop_schema("foo", cascade=True) + adapter.drop_schema("foo", cascade=False) + + assert _to_sql_calls(adapter) == [ + "DROP SCHEMA IF EXISTS `foo` CASCADE", + "DROP SCHEMA IF EXISTS `foo`", + ] From 1daf71e0feaae684e1a8e5cdfda2bb6495095c17 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Tue, 19 Aug 2025 04:50:44 +0000 Subject: [PATCH 06/10] Enable cloud engines --- .circleci/continue_config.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index 8f8324a2a0..155d095844 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -310,10 +310,10 @@ workflows: - athena - fabric - gcp-postgres - filters: - branches: - only: - - main + #filters: + # branches: + # only: + # - main - ui_style - ui_test - vscode_test From 65fb037ca45271a659db537b883b72053d4117af Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Tue, 19 Aug 2025 05:39:51 +0000 Subject: [PATCH 07/10] Add supported drop cascade object indicators --- sqlmesh/core/engine_adapter/athena.py | 1 + sqlmesh/core/engine_adapter/base.py | 7 +++++++ sqlmesh/core/engine_adapter/base_postgres.py | 1 + sqlmesh/core/engine_adapter/bigquery.py | 16 +--------------- sqlmesh/core/engine_adapter/clickhouse.py | 2 +- sqlmesh/core/engine_adapter/duckdb.py | 1 + sqlmesh/core/engine_adapter/snowflake.py | 1 + sqlmesh/core/engine_adapter/spark.py | 1 + sqlmesh/core/engine_adapter/trino.py | 1 + 9 files changed, 15 insertions(+), 16 deletions(-) diff --git a/sqlmesh/core/engine_adapter/athena.py b/sqlmesh/core/engine_adapter/athena.py index 54268f0805..3ed34067d2 100644 --- a/sqlmesh/core/engine_adapter/athena.py +++ b/sqlmesh/core/engine_adapter/athena.py @@ -45,6 +45,7 @@ class AthenaEngineAdapter(PandasNativeFetchDFSupportMixin, RowDiffMixin): # >>> self._execute('/* test */ DESCRIBE foo') # pyathena.error.OperationalError: FAILED: ParseException line 1:0 cannot recognize input near '/' '*' 'test' ATTACH_CORRELATION_ID = False + SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA"] def __init__( self, *args: t.Any, s3_warehouse_location: t.Optional[str] = None, **kwargs: t.Any diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index cef171e2b4..c521803e8b 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -108,6 +108,7 @@ class EngineAdapter: SUPPORTS_CLONING = False SUPPORTS_MANAGED_MODELS = False SUPPORTS_CREATE_DROP_CATALOG = False + SUPPORTED_DROP_CASCADE_OBJECT_KINDS: t.List[str] = [] SCHEMA_DIFFER = SchemaDiffer() SUPPORTS_TUPLE_IN = True HAS_VIEW_BINDING = False @@ -1060,6 +1061,7 @@ def _drop_object( name: TableName | SchemaName, exists: bool = True, kind: str = "TABLE", + cascade: bool = False, **drop_args: t.Any, ) -> None: """Drops an object. @@ -1070,8 +1072,13 @@ def _drop_object( name: The name of the table to drop. exists: If exists, defaults to True. kind: What kind of object to drop. Defaults to TABLE + cascade: Whether or not to DROP ... CASCADE. + Note that this is ignored for :kind's that are not present in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS **drop_args: Any extra arguments to set on the Drop expression """ + if cascade and kind.upper() in self.SUPPORTED_DROP_CASCADE_OBJECT_KINDS: + drop_args["cascade"] = cascade + self.execute(exp.Drop(this=exp.to_table(name), kind=kind, exists=exists, **drop_args)) def get_alter_expressions( diff --git a/sqlmesh/core/engine_adapter/base_postgres.py b/sqlmesh/core/engine_adapter/base_postgres.py index aa46fba95a..26446aacfd 100644 --- a/sqlmesh/core/engine_adapter/base_postgres.py +++ b/sqlmesh/core/engine_adapter/base_postgres.py @@ -24,6 +24,7 @@ class BasePostgresEngineAdapter(EngineAdapter): DEFAULT_BATCH_SIZE = 400 COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY + SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"] def columns( self, table_name: TableName, include_pseudo_columns: bool = False diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 460bda07db..8f617df742 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -68,6 +68,7 @@ class BigQueryEngineAdapter(InsertOverwriteWithMergeMixin, ClusteredByMixin, Row SUPPORTS_CLONING = True MAX_TABLE_COMMENT_LENGTH = 1024 MAX_COLUMN_COMMENT_LENGTH = 1024 + SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"] SCHEMA_DIFFER = SchemaDiffer( compatible_types={ @@ -1260,21 +1261,6 @@ def _native_df_to_pandas_df( return super()._native_df_to_pandas_df(query_or_df) - def _drop_object( - self, - name: TableName | SchemaName, - exists: bool = True, - kind: str = "TABLE", - **drop_args: t.Any, - ) -> None: - if kind.upper() == "TABLE" and "cascade" in drop_args: - # BigQuery doesnt support DROP CASCADE for tables - # ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#drop_table_statement - # so set it to False here so SQLGlot doesnt output a CASCADE argument - drop_args["cascade"] = False - - super()._drop_object(name=name, exists=exists, kind=kind, **drop_args) - @property def _query_data(self) -> t.Any: return self._connection_pool.get_attribute("query_data") diff --git a/sqlmesh/core/engine_adapter/clickhouse.py b/sqlmesh/core/engine_adapter/clickhouse.py index 5ac4e9b152..d8743f7fef 100644 --- a/sqlmesh/core/engine_adapter/clickhouse.py +++ b/sqlmesh/core/engine_adapter/clickhouse.py @@ -614,6 +614,7 @@ def _drop_object( name: TableName | SchemaName, exists: bool = True, kind: str = "TABLE", + cascade: bool = False, **drop_args: t.Any, ) -> None: """Drops an object. @@ -626,7 +627,6 @@ def _drop_object( kind: What kind of object to drop. Defaults to TABLE **drop_args: Any extra arguments to set on the Drop expression """ - drop_args.pop("cascade", None) self.execute( exp.Drop( this=exp.to_table(name), diff --git a/sqlmesh/core/engine_adapter/duckdb.py b/sqlmesh/core/engine_adapter/duckdb.py index d90a4ed736..a3bebadbe9 100644 --- a/sqlmesh/core/engine_adapter/duckdb.py +++ b/sqlmesh/core/engine_adapter/duckdb.py @@ -37,6 +37,7 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY SUPPORTS_CREATE_DROP_CATALOG = True + SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"] @property def catalog_support(self) -> CatalogSupport: diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index f6fc32cc0a..69ff33b5a8 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -55,6 +55,7 @@ class SnowflakeEngineAdapter(GetCurrentCatalogFromFunctionMixin, ClusteredByMixi SUPPORTS_MANAGED_MODELS = True CURRENT_CATALOG_EXPRESSION = exp.func("current_database") SUPPORTS_CREATE_DROP_CATALOG = True + SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA", "TABLE"] SCHEMA_DIFFER = SchemaDiffer( parameterized_type_defaults={ exp.DataType.build("BINARY", dialect=DIALECT).this: [(8388608,)], diff --git a/sqlmesh/core/engine_adapter/spark.py b/sqlmesh/core/engine_adapter/spark.py index 5e37ba075e..4f6e9a984f 100644 --- a/sqlmesh/core/engine_adapter/spark.py +++ b/sqlmesh/core/engine_adapter/spark.py @@ -57,6 +57,7 @@ class SparkEngineAdapter( # currently check for storage formats we say we don't support REPLACE TABLE SUPPORTS_REPLACE_TABLE = False QUOTE_IDENTIFIERS_IN_VIEWS = False + SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA"] WAP_PREFIX = "wap_" BRANCH_PREFIX = "branch_" diff --git a/sqlmesh/core/engine_adapter/trino.py b/sqlmesh/core/engine_adapter/trino.py index e16cf2d76c..c62f7bef45 100644 --- a/sqlmesh/core/engine_adapter/trino.py +++ b/sqlmesh/core/engine_adapter/trino.py @@ -53,6 +53,7 @@ class TrinoEngineAdapter( COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY SUPPORTS_REPLACE_TABLE = False + SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"] DEFAULT_CATALOG_TYPE = "hive" QUOTE_IDENTIFIERS_IN_VIEWS = False SCHEMA_DIFFER = SchemaDiffer( From 879417cf5efb0a8a31d1a3849365b5bce9ff451d Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Tue, 19 Aug 2025 05:58:08 +0000 Subject: [PATCH 08/10] Fix test --- tests/core/engine_adapter/test_base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/core/engine_adapter/test_base.py b/tests/core/engine_adapter/test_base.py index 02029ca6f8..615d725712 100644 --- a/tests/core/engine_adapter/test_base.py +++ b/tests/core/engine_adapter/test_base.py @@ -3463,6 +3463,7 @@ def test_drop_view(make_mocked_engine_adapter: t.Callable): ) def test_drop_schema(kwargs, expected, make_mocked_engine_adapter: t.Callable): adapter = make_mocked_engine_adapter(EngineAdapter) + adapter.SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"] adapter.drop_schema(**kwargs) From 242d4daec17a0d0e6e914482e9d7ecdceb986760 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Wed, 20 Aug 2025 20:27:28 +0000 Subject: [PATCH 09/10] reinstate branch filter --- .circleci/continue_config.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index 155d095844..8f8324a2a0 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -310,10 +310,10 @@ workflows: - athena - fabric - gcp-postgres - #filters: - # branches: - # only: - # - main + filters: + branches: + only: + - main - ui_style - ui_test - vscode_test From 3fd21fc2c4832e36c4412652cdde3ab012dc06bf Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Wed, 20 Aug 2025 21:04:38 +0000 Subject: [PATCH 10/10] Fix test --- .../engine_adapter/integration/test_integration_postgres.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/core/engine_adapter/integration/test_integration_postgres.py b/tests/core/engine_adapter/integration/test_integration_postgres.py index 22d0c68f6f..26b8cbda42 100644 --- a/tests/core/engine_adapter/integration/test_integration_postgres.py +++ b/tests/core/engine_adapter/integration/test_integration_postgres.py @@ -195,7 +195,10 @@ def _mutate_config(gateway: str, config: Config): sqlmesh = ctx.create_context( path=tmp_path, config_mutator=_mutate_config, ephemeral_state_connection=False ) - sqlmesh.plan(environment="dev", auto_apply=True) + # need run=True to prevent a "start date is greater than end date" error + # since dev cant exceed what is in prod, and prod has no cadence runs, + # without run=True this plan gets start=2020-01-04 (now) end=2020-01-01 (last prod interval) which fails + sqlmesh.plan(environment="dev", auto_apply=True, run=True) # should now have 7 snapshots in state - 2x model a, 3x model b, 1x model c and 1x model d all_snapshot_ids = _all_snapshot_ids(sqlmesh)