Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions sqlmesh/core/engine_adapter/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,56 @@ def _normalize_decimal_value(self, col: exp.Expression, precision: int) -> exp.E
exp.cast(col, "DOUBLE"),
f"DECIMAL(38, {precision})",
)

def _create_table(
self,
table_name_or_schema: t.Union[exp.Schema, TableName],
expression: t.Optional[exp.Expression],
exists: bool = True,
replace: bool = False,
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
table_description: t.Optional[str] = None,
column_descriptions: t.Optional[t.Dict[str, str]] = None,
table_kind: t.Optional[str] = None,
**kwargs: t.Any,
) -> None:
catalog = self.get_current_catalog()
catalog_type_tuple = self.fetchone(
exp.select("type")
.from_("duckdb_databases()")
.where(exp.column("database_name").eq(catalog))
)
catalog_type = catalog_type_tuple[0] if catalog_type_tuple else None

partitioned_by_exps = None
if catalog_type == "ducklake":
partitioned_by_exps = kwargs.pop("partitioned_by", None)

super()._create_table(
table_name_or_schema,
expression,
exists,
replace,
columns_to_types,
table_description,
column_descriptions,
table_kind,
**kwargs,
)

if partitioned_by_exps:
# Schema object contains column definitions, so we extract Table
table_name = (
table_name_or_schema.this
if isinstance(table_name_or_schema, exp.Schema)
else table_name_or_schema
)
table_name_str = (
table_name.sql(dialect=self.dialect)
if isinstance(table_name, exp.Table)
else table_name
)
partitioned_by_str = ", ".join(
expr.sql(dialect=self.dialect) for expr in partitioned_by_exps
)
self.execute(f"ALTER TABLE {table_name_str} SET PARTITIONED BY ({partitioned_by_str});")
37 changes: 35 additions & 2 deletions tests/core/engine_adapter/test_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import pandas as pd # noqa: TID253
import pytest
from pytest_mock.plugin import MockerFixture
from sqlglot import expressions as exp
from sqlglot import parse_one

from sqlmesh.core.engine_adapter import DuckDBEngineAdapter, EngineAdapter
from tests.core.engine_adapter import to_sql_calls

Expand Down Expand Up @@ -77,9 +77,12 @@ def test_set_current_catalog(make_mocked_engine_adapter: t.Callable, duck_conn):
]


def test_temporary_table(make_mocked_engine_adapter: t.Callable, duck_conn):
def test_temporary_table(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
adapter = make_mocked_engine_adapter(DuckDBEngineAdapter)

mocker.patch.object(adapter, "get_current_catalog", return_value="test_catalog")
mocker.patch.object(adapter, "fetchone", return_value=("test_catalog",))

adapter.create_table(
"test_table",
{"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")},
Expand All @@ -103,3 +106,33 @@ def test_drop_catalog(make_mocked_engine_adapter: t.Callable) -> None:
adapter.drop_catalog(exp.to_identifier("foo"))

assert to_sql_calls(adapter) == ['DETACH DATABASE IF EXISTS "foo"']


def test_ducklake_partitioning(adapter: EngineAdapter, duck_conn, tmp_path):
catalog = "a_ducklake_db"

duck_conn.install_extension("ducklake")
duck_conn.load_extension("ducklake")
duck_conn.execute(
f"ATTACH 'ducklake:{catalog}.ducklake' AS {catalog} (DATA_PATH '{tmp_path}');"
)

# no partitions on catalog creation
partition_info = duck_conn.execute(
f"SELECT * FROM __ducklake_metadata_{catalog}.main.ducklake_partition_info"
).fetchdf()
assert partition_info.empty

adapter.set_current_catalog(catalog)
adapter.create_schema("test_schema")
adapter.create_table(
"test_schema.test_table",
{"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")},
partitioned_by=[exp.to_column("a"), exp.to_column("b")],
)

# 1 partition after table creation
partition_info = duck_conn.execute(
f"SELECT * FROM __ducklake_metadata_{catalog}.main.ducklake_partition_info"
).fetchdf()
assert partition_info.shape[0] == 1