3737from sqlmesh .core import dialect as d
3838from sqlmesh .core .audit import Audit , StandaloneAudit
3939from sqlmesh .core .dialect import schema_
40- from sqlmesh .core .engine_adapter .shared import InsertOverwriteStrategy , DataObjectType
40+ from sqlmesh .core .engine_adapter .shared import InsertOverwriteStrategy , DataObjectType , DataObject
4141from sqlmesh .core .macros import RuntimeStage
4242from sqlmesh .core .model import (
4343 AuditResult ,
@@ -422,50 +422,14 @@ def get_snapshots_to_create(
422422 target_snapshots: Target snapshots.
423423 deployability_index: Determines snapshots that are deployable / representative in the context of this creation.
424424 """
425- snapshots_with_table_names = defaultdict (set )
426- tables_by_gateway_and_schema : t .Dict [t .Union [str , None ], t .Dict [exp .Table , set [str ]]] = (
427- defaultdict (lambda : defaultdict (set ))
428- )
429-
425+ existing_data_objects = self ._get_data_objects (target_snapshots , deployability_index )
426+ snapshots_to_create = []
430427 for snapshot in target_snapshots :
431428 if not snapshot .is_model or snapshot .is_symbolic :
432429 continue
433- is_deployable = deployability_index .is_deployable (snapshot )
434- table = exp .to_table (snapshot .table_name (is_deployable ), dialect = snapshot .model .dialect )
435- snapshots_with_table_names [snapshot ].add (table .name )
436- table_schema = d .schema_ (table .db , catalog = table .catalog )
437- tables_by_gateway_and_schema [snapshot .model_gateway ][table_schema ].add (table .name )
438-
439- def _get_data_objects (
440- schema : exp .Table ,
441- object_names : t .Optional [t .Set [str ]] = None ,
442- gateway : t .Optional [str ] = None ,
443- ) -> t .Set [str ]:
444- logger .info ("Listing data objects in schema %s" , schema .sql ())
445- objs = self .get_adapter (gateway ).get_data_objects (schema , object_names )
446- return {obj .name for obj in objs }
447-
448- with self .concurrent_context ():
449- existing_objects : t .Set [str ] = set ()
450- # A schema can be shared across multiple engines, so we need to group tables by both gateway and schema
451- for gateway , tables_by_schema in tables_by_gateway_and_schema .items ():
452- objs_for_gateway = {
453- obj
454- for objs in concurrent_apply_to_values (
455- list (tables_by_schema ),
456- lambda s : _get_data_objects (
457- schema = s , object_names = tables_by_schema .get (s ), gateway = gateway
458- ),
459- self .ddl_concurrent_tasks ,
460- )
461- for obj in objs
462- }
463- existing_objects .update (objs_for_gateway )
464-
465- snapshots_to_create = []
466- for snapshot , table_names in snapshots_with_table_names .items ():
467- missing_tables = table_names - existing_objects
468- if missing_tables or (snapshot .is_seed and not snapshot .intervals ):
430+ if snapshot .snapshot_id not in existing_data_objects or (
431+ snapshot .is_seed and not snapshot .intervals
432+ ):
469433 snapshots_to_create .append (snapshot )
470434
471435 return snapshots_to_create
@@ -514,16 +478,26 @@ def migrate(
514478 allow_additive_snapshots: Set of snapshots that are allowed to have additive schema changes.
515479 deployability_index: Determines snapshots that are deployable in the context of this evaluation.
516480 """
481+ deployability_index = deployability_index or DeployabilityIndex .all_deployable ()
482+ target_data_objects = self ._get_data_objects (target_snapshots , deployability_index )
483+ if not target_data_objects :
484+ return
485+
486+ if not snapshots :
487+ snapshots = {s .snapshot_id : s for s in target_snapshots }
488+
517489 allow_destructive_snapshots = allow_destructive_snapshots or set ()
518490 allow_additive_snapshots = allow_additive_snapshots or set ()
519- deployability_index = deployability_index or DeployabilityIndex .all_deployable ()
520491 snapshots_by_name = {s .name : s for s in snapshots .values ()}
492+ snapshots_with_data_objects = [snapshots [s_id ] for s_id in target_data_objects ]
521493 with self .concurrent_context ():
494+ # Only migrate snapshots for which there's an existing data object
522495 concurrent_apply_to_snapshots (
523- target_snapshots ,
496+ snapshots_with_data_objects ,
524497 lambda s : self ._migrate_snapshot (
525498 s ,
526499 snapshots_by_name ,
500+ target_data_objects [s .snapshot_id ],
527501 allow_destructive_snapshots ,
528502 allow_additive_snapshots ,
529503 self .get_adapter (s .model_gateway ),
@@ -1074,6 +1048,7 @@ def _migrate_snapshot(
10741048 self ,
10751049 snapshot : Snapshot ,
10761050 snapshots : t .Dict [str , Snapshot ],
1051+ target_data_object : t .Optional [DataObject ],
10771052 allow_destructive_snapshots : t .Set [str ],
10781053 allow_additive_snapshots : t .Set [str ],
10791054 adapter : EngineAdapter ,
@@ -1095,7 +1070,6 @@ def _migrate_snapshot(
10951070 adapter .transaction (),
10961071 adapter .session (snapshot .model .render_session_properties (** render_kwargs )),
10971072 ):
1098- target_data_object = adapter .get_data_object (target_table_name )
10991073 table_exists = target_data_object is not None
11001074 if adapter .drop_data_object_on_type_mismatch (
11011075 target_data_object , _snapshot_to_data_object_type (snapshot )
@@ -1447,6 +1421,62 @@ def _can_clone(self, snapshot: Snapshot, deployability_index: DeployabilityIndex
14471421 and not deployability_index .is_deployable (snapshot )
14481422 )
14491423
1424+ def _get_data_objects (
1425+ self ,
1426+ target_snapshots : t .Iterable [Snapshot ],
1427+ deployability_index : DeployabilityIndex ,
1428+ ) -> t .Dict [SnapshotId , DataObject ]:
1429+ """Returns a dictionary of snapshot IDs to existing data objects of their physical tables.
1430+
1431+ Args:
1432+ target_snapshots: Target snapshots.
1433+ deployability_index: The deployability index to determine whether to look for a deployable or
1434+ a non-deployable physical table.
1435+
1436+ Returns:
1437+ A dictionary of snapshot IDs to existing data objects of their physical tables. If the data object
1438+ for a snapshot is not found, it will not be included in the dictionary.
1439+ """
1440+ tables_by_gateway_and_schema : t .Dict [t .Union [str , None ], t .Dict [exp .Table , set [str ]]] = (
1441+ defaultdict (lambda : defaultdict (set ))
1442+ )
1443+ snapshots_by_table_name : t .Dict [str , Snapshot ] = {}
1444+ for snapshot in target_snapshots :
1445+ if not snapshot .is_model or snapshot .is_symbolic :
1446+ continue
1447+ is_deployable = deployability_index .is_deployable (snapshot )
1448+ table = exp .to_table (snapshot .table_name (is_deployable ), dialect = snapshot .model .dialect )
1449+ table_schema = d .schema_ (table .db , catalog = table .catalog )
1450+ tables_by_gateway_and_schema [snapshot .model_gateway ][table_schema ].add (table .name )
1451+ snapshots_by_table_name [table .name ] = snapshot
1452+
1453+ def _get_data_objects_in_schema (
1454+ schema : exp .Table ,
1455+ object_names : t .Optional [t .Set [str ]] = None ,
1456+ gateway : t .Optional [str ] = None ,
1457+ ) -> t .List [DataObject ]:
1458+ logger .info ("Listing data objects in schema %s" , schema .sql ())
1459+ return self .get_adapter (gateway ).get_data_objects (schema , object_names )
1460+
1461+ with self .concurrent_context ():
1462+ existing_objects : t .List [DataObject ] = []
1463+ # A schema can be shared across multiple engines, so we need to group tables by both gateway and schema
1464+ for gateway , tables_by_schema in tables_by_gateway_and_schema .items ():
1465+ objs_for_gateway = [
1466+ obj
1467+ for objs in concurrent_apply_to_values (
1468+ list (tables_by_schema ),
1469+ lambda s : _get_data_objects_in_schema (
1470+ schema = s , object_names = tables_by_schema .get (s ), gateway = gateway
1471+ ),
1472+ self .ddl_concurrent_tasks ,
1473+ )
1474+ for obj in objs
1475+ ]
1476+ existing_objects .extend (objs_for_gateway )
1477+
1478+ return {snapshots_by_table_name [obj .name ].snapshot_id : obj for obj in existing_objects }
1479+
14501480
14511481def _evaluation_strategy (snapshot : SnapshotInfoLike , adapter : EngineAdapter ) -> EvaluationStrategy :
14521482 klass : t .Type
0 commit comments