Skip to content

Commit 1b322be

Browse files
Fix: Dont normalize aliases in merge and when matched
1 parent eb4c0b4 commit 1b322be

File tree

5 files changed

+56
-82
lines changed

5 files changed

+56
-82
lines changed

sqlmesh/core/dialect.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,18 +1421,10 @@ def replace_merge_table_aliases(
14211421
"""
14221422
from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS
14231423

1424-
normalized_merge_source_alias = quote_identifiers(
1425-
normalize_identifiers(exp.to_identifier(MERGE_SOURCE_ALIAS), dialect), dialect=dialect
1426-
)
1427-
1428-
normalized_merge_target_alias = quote_identifiers(
1429-
normalize_identifiers(exp.to_identifier(MERGE_TARGET_ALIAS), dialect), dialect=dialect
1430-
)
1431-
14321424
if isinstance(expression, exp.Column) and (first_part := expression.parts[0]):
14331425
if first_part.this.lower() in ("target", "dbt_internal_dest", "__merge_target__"):
1434-
first_part.replace(normalized_merge_target_alias)
1426+
first_part.replace(exp.to_identifier(MERGE_TARGET_ALIAS, quoted=True))
14351427
elif first_part.this.lower() in ("source", "dbt_internal_source", "__merge_source__"):
1436-
first_part.replace(normalized_merge_source_alias)
1428+
first_part.replace(exp.to_identifier(MERGE_SOURCE_ALIAS, quoted=True))
14371429

14381430
return expression

sqlmesh/core/model/kind.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -478,10 +478,9 @@ def _when_matched_validator(
478478
v = v[1:-1]
479479

480480
v = t.cast(exp.Whens, d.parse_one(v, into=exp.Whens, dialect=dialect))
481-
else:
482-
v = t.cast(exp.Whens, v.transform(d.replace_merge_table_aliases, dialect=dialect))
483481

484-
return validate_expression(v, dialect=dialect)
482+
v = validate_expression(v, dialect=dialect)
483+
return t.cast(exp.Whens, v.transform(d.replace_merge_table_aliases, dialect=dialect))
485484

486485
@field_validator("merge_filter", mode="before")
487486
def _merge_filter_validator(
@@ -497,10 +496,9 @@ def _merge_filter_validator(
497496
if isinstance(v, str):
498497
v = v.strip()
499498
v = d.parse_one(v, dialect=dialect)
500-
else:
501-
v = v.transform(d.replace_merge_table_aliases, dialect=dialect)
502499

503-
return validate_expression(v, dialect=dialect)
500+
v = validate_expression(v, dialect=dialect)
501+
return v.transform(d.replace_merge_table_aliases, dialect=dialect)
504502

505503
@property
506504
def data_hash_values(self) -> t.List[t.Optional[str]]:

tests/core/test_model.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5480,7 +5480,7 @@ def test_when_matched():
54805480
"""
54815481
)
54825482

5483-
expected_when_matched = "(WHEN MATCHED THEN UPDATE SET `__merge_target__`.`salary` = COALESCE(`__merge_source__`.`salary`, `__merge_target__`.`salary`))"
5483+
expected_when_matched = "(WHEN MATCHED THEN UPDATE SET `__MERGE_TARGET__`.`salary` = COALESCE(`__MERGE_SOURCE__`.`salary`, `__MERGE_TARGET__`.`salary`))"
54845484

