diff --git a/src/include/storage/postgres_connection_pool.hpp b/src/include/storage/postgres_connection_pool.hpp index 0cb71413e..6d2193299 100644 --- a/src/include/storage/postgres_connection_pool.hpp +++ b/src/include/storage/postgres_connection_pool.hpp @@ -12,6 +12,10 @@ #include "duckdb/common/mutex.hpp" #include "duckdb/common/optional_ptr.hpp" #include "postgres_connection.hpp" +#include +#include +#include +#include namespace duckdb { class PostgresCatalog; @@ -19,8 +23,11 @@ class PostgresConnectionPool; class PostgresPoolConnection { public: + using time_point_t = std::chrono::steady_clock::time_point; + PostgresPoolConnection(); - PostgresPoolConnection(optional_ptr pool, PostgresConnection connection); + PostgresPoolConnection(optional_ptr pool, PostgresConnection connection, + time_point_t created_at); ~PostgresPoolConnection(); // disable copy constructors PostgresPoolConnection(const PostgresPoolConnection &other) = delete; @@ -35,32 +42,57 @@ class PostgresPoolConnection { private: optional_ptr pool; PostgresConnection connection; + time_point_t created_at; }; class PostgresConnectionPool { public: + using steady_clock = std::chrono::steady_clock; + using steady_time_point = steady_clock::time_point; + static constexpr const idx_t DEFAULT_MAX_CONNECTIONS = 64; PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections = DEFAULT_MAX_CONNECTIONS); + ~PostgresConnectionPool(); public: bool TryGetConnection(PostgresPoolConnection &connection); PostgresPoolConnection GetConnection(); //! Always returns a connection - even if the connection slots are exhausted PostgresPoolConnection ForceGetConnection(); - void ReturnConnection(PostgresConnection connection); + void ReturnConnection(PostgresConnection connection, steady_time_point created_at); void SetMaximumConnections(idx_t new_max); + void SetMaxLifetime(idx_t seconds); + void SetIdleTimeout(idx_t seconds); static void PostgresSetConnectionCache(ClientContext &context, SetScope scope, Value ¶meter); private: + struct CachedConnection { + PostgresConnection connection; + steady_time_point created_at; + steady_time_point returned_at; + }; + PostgresCatalog &postgres_catalog; mutex connection_lock; idx_t active_connections; idx_t maximum_connections; - vector connection_cache; + vector connection_cache; + + idx_t max_lifetime_seconds = 0; + idx_t idle_timeout_seconds = 0; + + std::thread reaper_thread; + std::condition_variable reaper_cv; + std::atomic shutdown {false}; + + bool IsExpired(const CachedConnection &entry, steady_time_point now) const; + void ReaperLoop(); + void StartReaperIfNeeded(unique_lock &lock); + void StopReaper(unique_lock &lock); + void UpdateTimeoutSetting(idx_t &field, idx_t seconds); -private: PostgresPoolConnection GetConnectionInternal(unique_lock &lock); }; diff --git a/src/postgres_extension.cpp b/src/postgres_extension.cpp index dce65b121..d788d57bb 100644 --- a/src/postgres_extension.cpp +++ b/src/postgres_extension.cpp @@ -70,6 +70,39 @@ static void SetPostgresConnectionLimit(ClientContext &context, SetScope scope, V config.SetOption("pg_connection_limit", parameter); } +using PoolTimeoutSetter = void (PostgresConnectionPool::*)(idx_t); + +static void SetPostgresPoolTimeout(ClientContext &context, SetScope scope, Value ¶meter, + const char *option_name, PoolTimeoutSetter setter) { + if (parameter.IsNull()) { + throw BinderException("Cannot be set to NULL"); + } + if (scope == SetScope::LOCAL) { + throw InvalidInputException("%s can only be set globally", option_name); + } + auto databases = DatabaseManager::Get(context).GetDatabases(context); + for (auto &db_ref : databases) { + auto &db = *db_ref; + auto &catalog = db.GetCatalog(); + if (catalog.GetCatalogType() != "postgres") { + continue; + } + (catalog.Cast().GetConnectionPool().*setter)(UBigIntValue::Get(parameter)); + } + auto &config = DBConfig::GetConfig(context); + config.SetOption(option_name, parameter); +} + +static void SetPostgresMaxLifetime(ClientContext &context, SetScope scope, Value ¶meter) { + SetPostgresPoolTimeout(context, scope, parameter, "pg_connection_max_lifetime", + &PostgresConnectionPool::SetMaxLifetime); +} + +static void SetPostgresIdleTimeout(ClientContext &context, SetScope scope, Value ¶meter) { + SetPostgresPoolTimeout(context, scope, parameter, "pg_connection_idle_timeout", + &PostgresConnectionPool::SetIdleTimeout); +} + static void SetPostgresDebugQueryPrint(ClientContext &context, SetScope scope, Value ¶meter) { PostgresConnection::DebugSetPrintQueries(BooleanValue::Get(parameter)); } @@ -172,6 +205,12 @@ static void LoadInternal(ExtensionLoader &loader) { config.AddExtensionOption("pg_connection_limit", "The maximum amount of concurrent Postgres connections", LogicalType::UBIGINT, Value::UBIGINT(PostgresConnectionPool::DEFAULT_MAX_CONNECTIONS), SetPostgresConnectionLimit); + config.AddExtensionOption("pg_connection_max_lifetime", + "Maximum lifetime of a pooled connection in seconds (0 = disabled)", + LogicalType::UBIGINT, Value::UBIGINT(0), SetPostgresMaxLifetime); + config.AddExtensionOption("pg_connection_idle_timeout", + "Maximum idle time of a pooled connection in seconds before it is closed (0 = disabled)", + LogicalType::UBIGINT, Value::UBIGINT(0), SetPostgresIdleTimeout); config.AddExtensionOption( "pg_array_as_varchar", "Read Postgres arrays as varchar - enables reading mixed dimensional arrays", LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting); diff --git a/src/storage/postgres_catalog.cpp b/src/storage/postgres_catalog.cpp index 888463b27..47254d9e0 100644 --- a/src/storage/postgres_catalog.cpp +++ b/src/storage/postgres_catalog.cpp @@ -23,6 +23,14 @@ PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_strin if (db_instance.TryGetCurrentSetting("pg_connection_limit", connection_limit)) { connection_pool.SetMaximumConnections(UBigIntValue::Get(connection_limit)); } + Value max_lifetime; + if (db_instance.TryGetCurrentSetting("pg_connection_max_lifetime", max_lifetime)) { + connection_pool.SetMaxLifetime(UBigIntValue::Get(max_lifetime)); + } + Value idle_timeout; + if (db_instance.TryGetCurrentSetting("pg_connection_idle_timeout", idle_timeout)) { + connection_pool.SetIdleTimeout(UBigIntValue::Get(idle_timeout)); + } auto connection = connection_pool.GetConnection(); this->version = connection.GetConnection().GetPostgresVersion(context); diff --git a/src/storage/postgres_connection_pool.cpp b/src/storage/postgres_connection_pool.cpp index a46c190b0..140bd9f4f 100644 --- a/src/storage/postgres_connection_pool.cpp +++ b/src/storage/postgres_connection_pool.cpp @@ -1,31 +1,34 @@ #include "storage/postgres_connection_pool.hpp" #include "storage/postgres_catalog.hpp" +#include namespace duckdb { static bool pg_use_connection_cache = true; -PostgresPoolConnection::PostgresPoolConnection() : pool(nullptr) { +PostgresPoolConnection::PostgresPoolConnection() : pool(nullptr), created_at() { } PostgresPoolConnection::PostgresPoolConnection(optional_ptr pool, - PostgresConnection connection_p) - : pool(pool), connection(std::move(connection_p)) { + PostgresConnection connection_p, time_point_t created_at_p) + : pool(pool), connection(std::move(connection_p)), created_at(created_at_p) { } PostgresPoolConnection::~PostgresPoolConnection() { if (pool) { - pool->ReturnConnection(std::move(connection)); + pool->ReturnConnection(std::move(connection), created_at); } } PostgresPoolConnection::PostgresPoolConnection(PostgresPoolConnection &&other) noexcept { std::swap(pool, other.pool); std::swap(connection, other.connection); + std::swap(created_at, other.created_at); } PostgresPoolConnection &PostgresPoolConnection::operator=(PostgresPoolConnection &&other) noexcept { std::swap(pool, other.pool); std::swap(connection, other.connection); + std::swap(created_at, other.created_at); return *this; } @@ -44,18 +47,102 @@ PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog : postgres_catalog(postgres_catalog), active_connections(0), maximum_connections(maximum_connections_p) { } +PostgresConnectionPool::~PostgresConnectionPool() { + unique_lock l(connection_lock); + StopReaper(l); +} + +bool PostgresConnectionPool::IsExpired(const CachedConnection &entry, steady_time_point now) const { + if (max_lifetime_seconds > 0) { + auto age = std::chrono::duration_cast(now - entry.created_at).count(); + if (static_cast(age) >= max_lifetime_seconds) { + return true; + } + } + if (idle_timeout_seconds > 0) { + auto idle = std::chrono::duration_cast(now - entry.returned_at).count(); + if (static_cast(idle) >= idle_timeout_seconds) { + return true; + } + } + return false; +} + +void PostgresConnectionPool::ReaperLoop() { + unique_lock l(connection_lock); + while (!shutdown.load()) { + idx_t sleep_seconds = 30; + if (max_lifetime_seconds > 0 && idle_timeout_seconds > 0) { + sleep_seconds = std::min(max_lifetime_seconds, idle_timeout_seconds); + } else if (max_lifetime_seconds > 0) { + sleep_seconds = max_lifetime_seconds; + } else if (idle_timeout_seconds > 0) { + sleep_seconds = idle_timeout_seconds; + } + sleep_seconds = std::max(1, sleep_seconds / 2); + sleep_seconds = std::min(60, sleep_seconds); + + reaper_cv.wait_for(l, std::chrono::seconds(sleep_seconds), [this]() { return shutdown.load(); }); + + if (shutdown.load()) { + break; + } + + auto now = steady_clock::now(); + auto it = std::partition(connection_cache.begin(), connection_cache.end(), + [this, now](const CachedConnection &e) { return !IsExpired(e, now); }); + vector expired(std::make_move_iterator(it), std::make_move_iterator(connection_cache.end())); + connection_cache.erase(it, connection_cache.end()); + // release lock while destroying expired connections (PQfinish may block) + l.unlock(); + expired.clear(); + l.lock(); + } +} + +void PostgresConnectionPool::StartReaperIfNeeded(unique_lock &lock) { + if (max_lifetime_seconds == 0 && idle_timeout_seconds == 0) { + return; + } + if (reaper_thread.joinable()) { + return; + } + shutdown.store(false); + reaper_thread = std::thread(&PostgresConnectionPool::ReaperLoop, this); +} + +void PostgresConnectionPool::StopReaper(unique_lock &lock) { + if (!reaper_thread.joinable()) { + return; + } + shutdown.store(true); + reaper_cv.notify_all(); + lock.unlock(); + reaper_thread.join(); + lock.lock(); +} + PostgresPoolConnection PostgresConnectionPool::GetConnectionInternal(unique_lock &lock) { active_connections++; - // check if we have any cached connections left - if (!connection_cache.empty()) { - auto connection = PostgresPoolConnection(this, std::move(connection_cache.back())); + auto now = steady_clock::now(); + while (!connection_cache.empty()) { + auto cached = std::move(connection_cache.back()); connection_cache.pop_back(); - return connection; + if (IsExpired(cached, now)) { + continue; + } + return PostgresPoolConnection(this, std::move(cached.connection), cached.created_at); } - // no cached connections left but there is space to open a new one - open it after releasing the cache lock lock.unlock(); - return PostgresPoolConnection( - this, PostgresConnection::Open(postgres_catalog.connection_string, postgres_catalog.attach_path)); + auto created = steady_clock::now(); + try { + return PostgresPoolConnection( + this, PostgresConnection::Open(postgres_catalog.connection_string, postgres_catalog.attach_path), created); + } catch (...) { + lock.lock(); + active_connections--; + throw; + } } PostgresPoolConnection PostgresConnectionPool::ForceGetConnection() { @@ -89,7 +176,7 @@ PostgresPoolConnection PostgresConnectionPool::GetConnection() { return result; } -void PostgresConnectionPool::ReturnConnection(PostgresConnection connection) { +void PostgresConnectionPool::ReturnConnection(PostgresConnection connection, steady_time_point created_at) { unique_lock l(connection_lock); if (active_connections <= 0) { throw InternalException("PostgresConnectionPool::ReturnConnection called but active_connections is 0"); @@ -99,6 +186,16 @@ void PostgresConnectionPool::ReturnConnection(PostgresConnection connection) { active_connections--; return; } + + // check if the connection has exceeded its max lifetime before doing anything else + if (max_lifetime_seconds > 0) { + auto age = std::chrono::duration_cast(steady_clock::now() - created_at).count(); + if (static_cast(age) >= max_lifetime_seconds) { + active_connections--; + return; + } + } + // we want to cache the connection // check if the underlying connection is still usable // avoid holding the lock while doing this @@ -116,6 +213,7 @@ void PostgresConnectionPool::ReturnConnection(PostgresConnection connection) { if (!connection_is_bad && PQtransactionStatus(pg_con) != PQTRANS_IDLE) { connection_is_bad = true; } + // lock and return the connection l.lock(); active_connections--; @@ -128,7 +226,11 @@ void PostgresConnectionPool::ReturnConnection(PostgresConnection connection) { // immediately return; } - connection_cache.push_back(std::move(connection)); + CachedConnection cached; + cached.connection = std::move(connection); + cached.created_at = created_at; + cached.returned_at = steady_clock::now(); + connection_cache.push_back(std::move(cached)); } void PostgresConnectionPool::SetMaximumConnections(idx_t new_max) { @@ -146,4 +248,23 @@ void PostgresConnectionPool::SetMaximumConnections(idx_t new_max) { maximum_connections = new_max; } +void PostgresConnectionPool::UpdateTimeoutSetting(idx_t &field, idx_t seconds) { + unique_lock l(connection_lock); + field = seconds; + if (max_lifetime_seconds == 0 && idle_timeout_seconds == 0) { + StopReaper(l); + } else { + StartReaperIfNeeded(l); + reaper_cv.notify_all(); + } +} + +void PostgresConnectionPool::SetMaxLifetime(idx_t seconds) { + UpdateTimeoutSetting(max_lifetime_seconds, seconds); +} + +void PostgresConnectionPool::SetIdleTimeout(idx_t seconds) { + UpdateTimeoutSetting(idle_timeout_seconds, seconds); +} + } // namespace duckdb diff --git a/test/sql/storage/attach_connection_lifetime.test b/test/sql/storage/attach_connection_lifetime.test new file mode 100644 index 000000000..81e5a3a1e --- /dev/null +++ b/test/sql/storage/attach_connection_lifetime.test @@ -0,0 +1,115 @@ +# name: test/sql/storage/attach_connection_lifetime.test +# description: Test connection pool max lifetime and idle timeout settings +# group: [storage] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +PRAGMA enable_verification + +statement ok +SET pg_connection_cache=true + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES); + +statement ok +USE s + +statement ok +CREATE OR REPLACE TABLE connection_lifetime_test(i INTEGER); + +statement ok +INSERT INTO connection_lifetime_test FROM range(100) + +# verify queries work before enabling lifetime settings +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# enable max lifetime with a short duration (2 seconds) +statement ok +SET pg_connection_max_lifetime=2 + +# queries should still succeed +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# wait for connections to expire +statement ok +SELECT pg_sleep(3) + +# queries should still succeed — pool creates new connections as needed +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# disable max lifetime +statement ok +SET pg_connection_max_lifetime=0 + +# enable idle timeout with a short duration (2 seconds) +statement ok +SET pg_connection_idle_timeout=2 + +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# wait for idle connections to be reaped +statement ok +SELECT pg_sleep(3) + +# queries should still succeed after idle connections are reaped +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# disable idle timeout +statement ok +SET pg_connection_idle_timeout=0 + +# enable both settings simultaneously +statement ok +SET pg_connection_max_lifetime=2 + +statement ok +SET pg_connection_idle_timeout=1 + +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +statement ok +SELECT pg_sleep(3) + +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# disable both +statement ok +SET pg_connection_max_lifetime=0 + +statement ok +SET pg_connection_idle_timeout=0 + +# final sanity check +query I +SELECT COUNT(*) FROM connection_lifetime_test +---- +100 + +# clean up +statement ok +DROP TABLE IF EXISTS connection_lifetime_test