From 2524848c7c41f0c85c1501f58ac70a8358dc6acc Mon Sep 17 00:00:00 2001 From: Dylan Abraham Date: Mon, 3 Nov 2025 10:47:23 -0800 Subject: [PATCH 1/2] allow configuring socket options for raw protosocket servers --- Cargo.lock | 1 + example-telnet/src/main.rs | 3 +- protosocket-server/Cargo.toml | 1 + protosocket-server/src/connection_server.rs | 120 ++++++++++++++++++-- protosocket-server/src/error.rs | 6 + protosocket-server/src/lib.rs | 2 + 6 files changed, 120 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2fb41b3..c0e5f72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -853,6 +853,7 @@ dependencies = [ "futures", "log", "protosocket", + "socket2 0.6.0", "thiserror", "tokio", ] diff --git a/example-telnet/src/main.rs b/example-telnet/src/main.rs index ff68d61..3c0d8c6 100644 --- a/example-telnet/src/main.rs +++ b/example-telnet/src/main.rs @@ -7,7 +7,7 @@ use std::{ use protosocket::{ ConnectionBindings, DeserializeError, Deserializer, MessageReactor, ReactorStatus, Serializer, }; -use protosocket_server::{ProtosocketServer, ServerConnector}; +use protosocket_server::{ProtosocketServer, ProtosocketServerConfig, ServerConnector}; #[allow(clippy::expect_used)] #[tokio::main] @@ -19,6 +19,7 @@ async fn main() -> Result<(), Box> { "127.0.0.1:9000".parse()?, tokio::runtime::Handle::current(), server_context, + ProtosocketServerConfig::default(), ) .await?; diff --git a/protosocket-server/Cargo.toml b/protosocket-server/Cargo.toml index a82a618..d3ee9f7 100644 --- a/protosocket-server/Cargo.toml +++ b/protosocket-server/Cargo.toml @@ -16,5 +16,6 @@ protosocket = { workspace = true } bytes = { workspace = true } futures = { workspace = true } log = { workspace = true } +socket2 = { workspace = true, features = ["all"] } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/protosocket-server/src/connection_server.rs b/protosocket-server/src/connection_server.rs index bdcdc53..d9788fe 100644 --- a/protosocket-server/src/connection_server.rs +++ b/protosocket-server/src/connection_server.rs @@ -1,14 +1,14 @@ +use protosocket::Connection; +use protosocket::ConnectionBindings; +use protosocket::Serializer; +use socket2::TcpKeepalive; +use std::ffi::c_int; use std::future::Future; use std::io::Error; use std::net::SocketAddr; use std::pin::Pin; -use std::sync::Arc; use std::task::Context; use std::task::Poll; - -use protosocket::Connection; -use protosocket::ConnectionBindings; -use protosocket::Serializer; use tokio::sync::mpsc; pub trait ServerConnector: Unpin { @@ -56,24 +56,120 @@ pub struct ProtosocketServer { runtime: tokio::runtime::Handle, } +pub struct ProtosocketSocketConfig { + nodelay: bool, + reuse: bool, + keepalive_duration: Option, + listen_backlog: u32, +} + +impl ProtosocketSocketConfig { + pub fn nodelay(mut self, nodelay: bool) -> Self { + self.nodelay = nodelay; + self + } + pub fn reuse(mut self, reuse: bool) -> Self { + self.reuse = reuse; + self + } + pub fn keepalive_duration(mut self, keepalive_duration: std::time::Duration) -> Self { + self.keepalive_duration = Some(keepalive_duration); + self + } + pub fn listen_backlog(mut self, backlog: u32) -> Self { + self.listen_backlog = backlog; + self + } +} + +impl Default for ProtosocketSocketConfig { + fn default() -> Self { + Self { + nodelay: true, + reuse: true, + keepalive_duration: None, + listen_backlog: 65536, + } + } +} + +pub struct ProtosocketServerConfig { + max_buffer_length: usize, + max_queued_outbound_messages: usize, + buffer_allocation_increment: usize, + socket_config: ProtosocketSocketConfig, +} + +impl ProtosocketServerConfig { + pub fn max_buffer_length(mut self, max_buffer_length: usize) -> Self { + self.max_buffer_length = max_buffer_length; + self + } + pub fn max_queued_outbound_messages(mut self, max_queued_outbound_messages: usize) -> Self { + self.max_queued_outbound_messages = max_queued_outbound_messages; + self + } + pub fn buffer_allocation_increment(mut self, buffer_allocation_increment: usize) -> Self { + self.buffer_allocation_increment = buffer_allocation_increment; + self + } + pub fn socket_config(mut self, config: ProtosocketSocketConfig) -> Self { + self.socket_config = config; + self + } +} + +impl Default for ProtosocketServerConfig { + fn default() -> Self { + Self { + max_buffer_length: 16 * (2 << 20), + max_queued_outbound_messages: 128, + buffer_allocation_increment: 1 << 20, + socket_config: Default::default(), + } + } +} + impl ProtosocketServer { /// Construct a new `ProtosocketServer` listening on the provided address. /// The address will be bound and listened upon with `SO_REUSEADDR` set. /// The server will use the provided runtime to spawn new tcp connections as `protosocket::Connection`s. pub async fn new( - address: std::net::SocketAddr, + address: SocketAddr, runtime: tokio::runtime::Handle, connector: Connector, + config: ProtosocketServerConfig, ) -> crate::Result { - let listener = tokio::net::TcpListener::bind(address) - .await - .map_err(Arc::new)?; + let socket = socket2::Socket::new( + match address { + SocketAddr::V4(_) => socket2::Domain::IPV4, + SocketAddr::V6(_) => socket2::Domain::IPV6, + }, + socket2::Type::STREAM, + None, + )?; + + let mut tcp_keepalive = TcpKeepalive::new(); + if let Some(duration) = config.socket_config.keepalive_duration { + tcp_keepalive = tcp_keepalive.with_time(duration); + } + + socket.set_nonblocking(true)?; + socket.set_tcp_nodelay(config.socket_config.nodelay)?; + socket.set_tcp_keepalive(&tcp_keepalive)?; + socket.set_reuse_port(config.socket_config.reuse)?; + socket.set_reuse_address(config.socket_config.reuse)?; + + socket.bind(&address.into())?; + socket.listen(config.socket_config.listen_backlog as c_int)?; + + let listener = tokio::net::TcpListener::from_std(socket.into())?; Ok(Self { connector, listener, - max_buffer_length: 16 * (2 << 20), - max_queued_outbound_messages: 128, - buffer_allocation_increment: 1 << 20, + max_buffer_length: config.max_buffer_length, + max_queued_outbound_messages: config.max_queued_outbound_messages, + buffer_allocation_increment: config.buffer_allocation_increment, runtime, }) } diff --git a/protosocket-server/src/error.rs b/protosocket-server/src/error.rs index 0604230..a5b7f06 100644 --- a/protosocket-server/src/error.rs +++ b/protosocket-server/src/error.rs @@ -13,3 +13,9 @@ pub enum Error { #[error("Requested resource was dead: ({0})")] Dead(&'static str), } + +impl From for Error { + fn from(e: std::io::Error) -> Self { + Self::IoFailure(Arc::new(e)) + } +} diff --git a/protosocket-server/src/lib.rs b/protosocket-server/src/lib.rs index 22f00dd..ff98a32 100644 --- a/protosocket-server/src/lib.rs +++ b/protosocket-server/src/lib.rs @@ -7,6 +7,8 @@ pub(crate) mod connection_server; pub(crate) mod error; pub use connection_server::ProtosocketServer; +pub use connection_server::ProtosocketServerConfig; +pub use connection_server::ProtosocketSocketConfig; pub use connection_server::ServerConnector; pub use error::Error; pub use error::Result; From 99b1c854d01142f9793eb539a538b5701d7e3243 Mon Sep 17 00:00:00 2001 From: Dylan Abraham Date: Mon, 3 Nov 2025 12:47:27 -0800 Subject: [PATCH 2/2] address feedback --- example-telnet/src/main.rs | 17 ++++----- protosocket-server/src/connection_server.rs | 39 ++++++++++++--------- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/example-telnet/src/main.rs b/example-telnet/src/main.rs index 3c0d8c6..368a311 100644 --- a/example-telnet/src/main.rs +++ b/example-telnet/src/main.rs @@ -7,7 +7,7 @@ use std::{ use protosocket::{ ConnectionBindings, DeserializeError, Deserializer, MessageReactor, ReactorStatus, Serializer, }; -use protosocket_server::{ProtosocketServer, ProtosocketServerConfig, ServerConnector}; +use protosocket_server::{ProtosocketServerConfig, ServerConnector}; #[allow(clippy::expect_used)] #[tokio::main] @@ -15,13 +15,14 @@ async fn main() -> Result<(), Box> { env_logger::init(); let server_context = ServerContext::default(); - let server = ProtosocketServer::new( - "127.0.0.1:9000".parse()?, - tokio::runtime::Handle::current(), - server_context, - ProtosocketServerConfig::default(), - ) - .await?; + let config = ProtosocketServerConfig::default(); + let server = config + .bind_tcp( + "127.0.0.1:9000".parse()?, + server_context, + tokio::runtime::Handle::current(), + ) + .await?; tokio::spawn(server).await??; Ok(()) diff --git a/protosocket-server/src/connection_server.rs b/protosocket-server/src/connection_server.rs index d9788fe..aa9d2e1 100644 --- a/protosocket-server/src/connection_server.rs +++ b/protosocket-server/src/connection_server.rs @@ -47,6 +47,8 @@ pub trait ServerConnector: Unpin { /// connection - you decide what those mean for you! /// /// A ProtosocketServer is a future: You spawn it and it runs forever. +/// +/// Construct a new ProtosocketServer by creating a ProtosocketServerConfig and calling the {{bind_tcp}} method. pub struct ProtosocketServer { connector: Connector, listener: tokio::net::TcpListener, @@ -56,6 +58,7 @@ pub struct ProtosocketServer { runtime: tokio::runtime::Handle, } +/// Socket configuration options for a ProtosocketServer. pub struct ProtosocketSocketConfig { nodelay: bool, reuse: bool, @@ -64,18 +67,22 @@ pub struct ProtosocketSocketConfig { } impl ProtosocketSocketConfig { + /// Whether nodelay should be set on the socket. pub fn nodelay(mut self, nodelay: bool) -> Self { self.nodelay = nodelay; self } + /// Whether reuseaddr and reuseport should be set on the socket. pub fn reuse(mut self, reuse: bool) -> Self { self.reuse = reuse; self } + /// The keepalive window to be set on the socket. pub fn keepalive_duration(mut self, keepalive_duration: std::time::Duration) -> Self { self.keepalive_duration = Some(keepalive_duration); self } + /// The backlog to be set on the socket when invoking `listen`. pub fn listen_backlog(mut self, backlog: u32) -> Self { self.listen_backlog = backlog; self @@ -101,22 +108,37 @@ pub struct ProtosocketServerConfig { } impl ProtosocketServerConfig { + /// The maximum buffer length per connection on this server. pub fn max_buffer_length(mut self, max_buffer_length: usize) -> Self { self.max_buffer_length = max_buffer_length; self } + /// The maximum number of queued outbound messages per connection on this server. pub fn max_queued_outbound_messages(mut self, max_queued_outbound_messages: usize) -> Self { self.max_queued_outbound_messages = max_queued_outbound_messages; self } + /// The step size for allocating additional memory for connection buffers on this server. pub fn buffer_allocation_increment(mut self, buffer_allocation_increment: usize) -> Self { self.buffer_allocation_increment = buffer_allocation_increment; self } + /// The tcp socket configuration options for this server. pub fn socket_config(mut self, config: ProtosocketSocketConfig) -> Self { self.socket_config = config; self } + + /// Binds a tcp listener to the given address and returns a ProtosocketServer with this configuration. + /// After binding, you must await the returned server future to process requests. + pub async fn bind_tcp( + self, + address: SocketAddr, + connector: Connector, + runtime: tokio::runtime::Handle, + ) -> crate::Result> { + ProtosocketServer::new(address, runtime, connector, self).await + } } impl Default for ProtosocketServerConfig { @@ -134,7 +156,7 @@ impl ProtosocketServer { /// Construct a new `ProtosocketServer` listening on the provided address. /// The address will be bound and listened upon with `SO_REUSEADDR` set. /// The server will use the provided runtime to spawn new tcp connections as `protosocket::Connection`s. - pub async fn new( + async fn new( address: SocketAddr, runtime: tokio::runtime::Handle, connector: Connector, @@ -173,21 +195,6 @@ impl ProtosocketServer { runtime, }) } - - /// Set the maximum buffer length for connections created by this server after the setting is applied. - pub fn set_max_buffer_length(&mut self, max_buffer_length: usize) { - self.max_buffer_length = max_buffer_length; - } - - /// Set the maximum queued outbound messages for connections created by this server after the setting is applied. - pub fn set_max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) { - self.max_queued_outbound_messages = max_queued_outbound_messages; - } - - /// Set the step size for allocating additional memory for connection buffers created by this server after the setting is applied. - pub fn set_buffer_allocation_increment(&mut self, buffer_allocation_increment: usize) { - self.buffer_allocation_increment = buffer_allocation_increment; - } } impl Future for ProtosocketServer {