Skip to content

Commit fc811ba

Browse files
Fix: Dont normalize aliases in merge and when matched
1 parent 1097cef commit fc811ba

File tree

5 files changed

+56
-82
lines changed

5 files changed

+56
-82
lines changed

sqlmesh/core/dialect.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,18 +1421,10 @@ def replace_merge_table_aliases(
14211421
"""
14221422
from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS
14231423

1424-
normalized_merge_source_alias = quote_identifiers(
1425-
normalize_identifiers(exp.to_identifier(MERGE_SOURCE_ALIAS), dialect), dialect=dialect
1426-
)
1427-
1428-
normalized_merge_target_alias = quote_identifiers(
1429-
normalize_identifiers(exp.to_identifier(MERGE_TARGET_ALIAS), dialect), dialect=dialect
1430-
)
1431-
14321424
if isinstance(expression, exp.Column) and (first_part := expression.parts[0]):
14331425
if first_part.this.lower() in ("target", "dbt_internal_dest", "__merge_target__"):
1434-
first_part.replace(normalized_merge_target_alias)
1426+
first_part.replace(exp.to_identifier(MERGE_TARGET_ALIAS, quoted=True))
14351427
elif first_part.this.lower() in ("source", "dbt_internal_source", "__merge_source__"):
1436-
first_part.replace(normalized_merge_source_alias)
1428+
first_part.replace(exp.to_identifier(MERGE_SOURCE_ALIAS, quoted=True))
14371429

14381430
return expression

sqlmesh/core/model/kind.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -478,10 +478,9 @@ def _when_matched_validator(
478478
v = v[1:-1]
479479

480480
v = t.cast(exp.Whens, d.parse_one(v, into=exp.Whens, dialect=dialect))
481-
else:
482-
v = t.cast(exp.Whens, v.transform(d.replace_merge_table_aliases, dialect=dialect))
483481

484-
return validate_expression(v, dialect=dialect)
482+
v = validate_expression(v, dialect=dialect)
483+
return t.cast(exp.Whens, v.transform(d.replace_merge_table_aliases, dialect=dialect))
485484

486485
@field_validator("merge_filter", mode="before")
487486
def _merge_filter_validator(
@@ -497,10 +496,9 @@ def _merge_filter_validator(
497496
if isinstance(v, str):
498497
v = v.strip()
499498
v = d.parse_one(v, dialect=dialect)
500-
else:
501-
v = v.transform(d.replace_merge_table_aliases, dialect=dialect)
502499

503-
return validate_expression(v, dialect=dialect)
500+
v = validate_expression(v, dialect=dialect)
501+
return v.transform(d.replace_merge_table_aliases, dialect=dialect)
504502

505503
@property
506504
def data_hash_values(self) -> t.List[t.Optional[str]]:

tests/core/test_model.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5479,7 +5479,7 @@ def test_when_matched():
54795479
"""
54805480
)
54815481

5482-
expected_when_matched = "(WHEN MATCHED THEN UPDATE SET `__merge_target__`.`salary` = COALESCE(`__merge_source__`.`salary`, `__merge_target__`.`salary`))"
5482+
expected_when_matched = "(WHEN MATCHED THEN UPDATE SET `__MERGE_TARGET__`.`salary` = COALESCE(`__MERGE_SOURCE__`.`salary`, `__MERGE_TARGET__`.`salary`))"
54835483

