diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 49d49e40e7..415e916365 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -348,11 +348,28 @@ def init(cursor: duckdb.DuckDBPyConnection) -> None: except Exception as e: raise ConfigError(f"Failed to load extension {extension['name']}: {e}") - for field, setting in self.connector_config.items(): - try: - cursor.execute(f"SET {field} = '{setting}'") - except Exception as e: - raise ConfigError(f"Failed to set connector config {field} to {setting}: {e}") + if self.connector_config: + option_names = list(self.connector_config) + in_part = ",".join("?" for _ in range(len(option_names))) + + cursor.execute( + f"SELECT name, value FROM duckdb_settings() WHERE name IN ({in_part})", + option_names, + ) + + existing_values = {field: setting for field, setting in cursor.fetchall()} + + # only set connector_config items if the values differ from what is already set + # trying to set options like 'temp_directory' even to the same value can throw errors like: + # Not implemented Error: Cannot switch temporary directory after the current one has been used + for field, setting in self.connector_config.items(): + if existing_values.get(field) != setting: + try: + cursor.execute(f"SET {field} = '{setting}'") + except Exception as e: + raise ConfigError( + f"Failed to set connector config {field} to {setting}: {e}" + ) if self.secrets: duckdb_version = duckdb.__version__ diff --git a/tests/core/engine_adapter/integration/test_integration_duckdb.py b/tests/core/engine_adapter/integration/test_integration_duckdb.py index ce7fb1b80d..a53c559a55 100644 --- a/tests/core/engine_adapter/integration/test_integration_duckdb.py +++ b/tests/core/engine_adapter/integration/test_integration_duckdb.py @@ -106,3 +106,36 @@ def _open_connection() -> bool: for future in as_completed(futures): assert future.result() + + +def test_connector_config_from_multiple_connections(tmp_path: Path): + config = DuckDBConnectionConfig( + concurrent_tasks=2, + extensions=["tpch"], + connector_config={"temp_directory": str(tmp_path), "memory_limit": "16mb"}, + ) + + adapter = config.create_engine_adapter() + pool = adapter._connection_pool + + assert isinstance(pool, ThreadLocalSharedConnectionPool) + + adapter.execute("CALL dbgen(sf = 0.1)") + + # check that temporary files exist so that calling "SET temp_directory = 'anything'" will throw an error + assert len(adapter.fetchall("select path from duckdb_temporary_files()")) > 0 + + def _open_connection() -> bool: + # This triggers cursor_init() which should only SET values if they have changed + pool.get_cursor() + return True + + thread_pool = ThreadPoolExecutor(max_workers=4) + futures = [] + for _ in range(4): + futures.append(thread_pool.submit(_open_connection)) + + for future in as_completed(futures): + assert future.result() + + pool.close_all()