Skip to content

Commit cd72235

Browse files
committed
Do not pass all snapshots to _demote_snapshot
1 parent 86b6dc6 commit cd72235

File tree

2 files changed

+61
-62
lines changed

2 files changed

+61
-62
lines changed

sqlmesh/core/plan/evaluator.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,6 @@ def visit_virtual_layer_update_stage(
342342
self._demote_snapshots(
343343
[stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots],
344344
stage.demoted_environment_naming_info,
345-
snapshots=stage.all_snapshots,
346345
deployability_index=stage.deployability_index,
347346
on_complete=lambda s: self.console.update_promotion_progress(s, False),
348347
)
@@ -386,14 +385,12 @@ def _demote_snapshots(
386385
self,
387386
target_snapshots: t.Iterable[Snapshot],
388387
environment_naming_info: EnvironmentNamingInfo,
389-
snapshots: t.Dict[SnapshotId, Snapshot],
390388
deployability_index: t.Optional[DeployabilityIndex] = None,
391389
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
392390
) -> None:
393391
self.snapshot_evaluator.demote(
394392
target_snapshots,
395393
environment_naming_info,
396-
snapshots=snapshots,
397394
deployability_index=deployability_index,
398395
on_complete=on_complete,
399396
)

sqlmesh/core/snapshot/evaluator.py

Lines changed: 61 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ def demote(
293293
concurrent_apply_to_snapshots(
294294
target_snapshots,
295295
lambda s: self._demote_snapshot(
296-
s, environment_naming_info, snapshots, deployability_index, on_complete
296+
s, environment_naming_info, deployability_index, on_complete
297297
),
298298
self.ddl_concurrent_tasks,
299299
)
@@ -974,42 +974,44 @@ def _promote_snapshot(
974974
snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None,
975975
table_mapping: t.Optional[t.Dict[str, str]] = None,
976976
) -> None:
977-
if snapshot.is_model:
978-
adapter = (
979-
self.get_adapter(snapshot.model_gateway)
980-
if environment_naming_info.gateway_managed
981-
else self.adapter
982-
)
983-
table_name = snapshot.table_name(deployability_index.is_representative(snapshot))
984-
view_name = snapshot.qualified_view_name.for_environment(
985-
environment_naming_info, dialect=adapter.dialect
986-
)
987-
render_kwargs: t.Dict[str, t.Any] = dict(
988-
start=start,
989-
end=end,
990-
execution_time=execution_time,
991-
engine_adapter=adapter,
992-
deployability_index=deployability_index,
993-
table_mapping=table_mapping,
994-
runtime_stage=RuntimeStage.PROMOTING,
995-
)
977+
if not snapshot.is_model:
978+
return
996979

997-
with (
998-
adapter.transaction(),
999-
adapter.session(snapshot.model.render_session_properties(**render_kwargs)),
1000-
):
1001-
_evaluation_strategy(snapshot, adapter).promote(
1002-
table_name=table_name,
1003-
view_name=view_name,
1004-
model=snapshot.model,
1005-
environment=environment_naming_info.name,
1006-
snapshots=snapshots,
1007-
**render_kwargs,
1008-
)
980+
adapter = (
981+
self.get_adapter(snapshot.model_gateway)
982+
if environment_naming_info.gateway_managed
983+
else self.adapter
984+
)
985+
table_name = snapshot.table_name(deployability_index.is_representative(snapshot))
986+
view_name = snapshot.qualified_view_name.for_environment(
987+
environment_naming_info, dialect=adapter.dialect
988+
)
989+
render_kwargs: t.Dict[str, t.Any] = dict(
990+
start=start,
991+
end=end,
992+
execution_time=execution_time,
993+
engine_adapter=adapter,
994+
deployability_index=deployability_index,
995+
table_mapping=table_mapping,
996+
runtime_stage=RuntimeStage.PROMOTING,
997+
)
998+
999+
with (
1000+
adapter.transaction(),
1001+
adapter.session(snapshot.model.render_session_properties(**render_kwargs)),
1002+
):
1003+
_evaluation_strategy(snapshot, adapter).promote(
1004+
table_name=table_name,
1005+
view_name=view_name,
1006+
model=snapshot.model,
1007+
environment=environment_naming_info.name,
1008+
snapshots=snapshots,
1009+
**render_kwargs,
1010+
)
10091011

1010-
snapshot_by_name = {s.name: s for s in (snapshots or {}).values()}
1011-
render_kwargs["snapshots"] = snapshot_by_name
1012-
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
1012+
snapshot_by_name = {s.name: s for s in (snapshots or {}).values()}
1013+
render_kwargs["snapshots"] = snapshot_by_name
1014+
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
10131015

10141016
if on_complete is not None:
10151017
on_complete(snapshot)
@@ -1018,34 +1020,34 @@ def _demote_snapshot(
10181020
self,
10191021
snapshot: Snapshot,
10201022
environment_naming_info: EnvironmentNamingInfo,
1021-
snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]],
10221023
deployability_index: t.Optional[DeployabilityIndex],
10231024
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
10241025
) -> None:
1025-
if snapshot.is_model:
1026-
adapter = (
1027-
self.get_adapter(snapshot.model_gateway)
1028-
if environment_naming_info.gateway_managed
1029-
else self.adapter
1030-
)
1031-
view_name = snapshot.qualified_view_name.for_environment(
1032-
environment_naming_info, dialect=adapter.dialect
1033-
)
1034-
with (
1035-
adapter.transaction(),
1036-
adapter.session(
1037-
snapshot.model.render_session_properties(
1038-
engine_adapter=adapter,
1039-
snapshots=snapshots,
1040-
deployability_index=deployability_index,
1041-
runtime_stage=RuntimeStage.DEMOTING,
1042-
)
1043-
),
1044-
):
1045-
_evaluation_strategy(snapshot, adapter).demote(view_name)
1026+
if not snapshot.is_model:
1027+
return
10461028

1047-
if on_complete is not None:
1048-
on_complete(snapshot)
1029+
adapter = (
1030+
self.get_adapter(snapshot.model_gateway)
1031+
if environment_naming_info.gateway_managed
1032+
else self.adapter
1033+
)
1034+
view_name = snapshot.qualified_view_name.for_environment(
1035+
environment_naming_info, dialect=adapter.dialect
1036+
)
1037+
with (
1038+
adapter.transaction(),
1039+
adapter.session(
1040+
snapshot.model.render_session_properties(
1041+
engine_adapter=adapter,
1042+
deployability_index=deployability_index,
1043+
runtime_stage=RuntimeStage.DEMOTING,
1044+
)
1045+
),
1046+
):
1047+
_evaluation_strategy(snapshot, adapter).demote(view_name)
1048+
1049+
if on_complete is not None:
1050+
on_complete(snapshot)
10491051

10501052
def _cleanup_snapshot(
10511053
self,

0 commit comments

Comments
 (0)