From cc8e793f97dcc7cf126d9e9fcff9d093960fffb1 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 15 Jan 2026 12:18:24 -0800 Subject: [PATCH 1/2] fix: prepared statements eviction churns CPU --- Cargo.lock | 1 + integration/rust/Cargo.toml | 1 + integration/rust/tests/bench/mod.rs | 1 + .../rust/tests/bench/prepared_stress.rs | 47 ++++++++++++++++ integration/rust/tests/mod.rs | 1 + pgdog/src/backend/prepared_statements.rs | 37 +++++------- pgdog/src/frontend/prepared_statements/mod.rs | 56 +++++++++++-------- pgdog/tests/pgbouncer/pgdog.toml | 1 + 8 files changed, 100 insertions(+), 45 deletions(-) create mode 100644 integration/rust/tests/bench/mod.rs create mode 100644 integration/rust/tests/bench/prepared_stress.rs diff --git a/Cargo.lock b/Cargo.lock index ac792a76..690a489a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3182,6 +3182,7 @@ dependencies = [ "libc", "ordered-float", "parking_lot", + "rand 0.9.2", "reqwest", "serde_json", "serial_test", diff --git a/integration/rust/Cargo.toml b/integration/rust/Cargo.toml index a485aa50..ef1ab54d 100644 --- a/integration/rust/Cargo.toml +++ b/integration/rust/Cargo.toml @@ -20,3 +20,4 @@ serde_json = "*" ordered-float = "4.2" tokio-rustls = "0.26" libc = "0.2" +rand = "0.9" diff --git a/integration/rust/tests/bench/mod.rs b/integration/rust/tests/bench/mod.rs new file mode 100644 index 00000000..0ee8d5bc --- /dev/null +++ b/integration/rust/tests/bench/mod.rs @@ -0,0 +1 @@ +pub mod prepared_stress; diff --git a/integration/rust/tests/bench/prepared_stress.rs b/integration/rust/tests/bench/prepared_stress.rs new file mode 100644 index 00000000..6f0e8a17 --- /dev/null +++ b/integration/rust/tests/bench/prepared_stress.rs @@ -0,0 +1,47 @@ +use sqlx::Connection; +use tokio::sync::mpsc::{Sender, channel}; +use tokio::{select, spawn}; + +#[tokio::test] +#[ignore] +async fn slam_with_prepared() -> Result<(), Box> { + let conns = 1000; + let statements = 50_000; + let mut signals: Vec> = vec![]; + let mut tasks = vec![]; + + for _ in 0..conns { + let (tx, mut rx) = channel(1); + signals.push(tx); + + let handle = spawn(async move { + let mut conn = + sqlx::PgConnection::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog").await?; + + loop { + let r = rand::random_range(0..statements); + + let query = format!("SELECT 1, 2, 3, 4, $1, 'apples and oranges', 'blah', {}", r); + let query = sqlx::query(query.as_str()).bind(r).execute(&mut conn); + + select! { + res = query => { + res?; + } + + _ = rx.recv() => { break; } + } + } + + Ok::<(), sqlx::Error>(()) + }); + + tasks.push(handle); + } + + for task in tasks { + task.await??; + } + + Ok(()) +} diff --git a/integration/rust/tests/mod.rs b/integration/rust/tests/mod.rs index 4889f7f0..ac611076 100644 --- a/integration/rust/tests/mod.rs +++ b/integration/rust/tests/mod.rs @@ -1,3 +1,4 @@ +pub mod bench; pub mod integration; pub mod sqlx; pub mod stats; diff --git a/pgdog/src/backend/prepared_statements.rs b/pgdog/src/backend/prepared_statements.rs index 0ee5a481..8c218526 100644 --- a/pgdog/src/backend/prepared_statements.rs +++ b/pgdog/src/backend/prepared_statements.rs @@ -10,7 +10,6 @@ use crate::{ Close, CloseComplete, FromBytes, Message, ParseComplete, Protocol, ProtocolMessage, ToBytes, }, - stats::memory::MemoryUsage, }; use super::Error; @@ -19,6 +18,12 @@ use super::{ state::ExecutionCode, }; +/// Approximate memory used by a String. +#[inline] +fn str_mem(s: &str) -> usize { + s.len() + std::mem::size_of::() +} + #[derive(Debug, Clone)] pub enum HandleResult { Forward, @@ -44,18 +49,6 @@ pub struct PreparedStatements { memory_used: usize, } -impl MemoryUsage for PreparedStatements { - #[inline] - fn memory_usage(&self) -> usize { - self.local_cache.memory_usage() - + self.parses.memory_usage() - + self.describes.memory_usage() - + self.capacity.memory_usage() - + std::mem::size_of::>>() - + self.state.memory_usage() - } -} - impl Default for PreparedStatements { fn default() -> Self { Self::new() @@ -287,8 +280,8 @@ impl PreparedStatements { /// Indicate this statement is prepared on the connection. pub fn prepared(&mut self, name: &str) { + self.memory_used += str_mem(name); self.local_cache.push(name.to_owned(), ()); - self.memory_used = self.memory_usage(); } /// How much memory is used by this structure, approx. @@ -321,16 +314,19 @@ impl PreparedStatements { /// This should only be done when a statement has been closed, /// or failed to parse. pub(crate) fn remove(&mut self, name: &str) -> bool { - let exists = self.local_cache.pop(name).is_some(); - self.memory_used = self.memory_usage(); - exists + if self.local_cache.pop(name).is_some() { + self.memory_used = self.memory_used.saturating_sub(str_mem(name)); + true + } else { + false + } } /// Indicate all prepared statements have been removed /// from the server connection. pub fn clear(&mut self) { self.local_cache.clear(); - self.memory_used = self.memory_usage(); + self.memory_used = 0; } /// Get current extended protocol state. @@ -366,13 +362,10 @@ impl PreparedStatements { if let Some((name, _)) = candidate { close.push(Close::named(&name)); + self.memory_used = self.memory_used.saturating_sub(str_mem(&name)); } } - if !close.is_empty() { - self.memory_used = self.memory_usage(); - } - close } } diff --git a/pgdog/src/frontend/prepared_statements/mod.rs b/pgdog/src/frontend/prepared_statements/mod.rs index f48513c4..30895abb 100644 --- a/pgdog/src/frontend/prepared_statements/mod.rs +++ b/pgdog/src/frontend/prepared_statements/mod.rs @@ -10,7 +10,6 @@ use tracing::debug; use crate::{ config::{config, PreparedStatements as PreparedStatementsLevel}, net::{Parse, ProtocolMessage}, - stats::memory::MemoryUsage, }; pub mod error; @@ -23,6 +22,12 @@ pub use rewrite::Rewrite; static CACHE: Lazy = Lazy::new(PreparedStatements::default); +/// Approximate memory used by a String. +#[inline] +fn str_mem(s: &str) -> usize { + s.len() + std::mem::size_of::() +} + #[derive(Clone, Debug)] pub struct PreparedStatements { pub(super) global: Arc>, @@ -31,15 +36,6 @@ pub struct PreparedStatements { pub(super) memory_used: usize, } -impl MemoryUsage for PreparedStatements { - #[inline] - fn memory_usage(&self) -> usize { - self.local.memory_usage() - + std::mem::size_of::() - + std::mem::size_of::>>() - } -} - impl Default for PreparedStatements { fn default() -> Self { Self { @@ -72,16 +68,20 @@ impl PreparedStatements { /// Register prepared statement with the global cache. pub fn insert(&mut self, parse: &mut Parse) { let (_new, name) = { self.global.write().insert(parse) }; - let existed = self.local.insert(parse.name().to_owned(), name.clone()); - self.memory_used = self.memory_usage(); + let key = parse.name(); + let existed = self.local.insert(key.to_owned(), name.clone()); // Client prepared it again because it got an error the first time. // We can check if this is a new statement first, but this is an error // condition which happens very infrequently, so we optimize for the happy path. - if existed.is_some() { - { - self.global.write().decrement(&name); - } + if let Some(old_value) = existed { + // Key already existed, only value changed. + self.memory_used = self.memory_used.saturating_sub(str_mem(&old_value)); + self.memory_used += str_mem(&name); + self.global.write().decrement(&name); + } else { + // New entry. + self.memory_used += str_mem(key) + str_mem(&name); } parse.rename_fast(&name) @@ -90,8 +90,18 @@ impl PreparedStatements { /// Insert statement into the cache bypassing duplicate checks. pub fn insert_anyway(&mut self, parse: &mut Parse) { let name = { self.global.write().insert_anyway(parse) }; - self.local.insert(parse.name().to_owned(), name.clone()); - self.memory_used = self.memory_usage(); + let key = parse.name(); + let existed = self.local.insert(key.to_owned(), name.clone()); + + if let Some(old_value) = existed { + // Key already existed, only value changed. + self.memory_used = self.memory_used.saturating_sub(str_mem(&old_value)); + self.memory_used += str_mem(&name); + } else { + // New entry. + self.memory_used += str_mem(key) + str_mem(&name); + } + parse.rename_fast(&name) } @@ -120,10 +130,10 @@ impl PreparedStatements { /// Remove prepared statement from local cache. pub fn close(&mut self, name: &str) { if let Some(global_name) = self.local.remove(name) { - { - self.global.write().close(&global_name); - } - self.memory_used = self.memory_usage(); + self.global.write().close(&global_name); + self.memory_used = self + .memory_used + .saturating_sub(str_mem(name) + str_mem(&global_name)); } } @@ -138,7 +148,7 @@ impl PreparedStatements { } self.local.clear(); - self.memory_used = self.memory_usage(); + self.memory_used = 0; } /// How much memory is used, approx. diff --git a/pgdog/tests/pgbouncer/pgdog.toml b/pgdog/tests/pgbouncer/pgdog.toml index 04ef5241..911dc0cf 100644 --- a/pgdog/tests/pgbouncer/pgdog.toml +++ b/pgdog/tests/pgbouncer/pgdog.toml @@ -1,6 +1,7 @@ [general] workers = 2 min_pool_size = 0 +prepared_statements_limit = 500 [[databases]] name = "pgdog" From 4ce8451c2b904ae85c0d5dde92b005033c9418ce Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 15 Jan 2026 12:18:51 -0800 Subject: [PATCH 2/2] bump version --- Cargo.lock | 2 +- pgdog/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 690a489a..86b892cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2398,7 +2398,7 @@ dependencies = [ [[package]] name = "pgdog" -version = "0.1.24" +version = "0.1.25" dependencies = [ "arc-swap", "async-trait", diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index c385c612..91cf58a9 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgdog" -version = "0.1.24" +version = "0.1.25" edition = "2021" description = "Modern PostgreSQL proxy, pooler and load balancer." authors = ["PgDog "]