diff --git a/sqlmesh/core/engine_adapter/duckdb.py b/sqlmesh/core/engine_adapter/duckdb.py index 169a7a7f94..00be5f426a 100644 --- a/sqlmesh/core/engine_adapter/duckdb.py +++ b/sqlmesh/core/engine_adapter/duckdb.py @@ -142,3 +142,56 @@ def _normalize_decimal_value(self, col: exp.Expression, precision: int) -> exp.E exp.cast(col, "DOUBLE"), f"DECIMAL(38, {precision})", ) + + def _create_table( + self, + table_name_or_schema: t.Union[exp.Schema, TableName], + expression: t.Optional[exp.Expression], + exists: bool = True, + replace: bool = False, + columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + table_description: t.Optional[str] = None, + column_descriptions: t.Optional[t.Dict[str, str]] = None, + table_kind: t.Optional[str] = None, + **kwargs: t.Any, + ) -> None: + catalog = self.get_current_catalog() + catalog_type_tuple = self.fetchone( + exp.select("type") + .from_("duckdb_databases()") + .where(exp.column("database_name").eq(catalog)) + ) + catalog_type = catalog_type_tuple[0] if catalog_type_tuple else None + + partitioned_by_exps = None + if catalog_type == "ducklake": + partitioned_by_exps = kwargs.pop("partitioned_by", None) + + super()._create_table( + table_name_or_schema, + expression, + exists, + replace, + columns_to_types, + table_description, + column_descriptions, + table_kind, + **kwargs, + ) + + if partitioned_by_exps: + # Schema object contains column definitions, so we extract Table + table_name = ( + table_name_or_schema.this + if isinstance(table_name_or_schema, exp.Schema) + else table_name_or_schema + ) + table_name_str = ( + table_name.sql(dialect=self.dialect) + if isinstance(table_name, exp.Table) + else table_name + ) + partitioned_by_str = ", ".join( + expr.sql(dialect=self.dialect) for expr in partitioned_by_exps + ) + self.execute(f"ALTER TABLE {table_name_str} SET PARTITIONED BY ({partitioned_by_str});") diff --git a/tests/core/engine_adapter/test_duckdb.py b/tests/core/engine_adapter/test_duckdb.py index 543b2e2f18..6442b1a0b4 100644 --- a/tests/core/engine_adapter/test_duckdb.py +++ b/tests/core/engine_adapter/test_duckdb.py @@ -2,9 +2,9 @@ import pandas as pd # noqa: TID253 import pytest +from pytest_mock.plugin import MockerFixture from sqlglot import expressions as exp from sqlglot import parse_one - from sqlmesh.core.engine_adapter import DuckDBEngineAdapter, EngineAdapter from tests.core.engine_adapter import to_sql_calls @@ -77,9 +77,12 @@ def test_set_current_catalog(make_mocked_engine_adapter: t.Callable, duck_conn): ] -def test_temporary_table(make_mocked_engine_adapter: t.Callable, duck_conn): +def test_temporary_table(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): adapter = make_mocked_engine_adapter(DuckDBEngineAdapter) + mocker.patch.object(adapter, "get_current_catalog", return_value="test_catalog") + mocker.patch.object(adapter, "fetchone", return_value=("test_catalog",)) + adapter.create_table( "test_table", {"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")}, @@ -103,3 +106,33 @@ def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None: adapter.drop_catalog(exp.to_identifier("foo")) assert to_sql_calls(adapter) == ['DETACH DATABASE IF EXISTS "foo"'] + + +def test_ducklake_partitioning(adapter: EngineAdapter, duck_conn, tmp_path): + catalog = "a_ducklake_db" + + duck_conn.install_extension("ducklake") + duck_conn.load_extension("ducklake") + duck_conn.execute( + f"ATTACH 'ducklake:{catalog}.ducklake' AS {catalog} (DATA_PATH '{tmp_path}');" + ) + + # no partitions on catalog creation + partition_info = duck_conn.execute( + f"SELECT * FROM __ducklake_metadata_{catalog}.main.ducklake_partition_info" + ).fetchdf() + assert partition_info.empty + + adapter.set_current_catalog(catalog) + adapter.create_schema("test_schema") + adapter.create_table( + "test_schema.test_table", + {"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")}, + partitioned_by=[exp.to_column("a"), exp.to_column("b")], + ) + + # 1 partition after table creation + partition_info = duck_conn.execute( + f"SELECT * FROM __ducklake_metadata_{catalog}.main.ducklake_partition_info" + ).fetchdf() + assert partition_info.shape[0] == 1