Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ jobs:
matrix:
features:
- "ext,http,ws,mio,openssl"
- "ext,http,ws,mio,ktls"
- "ext,http,ws,mio,rustls-webpki"
- "ext,http,ws,mio,rustls-native"
steps:
Expand All @@ -34,6 +35,7 @@ jobs:
matrix:
features:
- "ext,http,ws,mio,openssl"
- "ext,http,ws,mio,ktls"
- "ext,http,ws,mio,rustls-webpki"
- "ext,http,ws,mio,rustls-native"
steps:
Expand Down Expand Up @@ -76,6 +78,7 @@ jobs:
matrix:
features:
- "ext,http,ws,mio,openssl"
- "ext,http,ws,mio,ktls"
- "ext,http,ws,mio,rustls-webpki"
- "ext,http,ws,mio,rustls-native"
steps:
Expand Down
10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mio = ["dep:mio"]
rustls-native = ["rustls", "rustls-native-certs"]
rustls-webpki = ["rustls", "webpki-roots"]
openssl = ["dep:openssl", "dep:openssl-probe"]
ktls = ["openssl", "dep:openssl-sys", "dep:foreign-types", "dep:libc", "dep:openssl-src"]
http = ["dep:http", "httparse", "memchr", "itoa"]
ws = ["rand", "base64", "dep:http", "httparse"]
ext = []
Expand All @@ -43,6 +44,9 @@ smallvec = "1.15.0"
smallstr = "0.3.1"
core_affinity = "0.8.3"
log = "0.4.20"
openssl-sys = { version = "0.9", optional = true }
foreign-types = { version = "0.3.1", optional = true }
libc = { version = "0.2", optional = true }

[dependencies.webpki-roots]
version = "0.26.0"
Expand All @@ -60,6 +64,12 @@ tungstenite = "0.28.0"
criterion = "0.5.1"
idle = "0.2.0"

[build-dependencies.openssl-src]
version = "300"
features = ["ktls"]
optional = true
default-features = false

[lints.clippy]
uninit_assumed_init = "allow"
mem_replace_with_uninit = "allow"
Expand Down
125 changes: 125 additions & 0 deletions examples/io_service_with_context_ktls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#[cfg(feature = "ktls")]
mod deps {
pub use boomnet::service::IntoIOService;
pub use boomnet::service::endpoint::{DisconnectReason, Endpoint};
pub use boomnet::service::select::Selectable;
pub use boomnet::service::select::mio::MioSelector;
pub use boomnet::stream::ktls::{IntoKtlsStream, KtlStream};
pub use boomnet::stream::mio::{IntoMioStream, MioStream};
pub use boomnet::stream::tcp::TcpStream;
pub use boomnet::stream::tls::TlsConfigExt;
pub use boomnet::stream::{ConnectionInfo, ConnectionInfoProvider};
pub use boomnet::ws::{IntoWebsocket, Websocket, WebsocketFrame};
pub use mio::event::Source;
pub use mio::{Interest, Registry, Token};
pub use std::net::SocketAddr;
pub use std::time::Duration;
}

#[cfg(feature = "ktls")]
use deps::*;

#[cfg(feature = "ktls")]
struct TradeConnectionFactory {
connection_info: ConnectionInfo,
}

#[cfg(feature = "ktls")]
impl TradeConnectionFactory {
fn new() -> Self {
Self {
connection_info: ("fstream.binance.com", 443).into(),
}
}
}

#[cfg(feature = "ktls")]
struct TradeConnection {
ws: Websocket<KtlStream<MioStream>>,
}

#[cfg(feature = "ktls")]
impl TradeConnection {
fn do_work(&mut self) -> std::io::Result<()> {
for frame in self.ws.read_batch()? {
if let WebsocketFrame::Text(fin, body) = frame? {
println!("({fin}) {}", String::from_utf8_lossy(body));
}
}
Ok(())
}
}

#[cfg(feature = "ktls")]
impl Selectable for TradeConnection {
fn connected(&mut self) -> std::io::Result<bool> {
self.ws.connected()
}

fn make_writable(&mut self) -> std::io::Result<()> {
self.ws.make_writable()
}

fn make_readable(&mut self) -> std::io::Result<()> {
self.ws.make_readable()
}
}

#[cfg(feature = "ktls")]
impl Source for TradeConnection {
fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> {
self.ws.register(registry, token, interests)
}

fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> {
self.ws.reregister(registry, token, interests)
}

fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
self.ws.deregister(registry)
}
}

#[cfg(feature = "ktls")]
impl ConnectionInfoProvider for TradeConnectionFactory {
fn connection_info(&self) -> &ConnectionInfo {
&self.connection_info
}
}

