From 0d3a1e36dd933d8a82828ed9df02a0eae3f17169 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Mon, 25 Aug 2025 13:26:41 -0700 Subject: [PATCH] Fix: Catalog creation / deletion for motherduck --- sqlmesh/core/config/connection.py | 4 ++ sqlmesh/core/engine_adapter/duckdb.py | 34 +++++++++++++---- sqlmesh/core/state_sync/common.py | 48 +++++++++++++----------- tests/core/engine_adapter/test_duckdb.py | 18 +++++++++ 4 files changed, 75 insertions(+), 29 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index b68b83a39a..1678f5d147 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -537,6 +537,10 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]: connection_str += f"{'&' if self.database else '?'}motherduck_token={self.token}" return {"database": connection_str, "config": custom_user_agent_config} + @property + def _extra_engine_config(self) -> t.Dict[str, t.Any]: + return {"is_motherduck": True} + class DuckDBConnectionConfig(BaseDuckDBConnectionConfig): """Configuration for the DuckDB connection.""" diff --git a/sqlmesh/core/engine_adapter/duckdb.py b/sqlmesh/core/engine_adapter/duckdb.py index 8fbe40a575..4bce813610 100644 --- a/sqlmesh/core/engine_adapter/duckdb.py +++ b/sqlmesh/core/engine_adapter/duckdb.py @@ -47,16 +47,30 @@ def set_current_catalog(self, catalog: str) -> None: self.execute(exp.Use(this=exp.to_identifier(catalog))) def _create_catalog(self, catalog_name: exp.Identifier) -> None: - db_filename = f"{catalog_name.output_name}.db" - self.execute( - exp.Attach(this=exp.alias_(exp.Literal.string(db_filename), catalog_name), exists=True) - ) + if not self._is_motherduck: + db_filename = f"{catalog_name.output_name}.db" + self.execute( + exp.Attach( + this=exp.alias_(exp.Literal.string(db_filename), catalog_name), exists=True + ) + ) + else: + self.execute( + exp.Create(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True) + ) def _drop_catalog(self, catalog_name: exp.Identifier) -> None: - db_file_path = Path(f"{catalog_name.output_name}.db") - self.execute(exp.Detach(this=catalog_name, exists=True)) - if db_file_path.exists(): - db_file_path.unlink() + if not self._is_motherduck: + db_file_path = Path(f"{catalog_name.output_name}.db") + self.execute(exp.Detach(this=catalog_name, exists=True)) + if db_file_path.exists(): + db_file_path.unlink() + else: + self.execute( + exp.Drop( + this=exp.Table(this=catalog_name), kind="DATABASE", cascade=True, exists=True + ) + ) def _df_to_source_queries( self, @@ -198,3 +212,7 @@ def _create_table( expr.sql(dialect=self.dialect) for expr in partitioned_by_exps ) self.execute(f"ALTER TABLE {table_name_str} SET PARTITIONED BY ({partitioned_by_str});") + + @property + def _is_motherduck(self) -> bool: + return self._extra_config.get("is_motherduck", False) diff --git a/sqlmesh/core/state_sync/common.py b/sqlmesh/core/state_sync/common.py index 12899da82e..cd8c389e33 100644 --- a/sqlmesh/core/state_sync/common.py +++ b/sqlmesh/core/state_sync/common.py @@ -7,6 +7,7 @@ import abc from dataclasses import dataclass +from sqlglot import exp from sqlmesh.core.console import Console from sqlmesh.core.dialect import schema_ @@ -29,7 +30,7 @@ def cleanup_expired_views( warn_on_delete_failure: bool = False, console: t.Optional[Console] = None, ) -> None: - expired_schema_environments = [ + expired_schema_or_catalog_environments = [ environment for environment in environments if environment.suffix_target.is_schema or environment.suffix_target.is_catalog @@ -45,8 +46,9 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin return default_adapter catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set() + schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set() - # Drop the schemas for the expired environments + # Collect schemas and catalogs to drop for engine_adapter, expired_catalog, expired_schema, suffix_target in { ( (engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)), @@ -58,29 +60,16 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin ), environment.suffix_target, ) - for environment in expired_schema_environments + for environment in expired_schema_or_catalog_environments for snapshot in environment.snapshots if snapshot.is_model and not snapshot.is_symbolic }: - schema = schema_(expired_schema, expired_catalog) - try: - engine_adapter.drop_schema( - schema, - ignore_if_not_exists=True, - cascade=True, - ) - - if suffix_target.is_catalog and expired_catalog: + if suffix_target.is_catalog: + if expired_catalog: catalogs_to_drop.add((engine_adapter, expired_catalog)) - - if console: - console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect)) - except Exception as e: - message = f"Failed to drop the expired environment schema '{schema}': {e}" - if warn_on_delete_failure: - logger.warning(message) - else: - raise SQLMeshError(message) from e + else: + schema = schema_(expired_schema, expired_catalog) + schemas_to_drop.add((engine_adapter, schema)) # Drop the views for the expired environments for engine_adapter, expired_view in { @@ -105,6 +94,23 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin else: raise SQLMeshError(message) from e + # Drop the schemas for the expired environments + for engine_adapter, schema in schemas_to_drop: + try: + engine_adapter.drop_schema( + schema, + ignore_if_not_exists=True, + cascade=True, + ) + if console: + console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect)) + except Exception as e: + message = f"Failed to drop the expired environment schema '{schema}': {e}" + if warn_on_delete_failure: + logger.warning(message) + else: + raise SQLMeshError(message) from e + # Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs # catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog' for engine_adapter, catalog in catalogs_to_drop: diff --git a/tests/core/engine_adapter/test_duckdb.py b/tests/core/engine_adapter/test_duckdb.py index 7799cefe0c..9fd65a6e66 100644 --- a/tests/core/engine_adapter/test_duckdb.py +++ b/tests/core/engine_adapter/test_duckdb.py @@ -101,6 +101,15 @@ def test_create_catalog(make_mocked_engine_adapter: t.Callable) -> None: assert to_sql_calls(adapter) == ["ATTACH IF NOT EXISTS 'foo.db' AS \"foo\""] +def test_create_catalog_motherduck(make_mocked_engine_adapter: t.Callable) -> None: + adapter: DuckDBEngineAdapter = make_mocked_engine_adapter( + DuckDBEngineAdapter, is_motherduck=True + ) + adapter.create_catalog(exp.to_identifier("foo")) + + assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "foo"'] + + def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None: adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(DuckDBEngineAdapter) adapter.drop_catalog(exp.to_identifier("foo")) @@ -108,6 +117,15 @@ def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None: assert to_sql_calls(adapter) == ['DETACH DATABASE IF EXISTS "foo"'] +def test_drop_catalog_motherduck(make_mocked_engine_adapter: t.Callable) -> None: + adapter: DuckDBEngineAdapter = make_mocked_engine_adapter( + DuckDBEngineAdapter, is_motherduck=True + ) + adapter.drop_catalog(exp.to_identifier("foo")) + + assert to_sql_calls(adapter) == ['DROP DATABASE IF EXISTS "foo" CASCADE'] + + def test_ducklake_partitioning(adapter: EngineAdapter, duck_conn, tmp_path): catalog = "a_ducklake_db"