Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
connection_str += f"{'&' if self.database else '?'}motherduck_token={self.token}"
return {"database": connection_str, "config": custom_user_agent_config}

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {"is_motherduck": True}


class DuckDBConnectionConfig(BaseDuckDBConnectionConfig):
"""Configuration for the DuckDB connection."""
Expand Down
34 changes: 26 additions & 8 deletions sqlmesh/core/engine_adapter/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,30 @@ def set_current_catalog(self, catalog: str) -> None:
self.execute(exp.Use(this=exp.to_identifier(catalog)))

def _create_catalog(self, catalog_name: exp.Identifier) -> None:
db_filename = f"{catalog_name.output_name}.db"
self.execute(
exp.Attach(this=exp.alias_(exp.Literal.string(db_filename), catalog_name), exists=True)
)
if not self._is_motherduck:
db_filename = f"{catalog_name.output_name}.db"
self.execute(
exp.Attach(
this=exp.alias_(exp.Literal.string(db_filename), catalog_name), exists=True
)
)
else:
self.execute(
exp.Create(this=exp.Table(this=catalog_name), kind="DATABASE", exists=True)
)

def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
db_file_path = Path(f"{catalog_name.output_name}.db")
self.execute(exp.Detach(this=catalog_name, exists=True))
if db_file_path.exists():
db_file_path.unlink()
if not self._is_motherduck:
db_file_path = Path(f"{catalog_name.output_name}.db")
self.execute(exp.Detach(this=catalog_name, exists=True))
if db_file_path.exists():
db_file_path.unlink()
else:
self.execute(
exp.Drop(
this=exp.Table(this=catalog_name), kind="DATABASE", cascade=True, exists=True
)
)

def _df_to_source_queries(
self,
Expand Down Expand Up @@ -198,3 +212,7 @@ def _create_table(
expr.sql(dialect=self.dialect) for expr in partitioned_by_exps
)
self.execute(f"ALTER TABLE {table_name_str} SET PARTITIONED BY ({partitioned_by_str});")

@property
def _is_motherduck(self) -> bool:
return self._extra_config.get("is_motherduck", False)
48 changes: 27 additions & 21 deletions sqlmesh/core/state_sync/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import abc

from dataclasses import dataclass
from sqlglot import exp

from sqlmesh.core.console import Console
from sqlmesh.core.dialect import schema_
Expand All @@ -29,7 +30,7 @@ def cleanup_expired_views(
warn_on_delete_failure: bool = False,
console: t.Optional[Console] = None,
) -> None:
expired_schema_environments = [
expired_schema_or_catalog_environments = [
environment
for environment in environments
if environment.suffix_target.is_schema or environment.suffix_target.is_catalog
Expand All @@ -45,8 +46,9 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
return default_adapter

catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set()
schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set()

# Drop the schemas for the expired environments
# Collect schemas and catalogs to drop
for engine_adapter, expired_catalog, expired_schema, suffix_target in {
(
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
Expand All @@ -58,29 +60,16 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
),
environment.suffix_target,
)
for environment in expired_schema_environments
for environment in expired_schema_or_catalog_environments
for snapshot in environment.snapshots
if snapshot.is_model and not snapshot.is_symbolic
}:
schema = schema_(expired_schema, expired_catalog)
try:
engine_adapter.drop_schema(
schema,
ignore_if_not_exists=True,
cascade=True,
)

if suffix_target.is_catalog and expired_catalog:
if suffix_target.is_catalog:
if expired_catalog:
catalogs_to_drop.add((engine_adapter, expired_catalog))

if console:
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
except Exception as e:
message = f"Failed to drop the expired environment schema '{schema}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e
else:
schema = schema_(expired_schema, expired_catalog)
schemas_to_drop.add((engine_adapter, schema))

# Drop the views for the expired environments
for engine_adapter, expired_view in {
Expand All @@ -105,6 +94,23 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
else:
raise SQLMeshError(message) from e

# Drop the schemas for the expired environments
for engine_adapter, schema in schemas_to_drop:
try:
engine_adapter.drop_schema(
schema,
ignore_if_not_exists=True,
cascade=True,
)
if console:
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
except Exception as e:
message = f"Failed to drop the expired environment schema '{schema}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e

# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
for engine_adapter, catalog in catalogs_to_drop:
Expand Down
18 changes: 18 additions & 0 deletions tests/core/engine_adapter/test_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,31 @@ def test_create_catalog(make_mocked_engine_adapter: t.Callable) -> None:
assert to_sql_calls(adapter) == ["ATTACH IF NOT EXISTS 'foo.db' AS \"foo\""]


def test_create_catalog_motherduck(make_mocked_engine_adapter: t.Callable) -> None:
adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(
DuckDBEngineAdapter, is_motherduck=True
)
adapter.create_catalog(exp.to_identifier("foo"))

assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "foo"']


def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None:
adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(DuckDBEngineAdapter)
adapter.drop_catalog(exp.to_identifier("foo"))

assert to_sql_calls(adapter) == ['DETACH DATABASE IF EXISTS "foo"']


def test_drop_catalog_motherduck(make_mocked_engine_adapter: t.Callable) -> None:
adapter: DuckDBEngineAdapter = make_mocked_engine_adapter(
DuckDBEngineAdapter, is_motherduck=True
)
adapter.drop_catalog(exp.to_identifier("foo"))

assert to_sql_calls(adapter) == ['DROP DATABASE IF EXISTS "foo" CASCADE']


def test_ducklake_partitioning(adapter: EngineAdapter, duck_conn, tmp_path):
catalog = "a_ducklake_db"

Expand Down