From 7f5611375ac3d8b4145f897c72ede794eede5d5a Mon Sep 17 00:00:00 2001 From: Naman Garg <0708ng@gmail.com> Date: Tue, 14 Jan 2025 01:42:15 +0530 Subject: [PATCH] feat: orderbook streaming --- src/lib.rs | 5 +- src/orderbooks.rs | 182 +++++++++++++++++++++++++++++++++++++++++ src/types/mod.rs | 7 +- src/types/orderbook.rs | 61 ++++++++++++++ 4 files changed, 251 insertions(+), 4 deletions(-) create mode 100644 src/orderbooks.rs create mode 100644 src/types/orderbook.rs diff --git a/src/lib.rs b/src/lib.rs index 0611921..da136d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ -pub mod telemetry; +pub mod orderbooks; +pub mod price_data; pub mod prices; +pub mod telemetry; pub mod types; -pub mod price_data; diff --git a/src/orderbooks.rs b/src/orderbooks.rs new file mode 100644 index 0000000..b223be6 --- /dev/null +++ b/src/orderbooks.rs @@ -0,0 +1,182 @@ +use std::{collections::HashMap, thread::sleep}; + +use anyhow::Error; +use hyperliquid_rust_sdk::{BaseUrl, InfoClient, Message, Subscription}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver}, + watch, +}; +use tracing::{error, info}; + +use crate::types::{NameToOrderbookMap, Orderbook}; + +/// Manages a websocket connection for streaming L2 orderbook data for a single coin +pub struct OrderbookStream { + info_client: InfoClient, + orderbook_receiver: UnboundedReceiver, + sub_id: u32, + coin: String, +} + +impl OrderbookStream { + /// Creates a new orderbook stream for the specified coin + /// Initializes the websocket connection and subscribes to L2 book updates + pub async fn new(coin: String) -> Result { + let mut info_client = InfoClient::new(None, Some(BaseUrl::Mainnet)).await?; + + let (sender, receiver) = unbounded_channel(); + let sub_id = info_client + .subscribe(Subscription::L2Book { coin: coin.clone() }, sender) + .await?; + + Ok(OrderbookStream { + info_client, + orderbook_receiver: receiver, + sub_id, + coin, + }) + } + + /// Starts processing the orderbook stream and sending updates through the watch channel + /// Handles parsing of raw level data and maintains the current orderbook state + pub async fn start_sending( + &mut self, + sender: watch::Sender, + ) -> Result<(), Error> { + let mut orderbook = Orderbook::new(self.coin.clone()); + let mut orderbook_map = HashMap::new(); + orderbook_map.insert(self.coin.clone(), orderbook.clone()); + + let mut i = 0; + + // Every 20 hours, we will reconnect to avoid limits + while i < 100_000 { + match self.orderbook_receiver.recv().await { + Some(msg) => match msg { + Message::L2Book(order_book) => { + // Parse bid levels from raw string data + let bids: Vec<(f64, f64)> = order_book.data.levels[0] + .iter() + .filter_map(|level| { + let px = level.px.parse().ok()?; + let sz = level.sz.parse().ok()?; + Some((px, sz)) + }) + .collect(); + + // Parse ask levels from raw string data + let asks: Vec<(f64, f64)> = order_book.data.levels[1] + .iter() + .filter_map(|level| { + let px = level.px.parse().ok()?; + let sz = level.sz.parse().ok()?; + Some((px, sz)) + }) + .collect(); + + orderbook.update_from_stream(bids, asks); + orderbook_map.insert(self.coin.clone(), orderbook.clone()); + sender.send(orderbook_map.clone())?; + } + Message::NoData => { + error!("No orderbook data received"); + } + Message::HyperliquidError(err) => { + error!("Hyperliquid error while getting orderbook data: {err:?}"); + } + _ => { + tracing::debug!("Received message: {:?}", msg); + } + }, + None => { + error!("Failed to receive orderbook data"); + break; + } + } + + sleep(std::time::Duration::from_millis(800)); + i += 1; + } + + Ok(()) + } + + /// Unsubscribes from the L2 book websocket stream + pub async fn unsub(&mut self) -> anyhow::Result<()> { + Ok(self.info_client.unsubscribe(self.sub_id).await?) + } +} + +/// Starts a background task that maintains an orderbook stream for the specified coin +/// Returns a receiver that provides real-time orderbook updates +/// The task will automatically reconnect if the connection is lost +pub async fn start_orderbook_stream_task( + coin: String, +) -> anyhow::Result> { + let (orderbook_sender, orderbook_recv) = watch::channel(HashMap::new()); + + let coin_clone = coin.clone(); + tokio::spawn(async move { + let o_s = orderbook_sender; + loop { + info!("orderbook_stream_task: Starting for {}", coin_clone); + let mut new_stream = OrderbookStream::new(coin_clone.clone()).await.unwrap(); + match new_stream.start_sending(o_s.clone()).await { + Ok(_) => {} + Err(err) => { + error!("orderbook_stream_task: Error: {err:?}"); + } + }; + info!("orderbook_stream_task: Resetting..."); + + let _ = new_stream.unsub().await; + sleep(std::time::Duration::from_secs(5)); + } + }); + + Ok(orderbook_recv) +} + +#[cfg(test)] +mod tests { + use std::sync::Once; + use tracing_subscriber::{fmt, EnvFilter}; + + use super::*; + + static INIT: Once = Once::new(); + + /// Initialize tracing subscriber for tests with debug level enabled + fn init_logger() { + INIT.call_once(|| { + fmt() + .with_env_filter( + EnvFilter::from_default_env().add_directive(tracing::Level::DEBUG.into()), + ) + .with_test_writer() + .init(); + }); + } + + #[tokio::test] + async fn orderbook_data_is_being_sent() -> anyhow::Result<()> { + init_logger(); + + let receiver = start_orderbook_stream_task("ETH".to_string()).await?; + info!("Started orderbook stream for ETH"); + + for i in 0..10 { + let orderbooks = receiver.borrow().clone(); + info!("Received orderbooks: {:?}", orderbooks); + // let orderbook = orderbooks.get(&"ETH".to_string()).unwrap(); + // info!( + // "ETH Orderbook - Best bid: {:?}, Best ask: {:?}", + // orderbook.best_bid(), + // orderbook.best_ask() + // ); + sleep(std::time::Duration::from_secs(1)); + } + + Ok(()) + } +} diff --git a/src/types/mod.rs b/src/types/mod.rs index 3bbd638..9d29acc 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,7 +1,10 @@ -mod price; mod meta; -pub use price::*; +mod orderbook; +mod price; + pub use meta::*; +pub use orderbook::*; +pub use price::*; use std::{collections::HashMap, fmt}; diff --git a/src/types/orderbook.rs b/src/types/orderbook.rs new file mode 100644 index 0000000..b021e18 --- /dev/null +++ b/src/types/orderbook.rs @@ -0,0 +1,61 @@ +use serde::{Deserialize, Serialize}; + +/// Represents a single level in the orderbook with price and size +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OrderbookLevel { + pub price: f64, + pub size: f64, +} + +/// Represents the full orderbook state for a single coin +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Orderbook { + /// Vector of bid levels sorted by price (highest to lowest) + pub bids: Vec, + /// Vector of ask levels sorted by price (lowest to highest) + pub asks: Vec, + /// The coin symbol this orderbook represents + pub coin: String, +} + +impl Orderbook { + /// Creates a new empty orderbook for the given coin + pub fn new(coin: String) -> Self { + Orderbook { + bids: Vec::new(), + asks: Vec::new(), + coin, + } + } + + /// Updates the orderbook with new bid and ask levels + pub fn update(&mut self, bids: Vec, asks: Vec) { + self.bids = bids; + self.asks = asks; + } + + /// Updates the orderbook from raw stream data where levels are (price, size) tuples + pub fn update_from_stream(&mut self, bids: Vec<(f64, f64)>, asks: Vec<(f64, f64)>) { + self.bids = bids + .into_iter() + .map(|(price, size)| OrderbookLevel { price, size }) + .collect(); + self.asks = asks + .into_iter() + .map(|(price, size)| OrderbookLevel { price, size }) + .collect(); + } + + /// Returns the best (highest) bid level if available + pub fn best_bid(&self) -> Option<&OrderbookLevel> { + self.bids.first() + } + + /// Returns the best (lowest) ask level if available + pub fn best_ask(&self) -> Option<&OrderbookLevel> { + self.asks.first() + } +} + +/// Type alias for mapping coin symbols to their orderbooks +pub type NameToOrderbookMap = std::collections::HashMap;