Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example-messagepack/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn run_main() -> Result<(), Box<dyn std::error::Error>> {
1 << 20,
128,
64 << 10,
None,
)
.await?;
server.set_max_queued_outbound_messages(512);
Expand Down
1 change: 1 addition & 0 deletions example-proto-tls/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async fn run_main() -> Result<(), Box<dyn std::error::Error>> {
1 << 20,
128,
64 << 10,
None,
)
.await?;
server.set_max_queued_outbound_messages(512);
Expand Down
1 change: 1 addition & 0 deletions example-proto/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async fn run_main() -> Result<(), Box<dyn std::error::Error>> {
1 << 20,
128,
64 << 10,
None,
)
.await?;
server.set_max_queued_outbound_messages(512);
Expand Down
34 changes: 30 additions & 4 deletions protosocket-rpc/src/client/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{future::Future, net::SocketAddr, sync::Arc};

use protosocket::Connection;
use socket2::TcpKeepalive;
use std::{future::Future, net::SocketAddr, sync::Arc};
use tokio::{net::TcpStream, sync::mpsc};
use tokio_rustls::rustls::pki_types::ServerName;

Expand Down Expand Up @@ -172,6 +172,7 @@ pub struct Configuration<TStreamConnector> {
max_buffer_length: usize,
buffer_allocation_increment: usize,
max_queued_outbound_messages: usize,
tcp_keepalive_duration: Option<std::time::Duration>,
stream_connector: TStreamConnector,
}

Expand All @@ -185,6 +186,7 @@ where
max_buffer_length: 4 * (1 << 20), // 4 MiB
buffer_allocation_increment: 1 << 20,
max_queued_outbound_messages: 256,
tcp_keepalive_duration: None,
stream_connector,
}
}
Expand All @@ -209,6 +211,13 @@ where
pub fn buffer_allocation_increment(&mut self, buffer_allocation_increment: usize) {
self.buffer_allocation_increment = buffer_allocation_increment;
}

/// The duration to set for tcp_keepalive on the underlying socket.
///
/// Default: None
pub fn tcp_keepalive_duration(&mut self, tcp_keepalive_duration: Option<std::time::Duration>) {
self.tcp_keepalive_duration = tcp_keepalive_duration;
}
}

/// Connect a new protosocket rpc client to a server
Expand All @@ -233,8 +242,25 @@ where
{
log::trace!("new client {address}, {configuration:?}");

let stream = tokio::net::TcpStream::connect(address).await?;
stream.set_nodelay(true)?;
let socket = socket2::Socket::new(
match address {
SocketAddr::V4(_) => socket2::Domain::IPV4,
SocketAddr::V6(_) => socket2::Domain::IPV6,
},
socket2::Type::STREAM,
None,
)?;

let mut tcp_keepalive = TcpKeepalive::new();
if let Some(duration) = configuration.tcp_keepalive_duration {
tcp_keepalive = tcp_keepalive.with_time(duration);
}

socket.set_nonblocking(true)?;
socket.set_tcp_nodelay(true)?;
socket.set_tcp_keepalive(&tcp_keepalive)?;

let stream = TcpStream::from_std(socket.into())?;

let message_reactor: RpcCompletionReactor<
Deserializer::Message,
Expand Down
13 changes: 10 additions & 3 deletions protosocket-rpc/src/server/socket_server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use protosocket::Connection;
use socket2::TcpKeepalive;
use std::ffi::c_int;
use std::future::Future;
use std::io::Error;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use protosocket::Connection;
use std::time::Duration;
use tokio::sync::mpsc;

use super::connection_server::RpcConnectionServer;
Expand Down Expand Up @@ -44,6 +45,7 @@ where
buffer_allocation_increment: usize,
max_queued_outbound_messages: usize,
listen_backlog: u32,
tcp_keepalive_duration: Option<Duration>,
) -> crate::Result<Self> {
let socket = socket2::Socket::new(
match address {
Expand All @@ -54,9 +56,14 @@ where
None,
)?;

let mut tcp_keepalive = TcpKeepalive::new();
if let Some(duration) = tcp_keepalive_duration {
tcp_keepalive = tcp_keepalive.with_time(duration);
}

socket.set_nonblocking(true)?;
socket.set_tcp_nodelay(true)?;
socket.set_keepalive(true)?;
socket.set_tcp_keepalive(&tcp_keepalive)?;
socket.set_reuse_port(true)?;
socket.set_reuse_address(true)?;

Expand Down
Loading