Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions example-telnet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ use std::{
use protosocket::{
ConnectionBindings, DeserializeError, Deserializer, MessageReactor, ReactorStatus, Serializer,
};
use protosocket_server::{ProtosocketServer, ServerConnector};
use protosocket_server::{ProtosocketServerConfig, ServerConnector};

#[allow(clippy::expect_used)]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();

let server_context = ServerContext::default();
let server = ProtosocketServer::new(
"127.0.0.1:9000".parse()?,
tokio::runtime::Handle::current(),
server_context,
)
.await?;
let config = ProtosocketServerConfig::default();
let server = config
.bind_tcp(
"127.0.0.1:9000".parse()?,
server_context,
tokio::runtime::Handle::current(),
)
.await?;

tokio::spawn(server).await??;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions protosocket-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ protosocket = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
socket2 = { workspace = true, features = ["all"] }
thiserror = { workspace = true }
tokio = { workspace = true }
159 changes: 131 additions & 28 deletions protosocket-server/src/connection_server.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use protosocket::Connection;
use protosocket::ConnectionBindings;
use protosocket::Serializer;
use socket2::TcpKeepalive;
use std::ffi::c_int;
use std::future::Future;
use std::io::Error;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use protosocket::Connection;
use protosocket::ConnectionBindings;
use protosocket::Serializer;
use tokio::sync::mpsc;

pub trait ServerConnector: Unpin {
Expand Down Expand Up @@ -47,6 +47,8 @@ pub trait ServerConnector: Unpin {
/// connection - you decide what those mean for you!
///
/// A ProtosocketServer is a future: You spawn it and it runs forever.
///
/// Construct a new ProtosocketServer by creating a ProtosocketServerConfig and calling the {{bind_tcp}} method.
pub struct ProtosocketServer<Connector: ServerConnector> {
connector: Connector,
listener: tokio::net::TcpListener,
Expand All @@ -56,42 +58,143 @@ pub struct ProtosocketServer<Connector: ServerConnector> {
runtime: tokio::runtime::Handle,
}

/// Socket configuration options for a ProtosocketServer.
pub struct ProtosocketSocketConfig {
nodelay: bool,
reuse: bool,
keepalive_duration: Option<std::time::Duration>,
listen_backlog: u32,
}

impl ProtosocketSocketConfig {
/// Whether nodelay should be set on the socket.
pub fn nodelay(mut self, nodelay: bool) -> Self {
self.nodelay = nodelay;
self
}
/// Whether reuseaddr and reuseport should be set on the socket.
pub fn reuse(mut self, reuse: bool) -> Self {
self.reuse = reuse;
self
}
/// The keepalive window to be set on the socket.
pub fn keepalive_duration(mut self, keepalive_duration: std::time::Duration) -> Self {
self.keepalive_duration = Some(keepalive_duration);
self
}
/// The backlog to be set on the socket when invoking `listen`.
pub fn listen_backlog(mut self, backlog: u32) -> Self {
self.listen_backlog = backlog;
self
}
}

impl Default for ProtosocketSocketConfig {
fn default() -> Self {
Self {
nodelay: true,
reuse: true,
keepalive_duration: None,
listen_backlog: 65536,
}
}
}

pub struct ProtosocketServerConfig {
max_buffer_length: usize,
max_queued_outbound_messages: usize,
buffer_allocation_increment: usize,
socket_config: ProtosocketSocketConfig,
}

impl ProtosocketServerConfig {
/// The maximum buffer length per connection on this server.
pub fn max_buffer_length(mut self, max_buffer_length: usize) -> Self {
self.max_buffer_length = max_buffer_length;
self
}
/// The maximum number of queued outbound messages per connection on this server.
pub fn max_queued_outbound_messages(mut self, max_queued_outbound_messages: usize) -> Self {
self.max_queued_outbound_messages = max_queued_outbound_messages;
self
}
/// The step size for allocating additional memory for connection buffers on this server.
pub fn buffer_allocation_increment(mut self, buffer_allocation_increment: usize) -> Self {
self.buffer_allocation_increment = buffer_allocation_increment;
self
}
/// The tcp socket configuration options for this server.
pub fn socket_config(mut self, config: ProtosocketSocketConfig) -> Self {
self.socket_config = config;
self
}

/// Binds a tcp listener to the given address and returns a ProtosocketServer with this configuration.
/// After binding, you must await the returned server future to process requests.
pub async fn bind_tcp<Connector: ServerConnector>(
self,
address: SocketAddr,
connector: Connector,
runtime: tokio::runtime::Handle,
) -> crate::Result<ProtosocketServer<Connector>> {
ProtosocketServer::new(address, runtime, connector, self).await
}
}

impl Default for ProtosocketServerConfig {
fn default() -> Self {
Self {
max_buffer_length: 16 * (2 << 20),
max_queued_outbound_messages: 128,
buffer_allocation_increment: 1 << 20,
socket_config: Default::default(),
}
}
}

impl<Connector: ServerConnector> ProtosocketServer<Connector> {
/// Construct a new `ProtosocketServer` listening on the provided address.
/// The address will be bound and listened upon with `SO_REUSEADDR` set.
/// The server will use the provided runtime to spawn new tcp connections as `protosocket::Connection`s.
pub async fn new(
address: std::net::SocketAddr,
async fn new(
address: SocketAddr,
runtime: tokio::runtime::Handle,
connector: Connector,
config: ProtosocketServerConfig,
) -> crate::Result<Self> {
let listener = tokio::net::TcpListener::bind(address)
.await
.map_err(Arc::new)?;
let socket = socket2::Socket::new(
match address {
SocketAddr::V4(_) => socket2::Domain::IPV4,
SocketAddr::V6(_) => socket2::Domain::IPV6,
},
socket2::Type::STREAM,
None,
)?;

let mut tcp_keepalive = TcpKeepalive::new();
if let Some(duration) = config.socket_config.keepalive_duration {
tcp_keepalive = tcp_keepalive.with_time(duration);
}

socket.set_nonblocking(true)?;
socket.set_tcp_nodelay(config.socket_config.nodelay)?;
socket.set_tcp_keepalive(&tcp_keepalive)?;
socket.set_reuse_port(config.socket_config.reuse)?;
socket.set_reuse_address(config.socket_config.reuse)?;

socket.bind(&address.into())?;
socket.listen(config.socket_config.listen_backlog as c_int)?;

let listener = tokio::net::TcpListener::from_std(socket.into())?;
Ok(Self {
connector,
listener,
max_buffer_length: 16 * (2 << 20),
max_queued_outbound_messages: 128,
buffer_allocation_increment: 1 << 20,
max_buffer_length: config.max_buffer_length,
max_queued_outbound_messages: config.max_queued_outbound_messages,
buffer_allocation_increment: config.buffer_allocation_increment,
runtime,
})
}

/// Set the maximum buffer length for connections created by this server after the setting is applied.
pub fn set_max_buffer_length(&mut self, max_buffer_length: usize) {
self.max_buffer_length = max_buffer_length;
}

/// Set the maximum queued outbound messages for connections created by this server after the setting is applied.
pub fn set_max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
self.max_queued_outbound_messages = max_queued_outbound_messages;
}

/// Set the step size for allocating additional memory for connection buffers created by this server after the setting is applied.
pub fn set_buffer_allocation_increment(&mut self, buffer_allocation_increment: usize) {
self.buffer_allocation_increment = buffer_allocation_increment;
}
}

impl<Connector: ServerConnector> Future for ProtosocketServer<Connector> {
Expand Down
6 changes: 6 additions & 0 deletions protosocket-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ pub enum Error {
#[error("Requested resource was dead: ({0})")]
Dead(&'static str),
}

impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Self::IoFailure(Arc::new(e))
}
}
2 changes: 2 additions & 0 deletions protosocket-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub(crate) mod connection_server;
pub(crate) mod error;

pub use connection_server::ProtosocketServer;
pub use connection_server::ProtosocketServerConfig;
pub use connection_server::ProtosocketSocketConfig;
pub use connection_server::ServerConnector;
pub use error::Error;
pub use error::Result;