From 2e31e4a6489ece1de832450c16a244ad32101b20 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Wed, 3 Sep 2025 21:38:36 -0700 Subject: [PATCH] Fix: Prefetch all relevant data objects during snapshot migration --- sqlmesh/core/snapshot/evaluator.py | 120 +++++---- tests/core/test_snapshot_evaluator.py | 338 +++++++++++++------------- 2 files changed, 242 insertions(+), 216 deletions(-) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 6d6525a771..b96b0b8718 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -37,7 +37,7 @@ from sqlmesh.core import dialect as d from sqlmesh.core.audit import Audit, StandaloneAudit from sqlmesh.core.dialect import schema_ -from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, DataObjectType +from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, DataObjectType, DataObject from sqlmesh.core.macros import RuntimeStage from sqlmesh.core.model import ( AuditResult, @@ -422,50 +422,14 @@ def get_snapshots_to_create( target_snapshots: Target snapshots. deployability_index: Determines snapshots that are deployable / representative in the context of this creation. """ - snapshots_with_table_names = defaultdict(set) - tables_by_gateway_and_schema: t.Dict[t.Union[str, None], t.Dict[exp.Table, set[str]]] = ( - defaultdict(lambda: defaultdict(set)) - ) - + existing_data_objects = self._get_data_objects(target_snapshots, deployability_index) + snapshots_to_create = [] for snapshot in target_snapshots: if not snapshot.is_model or snapshot.is_symbolic: continue - is_deployable = deployability_index.is_deployable(snapshot) - table = exp.to_table(snapshot.table_name(is_deployable), dialect=snapshot.model.dialect) - snapshots_with_table_names[snapshot].add(table.name) - table_schema = d.schema_(table.db, catalog=table.catalog) - tables_by_gateway_and_schema[snapshot.model_gateway][table_schema].add(table.name) - - def _get_data_objects( - schema: exp.Table, - object_names: t.Optional[t.Set[str]] = None, - gateway: t.Optional[str] = None, - ) -> t.Set[str]: - logger.info("Listing data objects in schema %s", schema.sql()) - objs = self.get_adapter(gateway).get_data_objects(schema, object_names) - return {obj.name for obj in objs} - - with self.concurrent_context(): - existing_objects: t.Set[str] = set() - # A schema can be shared across multiple engines, so we need to group tables by both gateway and schema - for gateway, tables_by_schema in tables_by_gateway_and_schema.items(): - objs_for_gateway = { - obj - for objs in concurrent_apply_to_values( - list(tables_by_schema), - lambda s: _get_data_objects( - schema=s, object_names=tables_by_schema.get(s), gateway=gateway - ), - self.ddl_concurrent_tasks, - ) - for obj in objs - } - existing_objects.update(objs_for_gateway) - - snapshots_to_create = [] - for snapshot, table_names in snapshots_with_table_names.items(): - missing_tables = table_names - existing_objects - if missing_tables or (snapshot.is_seed and not snapshot.intervals): + if snapshot.snapshot_id not in existing_data_objects or ( + snapshot.is_seed and not snapshot.intervals + ): snapshots_to_create.append(snapshot) return snapshots_to_create @@ -514,16 +478,26 @@ def migrate( allow_additive_snapshots: Set of snapshots that are allowed to have additive schema changes. deployability_index: Determines snapshots that are deployable in the context of this evaluation. """ + deployability_index = deployability_index or DeployabilityIndex.all_deployable() + target_data_objects = self._get_data_objects(target_snapshots, deployability_index) + if not target_data_objects: + return + + if not snapshots: + snapshots = {s.snapshot_id: s for s in target_snapshots} + allow_destructive_snapshots = allow_destructive_snapshots or set() allow_additive_snapshots = allow_additive_snapshots or set() - deployability_index = deployability_index or DeployabilityIndex.all_deployable() snapshots_by_name = {s.name: s for s in snapshots.values()} + snapshots_with_data_objects = [snapshots[s_id] for s_id in target_data_objects] with self.concurrent_context(): + # Only migrate snapshots for which there's an existing data object concurrent_apply_to_snapshots( - target_snapshots, + snapshots_with_data_objects, lambda s: self._migrate_snapshot( s, snapshots_by_name, + target_data_objects[s.snapshot_id], allow_destructive_snapshots, allow_additive_snapshots, self.get_adapter(s.model_gateway), @@ -1074,6 +1048,7 @@ def _migrate_snapshot( self, snapshot: Snapshot, snapshots: t.Dict[str, Snapshot], + target_data_object: t.Optional[DataObject], allow_destructive_snapshots: t.Set[str], allow_additive_snapshots: t.Set[str], adapter: EngineAdapter, @@ -1095,7 +1070,6 @@ def _migrate_snapshot( adapter.transaction(), adapter.session(snapshot.model.render_session_properties(**render_kwargs)), ): - target_data_object = adapter.get_data_object(target_table_name) table_exists = target_data_object is not None if adapter.drop_data_object_on_type_mismatch( target_data_object, _snapshot_to_data_object_type(snapshot) @@ -1447,6 +1421,62 @@ def _can_clone(self, snapshot: Snapshot, deployability_index: DeployabilityIndex and not deployability_index.is_deployable(snapshot) ) + def _get_data_objects( + self, + target_snapshots: t.Iterable[Snapshot], + deployability_index: DeployabilityIndex, + ) -> t.Dict[SnapshotId, DataObject]: + """Returns a dictionary of snapshot IDs to existing data objects of their physical tables. + + Args: + target_snapshots: Target snapshots. + deployability_index: The deployability index to determine whether to look for a deployable or + a non-deployable physical table. + + Returns: + A dictionary of snapshot IDs to existing data objects of their physical tables. If the data object + for a snapshot is not found, it will not be included in the dictionary. + """ + tables_by_gateway_and_schema: t.Dict[t.Union[str, None], t.Dict[exp.Table, set[str]]] = ( + defaultdict(lambda: defaultdict(set)) + ) + snapshots_by_table_name: t.Dict[str, Snapshot] = {} + for snapshot in target_snapshots: + if not snapshot.is_model or snapshot.is_symbolic: + continue + is_deployable = deployability_index.is_deployable(snapshot) + table = exp.to_table(snapshot.table_name(is_deployable), dialect=snapshot.model.dialect) + table_schema = d.schema_(table.db, catalog=table.catalog) + tables_by_gateway_and_schema[snapshot.model_gateway][table_schema].add(table.name) + snapshots_by_table_name[table.name] = snapshot + + def _get_data_objects_in_schema( + schema: exp.Table, + object_names: t.Optional[t.Set[str]] = None, + gateway: t.Optional[str] = None, + ) -> t.List[DataObject]: + logger.info("Listing data objects in schema %s", schema.sql()) + return self.get_adapter(gateway).get_data_objects(schema, object_names) + + with self.concurrent_context(): + existing_objects: t.List[DataObject] = [] + # A schema can be shared across multiple engines, so we need to group tables by both gateway and schema + for gateway, tables_by_schema in tables_by_gateway_and_schema.items(): + objs_for_gateway = [ + obj + for objs in concurrent_apply_to_values( + list(tables_by_schema), + lambda s: _get_data_objects_in_schema( + schema=s, object_names=tables_by_schema.get(s), gateway=gateway + ), + self.ddl_concurrent_tasks, + ) + for obj in objs + ] + existing_objects.extend(objs_for_gateway) + + return {snapshots_by_table_name[obj.name].snapshot_id: obj for obj in existing_objects} + def _evaluation_strategy(snapshot: SnapshotInfoLike, adapter: EngineAdapter) -> EvaluationStrategy: klass: t.Type diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 9b1e81c0f4..6f610a696a 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -880,12 +880,12 @@ def test_create_only_dev_table_exists(mocker: MockerFixture, adapter_mock, make_ adapter_mock.table_exists.return_value = True evaluator = SnapshotEvaluator(adapter_mock) - evaluator.create([snapshot], {}) + evaluator.create([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) adapter_mock.create_view.assert_not_called() adapter_mock.get_data_objects.assert_called_once_with( schema_("sqlmesh__test_schema"), { - f"test_schema__test_model__{snapshot.version}", + f"test_schema__test_model__{snapshot.version}__dev", }, ) @@ -1003,14 +1003,15 @@ def test_create_tables_exist( evaluator = SnapshotEvaluator(adapter_mock) snapshot.categorize_as(category=snapshot_category, forward_only=forward_only) + table_name = ( + f"db__model__{snapshot.version}" + if deployability_index.is_deployable(snapshot) + else f"db__model__{snapshot.version}__dev" + ) + adapter_mock.get_data_objects.return_value = [ DataObject( - name=f"db__model__{snapshot.version}__dev", - schema="sqlmesh__db", - type=DataObjectType.TABLE, - ), - DataObject( - name=f"db__model__{snapshot.version}", + name=table_name, schema="sqlmesh__db", type=DataObjectType.TABLE, ), @@ -1024,11 +1025,7 @@ def test_create_tables_exist( adapter_mock.get_data_objects.assert_called_once_with( schema_("sqlmesh__db"), - { - f"db__model__{snapshot.version}" - if deployability_index.is_deployable(snapshot) - else f"db__model__{snapshot.version}__dev", - }, + {table_name}, ) adapter_mock.create_schema.assert_not_called() adapter_mock.create_table.assert_not_called() @@ -1279,7 +1276,20 @@ def test_migrate(mocker: MockerFixture, make_snapshot, make_mocked_engine_adapte adapter.with_settings = lambda **kwargs: adapter # type: ignore session_spy = mocker.spy(adapter, "session") - current_table = "sqlmesh__test_schema.test_schema__test_model__1" + model = SqlModel( + name="test_schema.test_model", + kind=IncrementalByTimeRangeKind( + time_column="a", on_destructive_change=OnDestructiveChange.ALLOW + ), + storage_format="parquet", + query=parse_one("SELECT c, a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), + ) + snapshot = make_snapshot(model, version="1") + snapshot.change_category = SnapshotChangeCategory.BREAKING + snapshot.forward_only = True + snapshot.previous_versions = snapshot.all_versions + + current_table = snapshot.table_name() def columns(table_name): if table_name == current_table: @@ -1296,32 +1306,27 @@ def columns(table_name): adapter.table_exists = lambda _: True # type: ignore mocker.patch.object( adapter, - "get_data_object", - return_value=DataObject(schema="test_schema", name="test_model", type="table"), + "get_data_objects", + return_value=[ + DataObject( + schema="test_schema", + name=f"test_schema__test_model__{snapshot.version}", + type="table", + ) + ], ) evaluator = SnapshotEvaluator(adapter) - model = SqlModel( - name="test_schema.test_model", - kind=IncrementalByTimeRangeKind( - time_column="a", on_destructive_change=OnDestructiveChange.ALLOW - ), - storage_format="parquet", - query=parse_one("SELECT c, a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), - ) - snapshot = make_snapshot(model, version="1") - snapshot.change_category = SnapshotChangeCategory.BREAKING - snapshot.forward_only = True - snapshot.previous_versions = snapshot.all_versions - - evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) + evaluator.migrate([snapshot], {}) adapter.cursor.execute.assert_has_calls( [ - call('ALTER TABLE "sqlmesh__test_schema"."test_schema__test_model__1" DROP COLUMN "b"'), call( - 'ALTER TABLE "sqlmesh__test_schema"."test_schema__test_model__1" ADD COLUMN "a" INT' + f'ALTER TABLE "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}" DROP COLUMN "b"' + ), + call( + f'ALTER TABLE "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}" ADD COLUMN "a" INT' ), ] ) @@ -1371,15 +1376,6 @@ def test_migrate_view( change_category: SnapshotChangeCategory, forward_only: bool, ): - adapter = make_mocked_engine_adapter(EngineAdapter) - mocker.patch.object( - adapter, - "get_data_object", - return_value=DataObject(schema="test_schema", name="test_model", type="view"), - ) - - evaluator = SnapshotEvaluator(adapter) - model = SqlModel( name="test_schema.test_model", kind=ViewKind(), @@ -1390,7 +1386,20 @@ def test_migrate_view( snapshot.change_category = change_category snapshot.forward_only = forward_only - evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) + adapter = make_mocked_engine_adapter(EngineAdapter) + mocker.patch( + "sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects", + return_value=[ + DataObject( + schema="test_schema", + name=f"test_schema__test_model__{snapshot.version}", + type="view", + ) + ], + ) + + evaluator = SnapshotEvaluator(adapter) + evaluator.migrate([snapshot], {}) adapter.cursor.execute.assert_not_called() @@ -1400,19 +1409,6 @@ def test_migrate_snapshot_data_object_type_mismatch( make_snapshot, make_mocked_engine_adapter, ): - adapter = make_mocked_engine_adapter(EngineAdapter) - adapter.with_settings = lambda **kwargs: adapter # type: ignore - mocker.patch.object( - adapter, - "get_data_object", - return_value=DataObject( - schema="sqlmesh__test_schema", name="test_schema__test_model__1", type="view" - ), - ) - mocker.patch.object(adapter, "table_exists", return_value=False) - - evaluator = SnapshotEvaluator(adapter) - model = SqlModel( name="test_schema.test_model", kind=FullKind(), @@ -1424,11 +1420,29 @@ def test_migrate_snapshot_data_object_type_mismatch( snapshot.forward_only = True snapshot.previous_versions = snapshot.all_versions - evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) + adapter = make_mocked_engine_adapter(EngineAdapter) + adapter.with_settings = lambda **kwargs: adapter # type: ignore + mocker.patch( + "sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects", + return_value=[ + DataObject( + schema="sqlmesh__test_schema", + name=f"test_schema__test_model__{snapshot.version}", + type="view", + ) + ], + ) + mocker.patch.object(adapter, "table_exists", return_value=False) + + evaluator = SnapshotEvaluator(adapter) + + evaluator.migrate([snapshot], {}) adapter.cursor.execute.assert_has_calls( [ - call('DROP VIEW IF EXISTS "sqlmesh__test_schema"."test_schema__test_model__1"'), + call( + f'DROP VIEW IF EXISTS "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}"' + ), ] ) @@ -1639,14 +1653,6 @@ def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot) snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.previous_versions = snapshot.all_versions - adapter_mock.get_data_objects.return_value = [ - DataObject( - name=f"test_schema__test_model__{snapshot.version}", - schema="sqlmesh__test_schema", - type=DataObjectType.TABLE, - ), - ] - evaluator.create([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) adapter_mock.create_table.assert_called_once_with( @@ -1663,7 +1669,7 @@ def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot) ) adapter_mock.clone_table.assert_called_once_with( - f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}__dev", + f"sqlmesh__test_schema.test_schema__test_model__{snapshot.dev_version}__dev", f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}", replace=True, rendered_physical_properties={}, @@ -1709,14 +1715,6 @@ def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_m snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.previous_versions = snapshot.all_versions - adapter_mock.get_data_objects.return_value = [ - DataObject( - name=f"test_schema__test_model__{snapshot.version}", - schema="sqlmesh__test_schema", - type=DataObjectType.TABLE, - ), - ] - with pytest.raises(SnapshotCreationFailedError): evaluator.create([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) @@ -1774,14 +1772,6 @@ def test_create_clone_in_dev_self_referencing( snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.previous_versions = snapshot.all_versions - adapter_mock.get_data_objects.return_value = [ - DataObject( - name=f"test_schema__test_model__{snapshot.version}", - schema="sqlmesh__test_schema", - type=DataObjectType.TABLE, - ), - ] - evaluator.create([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) adapter_mock.create_table.assert_called_once_with( @@ -1815,9 +1805,21 @@ def test_on_destructive_change_runtime_check( make_snapshot, make_mocked_engine_adapter, ): + # SQLMesh default: ERROR + model = SqlModel( + name="test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="a"), + query=parse_one("SELECT c, a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), + ) + snapshot = make_snapshot(model, version="1") + snapshot.change_category = SnapshotChangeCategory.BREAKING + snapshot.forward_only = True + snapshot.previous_versions = snapshot.all_versions + adapter = make_mocked_engine_adapter(EngineAdapter) adapter.with_settings = lambda **kwargs: adapter # type: ignore - current_table = "sqlmesh__test_schema.test_schema__test_model__1" + + current_table = snapshot.table_name() def columns(table_name): if table_name == current_table: @@ -1831,27 +1833,21 @@ def columns(table_name): } adapter.columns = columns # type: ignore - mocker.patch.object( - adapter, - "get_data_object", - return_value=DataObject(schema="test_schema", name="test_model", type=DataObjectType.TABLE), + mocker.patch( + "sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects", + return_value=[ + DataObject( + schema="test_schema", + name=f"test_schema__test_model__{snapshot.version}", + type=DataObjectType.TABLE, + ) + ], ) evaluator = SnapshotEvaluator(adapter) - # SQLMesh default: ERROR - model = SqlModel( - name="test_schema.test_model", - kind=IncrementalByTimeRangeKind(time_column="a"), - query=parse_one("SELECT c, a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), - ) - snapshot = make_snapshot(model, version="1") - snapshot.change_category = SnapshotChangeCategory.BREAKING - snapshot.forward_only = True - snapshot.previous_versions = snapshot.all_versions - with pytest.raises(NodeExecutionFailedError) as ex: - evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) + evaluator.migrate([snapshot], {}) destructive_change_err = ex.value.__cause__ assert isinstance(destructive_change_err, DestructiveChangeError) @@ -1875,7 +1871,7 @@ def columns(table_name): logger = logging.getLogger("sqlmesh.core.snapshot.evaluator") with patch.object(logger, "warning") as mock_logger: - evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) + evaluator.migrate([snapshot], {}) assert ( mock_logger.call_args[0][0] == "\nPlan requires destructive change to forward-only model '\"test_schema\".\"test_model\"'s schema that drops column 'b'.\n\nSchema changes:\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 DROP COLUMN b\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 ADD COLUMN a INT" @@ -1887,7 +1883,6 @@ def columns(table_name): [snapshot], {}, {'"test_schema"."test_model"'}, - deployability_index=DeployabilityIndex.none_deployable(), ) assert mock_logger.call_count == 0 @@ -1897,9 +1892,20 @@ def test_on_additive_change_runtime_check( make_snapshot, make_mocked_engine_adapter, ): + # SQLMesh default: ERROR + model = SqlModel( + name="test_schema.test_model", + kind=IncrementalByTimeRangeKind(time_column="a", on_additive_change=OnAdditiveChange.ERROR), + query=parse_one("SELECT c, a, b FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), + ) + snapshot = make_snapshot(model, version="1") + snapshot.change_category = SnapshotChangeCategory.BREAKING + snapshot.forward_only = True + snapshot.previous_versions = snapshot.all_versions + adapter = make_mocked_engine_adapter(EngineAdapter) adapter.with_settings = lambda **kwargs: adapter # type: ignore - current_table = "sqlmesh__test_schema.test_schema__test_model__1" + current_table = snapshot.table_name() def columns(table_name): if table_name == current_table: @@ -1914,27 +1920,21 @@ def columns(table_name): } adapter.columns = columns # type: ignore - mocker.patch.object( - adapter, - "get_data_object", - return_value=DataObject(schema="test_schema", name="test_model", type=DataObjectType.TABLE), + mocker.patch( + "sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects", + return_value=[ + DataObject( + schema="test_schema", + name=f"test_schema__test_model__{snapshot.version}", + type=DataObjectType.TABLE, + ) + ], ) evaluator = SnapshotEvaluator(adapter) - # SQLMesh default: ERROR - model = SqlModel( - name="test_schema.test_model", - kind=IncrementalByTimeRangeKind(time_column="a", on_additive_change=OnAdditiveChange.ERROR), - query=parse_one("SELECT c, a, b FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), - ) - snapshot = make_snapshot(model, version="1") - snapshot.change_category = SnapshotChangeCategory.BREAKING - snapshot.forward_only = True - snapshot.previous_versions = snapshot.all_versions - with pytest.raises(NodeExecutionFailedError) as ex: - evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) + evaluator.migrate([snapshot], {}) additive_change_error = ex.value.__cause__ assert isinstance(additive_change_error, AdditiveChangeError) @@ -1958,7 +1958,7 @@ def columns(table_name): logger = logging.getLogger("sqlmesh.core.snapshot.evaluator") with patch.object(logger, "warning") as mock_logger: - evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) + evaluator.migrate([snapshot], {}) assert ( mock_logger.call_args[0][0] == "\nPlan requires additive change to forward-only model '\"test_schema\".\"test_model\"'s schema that adds column 'b'.\n\nSchema changes:\n ALTER TABLE sqlmesh__test_schema.test_schema__test_model__1 ADD COLUMN b INT" @@ -2804,11 +2804,6 @@ def test_create_seed_no_intervals(mocker: MockerFixture, adapter_mock, make_snap schema="sqlmesh__db", type=DataObjectType.TABLE, ), - DataObject( - name=f"db__seed__{snapshot.version}__dev", - schema="sqlmesh__db", - type=DataObjectType.TABLE, - ), ] evaluator = SnapshotEvaluator(adapter_mock) @@ -3763,14 +3758,6 @@ def test_create_managed_forward_only_with_previous_version_doesnt_clone_for_dev_ ), ) - adapter_mock.get_data_objects.return_value = [ - DataObject( - name=f"test_schema__test_model__{snapshot.version}", - schema="sqlmesh__test_schema", - type=DataObjectType.MANAGED_TABLE, - ), - ] - evaluator.create( target_snapshots=[snapshot], snapshots={}, @@ -3806,12 +3793,16 @@ def test_migrate_snapshot(snapshot: Snapshot, mocker: MockerFixture, adapter_moc assert new_snapshot.table_name() == snapshot.table_name() - adapter_mock.get_data_object.return_value = DataObject( - schema="test_schema", name="test_model", type=DataObjectType.TABLE - ) + adapter_mock.get_data_objects.return_value = [ + DataObject( + schema="test_schema", + name=f"db__model__{new_snapshot.version}", + type=DataObjectType.TABLE, + ) + ] adapter_mock.drop_data_object_on_type_mismatch.return_value = False - evaluator.migrate([new_snapshot], {}, deployability_index=DeployabilityIndex.none_deployable()) + evaluator.migrate([new_snapshot], {}) common_kwargs: t.Dict[str, t.Any] = dict( table_format=None, @@ -3883,9 +3874,13 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture): snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True) snapshot.previous_versions = snapshot.all_versions - adapter_mock.get_data_object.return_value = DataObject( - schema="test_schema", name="test_model", type=DataObjectType.MANAGED_TABLE - ) + adapter_mock.get_data_objects.return_value = [ + DataObject( + schema="test_schema", + name=f"test_schema__test_model__{snapshot.version}", + type=DataObjectType.MANAGED_TABLE, + ) + ] adapter_mock.drop_data_object_on_type_mismatch.return_value = False # no schema changes - no-op @@ -3893,7 +3888,6 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture): evaluator.migrate( target_snapshots=[snapshot], snapshots={}, - deployability_index=DeployabilityIndex.none_deployable(), ) adapter_mock.create_table.assert_not_called() @@ -3908,7 +3902,6 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture): evaluator.migrate( target_snapshots=[snapshot], snapshots={}, - deployability_index=DeployabilityIndex.none_deployable(), ) sqlmesh_err = ex.value.__cause__ @@ -4065,34 +4058,6 @@ def test_multiple_engine_migration( adapter_two.with_settings.return_value = adapter_two engine_adapters = {"one": adapter_one, "two": adapter_two} - current_table = "sqlmesh__test_schema.test_schema__test_model__1" - - def columns(table_name): - if table_name == current_table: - return { - "c": exp.DataType.build("int"), - "b": exp.DataType.build("int"), - } - return { - "c": exp.DataType.build("int"), - "a": exp.DataType.build("int"), - } - - adapter_two.columns.side_effect = columns - adapter_two.get_data_object.return_value = DataObject( - schema="test_schema", name="test_model_2", type=DataObjectType.TABLE - ) - adapter_two.drop_data_object_on_type_mismatch.return_value = False - - mocker.patch.object(adapter_one, "columns", side_effect=columns) - mocker.patch.object( - adapter_one, - "get_data_object", - return_value=DataObject(schema="test_schema", name="test_model", type=DataObjectType.TABLE), - ) - - evaluator = SnapshotEvaluator(engine_adapters) - model = SqlModel( name="test_schema.test_model", kind=IncrementalByTimeRangeKind( @@ -4116,10 +4081,41 @@ def columns(table_name): snapshot_2.change_category = SnapshotChangeCategory.BREAKING snapshot_2.forward_only = True snapshot_2.previous_versions = snapshot_2.all_versions - evaluator.migrate( - [snapshot_1, snapshot_2], {}, deployability_index=DeployabilityIndex.none_deployable() + + def columns(table_name): + if table_name == snapshot_1.table_name(): + return { + "c": exp.DataType.build("int"), + "b": exp.DataType.build("int"), + } + return { + "c": exp.DataType.build("int"), + "a": exp.DataType.build("int"), + } + + adapter_two.columns.side_effect = columns + adapter_two.drop_data_object_on_type_mismatch.return_value = False + + mocker.patch.object(adapter_one, "columns", side_effect=columns) + mocker.patch( + "sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects", + return_value=[ + DataObject( + schema="test_schema", + name=f"test_schema__test_model__{snapshot_1.version}", + type=DataObjectType.TABLE, + ), + DataObject( + schema="test_schema", + name=f"test_schema__test_model_2__{snapshot_2.version}", + type=DataObjectType.TABLE, + ), + ], ) + evaluator = SnapshotEvaluator(engine_adapters) + evaluator.migrate([snapshot_1, snapshot_2], {}) + adapter_one.cursor.execute.assert_has_calls( [ call('ALTER TABLE "sqlmesh__test_schema"."test_schema__test_model__1" DROP COLUMN "b"'),