Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ serde_json = "*"
ordered-float = "4.2"
tokio-rustls = "0.26"
libc = "0.2"
rand = "0.9"
1 change: 1 addition & 0 deletions integration/rust/tests/bench/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod prepared_stress;
47 changes: 47 additions & 0 deletions integration/rust/tests/bench/prepared_stress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use sqlx::Connection;
use tokio::sync::mpsc::{Sender, channel};
use tokio::{select, spawn};

#[tokio::test]
#[ignore]
async fn slam_with_prepared() -> Result<(), Box<dyn std::error::Error>> {
let conns = 1000;
let statements = 50_000;
let mut signals: Vec<Sender<()>> = vec![];
let mut tasks = vec![];

for _ in 0..conns {
let (tx, mut rx) = channel(1);
signals.push(tx);

let handle = spawn(async move {
let mut conn =
sqlx::PgConnection::connect("postgres://pgdog:pgdog@127.0.0.1:6432/pgdog").await?;

loop {
let r = rand::random_range(0..statements);

let query = format!("SELECT 1, 2, 3, 4, $1, 'apples and oranges', 'blah', {}", r);
let query = sqlx::query(query.as_str()).bind(r).execute(&mut conn);

select! {
res = query => {
res?;
}

_ = rx.recv() => { break; }
}
}

Ok::<(), sqlx::Error>(())
});

tasks.push(handle);
}

for task in tasks {
task.await??;
}

Ok(())
}
1 change: 1 addition & 0 deletions integration/rust/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod bench;
pub mod integration;
pub mod sqlx;
pub mod stats;
Expand Down
2 changes: 1 addition & 1 deletion pgdog/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgdog"
version = "0.1.24"
version = "0.1.25"
edition = "2021"
description = "Modern PostgreSQL proxy, pooler and load balancer."
authors = ["PgDog <hi@pgdog.dev>"]
Expand Down
37 changes: 15 additions & 22 deletions pgdog/src/backend/prepared_statements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::{
Close, CloseComplete, FromBytes, Message, ParseComplete, Protocol, ProtocolMessage,
ToBytes,
},
stats::memory::MemoryUsage,
};

use super::Error;
Expand All @@ -19,6 +18,12 @@ use super::{
state::ExecutionCode,
};

/// Approximate memory used by a String.
#[inline]
fn str_mem(s: &str) -> usize {
s.len() + std::mem::size_of::<String>()
}

