Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1755,6 +1755,7 @@ class SparkConnectionConfig(ConnectionConfig):
config_dir: t.Optional[str] = None
catalog: t.Optional[str] = None
config: t.Dict[str, t.Any] = {}
wap_enabled: bool = False

concurrent_tasks: int = 4
register_comments: bool = True
Expand Down Expand Up @@ -1801,6 +1802,10 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
.getOrCreate(),
}

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {"wap_enabled": self.wap_enabled}


class TrinoAuthenticationMethod(str, Enum):
NO_AUTH = "no-auth"
Expand Down
5 changes: 5 additions & 0 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,11 @@ def fetch_pyspark_df(
"""Fetches a PySpark DataFrame from the cursor"""
raise NotImplementedError(f"Engine does not support PySpark DataFrames: {type(self)}")

@property
def wap_enabled(self) -> bool:
"""Returns whether WAP is enabled for this engine."""
return self._extra_config.get("wap_enabled", False)

def wap_supported(self, table_name: TableName) -> bool:
"""Returns whether WAP for the target table is supported."""
return False
Expand Down
14 changes: 8 additions & 6 deletions sqlmesh/core/engine_adapter/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,12 +457,14 @@ def _create_table(
if wap_id.startswith(f"{self.BRANCH_PREFIX}{self.WAP_PREFIX}"):
table_name.set("this", table_name.this.this)

wap_supported = (
kwargs.get("storage_format") or ""
).lower() == "iceberg" or self.wap_supported(table_name)
do_dummy_insert = (
False if not wap_supported or not exists else not self.table_exists(table_name)
)
do_dummy_insert = False
if self.wap_enabled:
Comment on lines +460 to +461
Copy link

Copilot AI Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable do_dummy_insert is declared but never used in this diff. This appears to be dead code that should be removed or the logic that uses this variable is missing from the implementation.

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh boy

wap_supported = (
kwargs.get("storage_format") or ""
).lower() == "iceberg" or self.wap_supported(table_name)
do_dummy_insert = (
False if not wap_supported or not exists else not self.table_exists(table_name)
)
super()._create_table(
table_name_or_schema,
expression,
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ def _evaluate_snapshot(
if (
snapshot.is_materialized
and target_table_exists
and adapter.wap_enabled
and (model.wap_supported or adapter.wap_supported(target_table_name))
):
wap_id = random_id()[0:8]
Expand Down
28 changes: 19 additions & 9 deletions tests/core/engine_adapter/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ def test_create_table_properties(make_mocked_engine_adapter: t.Callable):
)


@pytest.mark.parametrize("wap_enabled", [True, False])
def test_replace_query_table_properties_not_exists(
mocker: MockerFixture, make_mocked_engine_adapter: t.Callable
mocker: MockerFixture, make_mocked_engine_adapter: t.Callable, wap_enabled: bool
):
mocker.patch(
"sqlmesh.core.engine_adapter.spark.SparkEngineAdapter.table_exists",
return_value=False,
)
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
adapter = make_mocked_engine_adapter(SparkEngineAdapter, wap_enabled=wap_enabled)

columns_to_types = {
"cola": exp.DataType.build("INT"),
Expand All @@ -89,10 +90,13 @@ def test_replace_query_table_properties_not_exists(
table_properties={"a": exp.convert(1)},
)

assert to_sql_calls(adapter) == [
expected_sql_calls = [
"CREATE TABLE IF NOT EXISTS `test_table` USING ICEBERG PARTITIONED BY (`colb`) TBLPROPERTIES ('a'=1) AS SELECT CAST(`cola` AS INT) AS `cola`, CAST(`colb` AS STRING) AS `colb`, CAST(`colc` AS STRING) AS `colc` FROM (SELECT 1 AS `cola`, '2' AS `colb`, '3' AS `colc`) AS `_subquery`",
"INSERT INTO `test_table` SELECT * FROM `test_table`",
]
if wap_enabled:
expected_sql_calls.append("INSERT INTO `test_table` SELECT * FROM `test_table`")

assert to_sql_calls(adapter) == expected_sql_calls


def test_replace_query_table_properties_exists(
Expand Down Expand Up @@ -825,13 +829,16 @@ def test_wap_publish(make_mocked_engine_adapter: t.Callable, mocker: MockerFixtu
)


def test_create_table_iceberg(mocker: MockerFixture, make_mocked_engine_adapter: t.Callable):
@pytest.mark.parametrize("wap_enabled", [True, False])
def test_create_table_iceberg(
mocker: MockerFixture, make_mocked_engine_adapter: t.Callable, wap_enabled: bool
):
mocker.patch(
"sqlmesh.core.engine_adapter.spark.SparkEngineAdapter.table_exists",
return_value=False,
)

adapter = make_mocked_engine_adapter(SparkEngineAdapter)
adapter = make_mocked_engine_adapter(SparkEngineAdapter, wap_enabled=wap_enabled)

columns_to_types = {
"cola": exp.DataType.build("INT"),
Expand All @@ -846,10 +853,13 @@ def test_create_table_iceberg(mocker: MockerFixture, make_mocked_engine_adapter:
storage_format="ICEBERG",
)

assert to_sql_calls(adapter) == [
expected_sql_calls = [
"CREATE TABLE IF NOT EXISTS `test_table` (`cola` INT, `colb` STRING, `colc` STRING) USING ICEBERG PARTITIONED BY (`colb`)",
"INSERT INTO `test_table` SELECT * FROM `test_table`",
]
if wap_enabled:
expected_sql_calls.append("INSERT INTO `test_table` SELECT * FROM `test_table`")

assert to_sql_calls(adapter) == expected_sql_calls


def test_comments_hive(mocker: MockerFixture, make_mocked_engine_adapter: t.Callable):
Expand Down Expand Up @@ -973,7 +983,7 @@ def test_create_table_with_wap(make_mocked_engine_adapter: t.Callable, mocker: M
"sqlmesh.core.engine_adapter.spark.SparkEngineAdapter.table_exists",
return_value=False,
)
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
adapter = make_mocked_engine_adapter(SparkEngineAdapter, wap_enabled=True)

adapter.create_table(
"catalog.schema.table.branch_wap_12345",
Expand Down