Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions benches/latency/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,21 @@ impl EndpointWithContext<TestContext> for TestEndpoint {
.into_websocket("/");
Ok(Some(ws))
}
}

impl TestEndpoint {
pub fn new(port: u16, payload: &'static str) -> Self {
Self {
connection_info: ConnectionInfo::new("127.0.0.1", port),
payload,
}
}

fn poll(&mut self, ws: &mut Self::Target, ctx: &mut TestContext) -> std::io::Result<()> {
pub fn poll(
&mut self,
ws: &mut <Self as EndpointWithContext<TestContext>>::Target,
ctx: &mut TestContext,
) -> std::io::Result<()> {
if ctx.wants_write {
ws.send_text(true, Some(self.payload.as_bytes()))?;
ctx.wants_write = false;
Expand All @@ -59,12 +72,3 @@ impl EndpointWithContext<TestContext> for TestEndpoint {
Ok(())
}
}

impl TestEndpoint {
pub fn new(port: u16, payload: &'static str) -> Self {
Self {
connection_info: ConnectionInfo::new("127.0.0.1", port),
payload,
}
}
}
6 changes: 4 additions & 2 deletions benches/latency/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ fn boomnet_rtt_benchmark_io_service(c: &mut Criterion) {

// setup io service
let mut ctx = TestContext::new();
let mut io_service = DirectSelector::new().unwrap().into_io_service_with_context(&mut ctx);
let mut io_service = DirectSelector::new().unwrap().into_io_service_with_context();
io_service.register(TestEndpoint::new(9003, MSG)).unwrap();

group.bench_function("boomnet_rtt_io_service", |b| {
b.iter(|| {
loop {
io_service.poll(&mut ctx).unwrap();
io_service
.poll(&mut ctx, |ws, ctx, endpoint| endpoint.poll(ws, ctx))
.unwrap();
if ctx.processed == 100 {
ctx.wants_write = true;
ctx.processed = 0;
Expand Down
70 changes: 38 additions & 32 deletions examples/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,44 @@ impl TradeEndpoint {
)?;
Ok(())
}

#[inline]
#[allow(dead_code)]
pub fn poll(&mut self, ws: &mut TlsWebsocket<<Self as TlsWebsocketEndpoint>::Stream>) -> io::Result<()> {
for frame in ws.read_batch()? {
if let WebsocketFrame::Text(fin, data) = frame? {
match self.id % 4 {
0 => info!("({fin}) {}", Red.paint(String::from_utf8_lossy(data))),
1 => info!("({fin}) {}", Green.paint(String::from_utf8_lossy(data))),
2 => info!("({fin}) {}", Purple.paint(String::from_utf8_lossy(data))),
3 => info!("({fin}) {}", Yellow.paint(String::from_utf8_lossy(data))),
_ => {}
}
}
}
Ok(())
}

#[inline]
#[allow(dead_code)]
pub fn poll_ctx(
&mut self,
ws: &mut TlsWebsocket<<Self as TlsWebsocketEndpoint>::Stream>,
_ctx: &mut FeedContext,
) -> io::Result<()> {
for frame in ws.read_batch()? {
if let WebsocketFrame::Text(fin, data) = frame? {
match self.id % 4 {
0 => info!("({fin}) {}", Red.paint(String::from_utf8_lossy(data))),
1 => info!("({fin}) {}", Green.paint(String::from_utf8_lossy(data))),
2 => info!("({fin}) {}", Purple.paint(String::from_utf8_lossy(data))),
3 => info!("({fin}) {}", Yellow.paint(String::from_utf8_lossy(data))),
_ => {}
}
}
}
Ok(())
}
}

impl ConnectionInfoProvider for TradeEndpoint {
Expand All @@ -85,22 +123,6 @@ impl TlsWebsocketEndpoint for TradeEndpoint {
Ok(Some(ws))
}

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>) -> io::Result<()> {
for frame in ws.read_batch()? {
if let WebsocketFrame::Text(fin, data) = frame? {
match self.id % 4 {
0 => info!("({fin}) {}", Red.paint(String::from_utf8_lossy(data))),
1 => info!("({fin}) {}", Green.paint(String::from_utf8_lossy(data))),
2 => info!("({fin}) {}", Purple.paint(String::from_utf8_lossy(data))),
3 => info!("({fin}) {}", Yellow.paint(String::from_utf8_lossy(data))),
_ => {}
}
}
}
Ok(())
}

fn can_recreate(&mut self, reason: DisconnectReason) -> bool {
warn!("connection disconnected: {reason}");
true
Expand All @@ -125,20 +147,4 @@ impl TlsWebsocketEndpointWithContext<FeedContext> for TradeEndpoint {

Ok(Some(ws))
}

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>, _ctx: &mut FeedContext) -> io::Result<()> {
for frame in ws.read_batch()? {
if let WebsocketFrame::Text(fin, data) = frame? {
match self.id % 4 {
0 => info!("({fin}) {}", Red.paint(String::from_utf8_lossy(data))),
1 => info!("({fin}) {}", Green.paint(String::from_utf8_lossy(data))),
2 => info!("({fin}) {}", Purple.paint(String::from_utf8_lossy(data))),
3 => info!("({fin}) {}", Yellow.paint(String::from_utf8_lossy(data))),
_ => {}
}
}
}
Ok(())
}
}
35 changes: 19 additions & 16 deletions examples/endpoint_with_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@ impl TradeEndpoint {
next_disconnect_time_ns: ctx.current_time_ns() + Duration::from_secs(10).as_nanos() as u64,
}
}

#[inline]
fn poll(
&mut self,
ws: &mut TlsWebsocket<<Self as TlsWebsocketEndpointWithContext<FeedContext>>::Stream>,
ctx: &mut FeedContext,
) -> io::Result<()> {
while let Some(Ok(WebsocketFrame::Text(fin, data))) = ws.receive_next() {
info!("({fin}) {}", String::from_utf8_lossy(data));
}
let now_ns = ctx.current_time_ns();
if now_ns > self.next_disconnect_time_ns {
self.next_disconnect_time_ns = now_ns + Duration::from_secs(10).as_nanos() as u64;
return Err(io::Error::other("disconnected due to timer"));
}
Ok(())
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -75,33 +92,19 @@ impl TlsWebsocketEndpointWithContext<FeedContext> for TradeEndpoint {

Ok(Some(ws))
}

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>, ctx: &mut FeedContext) -> io::Result<()> {
while let Some(Ok(WebsocketFrame::Text(fin, data))) = ws.receive_next() {
info!("({fin}) {}", String::from_utf8_lossy(data));
}
let now_ns = ctx.current_time_ns();
if now_ns > self.next_disconnect_time_ns {
self.next_disconnect_time_ns = now_ns + Duration::from_secs(10).as_nanos() as u64;
return Err(io::Error::other("disconnected due to timer"));
}
Ok(())
}
}

fn main() -> anyhow::Result<()> {
env_logger::init();

let mut ctx = FeedContext::new();

let mut io_service = MioSelector::new()?.into_io_service_with_context(&mut ctx);
let mut io_service = MioSelector::new()?.into_io_service_with_context();

let endpoint_btc = TradeEndpoint::new("wss://stream1.binance.com:443/ws", "btcusdt", &ctx);

io_service.register(endpoint_btc)?;

loop {
io_service.poll(&mut ctx)?;
io_service.poll(&mut ctx, |ws, ctx, endpoint| endpoint.poll(ws, ctx))?;
}
}
4 changes: 2 additions & 2 deletions examples/io_service_dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ fn main() -> anyhow::Result<()> {
if success {
break;
} else {
io_service.poll()?;
io_service.poll(|ws, endpoint| endpoint.poll(ws))?;
}
}

loop {
io_service.poll()?;
io_service.poll(|ws, endpoint| endpoint.poll(ws))?;
}
}
6 changes: 3 additions & 3 deletions examples/io_service_with_async_dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ mod common;
fn main() -> anyhow::Result<()> {
env_logger::init();

let mut context = FeedContext::new();
let mut ctx = FeedContext::new();

let mut io_service = MioSelector::new()?
.into_io_service_with_context(&mut context)
.into_io_service_with_context()
.with_dns_resolver(AsyncDnsResolver::new()?);

let endpoint_btc = TradeEndpoint::new(0, "wss://stream1.binance.com:443/ws", None, "btcusdt");
Expand All @@ -24,6 +24,6 @@ fn main() -> anyhow::Result<()> {
io_service.register(endpoint_xrp)?;

loop {
io_service.poll(&mut context)?;
io_service.poll(&mut ctx, |ws, ctx, endpoint| endpoint.poll_ctx(ws, ctx))?;
}
}
2 changes: 1 addition & 1 deletion examples/io_service_with_auto_disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ fn main() -> anyhow::Result<()> {
io_service.register(endpoint_btc_2)?;

loop {
io_service.poll()?;
io_service.poll(|ws, endpoint| endpoint.poll(ws))?;
}
}
6 changes: 3 additions & 3 deletions examples/io_service_with_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ mod common;
fn main() -> anyhow::Result<()> {
env_logger::init();

let mut context = FeedContext::new();
let mut ctx = FeedContext::new();

let mut io_service = MioSelector::new()?.into_io_service_with_context(&mut context);
let mut io_service = MioSelector::new()?.into_io_service_with_context();

let endpoint_btc = TradeEndpoint::new(0, "wss://stream1.binance.com:443/ws", None, "btcusdt");
let endpoint_eth = TradeEndpoint::new(1, "wss://stream2.binance.com:443/ws", None, "ethusdt");
Expand All @@ -21,6 +21,6 @@ fn main() -> anyhow::Result<()> {
io_service.register(endpoint_xrp)?;

loop {
io_service.poll(&mut context)?;
io_service.poll(&mut ctx, |ws, ctx, endpoint| endpoint.poll_ctx(ws, ctx))?;
}
}
22 changes: 11 additions & 11 deletions examples/io_service_with_direct_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ impl TradeEndpoint {
ws_endpoint,
}
}

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<<Self as TlsWebsocketEndpoint>::Stream>) -> io::Result<()> {
for frame in ws.read_batch()? {
if let WebsocketFrame::Text(fin, data) = frame? {
println!("[{}] ({fin}) {}", self.id, String::from_utf8_lossy(data))
}
}
Ok(())
}
}

impl ConnectionInfoProvider for TradeEndpoint {
Expand All @@ -57,16 +67,6 @@ impl TlsWebsocketEndpoint for TradeEndpoint {

Ok(Some(ws))
}

#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>) -> io::Result<()> {
for frame in ws.read_batch()? {
if let WebsocketFrame::Text(fin, data) = frame? {
println!("[{}] ({fin}) {}", self.id, String::from_utf8_lossy(data))
}
}
Ok(())
}
}

fn main() -> anyhow::Result<()> {
Expand All @@ -83,6 +83,6 @@ fn main() -> anyhow::Result<()> {
io_service.register(endpoint_xrp)?;

loop {
io_service.poll()?;
io_service.poll(|ws, endpoint| endpoint.poll(ws))?;
}
}
2 changes: 1 addition & 1 deletion examples/io_service_without_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ fn main() -> anyhow::Result<()> {
io_service.register(endpoint_xrp)?;

loop {
io_service.poll()?;
io_service.poll(|ws, endpoint| endpoint.poll(ws))?;
}
}
Loading
Loading