From 4501a2fef3979af45f0d0e571ca9107edbf82272 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 3 Jul 2025 17:07:17 -0500 Subject: [PATCH 1/7] Establish session before snapshot promotion/demotion --- sqlmesh/core/plan/evaluator.py | 4 +-- sqlmesh/core/snapshot/evaluator.py | 44 ++++++++++++++++++++---------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index 545a5e5494..28a0e6eb71 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -340,7 +340,7 @@ def visit_virtual_layer_update_stage( ) if stage.demoted_environment_naming_info: self._demote_snapshots( - stage.demoted_snapshots, + [stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots], stage.demoted_environment_naming_info, on_complete=lambda s: self.console.update_promotion_progress(s, False), ) @@ -382,7 +382,7 @@ def _promote_snapshots( def _demote_snapshots( self, - target_snapshots: t.Iterable[SnapshotTableInfo], + target_snapshots: t.Iterable[Snapshot], environment_naming_info: EnvironmentNamingInfo, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, ) -> None: diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 641f216699..2c3a56f63c 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -276,7 +276,7 @@ def promote( def demote( self, - target_snapshots: t.Iterable[SnapshotInfoLike], + target_snapshots: t.Iterable[Snapshot], environment_naming_info: EnvironmentNamingInfo, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, ) -> None: @@ -989,25 +989,30 @@ def _promote_snapshot( table_mapping=table_mapping, runtime_stage=RuntimeStage.PROMOTING, ) - _evaluation_strategy(snapshot, adapter).promote( - table_name=table_name, - view_name=view_name, - model=snapshot.model, - environment=environment_naming_info.name, - snapshots=snapshots, - **render_kwargs, - ) - snapshot_by_name = {s.name: s for s in (snapshots or {}).values()} - render_kwargs["snapshots"] = snapshot_by_name - adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs)) + with ( + adapter.transaction(), + adapter.session(snapshot.model.render_session_properties(**render_kwargs)), + ): + _evaluation_strategy(snapshot, adapter).promote( + table_name=table_name, + view_name=view_name, + model=snapshot.model, + environment=environment_naming_info.name, + snapshots=snapshots, + **render_kwargs, + ) + + snapshot_by_name = {s.name: s for s in (snapshots or {}).values()} + render_kwargs["snapshots"] = snapshot_by_name + adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs)) if on_complete is not None: on_complete(snapshot) def _demote_snapshot( self, - snapshot: SnapshotInfoLike, + snapshot: Snapshot, environment_naming_info: EnvironmentNamingInfo, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]], ) -> None: @@ -1019,7 +1024,18 @@ def _demote_snapshot( view_name = snapshot.qualified_view_name.for_environment( environment_naming_info, dialect=adapter.dialect ) - _evaluation_strategy(snapshot, adapter).demote(view_name) + session_properties = ( + snapshot.model.render_session_properties( + engine_adapter=adapter, runtime_stage=RuntimeStage.PROMOTING + ) + if snapshot.is_model + else {} + ) + with ( + adapter.transaction(), + adapter.session(session_properties), + ): + _evaluation_strategy(snapshot, adapter).demote(view_name) if on_complete is not None: on_complete(snapshot) From 0b45a2872192f19ee54292912a03711838812670 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 3 Jul 2025 17:29:29 -0500 Subject: [PATCH 2/7] Ensure we have full snapshots for all demoted snapshot table infos --- sqlmesh/core/plan/stages.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/plan/stages.py b/sqlmesh/core/plan/stages.py index 9913a87bd0..194177b0cf 100644 --- a/sqlmesh/core/plan/stages.py +++ b/sqlmesh/core/plan/stages.py @@ -361,11 +361,14 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]: # Otherwise, unpause right after updatig the environment record. stages.append(UnpauseStage(promoted_snapshots=promoted_snapshots)) + full_demoted_snapshots = self.state_reader.get_snapshots( + s.snapshot_id for s in demoted_snapshots if s.snapshot_id not in snapshots + ) virtual_layer_update_stage = self._get_virtual_layer_update_stage( promoted_snapshots, demoted_snapshots, demoted_environment_naming_info, - snapshots, + snapshots | full_demoted_snapshots, deployability_index, ) if virtual_layer_update_stage: From 94c0d7484de68e28db9656e2f524956a71c45b17 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 3 Jul 2025 17:59:38 -0500 Subject: [PATCH 3/7] Add tests --- sqlmesh/core/snapshot/evaluator.py | 40 +++++++++++++-------------- tests/core/test_snapshot_evaluator.py | 4 +++ 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 2c3a56f63c..69502bec36 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1016,29 +1016,27 @@ def _demote_snapshot( environment_naming_info: EnvironmentNamingInfo, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]], ) -> None: - adapter = ( - self.get_adapter(snapshot.model_gateway) - if environment_naming_info.gateway_managed - else self.adapter - ) - view_name = snapshot.qualified_view_name.for_environment( - environment_naming_info, dialect=adapter.dialect - ) - session_properties = ( - snapshot.model.render_session_properties( - engine_adapter=adapter, runtime_stage=RuntimeStage.PROMOTING + if snapshot.is_model: + adapter = ( + self.get_adapter(snapshot.model_gateway) + if environment_naming_info.gateway_managed + else self.adapter ) - if snapshot.is_model - else {} - ) - with ( - adapter.transaction(), - adapter.session(session_properties), - ): - _evaluation_strategy(snapshot, adapter).demote(view_name) + view_name = snapshot.qualified_view_name.for_environment( + environment_naming_info, dialect=adapter.dialect + ) + with ( + adapter.transaction(), + adapter.session( + snapshot.model.render_session_properties( + engine_adapter=adapter, runtime_stage=RuntimeStage.PROMOTING + ) + ), + ): + _evaluation_strategy(snapshot, adapter).demote(view_name) - if on_complete is not None: - on_complete(snapshot) + if on_complete is not None: + on_complete(snapshot) def _cleanup_snapshot( self, diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 2888511ba1..93cef90daf 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -294,6 +294,8 @@ def test_promote(mocker: MockerFixture, adapter_mock, make_snapshot): evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env")) + adapter_mock.transaction.assert_called() + adapter_mock.session.assert_called() adapter_mock.create_schema.assert_called_once_with(to_schema("test_schema__test_env")) adapter_mock.create_view.assert_called_once_with( "test_schema__test_env.test_model", @@ -320,6 +322,8 @@ def test_demote(mocker: MockerFixture, adapter_mock, make_snapshot): evaluator.demote([snapshot], EnvironmentNamingInfo(name="test_env")) + adapter_mock.transaction.assert_called() + adapter_mock.session.assert_called() adapter_mock.drop_view.assert_called_once_with( "test_schema__test_env.test_model", cascade=False, From 5fc33270038d1bb6b0def58b9d3172c0920e6c93 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 3 Jul 2025 18:26:41 -0500 Subject: [PATCH 4/7] Pass snapshots and deployability_index to session properties renderer --- sqlmesh/core/plan/evaluator.py | 10 +++++++++- sqlmesh/core/snapshot/evaluator.py | 13 +++++++++++-- tests/core/test_snapshot_evaluator.py | 12 ++++++------ 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index 28a0e6eb71..01789dec8c 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -342,6 +342,8 @@ def visit_virtual_layer_update_stage( self._demote_snapshots( [stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots], stage.demoted_environment_naming_info, + snapshots=stage.all_snapshots, + deployability_index=stage.deployability_index, on_complete=lambda s: self.console.update_promotion_progress(s, False), ) @@ -384,10 +386,16 @@ def _demote_snapshots( self, target_snapshots: t.Iterable[Snapshot], environment_naming_info: EnvironmentNamingInfo, + snapshots: t.Dict[SnapshotId, Snapshot], + deployability_index: t.Optional[DeployabilityIndex] = None, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, ) -> None: self.snapshot_evaluator.demote( - target_snapshots, environment_naming_info, on_complete=on_complete + target_snapshots, + environment_naming_info, + snapshots=snapshots, + deployability_index=deployability_index, + on_complete=on_complete, ) def _restatement_intervals_across_all_environments( diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 69502bec36..b416533516 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -278,6 +278,8 @@ def demote( self, target_snapshots: t.Iterable[Snapshot], environment_naming_info: EnvironmentNamingInfo, + snapshots: t.Dict[SnapshotId, Snapshot], + deployability_index: t.Optional[DeployabilityIndex] = None, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, ) -> None: """Demotes the given collection of snapshots in the target environment by removing its view. @@ -290,7 +292,9 @@ def demote( with self.concurrent_context(): concurrent_apply_to_snapshots( target_snapshots, - lambda s: self._demote_snapshot(s, environment_naming_info, on_complete), + lambda s: self._demote_snapshot( + s, environment_naming_info, snapshots, deployability_index, on_complete + ), self.ddl_concurrent_tasks, ) @@ -1014,6 +1018,8 @@ def _demote_snapshot( self, snapshot: Snapshot, environment_naming_info: EnvironmentNamingInfo, + snapshots: t.Dict[SnapshotId, Snapshot], + deployability_index: t.Optional[DeployabilityIndex], on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]], ) -> None: if snapshot.is_model: @@ -1029,7 +1035,10 @@ def _demote_snapshot( adapter.transaction(), adapter.session( snapshot.model.render_session_properties( - engine_adapter=adapter, runtime_stage=RuntimeStage.PROMOTING + engine_adapter=adapter, + snapshots=snapshots, + deployability_index=deployability_index, + runtime_stage=RuntimeStage.PROMOTING, ) ), ): diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 93cef90daf..f3e4919138 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -320,7 +320,7 @@ def test_demote(mocker: MockerFixture, adapter_mock, make_snapshot): snapshot = make_snapshot(model) snapshot.categorize_as(SnapshotChangeCategory.BREAKING) - evaluator.demote([snapshot], EnvironmentNamingInfo(name="test_env")) + evaluator.demote([snapshot], EnvironmentNamingInfo(name="test_env"), snapshots={}) adapter_mock.transaction.assert_called() adapter_mock.session.assert_called() @@ -2694,7 +2694,7 @@ def test_standalone_audit(mocker: MockerFixture, adapter_mock, make_snapshot): adapter_mock.session.assert_not_called() # Demote - evaluator.demote([snapshot], EnvironmentNamingInfo(name="test_env")) + evaluator.demote([snapshot], EnvironmentNamingInfo(name="test_env"), snapshots={}) adapter_mock.assert_not_called() adapter_mock.transaction.assert_not_called() @@ -3837,7 +3837,7 @@ def test_multiple_engine_creation(snapshot: Snapshot, adapters, make_snapshot): assert len(post_calls) == 1 assert post_calls[0].sql(dialect="postgres") == expected_call - evaluator.demote([snapshot_2], EnvironmentNamingInfo(name="test_env")) + evaluator.demote([snapshot_2], EnvironmentNamingInfo(name="test_env"), snapshots={}) engine_adapters["secondary"].drop_view.assert_not_called() engine_adapters["default"].drop_view.assert_called_once_with( "test_schema__test_env.test_model", @@ -4083,7 +4083,7 @@ def model_with_statements(context, **kwargs): engine_adapters["default"].get_catalog_type.assert_not_called() assert len(engine_adapters["secondary"].get_catalog_type.call_args_list) == 2 - evaluator.demote([snapshot], environment_naming_info) + evaluator.demote([snapshot], environment_naming_info, snapshots={}) engine_adapters["default"].drop_view.assert_called_once_with( "db__test_env.multi_engine_test_model", cascade=False, @@ -4097,7 +4097,7 @@ def model_with_statements(context, **kwargs): assert view_args[0][0][0] == "db__test_env.multi_engine_test_model" # Similarly for demotion - evaluator.demote([snapshot], environment_naming_info_gw) + evaluator.demote([snapshot], environment_naming_info_gw, snapshots={}) engine_adapters["secondary"].drop_view.assert_called_once_with( "db__test_env.multi_engine_test_model", cascade=False, @@ -4155,7 +4155,7 @@ def test_multiple_engine_virtual_layer(snapshot: Snapshot, adapters, make_snapsh assert view_args_secondary[0][0][0] == "test_schema__test_env.test_model" # Demotion will follow with the same pattern - evaluator.demote([snapshot_2, snapshot], environment_naming_info) + evaluator.demote([snapshot_2, snapshot], environment_naming_info, snapshots={}) engine_adapters["default"].drop_view.assert_called_once_with( "db__test_env.model", cascade=False, From 0ec84ff0992475473920ba618fead313fa0391f0 Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Thu, 3 Jul 2025 18:43:06 -0500 Subject: [PATCH 5/7] Add DEMOTING runtime stage --- docs/concepts/macros/macro_variables.md | 3 ++- sqlmesh/core/macros.py | 1 + sqlmesh/core/snapshot/evaluator.py | 6 +++--- tests/core/test_snapshot_evaluator.py | 12 ++++++------ 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/docs/concepts/macros/macro_variables.md b/docs/concepts/macros/macro_variables.md index 858bf9f19d..a184f7d99f 100644 --- a/docs/concepts/macros/macro_variables.md +++ b/docs/concepts/macros/macro_variables.md @@ -132,7 +132,8 @@ SQLMesh provides additional predefined variables used to modify model behavior b * 'loading' - The project is being loaded into SQLMesh's runtime context. * 'creating' - The model tables are being created. * 'evaluating' - The model query logic is being evaluated. - * 'promoting' - The model is being promoted in the target environment (virtual layer update). + * 'promoting' - The model is being promoted in the target environment (view created during virtual layer update). + * 'demoting' - The model is being demoted in the target environment (view dropped during virtual layer update). * 'auditing' - The audit is being run. * 'testing' - The model query logic is being evaluated in the context of a unit test. * @gateway - A string value containing the name of the current [gateway](../../guides/connections.md). diff --git a/sqlmesh/core/macros.py b/sqlmesh/core/macros.py index 2c5ef3b2e8..ec5b2567f4 100644 --- a/sqlmesh/core/macros.py +++ b/sqlmesh/core/macros.py @@ -67,6 +67,7 @@ class RuntimeStage(Enum): CREATING = "creating" EVALUATING = "evaluating" PROMOTING = "promoting" + DEMOTING = "demoting" AUDITING = "auditing" TESTING = "testing" BEFORE_ALL = "before_all" diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index b416533516..dd69c14418 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -278,7 +278,7 @@ def demote( self, target_snapshots: t.Iterable[Snapshot], environment_naming_info: EnvironmentNamingInfo, - snapshots: t.Dict[SnapshotId, Snapshot], + snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None, deployability_index: t.Optional[DeployabilityIndex] = None, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, ) -> None: @@ -1018,7 +1018,7 @@ def _demote_snapshot( self, snapshot: Snapshot, environment_naming_info: EnvironmentNamingInfo, - snapshots: t.Dict[SnapshotId, Snapshot], + snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]], deployability_index: t.Optional[DeployabilityIndex], on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]], ) -> None: @@ -1038,7 +1038,7 @@ def _demote_snapshot( engine_adapter=adapter, snapshots=snapshots, deployability_index=deployability_index, - runtime_stage=RuntimeStage.PROMOTING, + runtime_stage=RuntimeStage.DEMOTING, ) ), ): diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index f3e4919138..93cef90daf 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -320,7 +320,7 @@ def test_demote(mocker: MockerFixture, adapter_mock, make_snapshot): snapshot = make_snapshot(model) snapshot.categorize_as(SnapshotChangeCategory.BREAKING) - evaluator.demote([snapshot], EnvironmentNamingInfo(name="test_env"), snapshots={}) + evaluator.demote([snapshot], EnvironmentNamingInfo(name="test_env")) adapter_mock.transaction.assert_called() adapter_mock.session.assert_called() @@ -2694,7 +2694,7 @@ def test_standalone_audit(mocker: MockerFixture, adapter_mock, make_snapshot): adapter_mock.session.assert_not_called() # Demote - evaluator.demote([snapshot], EnvironmentNamingInfo(name="test_env"), snapshots={}) + evaluator.demote([snapshot], EnvironmentNamingInfo(name="test_env")) adapter_mock.assert_not_called() adapter_mock.transaction.assert_not_called() @@ -3837,7 +3837,7 @@ def test_multiple_engine_creation(snapshot: Snapshot, adapters, make_snapshot): assert len(post_calls) == 1 assert post_calls[0].sql(dialect="postgres") == expected_call - evaluator.demote([snapshot_2], EnvironmentNamingInfo(name="test_env"), snapshots={}) + evaluator.demote([snapshot_2], EnvironmentNamingInfo(name="test_env")) engine_adapters["secondary"].drop_view.assert_not_called() engine_adapters["default"].drop_view.assert_called_once_with( "test_schema__test_env.test_model", @@ -4083,7 +4083,7 @@ def model_with_statements(context, **kwargs): engine_adapters["default"].get_catalog_type.assert_not_called() assert len(engine_adapters["secondary"].get_catalog_type.call_args_list) == 2 - evaluator.demote([snapshot], environment_naming_info, snapshots={}) + evaluator.demote([snapshot], environment_naming_info) engine_adapters["default"].drop_view.assert_called_once_with( "db__test_env.multi_engine_test_model", cascade=False, @@ -4097,7 +4097,7 @@ def model_with_statements(context, **kwargs): assert view_args[0][0][0] == "db__test_env.multi_engine_test_model" # Similarly for demotion - evaluator.demote([snapshot], environment_naming_info_gw, snapshots={}) + evaluator.demote([snapshot], environment_naming_info_gw) engine_adapters["secondary"].drop_view.assert_called_once_with( "db__test_env.multi_engine_test_model", cascade=False, @@ -4155,7 +4155,7 @@ def test_multiple_engine_virtual_layer(snapshot: Snapshot, adapters, make_snapsh assert view_args_secondary[0][0][0] == "test_schema__test_env.test_model" # Demotion will follow with the same pattern - evaluator.demote([snapshot_2, snapshot], environment_naming_info, snapshots={}) + evaluator.demote([snapshot_2, snapshot], environment_naming_info) engine_adapters["default"].drop_view.assert_called_once_with( "db__test_env.model", cascade=False, From f366b9114a38236ddf9fcef27eedfa2e5a04897d Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Mon, 7 Jul 2025 13:20:53 -0500 Subject: [PATCH 6/7] Do not pass all snapshots to _demote_snapshot --- sqlmesh/core/plan/evaluator.py | 3 - sqlmesh/core/snapshot/evaluator.py | 120 +++++++++++++++-------------- 2 files changed, 61 insertions(+), 62 deletions(-) diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index 01789dec8c..417a0f51e5 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -342,7 +342,6 @@ def visit_virtual_layer_update_stage( self._demote_snapshots( [stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots], stage.demoted_environment_naming_info, - snapshots=stage.all_snapshots, deployability_index=stage.deployability_index, on_complete=lambda s: self.console.update_promotion_progress(s, False), ) @@ -386,14 +385,12 @@ def _demote_snapshots( self, target_snapshots: t.Iterable[Snapshot], environment_naming_info: EnvironmentNamingInfo, - snapshots: t.Dict[SnapshotId, Snapshot], deployability_index: t.Optional[DeployabilityIndex] = None, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, ) -> None: self.snapshot_evaluator.demote( target_snapshots, environment_naming_info, - snapshots=snapshots, deployability_index=deployability_index, on_complete=on_complete, ) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index dd69c14418..0f29a3dcf0 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -293,7 +293,7 @@ def demote( concurrent_apply_to_snapshots( target_snapshots, lambda s: self._demote_snapshot( - s, environment_naming_info, snapshots, deployability_index, on_complete + s, environment_naming_info, deployability_index, on_complete ), self.ddl_concurrent_tasks, ) @@ -974,42 +974,44 @@ def _promote_snapshot( snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None, table_mapping: t.Optional[t.Dict[str, str]] = None, ) -> None: - if snapshot.is_model: - adapter = ( - self.get_adapter(snapshot.model_gateway) - if environment_naming_info.gateway_managed - else self.adapter - ) - table_name = snapshot.table_name(deployability_index.is_representative(snapshot)) - view_name = snapshot.qualified_view_name.for_environment( - environment_naming_info, dialect=adapter.dialect - ) - render_kwargs: t.Dict[str, t.Any] = dict( - start=start, - end=end, - execution_time=execution_time, - engine_adapter=adapter, - deployability_index=deployability_index, - table_mapping=table_mapping, - runtime_stage=RuntimeStage.PROMOTING, - ) + if not snapshot.is_model: + return - with ( - adapter.transaction(), - adapter.session(snapshot.model.render_session_properties(**render_kwargs)), - ): - _evaluation_strategy(snapshot, adapter).promote( - table_name=table_name, - view_name=view_name, - model=snapshot.model, - environment=environment_naming_info.name, - snapshots=snapshots, - **render_kwargs, - ) + adapter = ( + self.get_adapter(snapshot.model_gateway) + if environment_naming_info.gateway_managed + else self.adapter + ) + table_name = snapshot.table_name(deployability_index.is_representative(snapshot)) + view_name = snapshot.qualified_view_name.for_environment( + environment_naming_info, dialect=adapter.dialect + ) + render_kwargs: t.Dict[str, t.Any] = dict( + start=start, + end=end, + execution_time=execution_time, + engine_adapter=adapter, + deployability_index=deployability_index, + table_mapping=table_mapping, + runtime_stage=RuntimeStage.PROMOTING, + ) + + with ( + adapter.transaction(), + adapter.session(snapshot.model.render_session_properties(**render_kwargs)), + ): + _evaluation_strategy(snapshot, adapter).promote( + table_name=table_name, + view_name=view_name, + model=snapshot.model, + environment=environment_naming_info.name, + snapshots=snapshots, + **render_kwargs, + ) - snapshot_by_name = {s.name: s for s in (snapshots or {}).values()} - render_kwargs["snapshots"] = snapshot_by_name - adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs)) + snapshot_by_name = {s.name: s for s in (snapshots or {}).values()} + render_kwargs["snapshots"] = snapshot_by_name + adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs)) if on_complete is not None: on_complete(snapshot) @@ -1018,34 +1020,34 @@ def _demote_snapshot( self, snapshot: Snapshot, environment_naming_info: EnvironmentNamingInfo, - snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]], deployability_index: t.Optional[DeployabilityIndex], on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]], ) -> None: - if snapshot.is_model: - adapter = ( - self.get_adapter(snapshot.model_gateway) - if environment_naming_info.gateway_managed - else self.adapter - ) - view_name = snapshot.qualified_view_name.for_environment( - environment_naming_info, dialect=adapter.dialect - ) - with ( - adapter.transaction(), - adapter.session( - snapshot.model.render_session_properties( - engine_adapter=adapter, - snapshots=snapshots, - deployability_index=deployability_index, - runtime_stage=RuntimeStage.DEMOTING, - ) - ), - ): - _evaluation_strategy(snapshot, adapter).demote(view_name) + if not snapshot.is_model: + return - if on_complete is not None: - on_complete(snapshot) + adapter = ( + self.get_adapter(snapshot.model_gateway) + if environment_naming_info.gateway_managed + else self.adapter + ) + view_name = snapshot.qualified_view_name.for_environment( + environment_naming_info, dialect=adapter.dialect + ) + with ( + adapter.transaction(), + adapter.session( + snapshot.model.render_session_properties( + engine_adapter=adapter, + deployability_index=deployability_index, + runtime_stage=RuntimeStage.DEMOTING, + ) + ), + ): + _evaluation_strategy(snapshot, adapter).demote(view_name) + + if on_complete is not None: + on_complete(snapshot) def _cleanup_snapshot( self, From db950e5e94253140f3e1766d73cab27bbe0c9a1e Mon Sep 17 00:00:00 2001 From: Trey Spiller Date: Mon, 7 Jul 2025 14:33:00 -0500 Subject: [PATCH 7/7] Compute table_mapping in plan evaluator --- sqlmesh/core/plan/evaluator.py | 8 ++++++++ sqlmesh/core/snapshot/evaluator.py | 10 ++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index 417a0f51e5..9488b9bc91 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -344,6 +344,7 @@ def visit_virtual_layer_update_stage( stage.demoted_environment_naming_info, deployability_index=stage.deployability_index, on_complete=lambda s: self.console.update_promotion_progress(s, False), + snapshots=stage.all_snapshots, ) completed = True @@ -385,12 +386,19 @@ def _demote_snapshots( self, target_snapshots: t.Iterable[Snapshot], environment_naming_info: EnvironmentNamingInfo, + snapshots: t.Dict[SnapshotId, Snapshot], deployability_index: t.Optional[DeployabilityIndex] = None, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, ) -> None: self.snapshot_evaluator.demote( target_snapshots, environment_naming_info, + table_mapping=to_view_mapping( + snapshots.values(), + environment_naming_info, + default_catalog=self.default_catalog, + dialect=self.snapshot_evaluator.adapter.dialect, + ), deployability_index=deployability_index, on_complete=on_complete, ) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 0f29a3dcf0..993860b527 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -278,7 +278,7 @@ def demote( self, target_snapshots: t.Iterable[Snapshot], environment_naming_info: EnvironmentNamingInfo, - snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None, + table_mapping: t.Optional[t.Dict[str, str]] = None, deployability_index: t.Optional[DeployabilityIndex] = None, on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, ) -> None: @@ -293,7 +293,11 @@ def demote( concurrent_apply_to_snapshots( target_snapshots, lambda s: self._demote_snapshot( - s, environment_naming_info, deployability_index, on_complete + s, + environment_naming_info, + deployability_index=deployability_index, + on_complete=on_complete, + table_mapping=table_mapping, ), self.ddl_concurrent_tasks, ) @@ -1022,6 +1026,7 @@ def _demote_snapshot( environment_naming_info: EnvironmentNamingInfo, deployability_index: t.Optional[DeployabilityIndex], on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]], + table_mapping: t.Optional[t.Dict[str, str]] = None, ) -> None: if not snapshot.is_model: return @@ -1040,6 +1045,7 @@ def _demote_snapshot( snapshot.model.render_session_properties( engine_adapter=adapter, deployability_index=deployability_index, + table_mapping=table_mapping, runtime_stage=RuntimeStage.DEMOTING, ) ),