From 1bf06cf5fc31be1e3781e37a0694b05607f4540a Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Thu, 17 Jul 2025 02:23:44 +0000 Subject: [PATCH 1/2] Fix(duckdb): Only SET connector_config values on cursor init if they have changed --- sqlmesh/core/config/connection.py | 27 ++++++++++++--- .../integration/test_integration_duckdb.py | 33 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 49d49e40e7..15a6373b1b 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -348,11 +348,28 @@ def init(cursor: duckdb.DuckDBPyConnection) -> None: except Exception as e: raise ConfigError(f"Failed to load extension {extension['name']}: {e}") - for field, setting in self.connector_config.items(): - try: - cursor.execute(f"SET {field} = '{setting}'") - except Exception as e: - raise ConfigError(f"Failed to set connector config {field} to {setting}: {e}") + if self.connector_config: + option_names = list(self.connector_config) + in_part = ",".join(["?" for _ in range(len(option_names))]) + + cursor.execute( + f"SELECT name, value FROM duckdb_settings() WHERE name IN ({in_part})", + option_names, + ) + + existing_values = {r[0]: r[1] for r in cursor.fetchall()} + + # only set connector_config items if the values differ from what is already set + # trying to set options like 'temp_directory' even to the same value can throw errors like: + # Not implemented Error: Cannot switch temporary directory after the current one has been used + for field, setting in self.connector_config.items(): + if existing_values.get(field) != setting: + try: + cursor.execute(f"SET {field} = '{setting}'") + except Exception as e: + raise ConfigError( + f"Failed to set connector config {field} to {setting}: {e}" + ) if self.secrets: duckdb_version = duckdb.__version__ diff --git a/tests/core/engine_adapter/integration/test_integration_duckdb.py b/tests/core/engine_adapter/integration/test_integration_duckdb.py index ce7fb1b80d..a53c559a55 100644 --- a/tests/core/engine_adapter/integration/test_integration_duckdb.py +++ b/tests/core/engine_adapter/integration/test_integration_duckdb.py @@ -106,3 +106,36 @@ def _open_connection() -> bool: for future in as_completed(futures): assert future.result() + + +def test_connector_config_from_multiple_connections(tmp_path: Path): + config = DuckDBConnectionConfig( + concurrent_tasks=2, + extensions=["tpch"], + connector_config={"temp_directory": str(tmp_path), "memory_limit": "16mb"}, + ) + + adapter = config.create_engine_adapter() + pool = adapter._connection_pool + + assert isinstance(pool, ThreadLocalSharedConnectionPool) + + adapter.execute("CALL dbgen(sf = 0.1)") + + # check that temporary files exist so that calling "SET temp_directory = 'anything'" will throw an error + assert len(adapter.fetchall("select path from duckdb_temporary_files()")) > 0 + + def _open_connection() -> bool: + # This triggers cursor_init() which should only SET values if they have changed + pool.get_cursor() + return True + + thread_pool = ThreadPoolExecutor(max_workers=4) + futures = [] + for _ in range(4): + futures.append(thread_pool.submit(_open_connection)) + + for future in as_completed(futures): + assert future.result() + + pool.close_all() From 5e9b432add0d80727c01f2d1f32a231bcf5ff76c Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Thu, 17 Jul 2025 05:50:31 +0000 Subject: [PATCH 2/2] PR feedback --- sqlmesh/core/config/connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 15a6373b1b..415e916365 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -350,14 +350,14 @@ def init(cursor: duckdb.DuckDBPyConnection) -> None: if self.connector_config: option_names = list(self.connector_config) - in_part = ",".join(["?" for _ in range(len(option_names))]) + in_part = ",".join("?" for _ in range(len(option_names))) cursor.execute( f"SELECT name, value FROM duckdb_settings() WHERE name IN ({in_part})", option_names, ) - existing_values = {r[0]: r[1] for r in cursor.fetchall()} + existing_values = {field: setting for field, setting in cursor.fetchall()} # only set connector_config items if the values differ from what is already set # trying to set options like 'temp_directory' even to the same value can throw errors like: