From 59196e067b630c3fb4b1fb231a04263b12d9df0d Mon Sep 17 00:00:00 2001 From: Tom Brzozowski Date: Sat, 20 Dec 2025 16:52:33 +0000 Subject: [PATCH 1/7] placeholder --- Cargo.toml | 1 + src/stream/ktls.rs | 1 + src/stream/mod.rs | 2 ++ 3 files changed, 4 insertions(+) create mode 100644 src/stream/ktls.rs diff --git a/Cargo.toml b/Cargo.toml index ff33a01..3d0eade 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ mio = ["dep:mio"] rustls-native = ["rustls", "rustls-native-certs"] rustls-webpki = ["rustls", "webpki-roots"] openssl = ["dep:openssl", "dep:openssl-probe"] +ktls = ["openssl"] http = ["dep:http", "httparse", "memchr", "itoa"] ws = ["rand", "base64", "dep:http", "httparse"] ext = [] diff --git a/src/stream/ktls.rs b/src/stream/ktls.rs new file mode 100644 index 0000000..f11e14e --- /dev/null +++ b/src/stream/ktls.rs @@ -0,0 +1 @@ +pub struct KtlSteam; \ No newline at end of file diff --git a/src/stream/mod.rs b/src/stream/mod.rs index d1e93f1..5b8bf81 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -18,6 +18,8 @@ pub mod replay; pub mod tcp; #[cfg(any(feature = "rustls", feature = "openssl"))] pub mod tls; +#[cfg(feature = "ktls")] +pub mod ktls; #[cfg(target_os = "linux")] const EINPROGRESS: i32 = 115; From e596fc7f8b2c8714e3edb2b4ca2504458db571a5 Mon Sep 17 00:00:00 2001 From: Tom Brzozowski Date: Wed, 24 Dec 2025 12:55:19 +0000 Subject: [PATCH 2/7] ktls stream --- Cargo.toml | 4 + examples/ws_client_ktls.rs | 86 +++++++++++ examples/ws_client_ktls_2.rs | 40 ++++++ src/stream/ktls.rs | 270 ++++++++++++++++++++++++++++++++++- src/stream/tcp.rs | 7 + 5 files changed, 406 insertions(+), 1 deletion(-) create mode 100644 examples/ws_client_ktls.rs create mode 100644 examples/ws_client_ktls_2.rs diff --git a/Cargo.toml b/Cargo.toml index 3d0eade..ff1d72a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,12 +38,16 @@ httparse = { version = "1.8.0", optional = true } http = { version = "1.0.0", optional = true } openssl = { version = "0.10.70", features = ["vendored"], optional = true } openssl-probe = { version = "0.1.6", optional = true } +openssl-ktls = { version = "0.2.3", features = ["vendored"] } memchr = { version = "2.7.4", optional = true } itoa = { version = "1.0.15", optional = true } smallvec = "1.15.0" smallstr = "0.3.1" core_affinity = "0.8.3" log = "0.4.20" +# KTLS +openssl-sys = "0.9.105" +foreign-types-shared = "0.1.1" [dependencies.webpki-roots] version = "0.26.0" diff --git a/examples/ws_client_ktls.rs b/examples/ws_client_ktls.rs new file mode 100644 index 0000000..7a8d121 --- /dev/null +++ b/examples/ws_client_ktls.rs @@ -0,0 +1,86 @@ +use boomnet::stream::{ConnectionInfo, ConnectionInfoProvider}; +use boomnet::ws::{Websocket, WebsocketFrame}; +use openssl::ssl::ErrorCode; +use std::io; +use std::io::{ErrorKind, Read, Write}; + +struct KtlsStream { + inner: openssl_ktls::SslStream, + conn: ConnectionInfo, +} + +impl Read for KtlsStream { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.inner.read(buf) + } +} + +impl Write for KtlsStream { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.inner.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } +} + +impl ConnectionInfoProvider for KtlsStream { + fn connection_info(&self) -> &ConnectionInfo { + &self.conn + } +} + +impl KtlsStream { + fn connect(&self) -> io::Result<()> { + match self.inner.connect() { + Ok(_) => Ok(()), + Err(err) if err.code() == ErrorCode::WANT_READ => Err(ErrorKind::WouldBlock.into()), + Err(err) if err.code() == ErrorCode::WANT_WRITE => Err(ErrorKind::WouldBlock.into()), + Err(err) => Err(io::Error::other(err)), + } + } +} + +fn main() -> anyhow::Result<()> { + let mut builder = openssl::ssl::SslConnector::builder(openssl::ssl::SslMethod::tls_client())?; + builder.set_options(openssl_ktls::option::SSL_OP_ENABLE_KTLS); + // connector.set_cipher_list(openssl_ktls::option::ECDHE_RSA_AES128_GCM_SHA256)?; + // connector.set_ciphersuites("TLS_AES_256_GCM_SHA384")?; + let connector = builder.build(); + + let tcp_stream = std::net::TcpStream::connect("stream.crypto.com:443")?; + let ssl = connector.configure()?.into_ssl("stream.crypto.com")?; + + let ktls_stream = KtlsStream { + inner: openssl_ktls::SslStream::new(tcp_stream, ssl), + conn: ("stream.crypto.com", 443).into(), + }; + + ktls_stream.connect()?; + + assert!(ktls_stream.inner.ktls_recv_enabled()); + assert!(ktls_stream.inner.ktls_send_enabled()); + + // let mut ws = Websocket::new(ktls_stream, "/ws"); + let mut ws = Websocket::new(ktls_stream, "/exchange/v1/market"); + + // ws.send_text(true, Some(b"{\"method\":\"SUBSCRIBE\",\"params\":[\"btcusdt@trade\"],\"id\":1}"))?; + ws.send_text(true, Some(br#"{"id":1,"method":"subscribe","params":{"channels":["trade.BTCUSD-PERP"]}}"#))?; + + // let idle = IdleStrategy::Sleep(Duration::from_millis(1)); + + let mut log = false; + + loop { + for frame in ws.read_batch()? { + if let WebsocketFrame::Text(fin, body) = frame? { + if !log { + println!("({fin}) {}", String::from_utf8_lossy(body)); + // log = true; + } + } + } + // idle.idle(0); + } +} diff --git a/examples/ws_client_ktls_2.rs b/examples/ws_client_ktls_2.rs new file mode 100644 index 0000000..91f170b --- /dev/null +++ b/examples/ws_client_ktls_2.rs @@ -0,0 +1,40 @@ +use boomnet::ws::{Websocket, WebsocketFrame}; +use boomnet::stream::ktls::KtlSteam; +use boomnet::stream::tcp::TcpStream; + +fn main() -> anyhow::Result<()> { + let mut builder = openssl::ssl::SslConnector::builder(openssl::ssl::SslMethod::tls_client())?; + builder.set_options(openssl_ktls::option::SSL_OP_ENABLE_KTLS); + let connector = builder.build(); + + let tcp_stream = TcpStream::try_from(("stream.crypto.com", 443))?; + let ssl = connector.configure()?.into_ssl("stream.crypto.com")?; + let ktls_stream = KtlSteam::new(tcp_stream, ssl); + + ktls_stream.blocking_connect()?; + + println!("Connected!"); + + assert!(ktls_stream.ktls_recv_enabled()); + assert!(ktls_stream.ktls_send_enabled()); + + let mut ws = Websocket::new(ktls_stream, "/exchange/v1/market"); + + ws.send_text(true, Some(br#"{"id":1,"method":"subscribe","params":{"channels":["trade.BTCUSD-PERP"]}}"#))?; + + // let idle = IdleStrategy::Sleep(Duration::from_millis(1)); + + let mut log = false; + + loop { + for frame in ws.read_batch()? { + if let WebsocketFrame::Text(fin, body) = frame? { + if !log { + println!("({fin}) {}", String::from_utf8_lossy(body)); + // log = true; + } + } + } + // idle.idle(0); + } +} diff --git a/src/stream/ktls.rs b/src/stream/ktls.rs index f11e14e..b069c32 100644 --- a/src/stream/ktls.rs +++ b/src/stream/ktls.rs @@ -1 +1,269 @@ -pub struct KtlSteam; \ No newline at end of file +use crate::stream::{ConnectionInfo, ConnectionInfoProvider}; +use foreign_types_shared::ForeignType; +use openssl::ssl::ErrorCode; +use std::ffi::c_int; +use std::io; +use std::io::{ErrorKind, Read, Write}; +use std::os::fd::AsRawFd; + +const BIO_NOCLOSE: c_int = 0x00; + +pub struct KtlSteam { + _stream: S, + ssl: openssl::ssl::Ssl, +} + +impl KtlSteam { + /// Create a new SslStream from a tcp stream and SSL object. + pub fn new(stream: S, ssl: openssl::ssl::Ssl) -> Self + where + S: AsRawFd, + { + let sock_bio = unsafe { openssl_sys::BIO_new_socket(stream.as_raw_fd(), BIO_NOCLOSE) }; + assert!(!sock_bio.is_null(), "Failed to create socket BIO"); + unsafe { + openssl_sys::SSL_set_bio(ssl.as_ptr(), sock_bio, sock_bio); + } + KtlSteam { _stream: stream, ssl } + } + + pub fn connect(&self) -> Result<(), error::Error> { + let result = unsafe { openssl_sys::SSL_connect(self.ssl.as_ptr()) }; + if result <= 0 { + Err(error::Error::make(result, &self.ssl)) + } else { + Ok(()) + } + } + + pub fn blocking_connect(&self) -> io::Result<()> { + loop { + match self.connect() { + Ok(_) => break, + Err(err) if err.code() == ErrorCode::WANT_READ => {} + Err(err) if err.code() == ErrorCode::WANT_WRITE => {} + Err(err) => return Err(io::Error::other(err)), + } + } + Ok(()) + } + + pub fn ktls_send_enabled(&self) -> bool { + unsafe { + let wbio = openssl_sys::SSL_get_wbio(self.ssl.as_ptr()); + ffi::BIO_get_ktls_send(wbio) != 0 + } + } + + pub fn ktls_recv_enabled(&self) -> bool { + unsafe { + let rbio = openssl_sys::SSL_get_rbio(self.ssl.as_ptr()); + ffi::BIO_get_ktls_recv(rbio) != 0 + } + } + + #[inline] + pub fn ssl_read(&mut self, buf: &mut [u8]) -> Result { + unsafe { + let len = + openssl_sys::SSL_read(self.ssl.as_ptr(), buf.as_mut_ptr() as *mut _, buf.len().try_into().unwrap()); + if len < 0 { + Err(error::Error::make(len, &self.ssl)) + } else { + Ok(len as usize) + } + } + } + + #[inline] + pub fn ssl_write(&mut self, buf: &[u8]) -> Result { + if buf.is_empty() { + return Ok(0); + } + unsafe { + let len = + openssl_sys::SSL_write(self.ssl.as_ptr(), buf.as_ptr() as *const _, buf.len().try_into().unwrap()); + if len < 0 { + Err(error::Error::make(len, &self.ssl)) + } else { + Ok(len as usize) + } + } + } +} + +impl ConnectionInfoProvider for KtlSteam { + fn connection_info(&self) -> &ConnectionInfo { + self._stream.connection_info() + } +} + +impl Read for KtlSteam { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + match self.ssl_read(buf) { + Ok(read) => Ok(read), + Err(err) if err.code() == ErrorCode::WANT_READ => Err(ErrorKind::WouldBlock.into()), + Err(err) if err.code() == ErrorCode::WANT_WRITE => Err(ErrorKind::WouldBlock.into()), + Err(err) => Err(io::Error::other(err)), + } + } +} + +impl Write for KtlSteam { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + match self.ssl_write(buf) { + Ok(read) => Ok(read), + Err(err) if err.code() == ErrorCode::WANT_READ => Err(ErrorKind::WouldBlock.into()), + Err(err) if err.code() == ErrorCode::WANT_WRITE => Err(ErrorKind::WouldBlock.into()), + Err(err) => Err(io::Error::other(err)), + } + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +pub mod error { + use std::{error, ffi::c_int, fmt, io}; + + use openssl::{error::ErrorStack, ssl::ErrorCode}; + + #[derive(Debug)] + pub(crate) enum InnerError { + Io(io::Error), + Ssl(ErrorStack), + } + + /// An SSL error. + #[derive(Debug)] + pub struct Error { + pub(crate) code: ErrorCode, + pub(crate) cause: Option, + } + + impl Error { + pub fn code(&self) -> ErrorCode { + self.code + } + + pub fn io_error(&self) -> Option<&io::Error> { + match self.cause { + Some(InnerError::Io(ref e)) => Some(e), + _ => None, + } + } + + pub fn into_io_error(self) -> Result { + match self.cause { + Some(InnerError::Io(e)) => Ok(e), + _ => Err(self), + } + } + + pub fn ssl_error(&self) -> Option<&ErrorStack> { + match self.cause { + Some(InnerError::Ssl(ref e)) => Some(e), + _ => None, + } + } + + pub(crate) fn make(ret: c_int, ssl: &openssl::ssl::SslRef) -> Self { + use foreign_types_shared::ForeignTypeRef; + let code = unsafe { ErrorCode::from_raw(openssl_sys::SSL_get_error(ssl.as_ptr(), ret)) }; + + let cause = match code { + ErrorCode::SSL => Some(InnerError::Ssl(ErrorStack::get())), + ErrorCode::SYSCALL => { + let errs = ErrorStack::get(); + if errs.errors().is_empty() { + // get last error from io + let e = std::io::Error::last_os_error(); + Some(InnerError::Io(e)) + } else { + Some(InnerError::Ssl(errs)) + } + } + ErrorCode::ZERO_RETURN => None, + ErrorCode::WANT_READ | ErrorCode::WANT_WRITE => { + // get last error from io + let e = std::io::Error::last_os_error(); + Some(InnerError::Io(e)) + } + _ => None, + }; + + Error { code, cause } + } + + pub(crate) fn make_from_io(e: io::Error) -> Self { + Error { + code: ErrorCode::SYSCALL, + cause: Some(InnerError::Io(e)), + } + } + } + + impl From for Error { + fn from(e: ErrorStack) -> Error { + Error { + code: ErrorCode::SSL, + cause: Some(InnerError::Ssl(e)), + } + } + } + + impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.code { + ErrorCode::ZERO_RETURN => fmt.write_str("the SSL session has been shut down"), + ErrorCode::WANT_READ => match self.io_error() { + Some(_) => fmt.write_str("a nonblocking read call would have blocked"), + None => fmt.write_str("the operation should be retried"), + }, + ErrorCode::WANT_WRITE => match self.io_error() { + Some(_) => fmt.write_str("a nonblocking write call would have blocked"), + None => fmt.write_str("the operation should be retried"), + }, + ErrorCode::SYSCALL => match self.io_error() { + Some(err) => write!(fmt, "{err}"), + None => fmt.write_str("unexpected EOF"), + }, + ErrorCode::SSL => match self.ssl_error() { + Some(e) => write!(fmt, "{e}"), + None => fmt.write_str("OpenSSL error"), + }, + _ => write!(fmt, "unknown error code {}", self.code.as_raw()), + } + } + } + + impl error::Error for Error { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self.cause { + Some(InnerError::Io(ref e)) => Some(e), + Some(InnerError::Ssl(ref e)) => Some(e), + None => None, + } + } + } +} + +mod ffi { + use openssl_sys::BIO_ctrl; + use std::ffi::{c_int, c_long}; + + const BIO_CTRL_GET_KTLS_SEND: c_int = 73; + const BIO_CTRL_GET_KTLS_RECV: c_int = 76; + + #[allow(non_snake_case)] + pub unsafe fn BIO_get_ktls_send(b: *mut openssl_sys::BIO) -> c_long { + unsafe { BIO_ctrl(b, BIO_CTRL_GET_KTLS_SEND, 0, std::ptr::null_mut()) } + } + #[allow(non_snake_case)] + pub unsafe fn BIO_get_ktls_recv(b: *mut openssl_sys::BIO) -> c_long { + unsafe { BIO_ctrl(b, BIO_CTRL_GET_KTLS_RECV, 0, std::ptr::null_mut()) } + } +} diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index 2e0da21..72ab90f 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -5,6 +5,7 @@ use crate::stream::{ConnectionInfo, ConnectionInfoProvider}; use std::io; use std::io::{Read, Write}; use std::net::SocketAddr; +use std::os::fd::{AsRawFd, RawFd}; /// Wraps `std::net::TcpStream` and provides `ConnectionInfo`. #[derive(Debug)] @@ -13,6 +14,12 @@ pub struct TcpStream { connection_info: ConnectionInfo, } +impl AsRawFd for TcpStream { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + impl From for std::net::TcpStream { fn from(stream: TcpStream) -> Self { stream.inner From 857bb1a2a815b8dd679b9ef97713321ac25c7143 Mon Sep 17 00:00:00 2001 From: Tom Brzozowski Date: Fri, 26 Dec 2025 20:51:17 +0000 Subject: [PATCH 3/7] ktls stream --- Cargo.toml | 15 +- examples/io_service_with_context_ktls.rs | 26 ++ examples/ws_client_ktls.rs | 85 +------ examples/ws_client_ktls_2.rs | 40 ---- src/stream/ktls.rs | 291 ++++++++++++++++++----- src/stream/mio.rs | 8 +- src/stream/mod.rs | 2 +- src/stream/tcp.rs | 7 +- src/stream/tls.rs | 42 +++- src/ws/mod.rs | 2 +- 10 files changed, 322 insertions(+), 196 deletions(-) create mode 100644 examples/io_service_with_context_ktls.rs delete mode 100644 examples/ws_client_ktls_2.rs diff --git a/Cargo.toml b/Cargo.toml index ff1d72a..7460eae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ mio = ["dep:mio"] rustls-native = ["rustls", "rustls-native-certs"] rustls-webpki = ["rustls", "webpki-roots"] openssl = ["dep:openssl", "dep:openssl-probe"] -ktls = ["openssl"] +ktls = ["openssl", "dep:openssl-sys", "dep:foreign-types", "dep:libc", "dep:openssl-src"] http = ["dep:http", "httparse", "memchr", "itoa"] ws = ["rand", "base64", "dep:http", "httparse"] ext = [] @@ -38,16 +38,15 @@ httparse = { version = "1.8.0", optional = true } http = { version = "1.0.0", optional = true } openssl = { version = "0.10.70", features = ["vendored"], optional = true } openssl-probe = { version = "0.1.6", optional = true } -openssl-ktls = { version = "0.2.3", features = ["vendored"] } memchr = { version = "2.7.4", optional = true } itoa = { version = "1.0.15", optional = true } smallvec = "1.15.0" smallstr = "0.3.1" core_affinity = "0.8.3" log = "0.4.20" -# KTLS -openssl-sys = "0.9.105" -foreign-types-shared = "0.1.1" +openssl-sys = { version = "0.9", optional = true } +foreign-types = { version = "0.3.1", optional = true } +libc = { version = "0.2", optional = true } [dependencies.webpki-roots] version = "0.26.0" @@ -65,6 +64,12 @@ tungstenite = "0.28.0" criterion = "0.5.1" idle = "0.2.0" +[build-dependencies.openssl-src] +version = "300" +features = ["ktls"] +optional = true +default-features = false + [lints.clippy] uninit_assumed_init = "allow" mem_replace_with_uninit = "allow" diff --git a/examples/io_service_with_context_ktls.rs b/examples/io_service_with_context_ktls.rs new file mode 100644 index 0000000..f68fbc9 --- /dev/null +++ b/examples/io_service_with_context_ktls.rs @@ -0,0 +1,26 @@ +use crate::common::{FeedContext, TradeEndpoint}; +use boomnet::service::IntoIOServiceWithContext; +use boomnet::service::select::mio::MioSelector; + +#[path = "common/mod.rs"] +mod common; + +fn main() -> anyhow::Result<()> { + env_logger::init(); + + let mut ctx = FeedContext::new(); + + let mut io_service = MioSelector::new()?.into_io_service_with_context(); + + let endpoint_btc = TradeEndpoint::new(0, "wss://stream1.binance.com:443/ws", None, "btcusdt"); + let endpoint_eth = TradeEndpoint::new(1, "wss://stream2.binance.com:443/ws", None, "ethusdt"); + let endpoint_xrp = TradeEndpoint::new(2, "wss://stream3.binance.com:443/ws", None, "xrpusdt"); + + io_service.register(endpoint_btc)?; + io_service.register(endpoint_eth)?; + io_service.register(endpoint_xrp)?; + + loop { + io_service.poll(&mut ctx, |ws, ctx, endpoint| endpoint.poll_ctx(ws, ctx))?; + } +} diff --git a/examples/ws_client_ktls.rs b/examples/ws_client_ktls.rs index 7a8d121..8b55f84 100644 --- a/examples/ws_client_ktls.rs +++ b/examples/ws_client_ktls.rs @@ -1,86 +1,21 @@ -use boomnet::stream::{ConnectionInfo, ConnectionInfoProvider}; -use boomnet::ws::{Websocket, WebsocketFrame}; -use openssl::ssl::ErrorCode; -use std::io; -use std::io::{ErrorKind, Read, Write}; - -struct KtlsStream { - inner: openssl_ktls::SslStream, - conn: ConnectionInfo, -} - -impl Read for KtlsStream { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - self.inner.read(buf) - } -} - -impl Write for KtlsStream { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.inner.write(buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - self.inner.flush() - } -} - -impl ConnectionInfoProvider for KtlsStream { - fn connection_info(&self) -> &ConnectionInfo { - &self.conn - } -} - -impl KtlsStream { - fn connect(&self) -> io::Result<()> { - match self.inner.connect() { - Ok(_) => Ok(()), - Err(err) if err.code() == ErrorCode::WANT_READ => Err(ErrorKind::WouldBlock.into()), - Err(err) if err.code() == ErrorCode::WANT_WRITE => Err(ErrorKind::WouldBlock.into()), - Err(err) => Err(io::Error::other(err)), - } - } -} +use boomnet::stream::ktls::IntoKtlsStream; +use boomnet::stream::tcp::TcpStream; +use boomnet::stream::tls::TlsConfigExt; +use boomnet::ws::{IntoWebsocket, WebsocketFrame}; fn main() -> anyhow::Result<()> { - let mut builder = openssl::ssl::SslConnector::builder(openssl::ssl::SslMethod::tls_client())?; - builder.set_options(openssl_ktls::option::SSL_OP_ENABLE_KTLS); - // connector.set_cipher_list(openssl_ktls::option::ECDHE_RSA_AES128_GCM_SHA256)?; - // connector.set_ciphersuites("TLS_AES_256_GCM_SHA384")?; - let connector = builder.build(); - - let tcp_stream = std::net::TcpStream::connect("stream.crypto.com:443")?; - let ssl = connector.configure()?.into_ssl("stream.crypto.com")?; - - let ktls_stream = KtlsStream { - inner: openssl_ktls::SslStream::new(tcp_stream, ssl), - conn: ("stream.crypto.com", 443).into(), - }; - - ktls_stream.connect()?; - - assert!(ktls_stream.inner.ktls_recv_enabled()); - assert!(ktls_stream.inner.ktls_send_enabled()); - - // let mut ws = Websocket::new(ktls_stream, "/ws"); - let mut ws = Websocket::new(ktls_stream, "/exchange/v1/market"); - - // ws.send_text(true, Some(b"{\"method\":\"SUBSCRIBE\",\"params\":[\"btcusdt@trade\"],\"id\":1}"))?; - ws.send_text(true, Some(br#"{"id":1,"method":"subscribe","params":{"channels":["trade.BTCUSD-PERP"]}}"#))?; - - // let idle = IdleStrategy::Sleep(Duration::from_millis(1)); + let mut ws = TcpStream::try_from(("fstream.binance.com", 443))? + .into_ktls_stream_with_config(|cfg| cfg.with_no_cert_verification())? + .into_websocket("/ws"); - let mut log = false; + ws.send_text(true, Some(b"{\"method\":\"SUBSCRIBE\",\"params\":[\"btcusdt@trade\"],\"id\":1}"))?; loop { for frame in ws.read_batch()? { if let WebsocketFrame::Text(fin, body) = frame? { - if !log { - println!("({fin}) {}", String::from_utf8_lossy(body)); - // log = true; - } + println!("({fin}) {}", String::from_utf8_lossy(body)); } } - // idle.idle(0); + std::thread::sleep(std::time::Duration::from_millis(1)); } } diff --git a/examples/ws_client_ktls_2.rs b/examples/ws_client_ktls_2.rs deleted file mode 100644 index 91f170b..0000000 --- a/examples/ws_client_ktls_2.rs +++ /dev/null @@ -1,40 +0,0 @@ -use boomnet::ws::{Websocket, WebsocketFrame}; -use boomnet::stream::ktls::KtlSteam; -use boomnet::stream::tcp::TcpStream; - -fn main() -> anyhow::Result<()> { - let mut builder = openssl::ssl::SslConnector::builder(openssl::ssl::SslMethod::tls_client())?; - builder.set_options(openssl_ktls::option::SSL_OP_ENABLE_KTLS); - let connector = builder.build(); - - let tcp_stream = TcpStream::try_from(("stream.crypto.com", 443))?; - let ssl = connector.configure()?.into_ssl("stream.crypto.com")?; - let ktls_stream = KtlSteam::new(tcp_stream, ssl); - - ktls_stream.blocking_connect()?; - - println!("Connected!"); - - assert!(ktls_stream.ktls_recv_enabled()); - assert!(ktls_stream.ktls_send_enabled()); - - let mut ws = Websocket::new(ktls_stream, "/exchange/v1/market"); - - ws.send_text(true, Some(br#"{"id":1,"method":"subscribe","params":{"channels":["trade.BTCUSD-PERP"]}}"#))?; - - // let idle = IdleStrategy::Sleep(Duration::from_millis(1)); - - let mut log = false; - - loop { - for frame in ws.read_batch()? { - if let WebsocketFrame::Text(fin, body) = frame? { - if !log { - println!("({fin}) {}", String::from_utf8_lossy(body)); - // log = true; - } - } - } - // idle.idle(0); - } -} diff --git a/src/stream/ktls.rs b/src/stream/ktls.rs index b069c32..6f06b2d 100644 --- a/src/stream/ktls.rs +++ b/src/stream/ktls.rs @@ -1,33 +1,64 @@ +use crate::stream::ktls::net::peer_addr; +use crate::stream::tls::TlsConfig; use crate::stream::{ConnectionInfo, ConnectionInfoProvider}; -use foreign_types_shared::ForeignType; -use openssl::ssl::ErrorCode; -use std::ffi::c_int; +pub use error::Error; +use foreign_types::ForeignType; +use openssl::ssl::{ErrorCode, SslOptions}; +use smallstr::SmallString; use std::io; use std::io::{ErrorKind, Read, Write}; -use std::os::fd::AsRawFd; +use std::os::fd::{AsRawFd, BorrowedFd}; +use std::ptr::slice_from_raw_parts; -const BIO_NOCLOSE: c_int = 0x00; - -pub struct KtlSteam { +pub struct KtlStream { _stream: S, ssl: openssl::ssl::Ssl, + state: State, + buffer: Vec, } -impl KtlSteam { - /// Create a new SslStream from a tcp stream and SSL object. - pub fn new(stream: S, ssl: openssl::ssl::Ssl) -> Self +impl KtlStream { + pub fn new(stream: S, server_name: impl AsRef) -> io::Result> where S: AsRawFd, { - let sock_bio = unsafe { openssl_sys::BIO_new_socket(stream.as_raw_fd(), BIO_NOCLOSE) }; - assert!(!sock_bio.is_null(), "Failed to create socket BIO"); - unsafe { - openssl_sys::SSL_set_bio(ssl.as_ptr(), sock_bio, sock_bio); - } - KtlSteam { _stream: stream, ssl } + Self::new_with_config(stream, server_name, |_| ()) } - pub fn connect(&self) -> Result<(), error::Error> { + pub fn new_with_config(stream: S, server_name: impl AsRef, configure: F) -> io::Result> + where + S: AsRawFd, + F: FnOnce(&mut TlsConfig), + { + const SSL_OP_ENABLE_KTLS: SslOptions = SslOptions::from_bits_retain(ffi::SSL_OP_ENABLE_KTLS); + + // configure SSL context + let mut builder = openssl::ssl::SslConnector::builder(openssl::ssl::SslMethod::tls_client())?; + builder.set_options(SSL_OP_ENABLE_KTLS); + + let mut tls_config = builder.into(); + configure(&mut tls_config); + + let config = tls_config.into_openssl().build().configure()?; + let ssl = config.into_ssl(server_name.as_ref())?; + + Ok(KtlStream { + _stream: stream, + ssl, + state: State::Connecting, + buffer: Vec::with_capacity(4096), + }) + } + + fn connected(&self) -> io::Result + where + S: AsRawFd, + { + let fd = unsafe { BorrowedFd::borrow_raw(self._stream.as_raw_fd()) }; + Ok(peer_addr(fd)?.is_some()) + } + + fn ssl_connect(&self) -> Result<(), error::Error> { let result = unsafe { openssl_sys::SSL_connect(self.ssl.as_ptr()) }; if result <= 0 { Err(error::Error::make(result, &self.ssl)) @@ -36,26 +67,14 @@ impl KtlSteam { } } - pub fn blocking_connect(&self) -> io::Result<()> { - loop { - match self.connect() { - Ok(_) => break, - Err(err) if err.code() == ErrorCode::WANT_READ => {} - Err(err) if err.code() == ErrorCode::WANT_WRITE => {} - Err(err) => return Err(io::Error::other(err)), - } - } - Ok(()) - } - - pub fn ktls_send_enabled(&self) -> bool { + fn ktls_send_enabled(&self) -> bool { unsafe { let wbio = openssl_sys::SSL_get_wbio(self.ssl.as_ptr()); ffi::BIO_get_ktls_send(wbio) != 0 } } - pub fn ktls_recv_enabled(&self) -> bool { + fn ktls_recv_enabled(&self) -> bool { unsafe { let rbio = openssl_sys::SSL_get_rbio(self.ssl.as_ptr()); ffi::BIO_get_ktls_recv(rbio) != 0 @@ -63,7 +82,7 @@ impl KtlSteam { } #[inline] - pub fn ssl_read(&mut self, buf: &mut [u8]) -> Result { + fn ssl_read(&mut self, buf: &mut [u8]) -> Result { unsafe { let len = openssl_sys::SSL_read(self.ssl.as_ptr(), buf.as_mut_ptr() as *mut _, buf.len().try_into().unwrap()); @@ -76,7 +95,7 @@ impl KtlSteam { } #[inline] - pub fn ssl_write(&mut self, buf: &[u8]) -> Result { + fn ssl_write(&mut self, buf: &[u8]) -> Result { if buf.is_empty() { return Ok(0); } @@ -92,47 +111,136 @@ impl KtlSteam { } } -impl ConnectionInfoProvider for KtlSteam { +impl ConnectionInfoProvider for KtlStream { fn connection_info(&self) -> &ConnectionInfo { self._stream.connection_info() } } -impl Read for KtlSteam { +impl Read for KtlStream { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - match self.ssl_read(buf) { - Ok(read) => Ok(read), - Err(err) if err.code() == ErrorCode::WANT_READ => Err(ErrorKind::WouldBlock.into()), - Err(err) if err.code() == ErrorCode::WANT_WRITE => Err(ErrorKind::WouldBlock.into()), - Err(err) => Err(io::Error::other(err)), + match self.state { + State::Connecting => { + if self.connected()? { + // we intentionally pass BIO_NO_CLOSE to prevent double free on the file descriptor + let sock_bio = unsafe { openssl_sys::BIO_new_socket(self._stream.as_raw_fd(), ffi::BIO_NO_CLOSE) }; + assert!(!sock_bio.is_null(), "failed to create socket BIO"); + unsafe { + openssl_sys::SSL_set_bio(self.ssl.as_ptr(), sock_bio, sock_bio); + } + self.state = State::Handshake; + } + } + State::Handshake => match self.ssl_connect() { + Ok(_) => { + assert!(self.ktls_recv_enabled(), "ktls recv not enabled"); + assert!(self.ktls_send_enabled(), "ktls send not enabled"); + self.state = State::Drain(0) + } + Err(err) if err.code() == ErrorCode::WANT_READ => {} + Err(err) if err.code() == ErrorCode::WANT_WRITE => {} + Err(err) => return Err(io::Error::other(err)), + }, + State::Drain(index) => { + let mut from = index; + // let remaining = &self.buffer[from..]; + let remaining = + unsafe { &*slice_from_raw_parts(self.buffer.as_ptr().add(from), self.buffer.len() - from) }; + if remaining.is_empty() { + self.state = State::Ready; + } else { + from += match self.ssl_write(remaining) { + Ok(len) => len, + Err(err) if err.code() == ErrorCode::WANT_READ => 0, + Err(err) if err.code() == ErrorCode::WANT_WRITE => 0, + Err(err) => return Err(io::Error::other(err)), + }; + self.state = State::Drain(from); + } + } + State::Ready => match self.ssl_read(buf) { + Ok(0) => return Err(ErrorKind::UnexpectedEof.into()), + Ok(len) => return Ok(len), + Err(err) if err.code() == ErrorCode::WANT_READ => {} + Err(err) if err.code() == ErrorCode::WANT_WRITE => {} + Err(err) => return Err(io::Error::other(err)), + }, } + Err(ErrorKind::WouldBlock.into()) } } -impl Write for KtlSteam { +impl Write for KtlStream { #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { - match self.ssl_write(buf) { - Ok(read) => Ok(read), - Err(err) if err.code() == ErrorCode::WANT_READ => Err(ErrorKind::WouldBlock.into()), - Err(err) if err.code() == ErrorCode::WANT_WRITE => Err(ErrorKind::WouldBlock.into()), - Err(err) => Err(io::Error::other(err)), + match self.state { + State::Ready => match self.ssl_write(buf) { + Ok(len) => Ok(len), + Err(err) if err.code() == ErrorCode::WANT_READ => Err(ErrorKind::WouldBlock.into()), + Err(err) if err.code() == ErrorCode::WANT_WRITE => Err(ErrorKind::WouldBlock.into()), + Err(err) => Err(io::Error::other(err)), + }, + _ => { + // we buffer any pending write + self.buffer.extend_from_slice(buf); + Ok(buf.len()) + } } } #[inline] fn flush(&mut self) -> io::Result<()> { - Ok(()) + match self.state { + State::Connecting | State::Handshake | State::Drain(_) => Ok(()), + State::Ready => self._stream.flush(), + } } } -pub mod error { - use std::{error, ffi::c_int, fmt, io}; +pub trait IntoKtlsStream { + fn into_ktls_stream(self) -> io::Result> + where + Self: Sized, + { + self.into_ktls_stream_with_config(|_| ()) + } + + fn into_ktls_stream_with_config(self, builder: F) -> io::Result> + where + Self: Sized, + F: FnOnce(&mut TlsConfig); +} + +impl IntoKtlsStream for T +where + T: Read + Write + AsRawFd + ConnectionInfoProvider, +{ + fn into_ktls_stream_with_config(self, builder: F) -> io::Result> + where + Self: Sized, + F: FnOnce(&mut TlsConfig), + { + let server_name = SmallString::<[u8; 1024]>::from(self.connection_info().host()); + KtlStream::new_with_config(self, server_name, builder) + } +} + +#[derive(Copy, Clone)] +enum State { + Connecting, + Handshake, + Drain(usize), + Ready, +} +mod error { + use crate::util::NoBlock; + use foreign_types::ForeignTypeRef; use openssl::{error::ErrorStack, ssl::ErrorCode}; + use std::{error, ffi::c_int, fmt, io}; #[derive(Debug)] - pub(crate) enum InnerError { + enum InnerError { Io(io::Error), Ssl(ErrorStack), } @@ -140,8 +248,8 @@ pub mod error { /// An SSL error. #[derive(Debug)] pub struct Error { - pub(crate) code: ErrorCode, - pub(crate) cause: Option, + code: ErrorCode, + cause: Option, } impl Error { @@ -170,8 +278,7 @@ pub mod error { } } - pub(crate) fn make(ret: c_int, ssl: &openssl::ssl::SslRef) -> Self { - use foreign_types_shared::ForeignTypeRef; + pub fn make(ret: c_int, ssl: &openssl::ssl::SslRef) -> Self { let code = unsafe { ErrorCode::from_raw(openssl_sys::SSL_get_error(ssl.as_ptr(), ret)) }; let cause = match code { @@ -197,13 +304,6 @@ pub mod error { Error { code, cause } } - - pub(crate) fn make_from_io(e: io::Error) -> Self { - Error { - code: ErrorCode::SYSCALL, - cause: Some(InnerError::Io(e)), - } - } } impl From for Error { @@ -249,12 +349,40 @@ pub mod error { } } } + + impl NoBlock for Result { + type Value = usize; + + fn no_block(self) -> io::Result { + match self { + Ok(value) => Ok(value), + Err(err) if err.code() == ErrorCode::WANT_READ => Ok(0), + Err(err) if err.code() == ErrorCode::WANT_WRITE => Ok(0), + Err(err) => Err(io::Error::other(err)), + } + } + } + + impl NoBlock for Result<(), Error> { + type Value = (); + + fn no_block(self) -> io::Result { + match self { + Ok(()) => Ok(()), + Err(err) if err.code() == ErrorCode::WANT_READ => Ok(()), + Err(err) if err.code() == ErrorCode::WANT_WRITE => Ok(()), + Err(err) => Err(io::Error::other(err)), + } + } + } } mod ffi { use openssl_sys::BIO_ctrl; use std::ffi::{c_int, c_long}; + pub const SSL_OP_ENABLE_KTLS: u64 = 0x00000008; + pub const BIO_NO_CLOSE: c_int = 0x00; const BIO_CTRL_GET_KTLS_SEND: c_int = 73; const BIO_CTRL_GET_KTLS_RECV: c_int = 76; @@ -267,3 +395,44 @@ mod ffi { unsafe { BIO_ctrl(b, BIO_CTRL_GET_KTLS_RECV, 0, std::ptr::null_mut()) } } } + +mod net { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + use std::os::fd::{AsRawFd, BorrowedFd}; + use std::{io, mem}; + + pub fn peer_addr(fd: BorrowedFd<'_>) -> io::Result> { + let raw = fd.as_raw_fd(); + + let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; + let mut len = mem::size_of::() as libc::socklen_t; + + let rc = unsafe { libc::getpeername(raw, &mut storage as *mut _ as *mut libc::sockaddr, &mut len as *mut _) }; + + if rc == -1 { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::ENOTCONN) { + return Ok(None); + } + return Err(err); + } + + unsafe { + match storage.ss_family as libc::c_int { + libc::AF_INET => { + let sa = &*(&storage as *const _ as *const libc::sockaddr_in); + let ip = Ipv4Addr::from(u32::from_be(sa.sin_addr.s_addr)); + let port = u16::from_be(sa.sin_port); + Ok(Some(SocketAddr::new(IpAddr::V4(ip), port))) + } + libc::AF_INET6 => { + let sa = &*(&storage as *const _ as *const libc::sockaddr_in6); + let ip = Ipv6Addr::from(sa.sin6_addr.s6_addr); + let port = u16::from_be(sa.sin6_port); + Ok(Some(SocketAddr::new(IpAddr::V6(ip), port))) + } + _ => Err(io::Error::new(io::ErrorKind::InvalidData, "unsupported address family")), + } + } + } +} diff --git a/src/stream/mio.rs b/src/stream/mio.rs index 6fb1856..67d5a1d 100644 --- a/src/stream/mio.rs +++ b/src/stream/mio.rs @@ -3,7 +3,7 @@ use std::io::ErrorKind::{Interrupted, NotConnected, WouldBlock}; use std::io::{Read, Write}; use std::{io, net}; - +use std::os::fd::{AsRawFd, RawFd}; use crate::service::select::Selectable; use crate::stream::{ConnectionInfo, ConnectionInfoProvider}; use mio::event::Source; @@ -33,6 +33,12 @@ impl MioStream { } } +impl AsRawFd for MioStream { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + impl Selectable for MioStream { fn connected(&mut self) -> io::Result { if self.connected { diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 5b8bf81..a3f3218 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -18,7 +18,7 @@ pub mod replay; pub mod tcp; #[cfg(any(feature = "rustls", feature = "openssl"))] pub mod tls; -#[cfg(feature = "ktls")] +#[cfg(all(target_os = "linux", feature = "ktls"))] pub mod ktls; #[cfg(target_os = "linux")] diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index 72ab90f..2c6abf4 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -69,12 +69,17 @@ impl TryFrom<(ConnectionInfo, SocketAddr)> for TcpStream { } impl TcpStream { - pub fn new(stream: std::net::TcpStream, connection_info: ConnectionInfo) -> Self { + pub const fn new(stream: std::net::TcpStream, connection_info: ConnectionInfo) -> Self { Self { inner: stream, connection_info, } } + + #[inline] + pub fn connected(&mut self) -> bool { + self.inner.peer_addr().is_ok() + } } impl Read for TcpStream { diff --git a/src/stream/tls.rs b/src/stream/tls.rs index 354041f..9d571be 100644 --- a/src/stream/tls.rs +++ b/src/stream/tls.rs @@ -24,6 +24,20 @@ pub struct TlsConfig { openssl_config: SslConnectorBuilder, } +#[cfg(feature = "openssl")] +impl From for TlsConfig { + fn from(config: SslConnectorBuilder) -> Self { + Self { openssl_config: config } + } +} + +#[cfg(all(feature = "rustls", not(feature = "openssl")))] +impl From for TlsConfig { + fn from(config: ClientConfig) -> Self { + Self { rustls_config: config } + } +} + /// Extension methods for `TlsConfig`. pub trait TlsConfigExt { /// Disable certificate verification. @@ -69,6 +83,12 @@ impl TlsConfig { pub const fn as_openssl_mut(&mut self) -> &mut SslConnectorBuilder { &mut self.openssl_config } + + /// Get mutable reference to the `openssl` configuration object. + #[cfg(feature = "openssl")] + pub fn into_openssl(self) -> SslConnectorBuilder { + self.openssl_config + } } impl TlsConfigExt for TlsConfig { @@ -355,7 +375,7 @@ mod __openssl { } impl State { - fn get_stream_mut(&mut self) -> io::Result<&mut S> { + fn get_mut(&mut self) -> io::Result<&mut S> { match self { State::Handshake(stream_and_buf) => match stream_and_buf.as_mut() { Some((stream, _)) => Ok(stream.get_mut()), @@ -383,29 +403,29 @@ mod __openssl { #[cfg(feature = "mio")] impl Source for TlsStream { fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> io::Result<()> { - registry.register(self.state.get_stream_mut()?, token, interests) + registry.register(self.state.get_mut()?, token, interests) } fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> io::Result<()> { - registry.reregister(self.state.get_stream_mut()?, token, interests) + registry.reregister(self.state.get_mut()?, token, interests) } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { - registry.deregister(self.state.get_stream_mut()?) + registry.deregister(self.state.get_mut()?) } } impl Selectable for TlsStream { fn connected(&mut self) -> io::Result { - self.state.get_stream_mut()?.connected() + self.state.get_mut()?.connected() } fn make_writable(&mut self) -> io::Result<()> { - self.state.get_stream_mut()?.make_writable() + self.state.get_mut()?.make_writable() } fn make_readable(&mut self) -> io::Result<()> { - self.state.get_stream_mut()?.make_readable() + self.state.get_mut()?.make_readable() } } @@ -485,7 +505,7 @@ mod __openssl { } impl TlsStream { - pub fn wrap_with_config(stream: S, server_name: &str, configure: F) -> io::Result> + pub fn new_with_config(stream: S, server_name: &str, configure: F) -> io::Result> where F: FnOnce(&mut TlsConfig), { @@ -510,8 +530,8 @@ mod __openssl { } } - pub fn wrap(stream: S, server_name: &str) -> io::Result> { - Self::wrap_with_config(stream, server_name, |_| {}) + pub fn new(stream: S, server_name: &str) -> io::Result> { + Self::new_with_config(stream, server_name, |_| {}) } } @@ -576,7 +596,7 @@ where F: FnOnce(&mut TlsConfig), { let server_name = self.connection_info().clone().host; - TlsStream::wrap_with_config(self, &server_name, builder) + TlsStream::new_with_config(self, &server_name, builder) } } diff --git a/src/ws/mod.rs b/src/ws/mod.rs index 0c6ba8e..b6835fc 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -468,7 +468,7 @@ where let tls_ready_stream = match url.scheme() { "ws" => Ok(TlsReadyStream::Plain(stream)), - "wss" => Ok(TlsReadyStream::Tls(TlsStream::wrap(stream, url.host_str().unwrap()).unwrap())), + "wss" => Ok(TlsReadyStream::Tls(TlsStream::new(stream, url.host_str().unwrap()).unwrap())), scheme => Err(io::Error::other(format!("unrecognised url scheme: {scheme}"))), }?; From dd2c9ea54b151b61a9d490f34e346ce647675c89 Mon Sep 17 00:00:00 2001 From: Tom Brzozowski Date: Fri, 26 Dec 2025 21:26:11 +0000 Subject: [PATCH 4/7] ktls stream examples --- examples/io_service_with_context_ktls.rs | 111 ++++++++++++++++++++--- src/service/endpoint.rs | 18 ++-- src/stream/ktls.rs | 51 +++++++++-- src/stream/mio.rs | 8 +- src/stream/mod.rs | 4 +- 5 files changed, 154 insertions(+), 38 deletions(-) diff --git a/examples/io_service_with_context_ktls.rs b/examples/io_service_with_context_ktls.rs index f68fbc9..eb0c3a7 100644 --- a/examples/io_service_with_context_ktls.rs +++ b/examples/io_service_with_context_ktls.rs @@ -1,26 +1,107 @@ -use crate::common::{FeedContext, TradeEndpoint}; -use boomnet::service::IntoIOServiceWithContext; +use boomnet::service::IntoIOService; +use boomnet::service::endpoint::{DisconnectReason, Endpoint}; +use boomnet::service::select::Selectable; use boomnet::service::select::mio::MioSelector; +use boomnet::stream::ktls::{IntoKtlsStream, KtlStream}; +use boomnet::stream::mio::{IntoMioStream, MioStream}; +use boomnet::stream::tcp::TcpStream; +use boomnet::stream::tls::TlsConfigExt; +use boomnet::stream::{ConnectionInfo, ConnectionInfoProvider}; +use boomnet::ws::{IntoWebsocket, Websocket, WebsocketFrame}; +use mio::event::Source; +use mio::{Interest, Registry, Token}; +use std::net::SocketAddr; +use std::time::Duration; -#[path = "common/mod.rs"] -mod common; +struct TradeConnectionFactory { + connection_info: ConnectionInfo, +} -fn main() -> anyhow::Result<()> { - env_logger::init(); +impl TradeConnectionFactory { + fn new() -> Self { + Self { + connection_info: ("fstream.binance.com", 443).into(), + } + } +} - let mut ctx = FeedContext::new(); +struct TradeConnection { + ws: Websocket>, +} - let mut io_service = MioSelector::new()?.into_io_service_with_context(); +impl TradeConnection { + fn do_work(&mut self) -> std::io::Result<()> { + for frame in self.ws.read_batch()? { + if let WebsocketFrame::Text(fin, body) = frame? { + println!("({fin}) {}", String::from_utf8_lossy(body)); + } + } + Ok(()) + } +} + +impl Selectable for TradeConnection { + fn connected(&mut self) -> std::io::Result { + self.ws.connected() + } - let endpoint_btc = TradeEndpoint::new(0, "wss://stream1.binance.com:443/ws", None, "btcusdt"); - let endpoint_eth = TradeEndpoint::new(1, "wss://stream2.binance.com:443/ws", None, "ethusdt"); - let endpoint_xrp = TradeEndpoint::new(2, "wss://stream3.binance.com:443/ws", None, "xrpusdt"); + fn make_writable(&mut self) -> std::io::Result<()> { + self.ws.make_writable() + } + + fn make_readable(&mut self) -> std::io::Result<()> { + self.ws.make_readable() + } +} + +impl Source for TradeConnection { + fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> { + self.ws.register(registry, token, interests) + } + + fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> { + self.ws.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> { + self.ws.deregister(registry) + } +} + +impl ConnectionInfoProvider for TradeConnectionFactory { + fn connection_info(&self) -> &ConnectionInfo { + &self.connection_info + } +} + +impl Endpoint for TradeConnectionFactory { + type Target = TradeConnection; + + fn create_target(&mut self, addr: SocketAddr) -> std::io::Result> { + let mut ws = TcpStream::try_from((&self.connection_info, addr))? + .into_mio_stream() + .into_ktls_stream_with_config(|cfg| cfg.with_no_cert_verification())? + .into_websocket("/ws"); + + ws.send_text(true, Some(b"{\"method\":\"SUBSCRIBE\",\"params\":[\"btcusdt@trade\"],\"id\":1}"))?; + + Ok(Some(TradeConnection { ws })) + } + + fn can_recreate(&mut self, reason: DisconnectReason) -> bool { + println!("on disconnect: reason={}", reason); + true + } +} + +fn main() -> anyhow::Result<()> { + let mut io_service = MioSelector::new()? + .into_io_service() + .with_auto_disconnect(Duration::from_secs(10)); - io_service.register(endpoint_btc)?; - io_service.register(endpoint_eth)?; - io_service.register(endpoint_xrp)?; + io_service.register(TradeConnectionFactory::new())?; loop { - io_service.poll(&mut ctx, |ws, ctx, endpoint| endpoint.poll_ctx(ws, ctx))?; + io_service.poll(|conn, _| conn.do_work())?; } } diff --git a/src/service/endpoint.rs b/src/service/endpoint.rs index 2b0c27c..7b548f3 100644 --- a/src/service/endpoint.rs +++ b/src/service/endpoint.rs @@ -16,11 +16,9 @@ pub trait Endpoint: ConnectionInfoProvider { /// await the next connection attempt with (possibly) different `addr`. fn create_target(&mut self, addr: SocketAddr) -> io::Result>; - // /// Called by the `IOService` on each duty cycle. - // fn poll(&mut self, target: &mut Self::Target) -> io::Result<()>; - /// Upon disconnection `IOService` will query the endpoint if the connection can be - /// recreated, passing the disconnect `reason`. If not, it will cause program to panic. + /// recreated, passing the disconnect `reason`. If `false` is returned it will cause + /// program to panic. fn can_recreate(&mut self, _reason: DisconnectReason) -> bool { true } @@ -49,7 +47,8 @@ pub trait EndpointWithContext: ConnectionInfoProvider { fn create_target(&mut self, addr: SocketAddr, context: &mut C) -> io::Result>; /// Upon disconnection `IOService` will query the endpoint if the connection can be - /// recreated, passing the disconnect `reason`. If not, it will cause program to panic. + /// recreated, passing the disconnect `reason`. If `false` is returned it will cause + /// program to panic. fn can_recreate(&mut self, _reason: DisconnectReason, _context: &mut C) -> bool { true } @@ -66,9 +65,8 @@ pub trait EndpointWithContext: ConnectionInfoProvider { pub enum DisconnectReason { /// This is expected disconnection due to `ttl` on the connection expiring. AutoDisconnect(Duration), - /// Some other IO error has occurred such as reaching EOF or peer disconnect. It's normally - /// ok to try and connect again. - Other(io::Error), + /// IO error has occurred such as reaching EOF or peer disconnect. + IO(io::Error), } impl Display for DisconnectReason { @@ -78,7 +76,7 @@ impl Display for DisconnectReason { write!(f, "auto-disconnect after ")?; ttl.fmt(f) } - DisconnectReason::Other(err) => { + DisconnectReason::IO(err) => { write!(f, "{err}") } } @@ -91,7 +89,7 @@ impl DisconnectReason { } pub(crate) fn other(err: io::Error) -> DisconnectReason { - DisconnectReason::Other(err) + DisconnectReason::IO(err) } } diff --git a/src/stream/ktls.rs b/src/stream/ktls.rs index 6f06b2d..5ed0590 100644 --- a/src/stream/ktls.rs +++ b/src/stream/ktls.rs @@ -1,8 +1,11 @@ +use crate::service::select::Selectable; use crate::stream::ktls::net::peer_addr; use crate::stream::tls::TlsConfig; use crate::stream::{ConnectionInfo, ConnectionInfoProvider}; pub use error::Error; use foreign_types::ForeignType; +use mio::event::Source; +use mio::{Interest, Registry, Token}; use openssl::ssl::{ErrorCode, SslOptions}; use smallstr::SmallString; use std::io; @@ -11,7 +14,7 @@ use std::os::fd::{AsRawFd, BorrowedFd}; use std::ptr::slice_from_raw_parts; pub struct KtlStream { - _stream: S, + stream: S, ssl: openssl::ssl::Ssl, state: State, buffer: Vec, @@ -32,7 +35,6 @@ impl KtlStream { { const SSL_OP_ENABLE_KTLS: SslOptions = SslOptions::from_bits_retain(ffi::SSL_OP_ENABLE_KTLS); - // configure SSL context let mut builder = openssl::ssl::SslConnector::builder(openssl::ssl::SslMethod::tls_client())?; builder.set_options(SSL_OP_ENABLE_KTLS); @@ -43,7 +45,7 @@ impl KtlStream { let ssl = config.into_ssl(server_name.as_ref())?; Ok(KtlStream { - _stream: stream, + stream, ssl, state: State::Connecting, buffer: Vec::with_capacity(4096), @@ -54,7 +56,7 @@ impl KtlStream { where S: AsRawFd, { - let fd = unsafe { BorrowedFd::borrow_raw(self._stream.as_raw_fd()) }; + let fd = unsafe { BorrowedFd::borrow_raw(self.stream.as_raw_fd()) }; Ok(peer_addr(fd)?.is_some()) } @@ -113,7 +115,7 @@ impl KtlStream { impl ConnectionInfoProvider for KtlStream { fn connection_info(&self) -> &ConnectionInfo { - self._stream.connection_info() + self.stream.connection_info() } } @@ -123,7 +125,7 @@ impl Read for KtlStream { State::Connecting => { if self.connected()? { // we intentionally pass BIO_NO_CLOSE to prevent double free on the file descriptor - let sock_bio = unsafe { openssl_sys::BIO_new_socket(self._stream.as_raw_fd(), ffi::BIO_NO_CLOSE) }; + let sock_bio = unsafe { openssl_sys::BIO_new_socket(self.stream.as_raw_fd(), ffi::BIO_NO_CLOSE) }; assert!(!sock_bio.is_null(), "failed to create socket BIO"); unsafe { openssl_sys::SSL_set_bio(self.ssl.as_ptr(), sock_bio, sock_bio); @@ -192,11 +194,46 @@ impl Write for KtlStream { fn flush(&mut self) -> io::Result<()> { match self.state { State::Connecting | State::Handshake | State::Drain(_) => Ok(()), - State::Ready => self._stream.flush(), + State::Ready => self.stream.flush(), } } } +impl Selectable for KtlStream { + #[inline] + fn connected(&mut self) -> io::Result { + self.stream.connected() + } + + #[inline] + fn make_writable(&mut self) -> io::Result<()> { + self.stream.make_writable() + } + + #[inline] + fn make_readable(&mut self) -> io::Result<()> { + self.stream.make_readable() + } +} + +#[cfg(feature = "mio")] +impl Source for KtlStream { + #[inline] + fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> io::Result<()> { + registry.register(&mut self.stream, token, interests) + } + + #[inline] + fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> io::Result<()> { + registry.reregister(&mut self.stream, token, interests) + } + + #[inline] + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + registry.deregister(&mut self.stream) + } +} + pub trait IntoKtlsStream { fn into_ktls_stream(self) -> io::Result> where diff --git a/src/stream/mio.rs b/src/stream/mio.rs index 67d5a1d..cc5e878 100644 --- a/src/stream/mio.rs +++ b/src/stream/mio.rs @@ -1,14 +1,14 @@ //! Stream that can be used together with `MioSelector`. -use std::io::ErrorKind::{Interrupted, NotConnected, WouldBlock}; -use std::io::{Read, Write}; -use std::{io, net}; -use std::os::fd::{AsRawFd, RawFd}; use crate::service::select::Selectable; use crate::stream::{ConnectionInfo, ConnectionInfoProvider}; use mio::event::Source; use mio::net::TcpStream; use mio::{Interest, Registry, Token}; +use std::io::ErrorKind::{Interrupted, NotConnected, WouldBlock}; +use std::io::{Read, Write}; +use std::os::fd::{AsRawFd, RawFd}; +use std::{io, net}; #[derive(Debug)] pub struct MioStream { diff --git a/src/stream/mod.rs b/src/stream/mod.rs index a3f3218..596e676 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -11,6 +11,8 @@ use url::{ParseError, Url}; pub mod buffer; pub mod file; +#[cfg(all(target_os = "linux", feature = "ktls"))] +pub mod ktls; #[cfg(feature = "mio")] pub mod mio; pub mod record; @@ -18,8 +20,6 @@ pub mod replay; pub mod tcp; #[cfg(any(feature = "rustls", feature = "openssl"))] pub mod tls; -#[cfg(all(target_os = "linux", feature = "ktls"))] -pub mod ktls; #[cfg(target_os = "linux")] const EINPROGRESS: i32 = 115; From 8bfc980de7e0138ba1d1637ebe78a546ebcf77f7 Mon Sep 17 00:00:00 2001 From: Tom Brzozowski Date: Fri, 26 Dec 2025 21:57:00 +0000 Subject: [PATCH 5/7] add docs --- src/stream/ktls.rs | 78 +++++++++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 22 deletions(-) diff --git a/src/stream/ktls.rs b/src/stream/ktls.rs index 5ed0590..2d19323 100644 --- a/src/stream/ktls.rs +++ b/src/stream/ktls.rs @@ -1,8 +1,10 @@ +//! Provides TLS offload to the kernel (KTLS). +//! use crate::service::select::Selectable; +use crate::stream::ktls::error::Error; use crate::stream::ktls::net::peer_addr; use crate::stream::tls::TlsConfig; use crate::stream::{ConnectionInfo, ConnectionInfoProvider}; -pub use error::Error; use foreign_types::ForeignType; use mio::event::Source; use mio::{Interest, Registry, Token}; @@ -13,6 +15,21 @@ use std::io::{ErrorKind, Read, Write}; use std::os::fd::{AsRawFd, BorrowedFd}; use std::ptr::slice_from_raw_parts; +/// Offloads TLS to the kernel (KTLS). Uses OpenSSL backend to configure KTLS post handshake (can change in the future). +/// The stream is designed to work with a non-blocking underlying stream. +/// +/// ## Prerequisites +/// Ensure that `tls` kernel module is installed. Otherwise, the code will panic if either KTLS +/// `send` or `recv` are not enabled. This is the minimum required to enable KTLS in the +/// software mode. +/// +/// ## Example +/// ```no_run +/// use boomnet::stream::tcp::TcpStream; +/// use crate::boomnet::stream::ktls::IntoKtlsStream; +/// +/// let ktls_stream = TcpStream::try_from(("fstream.binance.com", 443)).unwrap().into_ktls_stream().unwrap(); +/// ``` pub struct KtlStream { stream: S, ssl: openssl::ssl::Ssl, @@ -21,6 +38,7 @@ pub struct KtlStream { } impl KtlStream { + /// Create KTLS from underlying stream using default [`TlsConfig`]. pub fn new(stream: S, server_name: impl AsRef) -> io::Result> where S: AsRawFd, @@ -28,6 +46,8 @@ impl KtlStream { Self::new_with_config(stream, server_name, |_| ()) } + /// Create KTLS from underlying stream. This method also requires an action used + /// further configure [`TlsConfig`]. pub fn new_with_config(stream: S, server_name: impl AsRef, configure: F) -> io::Result> where S: AsRawFd, @@ -52,6 +72,7 @@ impl KtlStream { }) } + #[inline] fn connected(&self) -> io::Result where S: AsRawFd, @@ -60,10 +81,11 @@ impl KtlStream { Ok(peer_addr(fd)?.is_some()) } - fn ssl_connect(&self) -> Result<(), error::Error> { + #[inline] + fn ssl_connect(&self) -> Result<(), Error> { let result = unsafe { openssl_sys::SSL_connect(self.ssl.as_ptr()) }; if result <= 0 { - Err(error::Error::make(result, &self.ssl)) + Err(Error::make(result, &self.ssl)) } else { Ok(()) } @@ -84,7 +106,7 @@ impl KtlStream { } #[inline] - fn ssl_read(&mut self, buf: &mut [u8]) -> Result { + fn ssl_read(&mut self, buf: &mut [u8]) -> Result { unsafe { let len = openssl_sys::SSL_read(self.ssl.as_ptr(), buf.as_mut_ptr() as *mut _, buf.len().try_into().unwrap()); @@ -113,6 +135,14 @@ impl KtlStream { } } +#[derive(Copy, Clone)] +enum State { + Connecting, + Handshake, + Drain(usize), + Ready, +} + impl ConnectionInfoProvider for KtlStream { fn connection_info(&self) -> &ConnectionInfo { self.stream.connection_info() @@ -135,8 +165,8 @@ impl Read for KtlStream { } State::Handshake => match self.ssl_connect() { Ok(_) => { - assert!(self.ktls_recv_enabled(), "ktls recv not enabled"); - assert!(self.ktls_send_enabled(), "ktls send not enabled"); + assert!(self.ktls_recv_enabled(), "ktls recv not enabled, did you install 'tls' kernel module?"); + assert!(self.ktls_send_enabled(), "ktls send not enabled, did you install 'tls' kernel module?"); self.state = State::Drain(0) } Err(err) if err.code() == ErrorCode::WANT_READ => {} @@ -145,7 +175,6 @@ impl Read for KtlStream { }, State::Drain(index) => { let mut from = index; - // let remaining = &self.buffer[from..]; let remaining = unsafe { &*slice_from_raw_parts(self.buffer.as_ptr().add(from), self.buffer.len() - from) }; if remaining.is_empty() { @@ -234,7 +263,17 @@ impl Source for KtlStream { } } +/// Trait to convert underlying stream into [`KtlStream`]. pub trait IntoKtlsStream { + /// Convert underlying stream into [`KtlStream`] with default tls config. + /// + /// ## Examples + /// ```no_run + /// use boomnet::stream::tcp::TcpStream; + /// use boomnet::stream::ktls::IntoKtlsStream; + /// + /// let ktls = TcpStream::try_from(("127.0.0.1", 4222)).unwrap().into_ktls_stream().unwrap(); + /// ``` fn into_ktls_stream(self) -> io::Result> where Self: Sized, @@ -242,6 +281,16 @@ pub trait IntoKtlsStream { self.into_ktls_stream_with_config(|_| ()) } + /// Convert underlying stream into [`KtlStream`] and use provided action to modify tls config. + /// + /// ## Examples + /// ```no_run + /// use boomnet::stream::tcp::TcpStream; + /// use boomnet::stream::ktls::IntoKtlsStream; + /// use boomnet::stream::tls::TlsConfigExt; + /// + /// let ktls = TcpStream::try_from(("127.0.0.1", 4222)).unwrap().into_ktls_stream_with_config(|cfg| cfg.with_no_cert_verification()).unwrap(); + /// ``` fn into_ktls_stream_with_config(self, builder: F) -> io::Result> where Self: Sized, @@ -262,14 +311,6 @@ where } } -#[derive(Copy, Clone)] -enum State { - Connecting, - Handshake, - Drain(usize), - Ready, -} - mod error { use crate::util::NoBlock; use foreign_types::ForeignTypeRef; @@ -301,13 +342,6 @@ mod error { } } - pub fn into_io_error(self) -> Result { - match self.cause { - Some(InnerError::Io(e)) => Ok(e), - _ => Err(self), - } - } - pub fn ssl_error(&self) -> Option<&ErrorStack> { match self.cause { Some(InnerError::Ssl(ref e)) => Some(e), From 54f4e80f2938ccdee8f0d7465f67ada9f2f0c04e Mon Sep 17 00:00:00 2001 From: Tom Brzozowski Date: Fri, 26 Dec 2025 21:58:58 +0000 Subject: [PATCH 6/7] uddate CI --- .github/workflows/rust.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index bae0f6a..9f802c9 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -21,6 +21,7 @@ jobs: matrix: features: - "ext,http,ws,mio,openssl" + - "ext,http,ws,mio,ktls" - "ext,http,ws,mio,rustls-webpki" - "ext,http,ws,mio,rustls-native" steps: @@ -34,6 +35,7 @@ jobs: matrix: features: - "ext,http,ws,mio,openssl" + - "ext,http,ws,mio,ktls" - "ext,http,ws,mio,rustls-webpki" - "ext,http,ws,mio,rustls-native" steps: @@ -76,6 +78,7 @@ jobs: matrix: features: - "ext,http,ws,mio,openssl" + - "ext,http,ws,mio,ktls" - "ext,http,ws,mio,rustls-webpki" - "ext,http,ws,mio,rustls-native" steps: From 51f442b1e85876d0030910e62b591a8b775988ea Mon Sep 17 00:00:00 2001 From: Tom Brzozowski Date: Fri, 26 Dec 2025 22:16:21 +0000 Subject: [PATCH 7/7] fix examples --- examples/io_service_with_context_ktls.rs | 46 ++++++++++++++++-------- examples/ws_client_ktls.rs | 18 +++++++--- src/stream/tls.rs | 6 ++-- 3 files changed, 49 insertions(+), 21 deletions(-) diff --git a/examples/io_service_with_context_ktls.rs b/examples/io_service_with_context_ktls.rs index eb0c3a7..07aa324 100644 --- a/examples/io_service_with_context_ktls.rs +++ b/examples/io_service_with_context_ktls.rs @@ -1,22 +1,30 @@ -use boomnet::service::IntoIOService; -use boomnet::service::endpoint::{DisconnectReason, Endpoint}; -use boomnet::service::select::Selectable; -use boomnet::service::select::mio::MioSelector; -use boomnet::stream::ktls::{IntoKtlsStream, KtlStream}; -use boomnet::stream::mio::{IntoMioStream, MioStream}; -use boomnet::stream::tcp::TcpStream; -use boomnet::stream::tls::TlsConfigExt; -use boomnet::stream::{ConnectionInfo, ConnectionInfoProvider}; -use boomnet::ws::{IntoWebsocket, Websocket, WebsocketFrame}; -use mio::event::Source; -use mio::{Interest, Registry, Token}; -use std::net::SocketAddr; -use std::time::Duration; +#[cfg(feature = "ktls")] +mod deps { + pub use boomnet::service::IntoIOService; + pub use boomnet::service::endpoint::{DisconnectReason, Endpoint}; + pub use boomnet::service::select::Selectable; + pub use boomnet::service::select::mio::MioSelector; + pub use boomnet::stream::ktls::{IntoKtlsStream, KtlStream}; + pub use boomnet::stream::mio::{IntoMioStream, MioStream}; + pub use boomnet::stream::tcp::TcpStream; + pub use boomnet::stream::tls::TlsConfigExt; + pub use boomnet::stream::{ConnectionInfo, ConnectionInfoProvider}; + pub use boomnet::ws::{IntoWebsocket, Websocket, WebsocketFrame}; + pub use mio::event::Source; + pub use mio::{Interest, Registry, Token}; + pub use std::net::SocketAddr; + pub use std::time::Duration; +} + +#[cfg(feature = "ktls")] +use deps::*; +#[cfg(feature = "ktls")] struct TradeConnectionFactory { connection_info: ConnectionInfo, } +#[cfg(feature = "ktls")] impl TradeConnectionFactory { fn new() -> Self { Self { @@ -25,10 +33,12 @@ impl TradeConnectionFactory { } } +#[cfg(feature = "ktls")] struct TradeConnection { ws: Websocket>, } +#[cfg(feature = "ktls")] impl TradeConnection { fn do_work(&mut self) -> std::io::Result<()> { for frame in self.ws.read_batch()? { @@ -40,6 +50,7 @@ impl TradeConnection { } } +#[cfg(feature = "ktls")] impl Selectable for TradeConnection { fn connected(&mut self) -> std::io::Result { self.ws.connected() @@ -54,6 +65,7 @@ impl Selectable for TradeConnection { } } +#[cfg(feature = "ktls")] impl Source for TradeConnection { fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> { self.ws.register(registry, token, interests) @@ -68,12 +80,14 @@ impl Source for TradeConnection { } } +#[cfg(feature = "ktls")] impl ConnectionInfoProvider for TradeConnectionFactory { fn connection_info(&self) -> &ConnectionInfo { &self.connection_info } } +#[cfg(feature = "ktls")] impl Endpoint for TradeConnectionFactory { type Target = TradeConnection; @@ -94,6 +108,7 @@ impl Endpoint for TradeConnectionFactory { } } +#[cfg(feature = "ktls")] fn main() -> anyhow::Result<()> { let mut io_service = MioSelector::new()? .into_io_service() @@ -105,3 +120,6 @@ fn main() -> anyhow::Result<()> { io_service.poll(|conn, _| conn.do_work())?; } } + +#[cfg(not(feature = "ktls"))] +fn main() {} diff --git a/examples/ws_client_ktls.rs b/examples/ws_client_ktls.rs index 8b55f84..7f49247 100644 --- a/examples/ws_client_ktls.rs +++ b/examples/ws_client_ktls.rs @@ -1,8 +1,15 @@ -use boomnet::stream::ktls::IntoKtlsStream; -use boomnet::stream::tcp::TcpStream; -use boomnet::stream::tls::TlsConfigExt; -use boomnet::ws::{IntoWebsocket, WebsocketFrame}; +#[cfg(feature = "ktls")] +mod deps { + pub use boomnet::stream::ktls::IntoKtlsStream; + pub use boomnet::stream::tcp::TcpStream; + pub use boomnet::stream::tls::TlsConfigExt; + pub use boomnet::ws::{IntoWebsocket, WebsocketFrame}; +} + +#[cfg(feature = "ktls")] +use deps::*; +#[cfg(feature = "ktls")] fn main() -> anyhow::Result<()> { let mut ws = TcpStream::try_from(("fstream.binance.com", 443))? .into_ktls_stream_with_config(|cfg| cfg.with_no_cert_verification())? @@ -19,3 +26,6 @@ fn main() -> anyhow::Result<()> { std::thread::sleep(std::time::Duration::from_millis(1)); } } + +#[cfg(not(feature = "ktls"))] +fn main() {} diff --git a/src/stream/tls.rs b/src/stream/tls.rs index 9d571be..9980e03 100644 --- a/src/stream/tls.rs +++ b/src/stream/tls.rs @@ -204,7 +204,7 @@ mod __rustls { } impl TlsStream { - pub fn wrap_with_config(stream: S, server_name: &str, builder: F) -> io::Result> + pub fn new_with_config(stream: S, server_name: &str, builder: F) -> io::Result> where F: FnOnce(&mut TlsConfig), { @@ -238,8 +238,8 @@ mod __rustls { Ok(Self { inner: stream, tls }) } - pub fn wrap(stream: S, server_name: &str) -> io::Result> { - Self::wrap_with_config(stream, server_name, |_| {}) + pub fn new(stream: S, server_name: &str) -> io::Result> { + Self::new_with_config(stream, server_name, |_| {}) } fn complete_io(&mut self) -> io::Result<(usize, usize)> {