From 52589151794176b31759d3b14255daf3ba8b3c9e Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Wed, 13 Aug 2025 11:55:40 -0700 Subject: [PATCH 1/3] Fix: Skip cleanup of missing physical tables --- sqlmesh/core/snapshot/evaluator.py | 15 ++++++---- tests/core/test_snapshot_evaluator.py | 42 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index a6dca27d35..0059292354 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1073,11 +1073,16 @@ def _cleanup_snapshot( evaluation_strategy = _evaluation_strategy(snapshot, adapter) for is_table_deployable, table_name in table_names: - evaluation_strategy.delete( - table_name, - is_table_deployable=is_table_deployable, - physical_schema=snapshot.physical_schema, - ) + # Use `get_data_object` to check if the table exists instead of `table_exists` since the former + # is based on `INFORMATION_SCHEMA` and avoids touching the table directly. + # This is important when the table name is malformed for some reason and running any statement + # that touches the table would result in an error. + if adapter.get_data_object(table_name) is not None: + evaluation_strategy.delete( + table_name, + is_table_deployable=is_table_deployable, + physical_schema=snapshot.physical_schema, + ) if on_complete is not None: on_complete(table_name) diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 6c0763892e..84b054a189 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -438,12 +438,23 @@ def create_and_cleanup(name: str, dev_table_only: bool): return snapshot snapshot = create_and_cleanup("catalog.test_schema.test_model", True) + adapter_mock.get_data_object.assert_called_once_with( + f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" + ) adapter_mock.drop_table.assert_called_once_with( f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" ) adapter_mock.reset_mock() snapshot = create_and_cleanup("test_schema.test_model", False) + adapter_mock.get_data_object.assert_has_calls( + [ + call( + f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" + ), + call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}"), + ] + ) adapter_mock.drop_table.assert_has_calls( [ call( @@ -455,6 +466,12 @@ def create_and_cleanup(name: str, dev_table_only: bool): adapter_mock.reset_mock() snapshot = create_and_cleanup("test_model", False) + adapter_mock.get_data_object.assert_has_calls( + [ + call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"), + call(f"sqlmesh__default.test_model__{snapshot.version}"), + ] + ) adapter_mock.drop_table.assert_has_calls( [ call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"), @@ -463,6 +480,31 @@ def create_and_cleanup(name: str, dev_table_only: bool): ) +def test_cleanup_skip_missing_table(adapter_mock, make_snapshot): + adapter_mock.get_data_object.return_value = None + + evaluator = SnapshotEvaluator(adapter_mock) + + model = SqlModel( + name="catalog.test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="a"), + storage_format="parquet", + query=parse_one("SELECT a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), + ) + + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) + snapshot.version = "test_version" + + evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env")) + evaluator.cleanup([SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True)]) + + adapter_mock.get_data_object.assert_called_once_with( + f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" + ) + adapter_mock.drop_table.assert_not_called() + + def test_cleanup_external_model(mocker: MockerFixture, adapter_mock, make_snapshot): evaluator = SnapshotEvaluator(adapter_mock) From 5bdb2c6ba33641568b8f2119a271eda74df96614 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Wed, 13 Aug 2025 13:37:27 -0700 Subject: [PATCH 2/3] add warning --- sqlmesh/core/snapshot/evaluator.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 0059292354..6bb42a59dd 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1083,6 +1083,10 @@ def _cleanup_snapshot( is_table_deployable=is_table_deployable, physical_schema=snapshot.physical_schema, ) + else: + logger.warning( + "Skipping cleanup of table '%s' because it does not exist", table_name + ) if on_complete is not None: on_complete(table_name) From e02064cc1bc30ccc326e0bf3d89231e253bbe428 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Wed, 13 Aug 2025 15:59:24 -0700 Subject: [PATCH 3/3] address comments --- sqlmesh/core/snapshot/evaluator.py | 14 ++++---- tests/core/test_snapshot_evaluator.py | 50 +++++++++++++++++---------- 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 6bb42a59dd..3937f37fba 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1073,17 +1073,19 @@ def _cleanup_snapshot( evaluation_strategy = _evaluation_strategy(snapshot, adapter) for is_table_deployable, table_name in table_names: - # Use `get_data_object` to check if the table exists instead of `table_exists` since the former - # is based on `INFORMATION_SCHEMA` and avoids touching the table directly. - # This is important when the table name is malformed for some reason and running any statement - # that touches the table would result in an error. - if adapter.get_data_object(table_name) is not None: + try: evaluation_strategy.delete( table_name, is_table_deployable=is_table_deployable, physical_schema=snapshot.physical_schema, ) - else: + except Exception: + # Use `get_data_object` to check if the table exists instead of `table_exists` since the former + # is based on `INFORMATION_SCHEMA` and avoids touching the table directly. + # This is important when the table name is malformed for some reason and running any statement + # that touches the table would result in an error. + if adapter.get_data_object(table_name) is not None: + raise logger.warning( "Skipping cleanup of table '%s' because it does not exist", table_name ) diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 84b054a189..d474159b7c 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -438,23 +438,14 @@ def create_and_cleanup(name: str, dev_table_only: bool): return snapshot snapshot = create_and_cleanup("catalog.test_schema.test_model", True) - adapter_mock.get_data_object.assert_called_once_with( - f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" - ) + adapter_mock.get_data_object.assert_not_called() adapter_mock.drop_table.assert_called_once_with( f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" ) adapter_mock.reset_mock() snapshot = create_and_cleanup("test_schema.test_model", False) - adapter_mock.get_data_object.assert_has_calls( - [ - call( - f"sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" - ), - call(f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}"), - ] - ) + adapter_mock.get_data_object.assert_not_called() adapter_mock.drop_table.assert_has_calls( [ call( @@ -466,12 +457,7 @@ def create_and_cleanup(name: str, dev_table_only: bool): adapter_mock.reset_mock() snapshot = create_and_cleanup("test_model", False) - adapter_mock.get_data_object.assert_has_calls( - [ - call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"), - call(f"sqlmesh__default.test_model__{snapshot.version}"), - ] - ) + adapter_mock.get_data_object.assert_not_called() adapter_mock.drop_table.assert_has_calls( [ call(f"sqlmesh__default.test_model__{snapshot.fingerprint.to_version()}__dev"), @@ -480,8 +466,34 @@ def create_and_cleanup(name: str, dev_table_only: bool): ) +def test_cleanup_fails(adapter_mock, make_snapshot): + adapter_mock.drop_table.side_effect = RuntimeError("test_error") + + evaluator = SnapshotEvaluator(adapter_mock) + + model = SqlModel( + name="catalog.test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="a"), + storage_format="parquet", + query=parse_one("SELECT a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), + ) + + snapshot = make_snapshot(model) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) + snapshot.version = "test_version" + + evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env")) + with pytest.raises(NodeExecutionFailedError) as exc_info: + evaluator.cleanup( + [SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True)] + ) + + assert str(exc_info.value.__cause__) == "test_error" + + def test_cleanup_skip_missing_table(adapter_mock, make_snapshot): adapter_mock.get_data_object.return_value = None + adapter_mock.drop_table.side_effect = RuntimeError("fail") evaluator = SnapshotEvaluator(adapter_mock) @@ -502,7 +514,9 @@ def test_cleanup_skip_missing_table(adapter_mock, make_snapshot): adapter_mock.get_data_object.assert_called_once_with( f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" ) - adapter_mock.drop_table.assert_not_called() + adapter_mock.drop_table.assert_called_once_with( + f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev" + ) def test_cleanup_external_model(mocker: MockerFixture, adapter_mock, make_snapshot):