Skip to content

Commit 2b702ba

Browse files
authored
Fix(duckdb): Only SET connector_config values on cursor init if they are different (#4981)
1 parent 18017c4 commit 2b702ba

File tree

2 files changed

+55
-5
lines changed

2 files changed

+55
-5
lines changed

sqlmesh/core/config/connection.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -348,11 +348,28 @@ def init(cursor: duckdb.DuckDBPyConnection) -> None:
348348
except Exception as e:
349349
raise ConfigError(f"Failed to load extension {extension['name']}: {e}")
350350

351-
for field, setting in self.connector_config.items():
352-
try:
353-
cursor.execute(f"SET {field} = '{setting}'")
354-
except Exception as e:
355-
raise ConfigError(f"Failed to set connector config {field} to {setting}: {e}")
351+
if self.connector_config:
352+
option_names = list(self.connector_config)
353+
in_part = ",".join("?" for _ in range(len(option_names)))
354+
355+
cursor.execute(
356+
f"SELECT name, value FROM duckdb_settings() WHERE name IN ({in_part})",
357+
option_names,
358+
)
359+
360+
existing_values = {field: setting for field, setting in cursor.fetchall()}
361+
362+
# only set connector_config items if the values differ from what is already set
363+
# trying to set options like 'temp_directory' even to the same value can throw errors like:
364+
# Not implemented Error: Cannot switch temporary directory after the current one has been used
365+
for field, setting in self.connector_config.items():
366+
if existing_values.get(field) != setting:
367+
try:
368+
cursor.execute(f"SET {field} = '{setting}'")
369+
except Exception as e:
370+
raise ConfigError(
371+
f"Failed to set connector config {field} to {setting}: {e}"
372+
)
356373

357374
if self.secrets:
358375
duckdb_version = duckdb.__version__

tests/core/engine_adapter/integration/test_integration_duckdb.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,36 @@ def _open_connection() -> bool:
106106

107107
for future in as_completed(futures):
108108
assert future.result()
109+
110+
111+
def test_connector_config_from_multiple_connections(tmp_path: Path):
112+
config = DuckDBConnectionConfig(
113+
concurrent_tasks=2,
114+
extensions=["tpch"],
115+
connector_config={"temp_directory": str(tmp_path), "memory_limit": "16mb"},
116+
)
117+
118+
adapter = config.create_engine_adapter()
119+
pool = adapter._connection_pool
120+
121+
assert isinstance(pool, ThreadLocalSharedConnectionPool)
122+
123+
adapter.execute("CALL dbgen(sf = 0.1)")
124+
125+
# check that temporary files exist so that calling "SET temp_directory = 'anything'" will throw an error
126+
assert len(adapter.fetchall("select path from duckdb_temporary_files()")) > 0
127+
128+
def _open_connection() -> bool:
129+
# This triggers cursor_init() which should only SET values if they have changed
130+
pool.get_cursor()
131+
return True
132+
133+
thread_pool = ThreadPoolExecutor(max_workers=4)
134+
futures = []
135+
for _ in range(4):
136+
futures.append(thread_pool.submit(_open_connection))
137+
138+
for future in as_completed(futures):
139+
assert future.result()
140+
141+
pool.close_all()

0 commit comments

Comments
 (0)