Skip to content

Commit 950e3b9

Browse files
committed
Add partitioning for ducklake databases
1 parent 57d2e9f commit 950e3b9

File tree

2 files changed

+88
-2
lines changed

2 files changed

+88
-2
lines changed

sqlmesh/core/engine_adapter/duckdb.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,3 +142,56 @@ def _normalize_decimal_value(self, col: exp.Expression, precision: int) -> exp.E
142142
exp.cast(col, "DOUBLE"),
143143
f"DECIMAL(38, {precision})",
144144
)
145+
146+
def _create_table(
147+
self,
148+
table_name_or_schema: t.Union[exp.Schema, TableName],
149+
expression: t.Optional[exp.Expression],
150+
exists: bool = True,
151+
replace: bool = False,
152+
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
153+
table_description: t.Optional[str] = None,
154+
column_descriptions: t.Optional[t.Dict[str, str]] = None,
155+
table_kind: t.Optional[str] = None,
156+
**kwargs: t.Any,
157+
) -> None:
158+
catalog = self.get_current_catalog()
159+
catalog_type_tuple = self.fetchone(
160+
exp.select("type")
161+
.from_("duckdb_databases()")
162+
.where(exp.column("database_name").eq(catalog))
163+
)
164+
catalog_type = catalog_type_tuple[0] if catalog_type_tuple else None
165+
166+
partitioned_by_exps = None
167+
if catalog_type == "ducklake":
168+
partitioned_by_exps = kwargs.pop("partitioned_by", None)
169+
170+
super()._create_table(
171+
table_name_or_schema,
172+
expression,
173+
exists,
174+
replace,
175+
columns_to_types,
176+
table_description,
177+
column_descriptions,
178+
table_kind,
179+
**kwargs,
180+
)
181+
182+
if partitioned_by_exps:
183+
# Schema object contains column definitions, so we extract Table
184+
table_name = (
185+
table_name_or_schema.this
186+
if isinstance(table_name_or_schema, exp.Schema)
187+
else table_name_or_schema
188+
)
189+
table_name_str = (
190+
table_name.sql(dialect=self.dialect)
191+
if isinstance(table_name, exp.Table)
192+
else table_name
193+
)
194+
partitioned_by_str = ", ".join(
195+
expr.sql(dialect=self.dialect) for expr in partitioned_by_exps
196+
)
197+
self.execute(f"ALTER TABLE {table_name_str} SET PARTITIONED BY ({partitioned_by_str});")

tests/core/engine_adapter/test_duckdb.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import typing as t
22

3+
import os
34
import pandas as pd # noqa: TID253
45
import pytest
6+
from pytest_mock.plugin import MockerFixture
57
from sqlglot import expressions as exp
68
from sqlglot import parse_one
7-
89
from sqlmesh.core.engine_adapter import DuckDBEngineAdapter, EngineAdapter
910
from tests.core.engine_adapter import to_sql_calls
1011

@@ -77,9 +78,12 @@ def test_set_current_catalog(make_mocked_engine_adapter: t.Callable, duck_conn):
7778
]
7879

7980

80-
def test_temporary_table(make_mocked_engine_adapter: t.Callable, duck_conn):
81+
def test_temporary_table(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
8182
adapter = make_mocked_engine_adapter(DuckDBEngineAdapter)
8283

84+
mocker.patch.object(adapter, "get_current_catalog", return_value="test_catalog")
85+
mocker.patch.object(adapter, "fetchone", return_value=("test_catalog",))
86+
8387
adapter.create_table(
8488
"test_table",
8589
{"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")},
@@ -103,3 +107,32 @@ def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None:
103107
adapter.drop_catalog(exp.to_identifier("foo"))
104108

105109
assert to_sql_calls(adapter) == ['DETACH DATABASE IF EXISTS "foo"']
110+
111+
112+
def test_ducklake_partitioning(adapter: EngineAdapter, duck_conn, tmp_path):
113+
os.chdir(tmp_path)
114+
catalog = "a_ducklake_db"
115+
116+
duck_conn.install_extension("ducklake")
117+
duck_conn.load_extension("ducklake")
118+
duck_conn.execute(f"ATTACH 'ducklake:{catalog}';")
119+
120+
# no partitions on catalog creation
121+
partition_info = duck_conn.execute(
122+
f"SELECT * FROM __ducklake_metadata_{catalog}.main.ducklake_partition_info"
123+
).fetchdf()
124+
assert partition_info.empty
125+
126+
adapter.set_current_catalog(catalog)
127+
adapter.create_schema("test_schema")
128+
adapter.create_table(
129+
"test_schema.test_table",
130+
{"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")},
131+
partitioned_by=[exp.to_column("a"), exp.to_column("b")],
132+
)
133+
134+
# 1 partition after table creation
135+
partition_info = duck_conn.execute(
136+
f"SELECT * FROM __ducklake_metadata_{catalog}.main.ducklake_partition_info"
137+
).fetchdf()
138+
assert partition_info.shape[0] == 1

0 commit comments

Comments
 (0)