From 950e3b9085125ede4d8397b6ad2ca5cd5ce5569e Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Fri, 25 Jul 2025 19:44:07 -0500 Subject: [PATCH 1/2] Add partitioning for ducklake databases --- sqlmesh/core/engine_adapter/duckdb.py | 53 ++++++++++++++++++++++++ tests/core/engine_adapter/test_duckdb.py | 37 ++++++++++++++++- 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/sqlmesh/core/engine_adapter/duckdb.py b/sqlmesh/core/engine_adapter/duckdb.py index 169a7a7f94..00be5f426a 100644 --- a/sqlmesh/core/engine_adapter/duckdb.py +++ b/sqlmesh/core/engine_adapter/duckdb.py @@ -142,3 +142,56 @@ def _normalize_decimal_value(self, col: exp.Expression, precision: int) -> exp.E exp.cast(col, "DOUBLE"), f"DECIMAL(38, {precision})", ) + + def _create_table( + self, + table_name_or_schema: t.Union[exp.Schema, TableName], + expression: t.Optional[exp.Expression], + exists: bool = True, + replace: bool = False, + columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, + table_description: t.Optional[str] = None, + column_descriptions: t.Optional[t.Dict[str, str]] = None, + table_kind: t.Optional[str] = None, + **kwargs: t.Any, + ) -> None: + catalog = self.get_current_catalog() + catalog_type_tuple = self.fetchone( + exp.select("type") + .from_("duckdb_databases()") + .where(exp.column("database_name").eq(catalog)) + ) + catalog_type = catalog_type_tuple[0] if catalog_type_tuple else None + + partitioned_by_exps = None + if catalog_type == "ducklake": + partitioned_by_exps = kwargs.pop("partitioned_by", None) + + super()._create_table( + table_name_or_schema, + expression, + exists, + replace, + columns_to_types, + table_description, + column_descriptions, + table_kind, + **kwargs, + ) + + if partitioned_by_exps: + # Schema object contains column definitions, so we extract Table + table_name = ( + table_name_or_schema.this + if isinstance(table_name_or_schema, exp.Schema) + else table_name_or_schema + ) + table_name_str = ( + table_name.sql(dialect=self.dialect) + if isinstance(table_name, exp.Table) + else table_name + ) + partitioned_by_str = ", ".join( + expr.sql(dialect=self.dialect) for expr in partitioned_by_exps + ) + self.execute(f"ALTER TABLE {table_name_str} SET PARTITIONED BY ({partitioned_by_str});") diff --git a/tests/core/engine_adapter/test_duckdb.py b/tests/core/engine_adapter/test_duckdb.py index 543b2e2f18..fc40036ecd 100644 --- a/tests/core/engine_adapter/test_duckdb.py +++ b/tests/core/engine_adapter/test_duckdb.py @@ -1,10 +1,11 @@ import typing as t +import os import pandas as pd # noqa: TID253 import pytest +from pytest_mock.plugin import MockerFixture from sqlglot import expressions as exp from sqlglot import parse_one - from sqlmesh.core.engine_adapter import DuckDBEngineAdapter, EngineAdapter from tests.core.engine_adapter import to_sql_calls @@ -77,9 +78,12 @@ def test_set_current_catalog(make_mocked_engine_adapter: t.Callable, duck_conn): ] -def test_temporary_table(make_mocked_engine_adapter: t.Callable, duck_conn): +def test_temporary_table(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): adapter = make_mocked_engine_adapter(DuckDBEngineAdapter) + mocker.patch.object(adapter, "get_current_catalog", return_value="test_catalog") + mocker.patch.object(adapter, "fetchone", return_value=("test_catalog",)) + adapter.create_table( "test_table", {"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")}, @@ -103,3 +107,32 @@ def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None: adapter.drop_catalog(exp.to_identifier("foo")) assert to_sql_calls(adapter) == ['DETACH DATABASE IF EXISTS "foo"'] + + +def test_ducklake_partitioning(adapter: EngineAdapter, duck_conn, tmp_path): + os.chdir(tmp_path) + catalog = "a_ducklake_db" + + duck_conn.install_extension("ducklake") + duck_conn.load_extension("ducklake") + duck_conn.execute(f"ATTACH 'ducklake:{catalog}';") + + # no partitions on catalog creation + partition_info = duck_conn.execute( + f"SELECT * FROM __ducklake_metadata_{catalog}.main.ducklake_partition_info" + ).fetchdf() + assert partition_info.empty + + adapter.set_current_catalog(catalog) + adapter.create_schema("test_schema") + adapter.create_table( + "test_schema.test_table", + {"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")}, + partitioned_by=[exp.to_column("a"), exp.to_column("b")], + ) + + # 1 partition after table creation + partition_info = duck_conn.execute( + f"SELECT * FROM __ducklake_metadata_{catalog}.main.ducklake_partition_info" + ).fetchdf() + assert partition_info.shape[0] == 1 From af86b9f732bdb9243a0c9ebe76e2bbe8a9c3e6d8 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Fri, 25 Jul 2025 20:07:50 -0500 Subject: [PATCH 2/2] Fix ATTACH in test --- tests/core/engine_adapter/test_duckdb.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/core/engine_adapter/test_duckdb.py b/tests/core/engine_adapter/test_duckdb.py index fc40036ecd..6442b1a0b4 100644 --- a/tests/core/engine_adapter/test_duckdb.py +++ b/tests/core/engine_adapter/test_duckdb.py @@ -1,6 +1,5 @@ import typing as t -import os import pandas as pd # noqa: TID253 import pytest from pytest_mock.plugin import MockerFixture @@ -110,12 +109,13 @@ def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None: def test_ducklake_partitioning(adapter: EngineAdapter, duck_conn, tmp_path): - os.chdir(tmp_path) catalog = "a_ducklake_db" duck_conn.install_extension("ducklake") duck_conn.load_extension("ducklake") - duck_conn.execute(f"ATTACH 'ducklake:{catalog}';") + duck_conn.execute( + f"ATTACH 'ducklake:{catalog}.ducklake' AS {catalog} (DATA_PATH '{tmp_path}');" + ) # no partitions on catalog creation partition_info = duck_conn.execute(