54855485
model = load_sql_based_model(expressions, dialect="hive")
54865486
assert model.kind.when_matched.sql(dialect="hive") == expected_when_matched
@@ -5514,9 +5514,9 @@ def test_when_matched():
55145514
kind INCREMENTAL_BY_UNIQUE_KEY (
55155515
unique_key ("purchase_order_id"),
55165516
when_matched (
5517-
WHEN MATCHED AND "__merge_source__"."_operation" = 1 THEN DELETE
5518-
WHEN MATCHED AND "__merge_source__"."_operation" <> 1 THEN UPDATE SET
5519-
"__merge_target__"."purchase_order_id" = 1
5517+
WHEN MATCHED AND "__MERGE_SOURCE__"."_operation" = 1 THEN DELETE
5518+
WHEN MATCHED AND "__MERGE_SOURCE__"."_operation" <> 1 THEN UPDATE SET
5519+
"__MERGE_TARGET__"."purchase_order_id" = 1
55205520
),
55215521
batch_concurrency 1,
55225522
forward_only FALSE,
@@ -5567,7 +5567,7 @@ def fingerprint_merge(
55675567
kind INCREMENTAL_BY_UNIQUE_KEY (
55685568
unique_key ("purchase_order_id"),
55695569
when_matched (
5570-
WHEN MATCHED AND "__merge_source__"."salary" <> "__merge_target__"."salary" THEN UPDATE SET
5570+
WHEN MATCHED AND "__MERGE_SOURCE__"."salary" <> "__MERGE_TARGET__"."salary" THEN UPDATE SET
55715571
ARRAY('target.update_datetime = source.update_datetime', 'target.salary = source.salary')
55725572
),
55735573
batch_concurrency 1,
@@ -5601,8 +5601,8 @@ def test_when_matched_multiple():
56015601
)
56025602

56035603
expected_when_matched = [
5604-
"WHEN MATCHED AND `__merge_source__`.`x` = 1 THEN UPDATE SET `__merge_target__`.`salary` = COALESCE(`__merge_source__`.`salary`, `__merge_target__`.`salary`)",
5605-
"WHEN MATCHED THEN UPDATE SET `__merge_target__`.`salary` = COALESCE(`__merge_source__`.`salary`, `__merge_target__`.`salary`)",
5604+
"WHEN MATCHED AND `__MERGE_SOURCE__`.`x` = 1 THEN UPDATE SET `__MERGE_TARGET__`.`salary` = COALESCE(`__MERGE_SOURCE__`.`salary`, `__MERGE_TARGET__`.`salary`)",
5605+
"WHEN MATCHED THEN UPDATE SET `__MERGE_TARGET__`.`salary` = COALESCE(`__MERGE_SOURCE__`.`salary`, `__MERGE_TARGET__`.`salary`)",
56065606
]
56075607

56085608
model = load_sql_based_model(expressions, dialect="hive", variables={"schema": "db"})
@@ -5643,13 +5643,13 @@ def test_when_matched_merge_filter_multi_part_columns():
56435643
)
56445644

56455645
expected_when_matched = [
5646-
"WHEN MATCHED AND `__merge_source__`.`record`.`nested_record`.`field` = 1 THEN UPDATE SET `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__merge_source__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
5647-
"WHEN MATCHED THEN UPDATE SET `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__merge_source__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
5646+
"WHEN MATCHED AND `__MERGE_SOURCE__`.`record`.`nested_record`.`field` = 1 THEN UPDATE SET `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__MERGE_SOURCE__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
5647+
"WHEN MATCHED THEN UPDATE SET `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__MERGE_SOURCE__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
56485648
]
56495649

56505650
expected_merge_filter = (
5651-
"`__merge_source__`.`record`.`nested_record`.`field` < `__merge_target__`.`record`.`nested_record`.`field` AND "
5652-
"`__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field` > `__merge_source__`.`repeated_record`.`sub_repeated_record`.`sub_field`"
5651+
"`__MERGE_SOURCE__`.`record`.`nested_record`.`field` < `__MERGE_TARGET__`.`record`.`nested_record`.`field` AND "
5652+
"`__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field` > `__MERGE_SOURCE__`.`repeated_record`.`sub_repeated_record`.`sub_field`"
56535653
)
56545654

56555655
model = load_sql_based_model(expressions, dialect="bigquery", variables={"schema": "db"})
@@ -6679,7 +6679,7 @@ def test_unrendered_macros_sql_model(mocker: MockerFixture) -> None:
66796679
assert model.unique_key[0] == exp.column("a", quoted=True)
66806680
assert (
66816681
t.cast(exp.Expression, model.merge_filter).sql()
6682-
== '"__merge_source__"."id" > 0 AND "__merge_target__"."updated_at" < @end_ds AND "__merge_source__"."updated_at" > @start_ds AND @merge_filter_var'
6682+
== '"__MERGE_SOURCE__"."id" > 0 AND "__MERGE_TARGET__"."updated_at" < @end_ds AND "__MERGE_SOURCE__"."updated_at" > @start_ds AND @merge_filter_var'
66836683
)
66846684

66856685

@@ -6775,7 +6775,7 @@ def model_with_macros(evaluator, **kwargs):
67756775
assert python_sql_model.unique_key[0] == exp.column("a", quoted=True)
67766776
assert (
67776777
python_sql_model.merge_filter.sql()
6778-
== '"__merge_source__"."id" > 0 AND "__merge_target__"."updated_at" < @end_ds AND "__merge_source__"."updated_at" > @start_ds AND @merge_filter_var'
6778+
== '"__MERGE_SOURCE__"."id" > 0 AND "__MERGE_TARGET__"."updated_at" < @end_ds AND "__MERGE_SOURCE__"."updated_at" > @start_ds AND @merge_filter_var'
67796779
)
67806780

67816781

@@ -7862,7 +7862,7 @@ def test_model_kind_to_expression():
78627862
.sql()
78637863
== """INCREMENTAL_BY_UNIQUE_KEY (
78647864
unique_key ("a"),
7865-
when_matched (WHEN MATCHED THEN UPDATE SET "__merge_target__"."b" = COALESCE("__merge_source__"."b", "__merge_target__"."b")),
7865+
when_matched (WHEN MATCHED THEN UPDATE SET "__MERGE_TARGET__"."b" = COALESCE("__MERGE_SOURCE__"."b", "__MERGE_TARGET__"."b")),
78667866
batch_concurrency 1,
78677867
forward_only FALSE,
78687868
disable_restatement FALSE,
@@ -7890,7 +7890,7 @@ def test_model_kind_to_expression():
78907890
.sql()
78917891
== """INCREMENTAL_BY_UNIQUE_KEY (
78927892
unique_key ("a"),
7893-
when_matched (WHEN MATCHED AND "__merge_source__"."x" = 1 THEN UPDATE SET "__merge_target__"."b" = COALESCE("__merge_source__"."b", "__merge_target__"."b") WHEN MATCHED THEN UPDATE SET "__merge_target__"."b" = COALESCE("__merge_source__"."b", "__merge_target__"."b")),
7893+
when_matched (WHEN MATCHED AND "__MERGE_SOURCE__"."x" = 1 THEN UPDATE SET "__MERGE_TARGET__"."b" = COALESCE("__MERGE_SOURCE__"."b", "__MERGE_TARGET__"."b") WHEN MATCHED THEN UPDATE SET "__MERGE_TARGET__"."b" = COALESCE("__MERGE_SOURCE__"."b", "__MERGE_TARGET__"."b")),
78947894
batch_concurrency 1,
78957895
forward_only FALSE,
78967896
disable_restatement FALSE,
@@ -8151,7 +8151,7 @@ def test_merge_filter():
81518151
"""
81528152
)
81538153

8154-
expected_incremental_predicate = f"`{MERGE_SOURCE_ALIAS.lower()}`.`salary` > 0"
8154+
expected_incremental_predicate = f"`{MERGE_SOURCE_ALIAS}`.`salary` > 0"
81558155

81568156
model = load_sql_based_model(expressions, dialect="hive")
81578157
assert model.kind.merge_filter.sql(dialect="hive") == expected_incremental_predicate
@@ -8194,19 +8194,19 @@ def test_merge_filter():
81948194
kind INCREMENTAL_BY_UNIQUE_KEY (
81958195
unique_key ("purchase_order_id"),
81968196
when_matched (
8197-
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS.lower()}"."_operation" = 1 THEN DELETE
8198-
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS.lower()}"."_operation" <> 1 THEN UPDATE SET
8199-
"{MERGE_TARGET_ALIAS.lower()}"."purchase_order_id" = 1
8197+
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS}"."_operation" = 1 THEN DELETE
8198+
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS}"."_operation" <> 1 THEN UPDATE SET
8199+
"{MERGE_TARGET_ALIAS}"."purchase_order_id" = 1
82008200
),
82018201
merge_filter (
8202-
"{MERGE_SOURCE_ALIAS.lower()}"."ds" > (
8202+
"{MERGE_SOURCE_ALIAS}"."ds" > (
82038203
SELECT
82048204
MAX("ds")
82058205
FROM "db"."test"
82068206
)
8207-
AND "{MERGE_SOURCE_ALIAS.lower()}"."ds" > @start_ds
8208-
AND "{MERGE_SOURCE_ALIAS.lower()}"."_operation" <> 1
8209-
AND "{MERGE_TARGET_ALIAS.lower()}"."start_date" > CURRENT_DATE + INTERVAL '7' DAY
8207+
AND "{MERGE_SOURCE_ALIAS}"."ds" > @start_ds
8208+
AND "{MERGE_SOURCE_ALIAS}"."_operation" <> 1
8209+
AND "{MERGE_TARGET_ALIAS}"."start_date" > CURRENT_DATE + INTERVAL '7' DAY
82108210
),
82118211
batch_concurrency 1,
82128212
forward_only FALSE,
@@ -8224,7 +8224,7 @@ def test_merge_filter():
82248224
rendered_merge_filters = model.render_merge_filter(start="2023-01-01", end="2023-01-02")
82258225
assert (
82268226
rendered_merge_filters.sql(dialect="hive")
8227-
== "(`__merge_source__`.`ds` > (SELECT MAX(`ds`) FROM `db`.`test`) AND `__merge_source__`.`ds` > '2023-01-01' AND `__merge_source__`.`_operation` <> 1 AND `__merge_target__`.`start_date` > CURRENT_DATE + INTERVAL '7' DAY)"
8227+
== "(`__MERGE_SOURCE__`.`ds` > (SELECT MAX(`ds`) FROM `db`.`test`) AND `__MERGE_SOURCE__`.`ds` > '2023-01-01' AND `__MERGE_SOURCE__`.`_operation` <> 1 AND `__MERGE_TARGET__`.`start_date` > CURRENT_DATE + INTERVAL '7' DAY)"
82288228
)
82298229

82308230

tests/core/test_snapshot_evaluator.py

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2200,18 +2200,14 @@ def test_create_incremental_by_unique_key_updated_at_exp(adapter_mock, make_snap
22002200
source=False,
22012201
then=exp.Update(
22022202
expressions=[
2203-
exp.column("name", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2204-
exp.column("name", MERGE_SOURCE_ALIAS.lower(), quoted=True)
2203+
exp.column("name", MERGE_TARGET_ALIAS, quoted=True).eq(
2204+
exp.column("name", MERGE_SOURCE_ALIAS, quoted=True)
22052205
),
2206-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2206+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
22072207
exp.Coalesce(
2208-
this=exp.column(
2209-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2210-
),
2208+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
22112209
expressions=[
2212-
exp.column(
2213-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2214-
)
2210+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
22152211
],
22162212
)
22172213
),
@@ -2269,23 +2265,19 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock,
22692265
expressions=[
22702266
exp.When(
22712267
matched=True,
2272-
condition=exp.column("id", MERGE_SOURCE_ALIAS.lower(), quoted=True).eq(
2268+
condition=exp.column("id", MERGE_SOURCE_ALIAS, quoted=True).eq(
22732269
exp.Literal.number(1)
22742270
),
22752271
then=exp.Update(
22762272
expressions=[
2277-
exp.column("name", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2278-
exp.column("name", MERGE_SOURCE_ALIAS.lower(), quoted=True)
2273+
exp.column("name", MERGE_TARGET_ALIAS, quoted=True).eq(
2274+
exp.column("name", MERGE_SOURCE_ALIAS, quoted=True)
22792275
),
2280-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2276+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
22812277
exp.Coalesce(
2282-
this=exp.column(
2283-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2284-
),
2278+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
22852279
expressions=[
2286-
exp.column(
2287-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2288-
)
2280+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
22892281
],
22902282
)
22912283
),
@@ -2297,18 +2289,14 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock,
22972289
source=False,
22982290
then=exp.Update(
22992291
expressions=[
2300-
exp.column("name", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2301-
exp.column("name", MERGE_SOURCE_ALIAS.lower(), quoted=True)
2292+
exp.column("name", MERGE_TARGET_ALIAS, quoted=True).eq(
2293+
exp.column("name", MERGE_SOURCE_ALIAS, quoted=True)
23022294
),
2303-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2295+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
23042296
exp.Coalesce(
2305-
this=exp.column(
2306-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2307-
),
2297+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
23082298
expressions=[
2309-
exp.column(
2310-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2311-
)
2299+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
23122300
],
23132301
)
23142302
),
@@ -2395,16 +2383,16 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
23952383
assert model.merge_filter == exp.And(
23962384
this=exp.And(
23972385
this=exp.GT(
2398-
this=exp.column("id", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2386+
this=exp.column("id", MERGE_SOURCE_ALIAS, quoted=True),
23992387
expression=exp.Literal(this="0", is_string=False),
24002388
),
24012389
expression=exp.LT(
2402-
this=exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True),
2390+
this=exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True),
24032391
expression=d.MacroVar(this="end_ds"),
24042392
),
24052393
),
24062394
expression=exp.GT(
2407-
this=exp.column("updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2395+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
24082396
expression=d.MacroVar(this="start_ds"),
24092397
),
24102398
)
@@ -2436,15 +2424,11 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
24362424
matched=True,
24372425
then=exp.Update(
24382426
expressions=[
2439-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2427+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
24402428
exp.Coalesce(
2441-
this=exp.column(
2442-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2443-
),
2429+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
24442430
expressions=[
2445-
exp.column(
2446-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2447-
)
2431+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
24482432
],
24492433
)
24502434
),
@@ -2456,16 +2440,16 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
24562440
merge_filter=exp.And(
24572441
this=exp.And(
24582442
this=exp.GT(
2459-
this=exp.column("id", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2443+
this=exp.column("id", MERGE_SOURCE_ALIAS, quoted=True),
24602444
expression=exp.Literal(this="0", is_string=False),
24612445
),
24622446
expression=exp.LT(
2463-
this=exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True),
2447+
this=exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True),
24642448
expression=exp.Literal(this="2020-01-02", is_string=True),
24652449
),
24662450
),
24672451
expression=exp.GT(
2468-
this=exp.column("updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2452+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
24692453
expression=exp.Literal(this="2020-01-01", is_string=True),
24702454
),
24712455
),

tests/dbt/test_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def test_model_to_sqlmesh_fields():
135135
assert kind.on_destructive_change == OnDestructiveChange.ALLOW
136136
assert (
137137
kind.merge_filter.sql(dialect=model.dialect)
138-
== """55 > "__merge_source__"."b" AND "__merge_target__"."session_start" > CURRENT_DATE + INTERVAL '7' DAY"""
138+
== """55 > "__MERGE_SOURCE__"."b" AND "__MERGE_TARGET__"."session_start" > CURRENT_DATE + INTERVAL '7' DAY"""
139139
)
140140

141141
model = model_config.update_with({"dialect": "snowflake"}).to_sqlmesh(context)

0 commit comments

Comments
 (0)