Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2185,7 +2185,13 @@ def _get_target_and_source_columns(
if model.on_destructive_change.is_ignore or model.on_additive_change.is_ignore:
# We need to identify the columns that are only in the source so we create an empty table with
# the user query to determine that
with self.adapter.temp_table(model.ctas_query(**render_kwargs)) as temp_table:
temp_table_name = exp.table_(
"diff",
db=model.physical_schema,
)
with self.adapter.temp_table(
model.ctas_query(**render_kwargs), name=temp_table_name
) as temp_table:
source_columns = list(self.adapter.columns(temp_table))
else:
source_columns = None
Expand Down
63 changes: 63 additions & 0 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from typing_extensions import Self
from unittest.mock import call, patch, Mock
import contextlib
import re
import logging
import pytest
Expand Down Expand Up @@ -2077,6 +2078,68 @@ def columns(table_name):
)


def test_temp_table_includes_schema_for_ignore_changes(
mocker: MockerFixture,
make_snapshot,
make_mocked_engine_adapter,
):
"""Test that temp table creation includes the physical schema when on_destructive_change or on_additive_change is IGNORE."""
# Create a model with on_destructive_change=IGNORE to trigger temp table creation
model = SqlModel(
name="test_schema.test_model",
kind=IncrementalByTimeRangeKind(
time_column="a", on_destructive_change=OnDestructiveChange.IGNORE
),
query=parse_one("SELECT c, a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"),
)
snapshot = make_snapshot(model, version="1")
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

# Set up the mocked adapter
adapter = make_mocked_engine_adapter(EngineAdapter)
adapter.with_settings = lambda **kwargs: adapter # type: ignore
adapter.table_exists = lambda _: True # type: ignore

# Mock columns method to return existing columns
def columns(table_name):
return {
"c": exp.DataType.build("int"),
"a": exp.DataType.build("int"),
}

adapter.columns = columns # type: ignore

# Create a mock for the temp_table context manager
temp_table_name_captured = None

@contextlib.contextmanager
def mock_temp_table(query_or_df, name="diff", **kwargs):
nonlocal temp_table_name_captured
temp_table_name_captured = exp.to_table(name)
# Return a table that temp_table would normally return
yield exp.table_("__temp_diff_12345", db=temp_table_name_captured.db)

adapter.temp_table = mock_temp_table # type: ignore
adapter.insert_append = lambda *args, **kwargs: None # type: ignore

evaluator = SnapshotEvaluator(adapter)

# Call the append method which will trigger _get_target_and_source_columns
evaluator.evaluate(
snapshot,
start="2020-01-01",
end="2020-01-02",
execution_time="2020-01-02",
snapshots={},
)

# Verify that temp_table was called with a name that includes the schema
assert temp_table_name_captured is not None
assert temp_table_name_captured.name == "diff"
assert temp_table_name_captured.db == model.physical_schema
assert str(temp_table_name_captured.db) == "sqlmesh__test_schema"


def test_forward_only_snapshot_for_added_model(mocker: MockerFixture, adapter_mock, make_snapshot):
adapter_mock.SUPPORTS_CLONING = False
evaluator = SnapshotEvaluator(adapter_mock)
Expand Down