From f74c5d3221bce461fdbdf75e993e446c2b50b642 Mon Sep 17 00:00:00 2001 From: Vincent Chan Date: Thu, 7 Aug 2025 15:37:41 -0700 Subject: [PATCH 1/4] Revert "Fix: Revert "Fix: Use merge when updating auto restatements (#5016)" (#5108)" This reverts commit 8fbf26c3b52c98285254e61e1457f9e65938d99b. --- sqlmesh/core/state_sync/db/snapshot.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/sqlmesh/core/state_sync/db/snapshot.py b/sqlmesh/core/state_sync/db/snapshot.py index 6064993087..30e0de00f2 100644 --- a/sqlmesh/core/state_sync/db/snapshot.py +++ b/sqlmesh/core/state_sync/db/snapshot.py @@ -372,25 +372,31 @@ def update_auto_restatements( Args: next_auto_restatement_ts: A dictionary of snapshot name version to the next auto restatement timestamp. """ + next_auto_restatement_ts_deleted = [] + next_auto_restatement_ts_filtered = {} + for k, v in next_auto_restatement_ts.items(): + if v is None: + next_auto_restatement_ts_deleted.append(k) + else: + next_auto_restatement_ts_filtered[k] = v + for where in snapshot_name_version_filter( self.engine_adapter, - next_auto_restatement_ts, + next_auto_restatement_ts_deleted, column_prefix="snapshot", alias=None, batch_size=self.SNAPSHOT_BATCH_SIZE, ): self.engine_adapter.delete_from(self.auto_restatements_table, where=where) - next_auto_restatement_ts_filtered = { - k: v for k, v in next_auto_restatement_ts.items() if v is not None - } if not next_auto_restatement_ts_filtered: return - self.engine_adapter.insert_append( + self.engine_adapter.merge( self.auto_restatements_table, _auto_restatements_to_df(next_auto_restatement_ts_filtered), columns_to_types=self._auto_restatement_columns_to_types, + unique_key=(exp.column("snapshot_name"), exp.column("snapshot_version")), ) def count(self) -> int: From 46a19c5842e487fe8f6804680495f8bed5af4c2f Mon Sep 17 00:00:00 2001 From: Vincent Chan Date: Thu, 7 Aug 2025 18:12:28 -0700 Subject: [PATCH 2/4] Feat: Re-introduce merge for updating auto restatements --- sqlmesh/core/engine_adapter/postgres.py | 28 ++++++++++++++++++---- tests/core/engine_adapter/test_postgres.py | 20 ++++++++++++++-- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/sqlmesh/core/engine_adapter/postgres.py b/sqlmesh/core/engine_adapter/postgres.py index 9962c037ac..674cb30175 100644 --- a/sqlmesh/core/engine_adapter/postgres.py +++ b/sqlmesh/core/engine_adapter/postgres.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import re import typing as t from functools import partial from sqlglot import exp @@ -112,11 +113,8 @@ def merge( **kwargs: t.Any, ) -> None: # Merge isn't supported until Postgres 15 - merge_impl = ( - super().merge - if self._connection_pool.get().server_version >= 150000 - else partial(logical_merge, self) - ) + major, minor = self.get_server_version() + merge_impl = super().merge if major >= 15 else partial(logical_merge, self) merge_impl( # type: ignore target_table, source_table, @@ -125,3 +123,23 @@ def merge( when_matched=when_matched, merge_filter=merge_filter, ) + + def get_server_version(self) -> t.Tuple[int, int]: + """Return major and minor server versions of the connection""" + connection = self._connection_pool.get() + connection_module = connection.__class__.__module__ + if connection_module.startswith("pg8000"): + server_version = connection.parameter_statuses.get("server_version") + # pg8000 server version contains version as well as packaging and distribution information + # e.g. 15.13 (Debian 15.13-1.pgdg120+1) + match = re.search(r"(\d+)\.(\d+)", server_version) + if match: + return int(match.group(1)), int(match.group(2)) + elif connection_module.startswith("psycopg"): + # This handles both psycopg and psycopg2 connection objects + server_version = connection.info.server_version + # Since major version 10, PostgreSQL represents the server version with an integer by + # multiplying the server's major version number by 10000 and adding the minor version number + # See https://www.postgresql.org/docs/current/libpq-status.html#LIBPQ-PQSERVERVERSION + return server_version // 10000, server_version % 100 + return 0, 0 diff --git a/tests/core/engine_adapter/test_postgres.py b/tests/core/engine_adapter/test_postgres.py index f013914c3e..0ea68375f2 100644 --- a/tests/core/engine_adapter/test_postgres.py +++ b/tests/core/engine_adapter/test_postgres.py @@ -94,7 +94,8 @@ def test_create_table_like(make_mocked_engine_adapter: t.Callable): def test_merge_version_gte_15(make_mocked_engine_adapter: t.Callable): adapter = make_mocked_engine_adapter(PostgresEngineAdapter) - adapter._connection_pool.get().server_version = 150000 + adapter._connection_pool.get().__class__.__module__ = "psycopg2.extensions" + adapter._connection_pool.get().info.server_version = 150000 adapter.merge( target_table="target", @@ -117,7 +118,8 @@ def test_merge_version_lt_15( make_mocked_engine_adapter: t.Callable, make_temp_table_name: t.Callable, mocker: MockerFixture ): adapter = make_mocked_engine_adapter(PostgresEngineAdapter) - adapter._connection_pool.get().server_version = 140000 + adapter._connection_pool.get().__class__.__module__ = "psycopg2.extensions" + adapter._connection_pool.get().info.server_version = 140000 temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table") table_name = "test" @@ -161,3 +163,17 @@ def table_columns(table_name: str) -> t.Dict[str, exp.DataType]: assert to_sql_calls(adapter) == [ 'ALTER TABLE "test_table" DROP COLUMN "test_column" CASCADE', ] + + +def test_get_server_version(make_mocked_engine_adapter: t.Callable): + adapter = make_mocked_engine_adapter(PostgresEngineAdapter) + + adapter._connection_pool.get().__class__.__module__ = "psycopg2.extensions" + adapter._connection_pool.get().info.server_version = 150013 + assert adapter.get_server_version() == (15, 13) + + adapter._connection_pool.get().__class__.__module__ = "pg8000.native" + adapter._connection_pool.get().parameter_statuses = { + "server_version": "15.13 (Debian 15.13-1.pgdg120+1)" + } + assert adapter.get_server_version() == (15, 13) From d32ec217fe3c4efdbd0685afd5bb6862595d8ba4 Mon Sep 17 00:00:00 2001 From: Vincent Chan Date: Fri, 8 Aug 2025 01:54:48 -0700 Subject: [PATCH 3/4] Show server_version --- sqlmesh/core/engine_adapter/postgres.py | 29 +++++++--------------- tests/core/engine_adapter/test_postgres.py | 26 +++++++++---------- 2 files changed, 22 insertions(+), 33 deletions(-) diff --git a/sqlmesh/core/engine_adapter/postgres.py b/sqlmesh/core/engine_adapter/postgres.py index 674cb30175..144dbc24de 100644 --- a/sqlmesh/core/engine_adapter/postgres.py +++ b/sqlmesh/core/engine_adapter/postgres.py @@ -3,7 +3,7 @@ import logging import re import typing as t -from functools import partial +from functools import cached_property, partial from sqlglot import exp from sqlmesh.core.engine_adapter.base_postgres import BasePostgresEngineAdapter @@ -113,7 +113,7 @@ def merge( **kwargs: t.Any, ) -> None: # Merge isn't supported until Postgres 15 - major, minor = self.get_server_version() + major, minor = self.server_version merge_impl = super().merge if major >= 15 else partial(logical_merge, self) merge_impl( # type: ignore target_table, @@ -124,22 +124,11 @@ def merge( merge_filter=merge_filter, ) - def get_server_version(self) -> t.Tuple[int, int]: - """Return major and minor server versions of the connection""" - connection = self._connection_pool.get() - connection_module = connection.__class__.__module__ - if connection_module.startswith("pg8000"): - server_version = connection.parameter_statuses.get("server_version") - # pg8000 server version contains version as well as packaging and distribution information - # e.g. 15.13 (Debian 15.13-1.pgdg120+1) - match = re.search(r"(\d+)\.(\d+)", server_version) - if match: - return int(match.group(1)), int(match.group(2)) - elif connection_module.startswith("psycopg"): - # This handles both psycopg and psycopg2 connection objects - server_version = connection.info.server_version - # Since major version 10, PostgreSQL represents the server version with an integer by - # multiplying the server's major version number by 10000 and adding the minor version number - # See https://www.postgresql.org/docs/current/libpq-status.html#LIBPQ-PQSERVERVERSION - return server_version // 10000, server_version % 100 + @cached_property + def server_version(self) -> t.Tuple[int, int]: + """Lazily fetch and cache major and minor server version""" + server_version, *_ = self.fetchone("SHOW server_version") + match = re.search(r"(\d+)\.(\d+)", server_version) + if match: + return int(match.group(1)), int(match.group(2)) return 0, 0 diff --git a/tests/core/engine_adapter/test_postgres.py b/tests/core/engine_adapter/test_postgres.py index 0ea68375f2..fd6ce44994 100644 --- a/tests/core/engine_adapter/test_postgres.py +++ b/tests/core/engine_adapter/test_postgres.py @@ -94,8 +94,7 @@ def test_create_table_like(make_mocked_engine_adapter: t.Callable): def test_merge_version_gte_15(make_mocked_engine_adapter: t.Callable): adapter = make_mocked_engine_adapter(PostgresEngineAdapter) - adapter._connection_pool.get().__class__.__module__ = "psycopg2.extensions" - adapter._connection_pool.get().info.server_version = 150000 + adapter.server_version = (15, 0) adapter.merge( target_table="target", @@ -118,8 +117,7 @@ def test_merge_version_lt_15( make_mocked_engine_adapter: t.Callable, make_temp_table_name: t.Callable, mocker: MockerFixture ): adapter = make_mocked_engine_adapter(PostgresEngineAdapter) - adapter._connection_pool.get().__class__.__module__ = "psycopg2.extensions" - adapter._connection_pool.get().info.server_version = 140000 + adapter.server_version = (14, 0) temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table") table_name = "test" @@ -165,15 +163,17 @@ def table_columns(table_name: str) -> t.Dict[str, exp.DataType]: ] -def test_get_server_version(make_mocked_engine_adapter: t.Callable): +def test_server_version(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): adapter = make_mocked_engine_adapter(PostgresEngineAdapter) - adapter._connection_pool.get().__class__.__module__ = "psycopg2.extensions" - adapter._connection_pool.get().info.server_version = 150013 - assert adapter.get_server_version() == (15, 13) + fetchone_mock = mocker.patch.object(adapter, "fetchone") + fetchone_mock.return_value = ("14.0",) + assert adapter.server_version == (14, 0) - adapter._connection_pool.get().__class__.__module__ = "pg8000.native" - adapter._connection_pool.get().parameter_statuses = { - "server_version": "15.13 (Debian 15.13-1.pgdg120+1)" - } - assert adapter.get_server_version() == (15, 13) + del adapter.server_version + fetchone_mock.return_value = ("15.8",) + assert adapter.server_version == (15, 8) + + del adapter.server_version + fetchone_mock.return_value = ("15.13 (Debian 15.13-1.pgdg120+1)",) + assert adapter.server_version == (15, 13) From 75633b28c806a03e3dff93bed53260e7fe30fd31 Mon Sep 17 00:00:00 2001 From: Vincent Chan Date: Fri, 8 Aug 2025 10:57:22 -0700 Subject: [PATCH 4/4] Add an integration test --- sqlmesh/core/engine_adapter/postgres.py | 9 +++++---- .../integration/test_integration_postgres.py | 6 +++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sqlmesh/core/engine_adapter/postgres.py b/sqlmesh/core/engine_adapter/postgres.py index 144dbc24de..a736f5553b 100644 --- a/sqlmesh/core/engine_adapter/postgres.py +++ b/sqlmesh/core/engine_adapter/postgres.py @@ -127,8 +127,9 @@ def merge( @cached_property def server_version(self) -> t.Tuple[int, int]: """Lazily fetch and cache major and minor server version""" - server_version, *_ = self.fetchone("SHOW server_version") - match = re.search(r"(\d+)\.(\d+)", server_version) - if match: - return int(match.group(1)), int(match.group(2)) + if result := self.fetchone("SHOW server_version"): + server_version, *_ = result + match = re.search(r"(\d+)\.(\d+)", server_version) + if match: + return int(match.group(1)), int(match.group(2)) return 0, 0 diff --git a/tests/core/engine_adapter/integration/test_integration_postgres.py b/tests/core/engine_adapter/integration/test_integration_postgres.py index 863aae55a4..82172378ae 100644 --- a/tests/core/engine_adapter/integration/test_integration_postgres.py +++ b/tests/core/engine_adapter/integration/test_integration_postgres.py @@ -2,7 +2,6 @@ import pytest from pytest import FixtureRequest from sqlmesh.core.engine_adapter import PostgresEngineAdapter -from tests.core.engine_adapter.integration import TestContext from tests.core.engine_adapter.integration import ( TestContext, @@ -29,3 +28,8 @@ def engine_adapter(ctx: TestContext) -> PostgresEngineAdapter: def test_engine_adapter(ctx: TestContext): assert isinstance(ctx.engine_adapter, PostgresEngineAdapter) assert ctx.engine_adapter.fetchone("select 1") == (1,) + + +def test_server_version_psycopg(ctx: TestContext): + assert isinstance(ctx.engine_adapter, PostgresEngineAdapter) + assert ctx.engine_adapter.server_version != (0, 0)