Skip to content

Commit e02064c

Browse files
committed
address comments
1 parent 5bdb2c6 commit e02064c

File tree

2 files changed

+40
-24
lines changed

2 files changed

+40
-24
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,17 +1073,19 @@ def _cleanup_snapshot(
10731073

10741074
evaluation_strategy = _evaluation_strategy(snapshot, adapter)
10751075
for is_table_deployable, table_name in table_names:
1076-
# Use `get_data_object` to check if the table exists instead of `table_exists` since the former
1077-
# is based on `INFORMATION_SCHEMA` and avoids touching the table directly.
1078-
# This is important when the table name is malformed for some reason and running any statement
1079-
# that touches the table would result in an error.
1080-
if adapter.get_data_object(table_name) is not None:
1076+
try:
10811077
evaluation_strategy.delete(
10821078
table_name,
10831079
is_table_deployable=is_table_deployable,
10841080
physical_schema=snapshot.physical_schema,
10851081
)
1086-
else:
1082+
except Exception:
1083+
# Use `get_data_object` to check if the table exists instead of `table_exists` since the former
1084+
# is based on `INFORMATION_SCHEMA` and avoids touching the table directly.
1085+
# This is important when the table name is malformed for some reason and running any statement
1086+
# that touches the table would result in an error.
1087+
if adapter.get_data_object(table_name) is not None:
1088+
raise
10871089
logger.warning(
10881090
"Skipping cleanup of table '%s' because it does not exist", table_name
10891091
)

tests/core/test_snapshot_evaluator.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -438,23 +438,14 @@ def create_and_cleanup(name: str, dev_table_only: bool):
438438
return snapshot
439439

440440
snapshot = create_and_cleanup("catalog.test_schema.test_model", True)
441-
adapter_mock.get_data_object.assert_called_once_with(
442-
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
443-
)
441+
adapter_mock.get_data_object.assert_not_called()
444442
adapter_mock.drop_table.assert_called_once_with(
445443
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
446444
)
447445
adapter_mock.reset_mock()
448446

449447
snapshot = create_and_cleanup("test_schema.test_model", False)
450-
adapter_mock.get_data_object.assert_has_calls(
451-
[
452-
call(
453-
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
454-
),
455-
call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}"),
456-
]
457-
)
448+
adapter_mock.get_data_object.assert_not_called()
458449
adapter_mock.drop_table.assert_has_calls(
459450
[
460451
call(
@@ -466,12 +457,7 @@ def create_and_cleanup(name: str, dev_table_only: bool):
466457
adapter_mock.reset_mock()
467458

468459
snapshot = create_and_cleanup("test_model", False)
469-
adapter_mock.get_data_object.assert_has_calls(
470-
[
471-
call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"),
472-
call(f"sqlmesh__default.test_model__{snapshot.version}"),
473-
]
474-
)
460+
adapter_mock.get_data_object.assert_not_called()
475461
adapter_mock.drop_table.assert_has_calls(
476462
[
477463
call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"),
@@ -480,8 +466,34 @@ def create_and_cleanup(name: str, dev_table_only: bool):
480466
)
481467

482468

469+
def test_cleanup_fails(adapter_mock, make_snapshot):
470+
adapter_mock.drop_table.side_effect = RuntimeError("test_error")
471+
472+
evaluator = SnapshotEvaluator(adapter_mock)
473+
474+
model = SqlModel(
475+
name="catalog.test_schema.test_model",
476+
kind=IncrementalByTimeRangeKind(time_column="a"),
477+
storage_format="parquet",
478+
query=parse_one("SELECT a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"),
479+
)
480+
481+
snapshot = make_snapshot(model)
482+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True)
483+
snapshot.version = "test_version"
484+
485+
evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env"))
486+
with pytest.raises(NodeExecutionFailedError) as exc_info:
487+
evaluator.cleanup(
488+
[SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True)]
489+
)
490+
491+
assert str(exc_info.value.__cause__) == "test_error"
492+
493+
483494
def test_cleanup_skip_missing_table(adapter_mock, make_snapshot):
484495
adapter_mock.get_data_object.return_value = None
496+
adapter_mock.drop_table.side_effect = RuntimeError("fail")
485497

486498
evaluator = SnapshotEvaluator(adapter_mock)
487499

@@ -502,7 +514,9 @@ def test_cleanup_skip_missing_table(adapter_mock, make_snapshot):
502514
adapter_mock.get_data_object.assert_called_once_with(
503515
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
504516
)
505-
adapter_mock.drop_table.assert_not_called()
517+
adapter_mock.drop_table.assert_called_once_with(
518+
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev"
519+
)
506520

507521

508522
def test_cleanup_external_model(mocker: MockerFixture, adapter_mock, make_snapshot):

0 commit comments

Comments
 (0)