From d1840e762933eca66e1584843b195da069edb7c4 Mon Sep 17 00:00:00 2001 From: geooo109 Date: Fri, 3 Oct 2025 14:13:00 +0300 Subject: [PATCH 1/2] fix: cluster_by as single string with multiple columns --- sqlmesh/dbt/model.py | 8 +++++++- tests/dbt/test_transformation.py | 14 ++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sqlmesh/dbt/model.py b/sqlmesh/dbt/model.py index d882f94942..05b375237c 100644 --- a/sqlmesh/dbt/model.py +++ b/sqlmesh/dbt/model.py @@ -157,7 +157,6 @@ class ModelConfig(BaseModelConfig): @field_validator( "unique_key", - "cluster_by", "tags", mode="before", ) @@ -165,6 +164,13 @@ class ModelConfig(BaseModelConfig): def _validate_list(cls, v: t.Union[str, t.List[str]]) -> t.List[str]: return ensure_list(v) + @field_validator("cluster_by", mode="before") + @classmethod + def _validate_cluster_by(cls, v: t.Union[str, t.List[str]]) -> t.Union[str, t.List[str]]: + if isinstance(v, str): + return [c.strip() for c in v.split(",")] + return ensure_list(v) + @field_validator("check_cols", mode="before") @classmethod def _validate_check_cols(cls, v: t.Union[str, t.List[str]]) -> t.Union[str, t.List[str]]: diff --git a/tests/dbt/test_transformation.py b/tests/dbt/test_transformation.py index 304ac57731..a928259fec 100644 --- a/tests/dbt/test_transformation.py +++ b/tests/dbt/test_transformation.py @@ -2306,6 +2306,20 @@ def test_model_cluster_by(): ) assert model.to_sqlmesh(context).clustered_by == [] + model = ModelConfig( + name="model", + alias="model", + package_name="package", + target_schema="test", + cluster_by="Bar, qux", + sql="SELECT * FROM baz", + materialized=Materialization.TABLE.value, + ) + assert model.to_sqlmesh(context).clustered_by == [ + exp.to_column('"BAR"'), + exp.to_column('"QUX"'), + ] + def test_snowflake_dynamic_table(): context = DbtContext() From da69c839e35b4d57350ced26b0562b95291f7e59 Mon Sep 17 00:00:00 2001 From: geooo109 Date: Thu, 9 Oct 2025 10:06:52 +0300 Subject: [PATCH 2/2] refactor impl --- sqlmesh/dbt/model.py | 16 ++++++------- tests/dbt/test_transformation.py | 40 ++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/sqlmesh/dbt/model.py b/sqlmesh/dbt/model.py index 05b375237c..fa84824a43 100644 --- a/sqlmesh/dbt/model.py +++ b/sqlmesh/dbt/model.py @@ -157,6 +157,7 @@ class ModelConfig(BaseModelConfig): @field_validator( "unique_key", + "cluster_by", "tags", mode="before", ) @@ -164,13 +165,6 @@ class ModelConfig(BaseModelConfig): def _validate_list(cls, v: t.Union[str, t.List[str]]) -> t.List[str]: return ensure_list(v) - @field_validator("cluster_by", mode="before") - @classmethod - def _validate_cluster_by(cls, v: t.Union[str, t.List[str]]) -> t.Union[str, t.List[str]]: - if isinstance(v, str): - return [c.strip() for c in v.split(",")] - return ensure_list(v) - @field_validator("check_cols", mode="before") @classmethod def _validate_check_cols(cls, v: t.Union[str, t.List[str]]) -> t.Union[str, t.List[str]]: @@ -607,7 +601,13 @@ def to_sqlmesh( clustered_by = [] for c in self.cluster_by: try: - clustered_by.append(d.parse_one(c, dialect=model_dialect)) + cluster_expr = exp.maybe_parse( + c, into=exp.Cluster, prefix="CLUSTER BY", dialect=model_dialect + ) + for expr in cluster_expr.expressions: + clustered_by.append( + expr.this if isinstance(expr, exp.Ordered) else expr + ) except SqlglotError as e: raise ConfigError( f"Failed to parse model '{self.canonical_name(context)}' cluster_by field '{c}' in '{self.path}': {e}" diff --git a/tests/dbt/test_transformation.py b/tests/dbt/test_transformation.py index a928259fec..97c5c37e75 100644 --- a/tests/dbt/test_transformation.py +++ b/tests/dbt/test_transformation.py @@ -2320,6 +2320,46 @@ def test_model_cluster_by(): exp.to_column('"QUX"'), ] + model = ModelConfig( + name="model", + alias="model", + package_name="package", + target_schema="test", + cluster_by=['"Bar,qux"'], + sql="SELECT * FROM baz", + materialized=Materialization.TABLE.value, + ) + assert model.to_sqlmesh(context).clustered_by == [ + exp.to_column('"Bar,qux"'), + ] + + model = ModelConfig( + name="model", + alias="model", + package_name="package", + target_schema="test", + cluster_by='"Bar,qux"', + sql="SELECT * FROM baz", + materialized=Materialization.TABLE.value, + ) + assert model.to_sqlmesh(context).clustered_by == [ + exp.to_column('"Bar,qux"'), + ] + + model = ModelConfig( + name="model", + alias="model", + package_name="package", + target_schema="test", + cluster_by=["to_date(Bar),qux"], + sql="SELECT * FROM baz", + materialized=Materialization.TABLE.value, + ) + assert model.to_sqlmesh(context).clustered_by == [ + exp.TsOrDsToDate(this=exp.to_column('"BAR"')), + exp.to_column('"QUX"'), + ] + def test_snowflake_dynamic_table(): context = DbtContext()