54845484
model = load_sql_based_model(expressions, dialect="hive")
54855485
assert model.kind.when_matched.sql(dialect="hive") == expected_when_matched
@@ -5513,9 +5513,9 @@ def test_when_matched():
55135513
kind INCREMENTAL_BY_UNIQUE_KEY (
55145514
unique_key ("purchase_order_id"),
55155515
when_matched (
5516-
WHEN MATCHED AND "__merge_source__"."_operation" = 1 THEN DELETE
5517-
WHEN MATCHED AND "__merge_source__"."_operation" <> 1 THEN UPDATE SET
5518-
"__merge_target__"."purchase_order_id" = 1
5516+
WHEN MATCHED AND "__MERGE_SOURCE__"."_operation" = 1 THEN DELETE
5517+
WHEN MATCHED AND "__MERGE_SOURCE__"."_operation" <> 1 THEN UPDATE SET
5518+
"__MERGE_TARGET__"."purchase_order_id" = 1
55195519
),
55205520
batch_concurrency 1,
55215521
forward_only FALSE,
@@ -5566,7 +5566,7 @@ def fingerprint_merge(
55665566
kind INCREMENTAL_BY_UNIQUE_KEY (
55675567
unique_key ("purchase_order_id"),
55685568
when_matched (
5569-
WHEN MATCHED AND "__merge_source__"."salary" <> "__merge_target__"."salary" THEN UPDATE SET
5569+
WHEN MATCHED AND "__MERGE_SOURCE__"."salary" <> "__MERGE_TARGET__"."salary" THEN UPDATE SET
55705570
ARRAY('target.update_datetime = source.update_datetime', 'target.salary = source.salary')
55715571
),
55725572
batch_concurrency 1,
@@ -5600,8 +5600,8 @@ def test_when_matched_multiple():
56005600
)
56015601

56025602
expected_when_matched = [
5603-
"WHEN MATCHED AND `__merge_source__`.`x` = 1 THEN UPDATE SET `__merge_target__`.`salary` = COALESCE(`__merge_source__`.`salary`, `__merge_target__`.`salary`)",
5604-
"WHEN MATCHED THEN UPDATE SET `__merge_target__`.`salary` = COALESCE(`__merge_source__`.`salary`, `__merge_target__`.`salary`)",
5603+
"WHEN MATCHED AND `__MERGE_SOURCE__`.`x` = 1 THEN UPDATE SET `__MERGE_TARGET__`.`salary` = COALESCE(`__MERGE_SOURCE__`.`salary`, `__MERGE_TARGET__`.`salary`)",
5604+
"WHEN MATCHED THEN UPDATE SET `__MERGE_TARGET__`.`salary` = COALESCE(`__MERGE_SOURCE__`.`salary`, `__MERGE_TARGET__`.`salary`)",
56055605
]
56065606

56075607
model = load_sql_based_model(expressions, dialect="hive", variables={"schema": "db"})
@@ -5642,13 +5642,13 @@ def test_when_matched_merge_filter_multi_part_columns():
56425642
)
56435643

56445644
expected_when_matched = [
5645-
"WHEN MATCHED AND `__merge_source__`.`record`.`nested_record`.`field` = 1 THEN UPDATE SET `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__merge_source__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
5646-
"WHEN MATCHED THEN UPDATE SET `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__merge_source__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
5645+
"WHEN MATCHED AND `__MERGE_SOURCE__`.`record`.`nested_record`.`field` = 1 THEN UPDATE SET `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__MERGE_SOURCE__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
5646+
"WHEN MATCHED THEN UPDATE SET `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__MERGE_SOURCE__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
56475647
]
56485648

56495649
expected_merge_filter = (
5650-
"`__merge_source__`.`record`.`nested_record`.`field` < `__merge_target__`.`record`.`nested_record`.`field` AND "
5651-
"`__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field` > `__merge_source__`.`repeated_record`.`sub_repeated_record`.`sub_field`"
5650+
"`__MERGE_SOURCE__`.`record`.`nested_record`.`field` < `__MERGE_TARGET__`.`record`.`nested_record`.`field` AND "
5651+
"`__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field` > `__MERGE_SOURCE__`.`repeated_record`.`sub_repeated_record`.`sub_field`"
56525652
)
56535653

56545654
model = load_sql_based_model(expressions, dialect="bigquery", variables={"schema": "db"})
@@ -6678,7 +6678,7 @@ def test_unrendered_macros_sql_model(mocker: MockerFixture) -> None:
66786678
assert model.unique_key[0] == exp.column("a", quoted=True)
66796679
assert (
66806680
t.cast(exp.Expression, model.merge_filter).sql()
6681-
== '"__merge_source__"."id" > 0 AND "__merge_target__"."updated_at" < @end_ds AND "__merge_source__"."updated_at" > @start_ds AND @merge_filter_var'
6681+
== '"__MERGE_SOURCE__"."id" > 0 AND "__MERGE_TARGET__"."updated_at" < @end_ds AND "__MERGE_SOURCE__"."updated_at" > @start_ds AND @merge_filter_var'
66826682
)
66836683

66846684

@@ -6774,7 +6774,7 @@ def model_with_macros(evaluator, **kwargs):
67746774
assert python_sql_model.unique_key[0] == exp.column("a", quoted=True)
67756775
assert (
67766776
python_sql_model.merge_filter.sql()
6777-
== '"__merge_source__"."id" > 0 AND "__merge_target__"."updated_at" < @end_ds AND "__merge_source__"."updated_at" > @start_ds AND @merge_filter_var'
6777+
== '"__MERGE_SOURCE__"."id" > 0 AND "__MERGE_TARGET__"."updated_at" < @end_ds AND "__MERGE_SOURCE__"."updated_at" > @start_ds AND @merge_filter_var'
67786778
)
67796779

67806780

@@ -7861,7 +7861,7 @@ def test_model_kind_to_expression():
78617861
.sql()
78627862
== """INCREMENTAL_BY_UNIQUE_KEY (
78637863
unique_key ("a"),
7864-
when_matched (WHEN MATCHED THEN UPDATE SET "__merge_target__"."b" = COALESCE("__merge_source__"."b", "__merge_target__"."b")),
7864+
when_matched (WHEN MATCHED THEN UPDATE SET "__MERGE_TARGET__"."b" = COALESCE("__MERGE_SOURCE__"."b", "__MERGE_TARGET__"."b")),
78657865
batch_concurrency 1,
78667866
forward_only FALSE,
78677867
disable_restatement FALSE,
@@ -7889,7 +7889,7 @@ def test_model_kind_to_expression():
78897889
.sql()
78907890
== """INCREMENTAL_BY_UNIQUE_KEY (
78917891
unique_key ("a"),
7892-
when_matched (WHEN MATCHED AND "__merge_source__"."x" = 1 THEN UPDATE SET "__merge_target__"."b" = COALESCE("__merge_source__"."b", "__merge_target__"."b") WHEN MATCHED THEN UPDATE SET "__merge_target__"."b" = COALESCE("__merge_source__"."b", "__merge_target__"."b")),
7892+
when_matched (WHEN MATCHED AND "__MERGE_SOURCE__"."x" = 1 THEN UPDATE SET "__MERGE_TARGET__"."b" = COALESCE("__MERGE_SOURCE__"."b", "__MERGE_TARGET__"."b") WHEN MATCHED THEN UPDATE SET "__MERGE_TARGET__"."b" = COALESCE("__MERGE_SOURCE__"."b", "__MERGE_TARGET__"."b")),
78937893
batch_concurrency 1,
78947894
forward_only FALSE,
78957895
disable_restatement FALSE,
@@ -8150,7 +8150,7 @@ def test_merge_filter():
81508150
"""
81518151
)
81528152

8153-
expected_incremental_predicate = f"`{MERGE_SOURCE_ALIAS.lower()}`.`salary` > 0"
8153+
expected_incremental_predicate = f"`{MERGE_SOURCE_ALIAS}`.`salary` > 0"
81548154

81558155
model = load_sql_based_model(expressions, dialect="hive")
81568156
assert model.kind.merge_filter.sql(dialect="hive") == expected_incremental_predicate
@@ -8193,19 +8193,19 @@ def test_merge_filter():
81938193
kind INCREMENTAL_BY_UNIQUE_KEY (
81948194
unique_key ("purchase_order_id"),
81958195
when_matched (
8196-
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS.lower()}"."_operation" = 1 THEN DELETE
8197-
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS.lower()}"."_operation" <> 1 THEN UPDATE SET
8198-
"{MERGE_TARGET_ALIAS.lower()}"."purchase_order_id" = 1
8196+
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS}"."_operation" = 1 THEN DELETE
8197+
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS}"."_operation" <> 1 THEN UPDATE SET
8198+
"{MERGE_TARGET_ALIAS}"."purchase_order_id" = 1
81998199
),
82008200
merge_filter (
8201-
"{MERGE_SOURCE_ALIAS.lower()}"."ds" > (
8201+
"{MERGE_SOURCE_ALIAS}"."ds" > (
82028202
SELECT
82038203
MAX("ds")
82048204
FROM "db"."test"
82058205
)
8206-
AND "{MERGE_SOURCE_ALIAS.lower()}"."ds" > @start_ds
8207-
AND "{MERGE_SOURCE_ALIAS.lower()}"."_operation" <> 1
8208-
AND "{MERGE_TARGET_ALIAS.lower()}"."start_date" > CURRENT_DATE + INTERVAL '7' DAY
8206+
AND "{MERGE_SOURCE_ALIAS}"."ds" > @start_ds
8207+
AND "{MERGE_SOURCE_ALIAS}"."_operation" <> 1
8208+
AND "{MERGE_TARGET_ALIAS}"."start_date" > CURRENT_DATE + INTERVAL '7' DAY
82098209
),
82108210
batch_concurrency 1,
82118211
forward_only FALSE,
@@ -8223,7 +8223,7 @@ def test_merge_filter():
82238223
rendered_merge_filters = model.render_merge_filter(start="2023-01-01", end="2023-01-02")
82248224
assert (
82258225
rendered_merge_filters.sql(dialect="hive")
8226-
== "(`__merge_source__`.`ds` > (SELECT MAX(`ds`) FROM `db`.`test`) AND `__merge_source__`.`ds` > '2023-01-01' AND `__merge_source__`.`_operation` <> 1 AND `__merge_target__`.`start_date` > CURRENT_DATE + INTERVAL '7' DAY)"
8226+
== "(`__MERGE_SOURCE__`.`ds` > (SELECT MAX(`ds`) FROM `db`.`test`) AND `__MERGE_SOURCE__`.`ds` > '2023-01-01' AND `__MERGE_SOURCE__`.`_operation` <> 1 AND `__MERGE_TARGET__`.`start_date` > CURRENT_DATE + INTERVAL '7' DAY)"
82278227
)
82288228

82298229

tests/core/test_snapshot_evaluator.py

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2200,18 +2200,14 @@ def test_create_incremental_by_unique_key_updated_at_exp(adapter_mock, make_snap
22002200
source=False,
22012201
then=exp.Update(
22022202
expressions=[
2203-
exp.column("name", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2204-
exp.column("name", MERGE_SOURCE_ALIAS.lower(), quoted=True)
2203+
exp.column("name", MERGE_TARGET_ALIAS, quoted=True).eq(
2204+
exp.column("name", MERGE_SOURCE_ALIAS, quoted=True)
22052205
),
2206-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2206+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
22072207
exp.Coalesce(
2208-
this=exp.column(
2209-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2210-
),
2208+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
22112209
expressions=[
2212-
exp.column(
2213-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2214-
)
2210+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
22152211
],
22162212
)
22172213
),
@@ -2269,23 +2265,19 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock,
22692265
expressions=[
22702266
exp.When(
22712267
matched=True,
2272-
condition=exp.column("id", MERGE_SOURCE_ALIAS.lower(), quoted=True).eq(
2268+
condition=exp.column("id", MERGE_SOURCE_ALIAS, quoted=True).eq(
22732269
exp.Literal.number(1)
22742270
),
22752271
then=exp.Update(
22762272
expressions=[
2277-
exp.column("name", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2278-
exp.column("name", MERGE_SOURCE_ALIAS.lower(), quoted=True)
2273+
exp.column("name", MERGE_TARGET_ALIAS, quoted=True).eq(
2274+
exp.column("name", MERGE_SOURCE_ALIAS, quoted=True)
22792275
),
2280-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2276+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
22812277
exp.Coalesce(
2282-
this=exp.column(
2283-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2284-
),
2278+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
22852279
expressions=[
2286-
exp.column(
2287-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2288-
)
2280+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
22892281
],
22902282
)
22912283
),
@@ -2297,18 +2289,14 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock,
22972289
source=False,
22982290
then=exp.Update(
22992291
expressions=[
2300-
exp.column("name", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2301-
exp.column("name", MERGE_SOURCE_ALIAS.lower(), quoted=True)
2292+
exp.column("name", MERGE_TARGET_ALIAS, quoted=True).eq(
2293+
exp.column("name", MERGE_SOURCE_ALIAS, quoted=True)
23022294
),
2303-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2295+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
23042296
exp.Coalesce(
2305-
this=exp.column(
2306-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2307-
),
2297+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
23082298
expressions=[
2309-
exp.column(
2310-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2311-
)
2299+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
23122300
],
23132301
)
23142302
),
@@ -2395,16 +2383,16 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
23952383
assert model.merge_filter == exp.And(
23962384
this=exp.And(
23972385
this=exp.GT(
2398-
this=exp.column("id", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2386+
this=exp.column("id", MERGE_SOURCE_ALIAS, quoted=True),
23992387
expression=exp.Literal(this="0", is_string=False),
24002388
),
24012389
expression=exp.LT(
2402-
this=exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True),
2390+
this=exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True),
24032391
expression=d.MacroVar(this="end_ds"),
24042392
),
24052393
),
24062394
expression=exp.GT(
2407-
this=exp.column("updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2395+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
24082396
expression=d.MacroVar(this="start_ds"),
24092397
),
24102398
)
@@ -2436,15 +2424,11 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
24362424
matched=True,
24372425
then=exp.Update(
24382426
expressions=[
2439-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2427+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
24402428
exp.Coalesce(
2441-
this=exp.column(
2442-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2443-
),
2429+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
24442430
expressions=[
2445-
exp.column(
2446-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2447-
)
2431+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
24482432
],
24492433
)
24502434
),
@@ -2456,16 +2440,16 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
24562440
merge_filter=exp.And(
24572441
this=exp.And(
24582442
this=exp.GT(
2459-
this=exp.column("id", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2443+
this=exp.column("id", MERGE_SOURCE_ALIAS, quoted=True),
24602444
expression=exp.Literal(this="0", is_string=False),
24612445
),
24622446
expression=exp.LT(
2463-
this=exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True),
2447+
this=exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True),
24642448
expression=exp.Literal(this="2020-01-02", is_string=True),
24652449
),
24662450
),
24672451
expression=exp.GT(
2468-
this=exp.column("updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2452+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
24692453
expression=exp.Literal(this="2020-01-01", is_string=True),
24702454
),
24712455
),

tests/dbt/test_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def test_model_to_sqlmesh_fields():
135135
assert kind.on_destructive_change == OnDestructiveChange.ALLOW
136136
assert (
137137
kind.merge_filter.sql(dialect=model.dialect)
138-
== """55 > "__merge_source__"."b" AND "__merge_target__"."session_start" > CURRENT_DATE + INTERVAL '7' DAY"""
138+
== """55 > "__MERGE_SOURCE__"."b" AND "__MERGE_TARGET__"."session_start" > CURRENT_DATE + INTERVAL '7' DAY"""
139139
)
140140

141141
model = model_config.update_with({"dialect": "snowflake"}).to_sqlmesh(context)

0 commit comments

Comments
 (0)