Skip to content

Commit c7c302f

Browse files
committed
Establish session before snapshot promotion/demotion
1 parent 7e983f8 commit c7c302f

File tree

2 files changed

+32
-16
lines changed

2 files changed

+32
-16
lines changed

sqlmesh/core/plan/evaluator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ def visit_virtual_layer_update_stage(
340340
)
341341
if stage.demoted_environment_naming_info:
342342
self._demote_snapshots(
343-
stage.demoted_snapshots,
343+
[stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots],
344344
stage.demoted_environment_naming_info,
345345
on_complete=lambda s: self.console.update_promotion_progress(s, False),
346346
)
@@ -382,7 +382,7 @@ def _promote_snapshots(
382382

383383
def _demote_snapshots(
384384
self,
385-
target_snapshots: t.Iterable[SnapshotTableInfo],
385+
target_snapshots: t.Iterable[Snapshot],
386386
environment_naming_info: EnvironmentNamingInfo,
387387
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
388388
) -> None:

sqlmesh/core/snapshot/evaluator.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ def promote(
276276

277277
def demote(
278278
self,
279-
target_snapshots: t.Iterable[SnapshotInfoLike],
279+
target_snapshots: t.Iterable[Snapshot],
280280
environment_naming_info: EnvironmentNamingInfo,
281281
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
282282
) -> None:
@@ -989,25 +989,30 @@ def _promote_snapshot(
989989
table_mapping=table_mapping,
990990
runtime_stage=RuntimeStage.PROMOTING,
991991
)
992-
_evaluation_strategy(snapshot, adapter).promote(
993-
table_name=table_name,
994-
view_name=view_name,
995-
model=snapshot.model,
996-
environment=environment_naming_info.name,
997-
snapshots=snapshots,
998-
**render_kwargs,
999-
)
1000992

1001-
snapshot_by_name = {s.name: s for s in (snapshots or {}).values()}
1002-
render_kwargs["snapshots"] = snapshot_by_name
1003-
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
993+
with (
994+
adapter.transaction(),
995+
adapter.session(snapshot.model.render_session_properties(**render_kwargs)),
996+
):
997+
_evaluation_strategy(snapshot, adapter).promote(
998+
table_name=table_name,
999+
view_name=view_name,
1000+
model=snapshot.model,
1001+
environment=environment_naming_info.name,
1002+
snapshots=snapshots,
1003+
**render_kwargs,
1004+
)
1005+
1006+
snapshot_by_name = {s.name: s for s in (snapshots or {}).values()}
1007+
render_kwargs["snapshots"] = snapshot_by_name
1008+
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
10041009

10051010
if on_complete is not None:
10061011
on_complete(snapshot)
10071012

10081013
def _demote_snapshot(
10091014
self,
1010-
snapshot: SnapshotInfoLike,
1015+
snapshot: Snapshot,
10111016
environment_naming_info: EnvironmentNamingInfo,
10121017
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
10131018
) -> None:
@@ -1019,7 +1024,18 @@ def _demote_snapshot(
10191024
view_name = snapshot.qualified_view_name.for_environment(
10201025
environment_naming_info, dialect=adapter.dialect
10211026
)
1022-
_evaluation_strategy(snapshot, adapter).demote(view_name)
1027+
session_properties = (
1028+
snapshot.model.render_session_properties(
1029+
engine_adapter=adapter, runtime_stage=RuntimeStage.PROMOTING
1030+
)
1031+
if snapshot.is_model
1032+
else {}
1033+
)
1034+
with (
1035+
adapter.transaction(),
1036+
adapter.session(session_properties),
1037+
):
1038+
_evaluation_strategy(snapshot, adapter).demote(view_name)
10231039

10241040
if on_complete is not None:
10251041
on_complete(snapshot)

0 commit comments

Comments
 (0)