Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,13 @@ def replace_query(
target_table,
source_queries,
target_columns_to_types,
**kwargs,
)
return self._insert_overwrite_by_condition(
target_table,
source_queries,
target_columns_to_types,
**kwargs,
)

def create_index(
Expand Down Expand Up @@ -1614,7 +1616,7 @@ def _insert_overwrite_by_time_partition(
**kwargs: t.Any,
) -> None:
return self._insert_overwrite_by_condition(
table_name, source_queries, target_columns_to_types, where
table_name, source_queries, target_columns_to_types, where, **kwargs
)

def _values_to_sql(
Expand Down
6 changes: 4 additions & 2 deletions sqlmesh/core/engine_adapter/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ def _insert_overwrite_by_condition(
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
**kwargs: t.Any,
) -> None:
if not where or where == exp.true():
# note that this is passed as table_properties here rather than physical_properties
use_merge_strategy = kwargs.get("table_properties", {}).get("mssql_merge_exists")
if (not where or where == exp.true()) and not use_merge_strategy:
# this is a full table replacement, call the base strategy to do DELETE+INSERT
# which will result in TRUNCATE+INSERT due to how we have overridden self.delete_from()
return EngineAdapter._insert_overwrite_by_condition(
Expand All @@ -436,7 +438,7 @@ def _insert_overwrite_by_condition(
**kwargs,
)

# For actual conditional overwrites, use MERGE from InsertOverwriteWithMergeMixin
# For conditional overwrites or when mssql_merge_exists is set use MERGE
return super()._insert_overwrite_by_condition(
table_name=table_name,
source_queries=source_queries,
Expand Down
96 changes: 91 additions & 5 deletions tests/core/engine_adapter/test_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from sqlglot import expressions as exp
from sqlglot import parse_one

from pathlib import Path
from sqlmesh import model
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
from sqlmesh.core.snapshot import SnapshotEvaluator, SnapshotChangeCategory
from sqlmesh.core.snapshot import SnapshotEvaluator, SnapshotChangeCategory, Snapshot
from sqlmesh.core.model import load_sql_based_model
from sqlmesh.core.model.kind import SCDType2ByTimeKind
from sqlmesh.core import dialect as d
from sqlmesh.core.engine_adapter.shared import (
DataObject,
DataObjectType,
)
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType, SourceQuery
from sqlmesh.utils.date import to_ds
from tests.core.engine_adapter import to_sql_calls

Expand Down Expand Up @@ -916,3 +916,89 @@ def test_replace_query_strategy(adapter: MSSQLEngineAdapter, mocker: MockerFixtu
"TRUNCATE TABLE [test_table];",
"INSERT INTO [test_table] ([a], [b]) SELECT [a] AS [a], [b] AS [b] FROM [db].[upstream_table] AS [upstream_table];",
]


def test_mssql_merge_exists_switches_strategy_from_truncate_to_merge(
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
):
adapter = make_mocked_engine_adapter(MSSQLEngineAdapter)

query = exp.select("*").from_("source")
source_queries = [SourceQuery(query_factory=lambda: query)]

# Test WITHOUT mssql_merge_exists, should use DELETE+INSERT strategy
base_insert_overwrite = mocker.patch(
"sqlmesh.core.engine_adapter.base.EngineAdapter._insert_overwrite_by_condition"
)

adapter._insert_overwrite_by_condition(
table_name="target",
source_queries=source_queries,
target_columns_to_types={
"id": exp.DataType.build("INT"),
"value": exp.DataType.build("VARCHAR"),
},
where=None,
)

# Should call base DELETE+INSERT strategy
assert base_insert_overwrite.called
base_insert_overwrite.reset_mock()

# Test WITH mssql_merge_exists uses MERGE strategy
super_insert_overwrite = mocker.patch(
"sqlmesh.core.engine_adapter.base.EngineAdapterWithIndexSupport._insert_overwrite_by_condition"
)

adapter._insert_overwrite_by_condition(
table_name="target",
source_queries=source_queries,
target_columns_to_types={
"id": exp.DataType.build("INT"),
"value": exp.DataType.build("VARCHAR"),
},
where=None,
table_properties={"mssql_merge_exists": True},
)

# Should call super's MERGE strategy, not base DELETE+INSERT
assert super_insert_overwrite.called
assert not base_insert_overwrite.called


def test_python_scd2_model_preserves_physical_properties(make_snapshot):
@model(
"test_schema.python_scd2_with_mssql_merge",
kind=SCDType2ByTimeKind(
unique_key=["id"],
valid_from_name="valid_from",
valid_to_name="valid_to",
updated_at_name="updated_at",
),
columns={
"id": "INT",
"value": "VARCHAR",
"updated_at": "TIMESTAMP",
"valid_from": "TIMESTAMP",
"valid_to": "TIMESTAMP",
},
physical_properties={"mssql_merge_exists": True},
)
def python_scd2_model(context, **kwargs):
import pandas as pd

return pd.DataFrame(
{"id": [1, 2], "value": ["a", "b"], "updated_at": ["2024-01-01", "2024-01-02"]}
)

m = model.get_registry()["test_schema.python_scd2_with_mssql_merge"].model(
module_path=Path("."),
path=Path("."),
dialect="tsql",
)

# verify model has physical_properties that trigger merge strategy
assert "mssql_merge_exists" in m.physical_properties
snapshot: Snapshot = make_snapshot(m)
assert snapshot.node.physical_properties == m.physical_properties
assert snapshot.node.physical_properties.get("mssql_merge_exists")