Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,28 @@ def init(cursor: duckdb.DuckDBPyConnection) -> None:
except Exception as e:
raise ConfigError(f"Failed to load extension {extension['name']}: {e}")

for field, setting in self.connector_config.items():
try:
cursor.execute(f"SET {field} = '{setting}'")
except Exception as e:
raise ConfigError(f"Failed to set connector config {field} to {setting}: {e}")
if self.connector_config:
option_names = list(self.connector_config)
in_part = ",".join("?" for _ in range(len(option_names)))

cursor.execute(
f"SELECT name, value FROM duckdb_settings() WHERE name IN ({in_part})",
option_names,
)

existing_values = {field: setting for field, setting in cursor.fetchall()}

# only set connector_config items if the values differ from what is already set
# trying to set options like 'temp_directory' even to the same value can throw errors like:
# Not implemented Error: Cannot switch temporary directory after the current one has been used
for field, setting in self.connector_config.items():
if existing_values.get(field) != setting:
try:
cursor.execute(f"SET {field} = '{setting}'")
except Exception as e:
raise ConfigError(
f"Failed to set connector config {field} to {setting}: {e}"
)

if self.secrets:
duckdb_version = duckdb.__version__
Expand Down
33 changes: 33 additions & 0 deletions tests/core/engine_adapter/integration/test_integration_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,36 @@ def _open_connection() -> bool:

for future in as_completed(futures):
assert future.result()


def test_connector_config_from_multiple_connections(tmp_path: Path):
config = DuckDBConnectionConfig(
concurrent_tasks=2,
extensions=["tpch"],
connector_config={"temp_directory": str(tmp_path), "memory_limit": "16mb"},
)

adapter = config.create_engine_adapter()
pool = adapter._connection_pool

assert isinstance(pool, ThreadLocalSharedConnectionPool)

adapter.execute("CALL dbgen(sf = 0.1)")

# check that temporary files exist so that calling "SET temp_directory = 'anything'" will throw an error
assert len(adapter.fetchall("select path from duckdb_temporary_files()")) > 0

def _open_connection() -> bool:
# This triggers cursor_init() which should only SET values if they have changed
pool.get_cursor()
return True

thread_pool = ThreadPoolExecutor(max_workers=4)
futures = []
for _ in range(4):
futures.append(thread_pool.submit(_open_connection))

for future in as_completed(futures):
assert future.result()

pool.close_all()