#[derive(Debug, Clone)]
pub enum HandleResult {
Forward,
Expand All @@ -44,18 +49,6 @@ pub struct PreparedStatements {
memory_used: usize,
}

impl MemoryUsage for PreparedStatements {
#[inline]
fn memory_usage(&self) -> usize {
self.local_cache.memory_usage()
+ self.parses.memory_usage()
+ self.describes.memory_usage()
+ self.capacity.memory_usage()
+ std::mem::size_of::<Arc<RwLock<GlobalCache>>>()
+ self.state.memory_usage()
}
}

impl Default for PreparedStatements {
fn default() -> Self {
Self::new()
Expand Down Expand Up @@ -287,8 +280,8 @@ impl PreparedStatements {

/// Indicate this statement is prepared on the connection.
pub fn prepared(&mut self, name: &str) {
self.memory_used += str_mem(name);
self.local_cache.push(name.to_owned(), ());
self.memory_used = self.memory_usage();
}

/// How much memory is used by this structure, approx.
Expand Down Expand Up @@ -321,16 +314,19 @@ impl PreparedStatements {
/// This should only be done when a statement has been closed,
/// or failed to parse.
pub(crate) fn remove(&mut self, name: &str) -> bool {
let exists = self.local_cache.pop(name).is_some();
self.memory_used = self.memory_usage();
exists
if self.local_cache.pop(name).is_some() {
self.memory_used = self.memory_used.saturating_sub(str_mem(name));
true
} else {
false
}
}

/// Indicate all prepared statements have been removed
/// from the server connection.
pub fn clear(&mut self) {
self.local_cache.clear();
self.memory_used = self.memory_usage();
self.memory_used = 0;
}

/// Get current extended protocol state.
Expand Down Expand Up @@ -366,13 +362,10 @@ impl PreparedStatements {

if let Some((name, _)) = candidate {
close.push(Close::named(&name));
self.memory_used = self.memory_used.saturating_sub(str_mem(&name));
}
}

if !close.is_empty() {
self.memory_used = self.memory_usage();
}

close
}
}
56 changes: 33 additions & 23 deletions pgdog/src/frontend/prepared_statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use tracing::debug;
use crate::{
config::{config, PreparedStatements as PreparedStatementsLevel},
net::{Parse, ProtocolMessage},
stats::memory::MemoryUsage,
};

pub mod error;
Expand All @@ -23,6 +22,12 @@ pub use rewrite::Rewrite;

static CACHE: Lazy<PreparedStatements> = Lazy::new(PreparedStatements::default);

/// Approximate memory used by a String.
#[inline]
fn str_mem(s: &str) -> usize {
s.len() + std::mem::size_of::<String>()
}

#[derive(Clone, Debug)]
pub struct PreparedStatements {
pub(super) global: Arc<RwLock<GlobalCache>>,
Expand All @@ -31,15 +36,6 @@ pub struct PreparedStatements {
pub(super) memory_used: usize,
}

impl MemoryUsage for PreparedStatements {
#[inline]
fn memory_usage(&self) -> usize {
self.local.memory_usage()
+ std::mem::size_of::<PreparedStatementsLevel>()
+ std::mem::size_of::<Arc<RwLock<GlobalCache>>>()
}
}

impl Default for PreparedStatements {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -72,16 +68,20 @@ impl PreparedStatements {
/// Register prepared statement with the global cache.
pub fn insert(&mut self, parse: &mut Parse) {
let (_new, name) = { self.global.write().insert(parse) };
let existed = self.local.insert(parse.name().to_owned(), name.clone());
self.memory_used = self.memory_usage();
let key = parse.name();
let existed = self.local.insert(key.to_owned(), name.clone());

// Client prepared it again because it got an error the first time.
// We can check if this is a new statement first, but this is an error
// condition which happens very infrequently, so we optimize for the happy path.
if existed.is_some() {
{
self.global.write().decrement(&name);
}
if let Some(old_value) = existed {
// Key already existed, only value changed.
self.memory_used = self.memory_used.saturating_sub(str_mem(&old_value));
self.memory_used += str_mem(&name);
self.global.write().decrement(&name);
} else {
// New entry.
self.memory_used += str_mem(key) + str_mem(&name);
}

parse.rename_fast(&name)
Expand All @@ -90,8 +90,18 @@ impl PreparedStatements {
/// Insert statement into the cache bypassing duplicate checks.
pub fn insert_anyway(&mut self, parse: &mut Parse) {
let name = { self.global.write().insert_anyway(parse) };
self.local.insert(parse.name().to_owned(), name.clone());
self.memory_used = self.memory_usage();
let key = parse.name();
let existed = self.local.insert(key.to_owned(), name.clone());

if let Some(old_value) = existed {
// Key already existed, only value changed.
self.memory_used = self.memory_used.saturating_sub(str_mem(&old_value));
self.memory_used += str_mem(&name);
} else {
// New entry.
self.memory_used += str_mem(key) + str_mem(&name);
}

parse.rename_fast(&name)
}

Expand Down Expand Up @@ -120,10 +130,10 @@ impl PreparedStatements {
/// Remove prepared statement from local cache.
pub fn close(&mut self, name: &str) {
if let Some(global_name) = self.local.remove(name) {
{
self.global.write().close(&global_name);
}
self.memory_used = self.memory_usage();
self.global.write().close(&global_name);
self.memory_used = self
.memory_used
.saturating_sub(str_mem(name) + str_mem(&global_name));
}
}

Expand All @@ -138,7 +148,7 @@ impl PreparedStatements {
}

self.local.clear();
self.memory_used = self.memory_usage();
self.memory_used = 0;
}

/// How much memory is used, approx.
Expand Down
1 change: 1 addition & 0 deletions pgdog/tests/pgbouncer/pgdog.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[general]
workers = 2
min_pool_size = 0
prepared_statements_limit = 500

[[databases]]
name = "pgdog"
Expand Down
Loading