#[cfg(feature = "ktls")]
impl Endpoint for TradeConnectionFactory {
type Target = TradeConnection;

fn create_target(&mut self, addr: SocketAddr) -> std::io::Result<Option<Self::Target>> {
let mut ws = TcpStream::try_from((&self.connection_info, addr))?
.into_mio_stream()
.into_ktls_stream_with_config(|cfg| cfg.with_no_cert_verification())?
.into_websocket("/ws");

ws.send_text(true, Some(b"{\"method\":\"SUBSCRIBE\",\"params\":[\"btcusdt@trade\"],\"id\":1}"))?;

Ok(Some(TradeConnection { ws }))
}

fn can_recreate(&mut self, reason: DisconnectReason) -> bool {
println!("on disconnect: reason={}", reason);
true
}
}

#[cfg(feature = "ktls")]
fn main() -> anyhow::Result<()> {
let mut io_service = MioSelector::new()?
.into_io_service()
.with_auto_disconnect(Duration::from_secs(10));

io_service.register(TradeConnectionFactory::new())?;

loop {
io_service.poll(|conn, _| conn.do_work())?;
}
}

#[cfg(not(feature = "ktls"))]
fn main() {}
31 changes: 31 additions & 0 deletions examples/ws_client_ktls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#[cfg(feature = "ktls")]
mod deps {
pub use boomnet::stream::ktls::IntoKtlsStream;
pub use boomnet::stream::tcp::TcpStream;
pub use boomnet::stream::tls::TlsConfigExt;
pub use boomnet::ws::{IntoWebsocket, WebsocketFrame};
}

#[cfg(feature = "ktls")]
use deps::*;

#[cfg(feature = "ktls")]
fn main() -> anyhow::Result<()> {
let mut ws = TcpStream::try_from(("fstream.binance.com", 443))?
.into_ktls_stream_with_config(|cfg| cfg.with_no_cert_verification())?
.into_websocket("/ws");

ws.send_text(true, Some(b"{\"method\":\"SUBSCRIBE\",\"params\":[\"btcusdt@trade\"],\"id\":1}"))?;

loop {
for frame in ws.read_batch()? {
if let WebsocketFrame::Text(fin, body) = frame? {
println!("({fin}) {}", String::from_utf8_lossy(body));
}
}
std::thread::sleep(std::time::Duration::from_millis(1));
}
}

#[cfg(not(feature = "ktls"))]
fn main() {}
18 changes: 8 additions & 10 deletions src/service/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ pub trait Endpoint: ConnectionInfoProvider {
/// await the next connection attempt with (possibly) different `addr`.
fn create_target(&mut self, addr: SocketAddr) -> io::Result<Option<Self::Target>>;

// /// Called by the `IOService` on each duty cycle.
// fn poll(&mut self, target: &mut Self::Target) -> io::Result<()>;

/// Upon disconnection `IOService` will query the endpoint if the connection can be
/// recreated, passing the disconnect `reason`. If not, it will cause program to panic.
/// recreated, passing the disconnect `reason`. If `false` is returned it will cause
/// program to panic.
fn can_recreate(&mut self, _reason: DisconnectReason) -> bool {
true
}
Expand Down Expand Up @@ -49,7 +47,8 @@ pub trait EndpointWithContext<C>: ConnectionInfoProvider {
fn create_target(&mut self, addr: SocketAddr, context: &mut C) -> io::Result<Option<Self::Target>>;

/// Upon disconnection `IOService` will query the endpoint if the connection can be
/// recreated, passing the disconnect `reason`. If not, it will cause program to panic.
/// recreated, passing the disconnect `reason`. If `false` is returned it will cause
/// program to panic.
fn can_recreate(&mut self, _reason: DisconnectReason, _context: &mut C) -> bool {
true
}
Expand All @@ -66,9 +65,8 @@ pub trait EndpointWithContext<C>: ConnectionInfoProvider {
pub enum DisconnectReason {
/// This is expected disconnection due to `ttl` on the connection expiring.
AutoDisconnect(Duration),
/// Some other IO error has occurred such as reaching EOF or peer disconnect. It's normally
/// ok to try and connect again.
Other(io::Error),
/// IO error has occurred such as reaching EOF or peer disconnect.
IO(io::Error),
}

impl Display for DisconnectReason {
Expand All @@ -78,7 +76,7 @@ impl Display for DisconnectReason {
write!(f, "auto-disconnect after ")?;
ttl.fmt(f)
}
DisconnectReason::Other(err) => {
DisconnectReason::IO(err) => {
write!(f, "{err}")
}
}
Expand All @@ -91,7 +89,7 @@ impl DisconnectReason {
}

pub(crate) fn other(err: io::Error) -> DisconnectReason {
DisconnectReason::Other(err)
DisconnectReason::IO(err)
}
}

Expand Down
Loading
Loading