From 6dfadb561a36a1053d5ffefe0681c243d48e1d00 Mon Sep 17 00:00:00 2001 From: Basanth Jenu H B Date: Fri, 27 Feb 2026 20:39:59 +0530 Subject: [PATCH 1/2] feat: add support for max lifetime and idle timeout --- .../storage/postgres_connection_pool.hpp | 37 +++- src/postgres_extension.cpp | 40 +++++ src/storage/postgres_catalog.cpp | 8 + src/storage/postgres_connection_pool.cpp | 158 +++++++++++++++--- .../storage/attach_connection_lifetime.test | 111 ++++++++++++ 5 files changed, 326 insertions(+), 28 deletions(-) create mode 100644 test/sql/storage/attach_connection_lifetime.test diff --git a/src/include/storage/postgres_connection_pool.hpp b/src/include/storage/postgres_connection_pool.hpp index 0cb71413e..6e84852a2 100644 --- a/src/include/storage/postgres_connection_pool.hpp +++ b/src/include/storage/postgres_connection_pool.hpp @@ -12,15 +12,29 @@ #include "duckdb/common/mutex.hpp" #include "duckdb/common/optional_ptr.hpp" #include "postgres_connection.hpp" +#include +#include +#include +#include namespace duckdb { class PostgresCatalog; class PostgresConnectionPool; +using steady_clock = std::chrono::steady_clock; +using steady_time_point = steady_clock::time_point; + +struct CachedConnection { + PostgresConnection connection; + steady_time_point created_at; + steady_time_point returned_at; +}; + class PostgresPoolConnection { public: PostgresPoolConnection(); - PostgresPoolConnection(optional_ptr pool, PostgresConnection connection); + PostgresPoolConnection(optional_ptr pool, PostgresConnection connection, + steady_time_point created_at); ~PostgresPoolConnection(); // disable copy constructors PostgresPoolConnection(const PostgresPoolConnection &other) = delete; @@ -31,10 +45,12 @@ class PostgresPoolConnection { bool HasConnection(); PostgresConnection &GetConnection(); + steady_time_point GetCreatedAt() const; private: optional_ptr pool; PostgresConnection connection; + steady_time_point created_at; }; class PostgresConnectionPool { @@ -42,14 +58,17 @@ class PostgresConnectionPool { static constexpr const idx_t DEFAULT_MAX_CONNECTIONS = 64; PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections = DEFAULT_MAX_CONNECTIONS); + ~PostgresConnectionPool(); public: bool TryGetConnection(PostgresPoolConnection &connection); PostgresPoolConnection GetConnection(); //! Always returns a connection - even if the connection slots are exhausted PostgresPoolConnection ForceGetConnection(); - void ReturnConnection(PostgresConnection connection); + void ReturnConnection(PostgresConnection connection, steady_time_point created_at); void SetMaximumConnections(idx_t new_max); + void SetMaxLifetime(idx_t seconds); + void SetIdleTimeout(idx_t seconds); static void PostgresSetConnectionCache(ClientContext &context, SetScope scope, Value ¶meter); @@ -58,7 +77,19 @@ class PostgresConnectionPool { mutex connection_lock; idx_t active_connections; idx_t maximum_connections; - vector connection_cache; + vector connection_cache; + + idx_t max_lifetime_seconds = 0; + idx_t idle_timeout_seconds = 0; + + std::thread reaper_thread; + std::condition_variable reaper_cv; + std::atomic shutdown {false}; + + bool IsExpired(const CachedConnection &entry, steady_time_point now) const; + void ReaperLoop(); + void StartReaperIfNeeded(unique_lock &lock); + void StopReaper(unique_lock &lock); private: PostgresPoolConnection GetConnectionInternal(unique_lock &lock); diff --git a/src/postgres_extension.cpp b/src/postgres_extension.cpp index dce65b121..ce7c8a987 100644 --- a/src/postgres_extension.cpp +++ b/src/postgres_extension.cpp @@ -70,6 +70,40 @@ static void SetPostgresConnectionLimit(ClientContext &context, SetScope scope, V config.SetOption("pg_connection_limit", parameter); } +static void SetPostgresMaxLifetime(ClientContext &context, SetScope scope, Value ¶meter) { + if (scope == SetScope::LOCAL) { + throw InvalidInputException("pg_connection_max_lifetime can only be set globally"); + } + auto databases = DatabaseManager::Get(context).GetDatabases(context); + for (auto &db_ref : databases) { + auto &db = *db_ref; + auto &catalog = db.GetCatalog(); + if (catalog.GetCatalogType() != "postgres") { + continue; + } + catalog.Cast().GetConnectionPool().SetMaxLifetime(UBigIntValue::Get(parameter)); + } + auto &config = DBConfig::GetConfig(context); + config.SetOption("pg_connection_max_lifetime", parameter); +} + +static void SetPostgresIdleTimeout(ClientContext &context, SetScope scope, Value ¶meter) { + if (scope == SetScope::LOCAL) { + throw InvalidInputException("pg_connection_idle_timeout can only be set globally"); + } + auto databases = DatabaseManager::Get(context).GetDatabases(context); + for (auto &db_ref : databases) { + auto &db = *db_ref; + auto &catalog = db.GetCatalog(); + if (catalog.GetCatalogType() != "postgres") { + continue; + } + catalog.Cast().GetConnectionPool().SetIdleTimeout(UBigIntValue::Get(parameter)); + } + auto &config = DBConfig::GetConfig(context); + config.SetOption("pg_connection_idle_timeout", parameter); +} + static void SetPostgresDebugQueryPrint(ClientContext &context, SetScope scope, Value ¶meter) { PostgresConnection::DebugSetPrintQueries(BooleanValue::Get(parameter)); } @@ -172,6 +206,12 @@ static void LoadInternal(ExtensionLoader &loader) { config.AddExtensionOption("pg_connection_limit", "The maximum amount of concurrent Postgres connections", LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DEFAULT_MAX_CONNECTIONS), SetPostgresConnectionLimit); + config.AddExtensionOption("pg_connection_max_lifetime", + "Maximum lifetime of a pooled connection in seconds (0 = disabled)", + LogicalType::UBIGINT, Value::UBIGINT(0), SetPostgresMaxLifetime); + config.AddExtensionOption("pg_connection_idle_timeout", + "Maximum idle time of a pooled connection in seconds before it is closed (0 = disabled)", + LogicalType::UBIGINT, Value::UBIGINT(0), SetPostgresIdleTimeout); config.AddExtensionOption( "pg_array_as_varchar", "Read Postgres arrays as varchar - enables reading mixed dimensional arrays", LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting); diff --git a/src/storage/postgres_catalog.cpp b/src/storage/postgres_catalog.cpp index 888463b27..47254d9e0 100644 --- a/src/storage/postgres_catalog.cpp +++ b/src/storage/postgres_catalog.cpp @@ -23,6 +23,14 @@ PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_strin if (db_instance.TryGetCurrentSetting("pg_connection_limit", connection_limit)) { connection_pool.SetMaximumConnections(UBigIntValue::Get(connection_limit)); } + Value max_lifetime; + if (db_instance.TryGetCurrentSetting("pg_connection_max_lifetime", max_lifetime)) { + connection_pool.SetMaxLifetime(UBigIntValue::Get(max_lifetime)); + } + Value idle_timeout; + if (db_instance.TryGetCurrentSetting("pg_connection_idle_timeout", idle_timeout)) { + connection_pool.SetIdleTimeout(UBigIntValue::Get(idle_timeout)); + } auto connection = connection_pool.GetConnection(); this->version = connection.GetConnection().GetPostgresVersion(context); diff --git a/src/storage/postgres_connection_pool.cpp b/src/storage/postgres_connection_pool.cpp index a46c190b0..165e02a6c 100644 --- a/src/storage/postgres_connection_pool.cpp +++ b/src/storage/postgres_connection_pool.cpp @@ -1,31 +1,36 @@ #include "storage/postgres_connection_pool.hpp" #include "storage/postgres_catalog.hpp" +#include namespace duckdb { static bool pg_use_connection_cache = true; -PostgresPoolConnection::PostgresPoolConnection() : pool(nullptr) { +// --- PostgresPoolConnection --- + +PostgresPoolConnection::PostgresPoolConnection() : pool(nullptr), created_at() { } PostgresPoolConnection::PostgresPoolConnection(optional_ptr pool, - PostgresConnection connection_p) - : pool(pool), connection(std::move(connection_p)) { + PostgresConnection connection_p, steady_time_point created_at_p) + : pool(pool), connection(std::move(connection_p)), created_at(created_at_p) { } PostgresPoolConnection::~PostgresPoolConnection() { if (pool) { - pool->ReturnConnection(std::move(connection)); + pool->ReturnConnection(std::move(connection), created_at); } } PostgresPoolConnection::PostgresPoolConnection(PostgresPoolConnection &&other) noexcept { std::swap(pool, other.pool); std::swap(connection, other.connection); + std::swap(created_at, other.created_at); } PostgresPoolConnection &PostgresPoolConnection::operator=(PostgresPoolConnection &&other) noexcept { std::swap(pool, other.pool); std::swap(connection, other.connection); + std::swap(created_at, other.created_at); return *this; } @@ -40,22 +45,101 @@ PostgresConnection &PostgresPoolConnection::GetConnection() { return connection; } +steady_time_point PostgresPoolConnection::GetCreatedAt() const { + return created_at; +} + +// --- PostgresConnectionPool --- + PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections_p) : postgres_catalog(postgres_catalog), active_connections(0), maximum_connections(maximum_connections_p) { } +PostgresConnectionPool::~PostgresConnectionPool() { + unique_lock l(connection_lock); + StopReaper(l); +} + +bool PostgresConnectionPool::IsExpired(const CachedConnection &entry, steady_time_point now) const { + if (max_lifetime_seconds > 0) { + auto age = std::chrono::duration_cast(now - entry.created_at).count(); + if (static_cast(age) >= max_lifetime_seconds) { + return true; + } + } + if (idle_timeout_seconds > 0) { + auto idle = std::chrono::duration_cast(now - entry.returned_at).count(); + if (static_cast(idle) >= idle_timeout_seconds) { + return true; + } + } + return false; +} + +void PostgresConnectionPool::ReaperLoop() { + unique_lock l(connection_lock); + while (!shutdown.load()) { + idx_t sleep_seconds = 30; + if (max_lifetime_seconds > 0 && idle_timeout_seconds > 0) { + sleep_seconds = std::min(max_lifetime_seconds, idle_timeout_seconds); + } else if (max_lifetime_seconds > 0) { + sleep_seconds = max_lifetime_seconds; + } else if (idle_timeout_seconds > 0) { + sleep_seconds = idle_timeout_seconds; + } + sleep_seconds = std::max(1, sleep_seconds / 2); + sleep_seconds = std::min(60, sleep_seconds); + + reaper_cv.wait_for(l, std::chrono::seconds(sleep_seconds), [this]() { return shutdown.load(); }); + + if (shutdown.load()) { + break; + } + + auto now = steady_clock::now(); + auto it = std::remove_if(connection_cache.begin(), connection_cache.end(), + [this, now](const CachedConnection &entry) { return IsExpired(entry, now); }); + connection_cache.erase(it, connection_cache.end()); + } +} + +void PostgresConnectionPool::StartReaperIfNeeded(unique_lock &lock) { + if (max_lifetime_seconds == 0 && idle_timeout_seconds == 0) { + return; + } + if (reaper_thread.joinable()) { + return; + } + shutdown.store(false); + reaper_thread = std::thread(&PostgresConnectionPool::ReaperLoop, this); +} + +void PostgresConnectionPool::StopReaper(unique_lock &lock) { + if (!reaper_thread.joinable()) { + return; + } + shutdown.store(true); + reaper_cv.notify_all(); + lock.unlock(); + reaper_thread.join(); + lock.lock(); +} + PostgresPoolConnection PostgresConnectionPool::GetConnectionInternal(unique_lock &lock) { active_connections++; - // check if we have any cached connections left - if (!connection_cache.empty()) { - auto connection = PostgresPoolConnection(this, std::move(connection_cache.back())); + auto now = steady_clock::now(); + while (!connection_cache.empty()) { + auto cached = std::move(connection_cache.back()); connection_cache.pop_back(); - return connection; + if (IsExpired(cached, now)) { + continue; + } + return PostgresPoolConnection(this, std::move(cached.connection), cached.created_at); } - // no cached connections left but there is space to open a new one - open it after releasing the cache lock lock.unlock(); + auto created = steady_clock::now(); return PostgresPoolConnection( - this, PostgresConnection::Open(postgres_catalog.connection_string, postgres_catalog.attach_path)); + this, PostgresConnection::Open(postgres_catalog.connection_string, postgres_catalog.attach_path), created); } PostgresPoolConnection PostgresConnectionPool::ForceGetConnection() { @@ -89,54 +173,56 @@ PostgresPoolConnection PostgresConnectionPool::GetConnection() { return result; } -void PostgresConnectionPool::ReturnConnection(PostgresConnection connection) { +void PostgresConnectionPool::ReturnConnection(PostgresConnection connection, steady_time_point created_at) { unique_lock l(connection_lock); if (active_connections <= 0) { throw InternalException("PostgresConnectionPool::ReturnConnection called but active_connections is 0"); } if (!pg_use_connection_cache) { - // not caching - just return active_connections--; return; } - // we want to cache the connection - // check if the underlying connection is still usable - // avoid holding the lock while doing this + + // check if the connection has exceeded its max lifetime before doing anything else + if (max_lifetime_seconds > 0) { + auto age = std::chrono::duration_cast(steady_clock::now() - created_at).count(); + if (static_cast(age) >= max_lifetime_seconds) { + active_connections--; + return; + } + } + l.unlock(); bool connection_is_bad = false; auto pg_con = connection.GetConn(); if (PQstatus(connection.GetConn()) != CONNECTION_OK) { - // CONNECTION_BAD! try to reset it PQreset(pg_con); if (PQstatus(connection.GetConn()) != CONNECTION_OK) { - // still bad - just abandon this one connection_is_bad = true; } } if (!connection_is_bad && PQtransactionStatus(pg_con) != PQTRANS_IDLE) { connection_is_bad = true; } - // lock and return the connection + l.lock(); active_connections--; if (connection_is_bad) { - // if the connection is bad we cannot cache it return; } if (active_connections >= maximum_connections) { - // if the maximum number of connections has been decreased by the user we might need to reclaim the connection - // immediately return; } - connection_cache.push_back(std::move(connection)); + CachedConnection cached; + cached.connection = std::move(connection); + cached.created_at = created_at; + cached.returned_at = steady_clock::now(); + connection_cache.push_back(std::move(cached)); } void PostgresConnectionPool::SetMaximumConnections(idx_t new_max) { lock_guard l(connection_lock); if (new_max < maximum_connections) { - // potentially close connections - // note that we can only close connections in the connection cache - // we will have to wait for connections to be returned auto total_open_connections = active_connections + connection_cache.size(); while (!connection_cache.empty() && total_open_connections > new_max) { total_open_connections--; @@ -146,4 +232,26 @@ void PostgresConnectionPool::SetMaximumConnections(idx_t new_max) { maximum_connections = new_max; } +void PostgresConnectionPool::SetMaxLifetime(idx_t seconds) { + unique_lock l(connection_lock); + max_lifetime_seconds = seconds; + if (seconds == 0 && idle_timeout_seconds == 0) { + StopReaper(l); + } else { + StartReaperIfNeeded(l); + reaper_cv.notify_all(); + } +} + +void PostgresConnectionPool::SetIdleTimeout(idx_t seconds) { + unique_lock l(connection_lock); + idle_timeout_seconds = seconds; + if (seconds == 0 && max_lifetime_seconds == 0) { + StopReaper(l); + } else { + StartReaperIfNeeded(l); + reaper_cv.notify_all(); + } +} + } // namespace duckdb diff --git a/test/sql/storage/attach_connection_lifetime.test b/test/sql/storage/attach_connection_lifetime.test new file mode 100644 index 000000000..d86bfe874 --- /dev/null +++ b/test/sql/storage/attach_connection_lifetime.test @@ -0,0 +1,111 @@ +# name: test/sql/storage/attach_connection_lifetime.test +# description: Test connection pool max lifetime and idle timeout settings +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +PRAGMA enable_verification + +statement ok +SET pg_connection_cache=true + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +statement ok +USE s + +statement ok +CREATE OR REPLACE TABLE connection_lifetime_test(i INTEGER); + +statement ok +INSERT INTO connection_lifetime_test FROM range(100) + +# verify queries work before enabling lifetime settings +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# enable max lifetime with a short duration (2 seconds) +statement ok +SET pg_connection_max_lifetime=2 + +# queries should still succeed +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# wait for connections to expire +statement ok +SELECT pg_sleep(3) + +# queries should still succeed — pool creates new connections as needed +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# disable max lifetime +statement ok +SET pg_connection_max_lifetime=0 + +# enable idle timeout with a short duration (2 seconds) +statement ok +SET pg_connection_idle_timeout=2 + +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# wait for idle connections to be reaped +statement ok +SELECT pg_sleep(3) + +# queries should still succeed after idle connections are reaped +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# disable idle timeout +statement ok +SET pg_connection_idle_timeout=0 + +# enable both settings simultaneously +statement ok +SET pg_connection_max_lifetime=2 + +statement ok +SET pg_connection_idle_timeout=1 + +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +statement ok +SELECT pg_sleep(3) + +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# disable both +statement ok +SET pg_connection_max_lifetime=0 + +statement ok +SET pg_connection_idle_timeout=0 + +# final sanity check +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 From dcc4024a5185fe812325538b2fed3d0085dde585 Mon Sep 17 00:00:00 2001 From: Basanth Jenu H B Date: Fri, 27 Feb 2026 21:04:09 +0530 Subject: [PATCH 2/2] feat: minor refactor --- .../storage/postgres_connection_pool.hpp | 27 ++++---- src/postgres_extension.cpp | 35 ++++++----- src/storage/postgres_connection_pool.cpp | 61 +++++++++++-------- .../storage/attach_connection_lifetime.test | 4 ++ 4 files changed, 72 insertions(+), 55 deletions(-) diff --git a/src/include/storage/postgres_connection_pool.hpp b/src/include/storage/postgres_connection_pool.hpp index 6e84852a2..6d2193299 100644 --- a/src/include/storage/postgres_connection_pool.hpp +++ b/src/include/storage/postgres_connection_pool.hpp @@ -21,20 +21,13 @@ namespace duckdb { class PostgresCatalog; class PostgresConnectionPool; -using steady_clock = std::chrono::steady_clock; -using steady_time_point = steady_clock::time_point; - -struct CachedConnection { - PostgresConnection connection; - steady_time_point created_at; - steady_time_point returned_at; -}; - class PostgresPoolConnection { public: + using time_point_t = std::chrono::steady_clock::time_point; + PostgresPoolConnection(); PostgresPoolConnection(optional_ptr pool, PostgresConnection connection, - steady_time_point created_at); + time_point_t created_at); ~PostgresPoolConnection(); // disable copy constructors PostgresPoolConnection(const PostgresPoolConnection &other) = delete; @@ -45,16 +38,18 @@ class PostgresPoolConnection { bool HasConnection(); PostgresConnection &GetConnection(); - steady_time_point GetCreatedAt() const; private: optional_ptr pool; PostgresConnection connection; - steady_time_point created_at; + time_point_t created_at; }; class PostgresConnectionPool { public: + using steady_clock = std::chrono::steady_clock; + using steady_time_point = steady_clock::time_point; + static constexpr const idx_t DEFAULT_MAX_CONNECTIONS = 64; PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections = DEFAULT_MAX_CONNECTIONS); @@ -73,6 +68,12 @@ class PostgresConnectionPool { static void PostgresSetConnectionCache(ClientContext &context, SetScope scope, Value ¶meter); private: + struct CachedConnection { + PostgresConnection connection; + steady_time_point created_at; + steady_time_point returned_at; + }; + PostgresCatalog &postgres_catalog; mutex connection_lock; idx_t active_connections; @@ -90,8 +91,8 @@ class PostgresConnectionPool { void ReaperLoop(); void StartReaperIfNeeded(unique_lock &lock); void StopReaper(unique_lock &lock); + void UpdateTimeoutSetting(idx_t &field, idx_t seconds); -private: PostgresPoolConnection GetConnectionInternal(unique_lock &lock); }; diff --git a/src/postgres_extension.cpp b/src/postgres_extension.cpp index ce7c8a987..d788d57bb 100644 --- a/src/postgres_extension.cpp +++ b/src/postgres_extension.cpp @@ -70,9 +70,15 @@ static void SetPostgresConnectionLimit(ClientContext &context, SetScope scope, V config.SetOption("pg_connection_limit", parameter); } -static void SetPostgresMaxLifetime(ClientContext &context, SetScope scope, Value ¶meter) { +using PoolTimeoutSetter = void (PostgresConnectionPool::*)(idx_t); + +static void SetPostgresPoolTimeout(ClientContext &context, SetScope scope, Value ¶meter, + const char *option_name, PoolTimeoutSetter setter) { + if (parameter.IsNull()) { + throw BinderException("Cannot be set to NULL"); + } if (scope == SetScope::LOCAL) { - throw InvalidInputException("pg_connection_max_lifetime can only be set globally"); + throw InvalidInputException("%s can only be set globally", option_name); } auto databases = DatabaseManager::Get(context).GetDatabases(context); for (auto &db_ref : databases) { @@ -81,27 +87,20 @@ static void SetPostgresMaxLifetime(ClientContext &context, SetScope scope, Value if (catalog.GetCatalogType() != "postgres") { continue; } - catalog.Cast().GetConnectionPool().SetMaxLifetime(UBigIntValue::Get(parameter)); + (catalog.Cast().GetConnectionPool().*setter)(UBigIntValue::Get(parameter)); } auto &config = DBConfig::GetConfig(context); - config.SetOption("pg_connection_max_lifetime", parameter); + config.SetOption(option_name, parameter); +} + +static void SetPostgresMaxLifetime(ClientContext &context, SetScope scope, Value ¶meter) { + SetPostgresPoolTimeout(context, scope, parameter, "pg_connection_max_lifetime", + &PostgresConnectionPool::SetMaxLifetime); } static void SetPostgresIdleTimeout(ClientContext &context, SetScope scope, Value ¶meter) { - if (scope == SetScope::LOCAL) { - throw InvalidInputException("pg_connection_idle_timeout can only be set globally"); - } - auto databases = DatabaseManager::Get(context).GetDatabases(context); - for (auto &db_ref : databases) { - auto &db = *db_ref; - auto &catalog = db.GetCatalog(); - if (catalog.GetCatalogType() != "postgres") { - continue; - } - catalog.Cast().GetConnectionPool().SetIdleTimeout(UBigIntValue::Get(parameter)); - } - auto &config = DBConfig::GetConfig(context); - config.SetOption("pg_connection_idle_timeout", parameter); + SetPostgresPoolTimeout(context, scope, parameter, "pg_connection_idle_timeout", + &PostgresConnectionPool::SetIdleTimeout); } static void SetPostgresDebugQueryPrint(ClientContext &context, SetScope scope, Value ¶meter) { diff --git a/src/storage/postgres_connection_pool.cpp b/src/storage/postgres_connection_pool.cpp index 165e02a6c..140bd9f4f 100644 --- a/src/storage/postgres_connection_pool.cpp +++ b/src/storage/postgres_connection_pool.cpp @@ -5,13 +5,11 @@ namespace duckdb { static bool pg_use_connection_cache = true; -// --- PostgresPoolConnection --- - PostgresPoolConnection::PostgresPoolConnection() : pool(nullptr), created_at() { } PostgresPoolConnection::PostgresPoolConnection(optional_ptr pool, - PostgresConnection connection_p, steady_time_point created_at_p) + PostgresConnection connection_p, time_point_t created_at_p) : pool(pool), connection(std::move(connection_p)), created_at(created_at_p) { } @@ -45,12 +43,6 @@ PostgresConnection &PostgresPoolConnection::GetConnection() { return connection; } -steady_time_point PostgresPoolConnection::GetCreatedAt() const { - return created_at; -} - -// --- PostgresConnectionPool --- - PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections_p) : postgres_catalog(postgres_catalog), active_connections(0), maximum_connections(maximum_connections_p) { } @@ -97,9 +89,14 @@ void PostgresConnectionPool::ReaperLoop() { } auto now = steady_clock::now(); - auto it = std::remove_if(connection_cache.begin(), connection_cache.end(), - [this, now](const CachedConnection &entry) { return IsExpired(entry, now); }); + auto it = std::partition(connection_cache.begin(), connection_cache.end(), + [this, now](const CachedConnection &e) { return !IsExpired(e, now); }); + vector expired(std::make_move_iterator(it), std::make_move_iterator(connection_cache.end())); connection_cache.erase(it, connection_cache.end()); + // release lock while destroying expired connections (PQfinish may block) + l.unlock(); + expired.clear(); + l.lock(); } } @@ -138,8 +135,14 @@ PostgresPoolConnection PostgresConnectionPool::GetConnectionInternal(unique_lock } lock.unlock(); auto created = steady_clock::now(); - return PostgresPoolConnection( - this, PostgresConnection::Open(postgres_catalog.connection_string, postgres_catalog.attach_path), created); + try { + return PostgresPoolConnection( + this, PostgresConnection::Open(postgres_catalog.connection_string, postgres_catalog.attach_path), created); + } catch (...) { + lock.lock(); + active_connections--; + throw; + } } PostgresPoolConnection PostgresConnectionPool::ForceGetConnection() { @@ -179,6 +182,7 @@ void PostgresConnectionPool::ReturnConnection(PostgresConnection connection, ste throw InternalException("PostgresConnectionPool::ReturnConnection called but active_connections is 0"); } if (!pg_use_connection_cache) { + // not caching - just return active_connections--; return; } @@ -192,12 +196,17 @@ void PostgresConnectionPool::ReturnConnection(PostgresConnection connection, ste } } + // we want to cache the connection + // check if the underlying connection is still usable + // avoid holding the lock while doing this l.unlock(); bool connection_is_bad = false; auto pg_con = connection.GetConn(); if (PQstatus(connection.GetConn()) != CONNECTION_OK) { + // CONNECTION_BAD! try to reset it PQreset(pg_con); if (PQstatus(connection.GetConn()) != CONNECTION_OK) { + // still bad - just abandon this one connection_is_bad = true; } } @@ -205,12 +214,16 @@ void PostgresConnectionPool::ReturnConnection(PostgresConnection connection, ste connection_is_bad = true; } + // lock and return the connection l.lock(); active_connections--; if (connection_is_bad) { + // if the connection is bad we cannot cache it return; } if (active_connections >= maximum_connections) { + // if the maximum number of connections has been decreased by the user we might need to reclaim the connection + // immediately return; } CachedConnection cached; @@ -223,6 +236,9 @@ void PostgresConnectionPool::ReturnConnection(PostgresConnection connection, ste void PostgresConnectionPool::SetMaximumConnections(idx_t new_max) { lock_guard l(connection_lock); if (new_max < maximum_connections) { + // potentially close connections + // note that we can only close connections in the connection cache + // we will have to wait for connections to be returned auto total_open_connections = active_connections + connection_cache.size(); while (!connection_cache.empty() && total_open_connections > new_max) { total_open_connections--; @@ -232,10 +248,10 @@ void PostgresConnectionPool::SetMaximumConnections(idx_t new_max) { maximum_connections = new_max; } -void PostgresConnectionPool::SetMaxLifetime(idx_t seconds) { +void PostgresConnectionPool::UpdateTimeoutSetting(idx_t &field, idx_t seconds) { unique_lock l(connection_lock); - max_lifetime_seconds = seconds; - if (seconds == 0 && idle_timeout_seconds == 0) { + field = seconds; + if (max_lifetime_seconds == 0 && idle_timeout_seconds == 0) { StopReaper(l); } else { StartReaperIfNeeded(l); @@ -243,15 +259,12 @@ void PostgresConnectionPool::SetMaxLifetime(idx_t seconds) { } } +void PostgresConnectionPool::SetMaxLifetime(idx_t seconds) { + UpdateTimeoutSetting(max_lifetime_seconds, seconds); +} + void PostgresConnectionPool::SetIdleTimeout(idx_t seconds) { - unique_lock l(connection_lock); - idle_timeout_seconds = seconds; - if (seconds == 0 && max_lifetime_seconds == 0) { - StopReaper(l); - } else { - StartReaperIfNeeded(l); - reaper_cv.notify_all(); - } + UpdateTimeoutSetting(idle_timeout_seconds, seconds); } } // namespace duckdb diff --git a/test/sql/storage/attach_connection_lifetime.test b/test/sql/storage/attach_connection_lifetime.test index d86bfe874..81e5a3a1e 100644 --- a/test/sql/storage/attach_connection_lifetime.test +++ b/test/sql/storage/attach_connection_lifetime.test @@ -109,3 +109,7 @@ query I SELECT COUNT(*) FROM connection_lifetime_test ---- 100 + +# clean up +statement ok +DROP TABLE IF EXISTS connection_lifetime_test