From f9b3c3a5a76114d703f14f0d9c75df2c9282db1a Mon Sep 17 00:00:00 2001 From: picklerick2349 Date: Tue, 4 Apr 2023 18:47:48 +0530 Subject: [PATCH 1/3] Adding the slot voting RPC call --- tinydancer/src/sampler.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tinydancer/src/sampler.rs b/tinydancer/src/sampler.rs index 4d7448a..18051af 100644 --- a/tinydancer/src/sampler.rs +++ b/tinydancer/src/sampler.rs @@ -47,6 +47,11 @@ use url::Url; pub const SHRED_CF: &str = "archived_shreds"; +pub const MAX_LOCKOUT_HISTORY: usize = 31; +pub type BlockCommitmentArray = [u64; MAX_LOCKOUT_HISTORY + 1]; + +pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; + pub struct SampleService { sample_indices: Vec, // peers: Vec<(Pubkey, SocketAddr)>, @@ -155,6 +160,25 @@ pub async fn request_shreds( serde_json::from_str::(&res) } +pub async fn request_slot_voting( + slot: u64, + endpoint: String, +) -> Result, serde_json::Error> { + let request = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "getBlockCommitment", + "params": [ + slot + ] + }) + .to_string(); + + let res = send_rpc_call!(endpoint, request); + + serde_json::from_str::>(&res) +} + async fn slot_update_loop( slot_update_tx: Sender, pub_sub: String, @@ -535,6 +559,14 @@ pub struct GetShredResponse { pub result: GetShredResult, pub id: i64, } + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcBlockCommitment { + pub commitment: Option, + pub total_stake: u64, +} + #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct GetShredResult { From 134f128400cc1ec7583a62ed70be4c0218c39b64 Mon Sep 17 00:00:00 2001 From: picklerick2349 Date: Wed, 5 Apr 2023 19:43:15 +0530 Subject: [PATCH 2/3] Adding support for verifying Tower BFT consensus --- tinydancer/src/consensus.rs | 246 +++++++++++++++++++++++++++++++++++ tinydancer/src/main.rs | 7 + tinydancer/src/sampler.rs | 33 +---- tinydancer/src/tinydancer.rs | 68 +++++++--- 4 files changed, 304 insertions(+), 50 deletions(-) create mode 100644 tinydancer/src/consensus.rs diff --git a/tinydancer/src/consensus.rs b/tinydancer/src/consensus.rs new file mode 100644 index 0000000..48ff428 --- /dev/null +++ b/tinydancer/src/consensus.rs @@ -0,0 +1,246 @@ +use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster}; +use crate::sampler::{ArchiveConfig, SlotSubscribeResponse}; +use crate::{convert_to_websocket, send_rpc_call, try_coerce_shred}; +use anyhow::anyhow; +use async_trait::async_trait; +use crossbeam::channel::{Receiver, Sender}; +use futures::Sink; +use itertools::Itertools; +use rand::distributions::Uniform; +use rand::prelude::*; +use rayon::prelude::*; +use reqwest::Request; +use rocksdb::{ColumnFamily, Options as RocksOptions, DB}; +use serde::de::DeserializeOwned; +use solana_ledger::shred::{ShredId, ShredType}; +use solana_ledger::{ +ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, +blockstore::Blockstore, +// blockstore_db::columns::ShredCode, +shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE}, +}; +use solana_sdk::hash::hashv; +use solana_sdk::{ +clock::Slot, +genesis_config::ClusterType, +hash::{Hash, HASH_BYTES}, +packet::PACKET_DATA_SIZE, +pubkey::{Pubkey, PUBKEY_BYTES}, +signature::{Signable, Signature, Signer, SIGNATURE_BYTES}, +signer::keypair::Keypair, +timing::{duration_as_ms, timestamp}, +}; +use std::str::FromStr; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::{error::Error, ops::Add}; +use std::{ +net::{SocketAddr, UdpSocket}, +thread::Builder, +}; +use tiny_logger::logs::{debug, error, info}; +use tokio::{ +sync::mpsc::UnboundedSender, +task::{JoinError, JoinHandle}, +sync::Mutex as TokioMutex, +}; +use tungstenite::{connect, Message}; +use url::Url; +use serde_derive::Deserialize; +use serde_derive::Serialize; + +pub struct ConsensusService { + consensus_indices: Vec, + consensus_handler: JoinHandle<()>, +} + +pub struct ConsensusServiceConfig { + pub cluster: Cluster, + pub archive_config: ArchiveConfig, + pub instance: Arc, + pub status_consensus: Arc>, + pub sample_qty: usize, +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] + pub struct RpcBlockCommitment { + pub commitment: Option, + pub total_stake: u64, +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] + pub struct GetCommittmentResponse { + pub jsonrpc: String, + pub result: RpcBlockCommitment, + pub id: i64, +} + +pub const MAX_LOCKOUT_HISTORY: usize = 31; +pub type BlockCommitmentArray = [u64; MAX_LOCKOUT_HISTORY + 1]; + +pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; + +#[async_trait] +impl ClientService for ConsensusService { +type ServiceError = tokio::task::JoinError; + +fn new(config: ConsensusServiceConfig) -> Self { + let consensus_handler = tokio::spawn(async move { + let rpc_url = endpoint(config.cluster); + let pub_sub = convert_to_websocket!(rpc_url); + + let mut threads = Vec::default(); + + let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::(); + + let status_arc = config.status_consensus.clone(); + + // waits on new slots => triggers slot_verify_loop + threads.push(tokio::spawn(slot_update_loop( + slot_update_tx, + pub_sub, + config.status_consensus, + ))); + + // verify slot votes + threads.push(tokio::spawn(slot_verify_loop( + slot_update_rx, + rpc_url, + status_arc, + ))); + + + for thread in threads { + thread.await; + } + }); + + Self { + consensus_handler, + consensus_indices: Vec::default(), + } +} + +async fn join(self) -> std::result::Result<(), Self::ServiceError> { + self.consensus_handler.await +} +} + +pub async fn slot_update_loop( + slot_update_tx: Sender, + pub_sub: String, + status_sampler: Arc>, +) -> anyhow::Result<()> { +let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { + Ok((socket, _response)) => Some((socket, _response)), + Err(_) => { + let mut status = status_sampler.lock().await; + *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); + None + } +}; + +if result.is_none() { + return Err(anyhow!("")); +} + +let (mut socket, _response) = result.unwrap(); + +socket.write_message(Message::Text( + r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(), +))?; + +loop { + match socket.read_message() { + Ok(msg) => { + let res = serde_json::from_str::(msg.to_string().as_str()); + + // info!("res: {:?}", msg.to_string().as_str()); + if let Ok(res) = res { + match slot_update_tx.send(res.params.result.root as u64) { + Ok(_) => { + info!("slot updated: {:?}", res.params.result.root); + } + Err(e) => { + info!("error here: {:?} {:?}", e, res.params.result.root as u64); + continue; // @TODO: we should add retries here incase send fails for some reason + } + } + } + } + Err(e) => info!("err: {:?}", e), + } +} +} + +// verifies the total vote on the slot > 2/3 +fn verify_slot(slot_commitment: RpcBlockCommitment) -> bool { + let commitment_array = &slot_commitment.commitment; + let total_stake = &slot_commitment.total_stake; + let sum: u64 = commitment_array.iter().flatten().sum(); + + if (sum as f64 / *total_stake as f64) > VOTE_THRESHOLD_SIZE { + true + } else { + false + } +} + +pub async fn slot_verify_loop( + slot_update_rx: Receiver, + endpoint: String, + status_sampler: Arc>, +) -> anyhow::Result<()> { +loop { + let mut status = status_sampler.lock().await; + if let ClientStatus::Crashed(_) = &*status { + return Err(anyhow!("Client crashed")); + } else { + *status = ClientStatus::Active(String::from( + "Monitoring Tinydancer: Verifying consensus", + )); + } + if let Ok(slot) = slot_update_rx.recv() { + let slot_commitment_result = request_slot_voting(slot, &endpoint).await; + + if let Err(e) = slot_commitment_result { + println!("Error {}", e); + info!("{}", e); + continue; + } + + let slot_commitment = slot_commitment_result.unwrap(); + + let verified = verify_slot(slot_commitment.result); + + if verified { + info!("slot {:?} verified ", slot); + } else { + info!("slot {:?} failed to verified ", slot); + info!("sample INVALID for slot : {:?}", slot); + } + } + } +} + +pub async fn request_slot_voting( + slot: u64, + endpoint: &String, +) -> Result { + + let request = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "getBlockCommitment", + "params": [ + slot + ] + }) + .to_string(); + + let res = send_rpc_call!(endpoint, request); + + serde_json::from_str::(&res) +} diff --git a/tinydancer/src/main.rs b/tinydancer/src/main.rs index 9d1c522..77a36ab 100644 --- a/tinydancer/src/main.rs +++ b/tinydancer/src/main.rs @@ -48,6 +48,7 @@ mod macros; use colored::Colorize; mod rpc_wrapper; mod sampler; +mod consensus; mod ui; use anyhow::{anyhow, Result}; @@ -85,6 +86,10 @@ pub enum Commands { /// Duration after which shreds will be purged #[clap(required = false, default_value_t = 10000000)] shred_archive_duration: u64, + + /// Run the node in consensus mode + #[clap(long, short)] + consensus_mode: bool, }, /// Verify the samples for a single slot Verify { @@ -144,6 +149,7 @@ async fn main() -> Result<()> { archive_path, shred_archive_duration, tui_monitor, + consensus_mode } => { let config_file = get_config_file().map_err(|_| anyhow!("tinydancer config not set"))?; @@ -152,6 +158,7 @@ async fn main() -> Result<()> { rpc_endpoint: get_cluster(config_file.cluster), sample_qty, tui_monitor, + consensus_mode, log_path: config_file.log_path, archive_config: { archive_path diff --git a/tinydancer/src/sampler.rs b/tinydancer/src/sampler.rs index 18051af..885ce17 100644 --- a/tinydancer/src/sampler.rs +++ b/tinydancer/src/sampler.rs @@ -47,11 +47,6 @@ use url::Url; pub const SHRED_CF: &str = "archived_shreds"; -pub const MAX_LOCKOUT_HISTORY: usize = 31; -pub type BlockCommitmentArray = [u64; MAX_LOCKOUT_HISTORY + 1]; - -pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; - pub struct SampleService { sample_indices: Vec, // peers: Vec<(Pubkey, SocketAddr)>, @@ -160,26 +155,7 @@ pub async fn request_shreds( serde_json::from_str::(&res) } -pub async fn request_slot_voting( - slot: u64, - endpoint: String, -) -> Result, serde_json::Error> { - let request = serde_json::json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "getBlockCommitment", - "params": [ - slot - ] - }) - .to_string(); - - let res = send_rpc_call!(endpoint, request); - - serde_json::from_str::>(&res) -} - -async fn slot_update_loop( +pub async fn slot_update_loop( slot_update_tx: Sender, pub_sub: String, status_sampler: Arc>, @@ -560,13 +536,6 @@ pub struct GetShredResponse { pub id: i64, } -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct RpcBlockCommitment { - pub commitment: Option, - pub total_stake: u64, -} - #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct GetShredResult { diff --git a/tinydancer/src/tinydancer.rs b/tinydancer/src/tinydancer.rs index 8687a10..55f5c44 100644 --- a/tinydancer/src/tinydancer.rs +++ b/tinydancer/src/tinydancer.rs @@ -12,6 +12,7 @@ use crate::{ block_on, rpc_wrapper::{TransactionService, TransactionServiceConfig}, sampler::{ArchiveConfig, SampleService, SampleServiceConfig, SHRED_CF}, + consensus::{ConsensusService, ConsensusServiceConfig}, ui::{UiConfig, UiService}, }; use anyhow::anyhow; @@ -22,7 +23,7 @@ use tiny_logger::logs::info; // use log::info; // use log4rs; use std::error::Error; -use tokio::{runtime::Runtime, task::JoinError, try_join}; +use tokio::{runtime::Runtime, task::JoinError, try_join, sync::Mutex as TokioMutex,}; // use std::{thread, thread::JoinHandle, time::Duration}; #[async_trait] @@ -48,6 +49,7 @@ pub struct TinyDancerConfig { pub enable_ui_service: bool, pub archive_config: ArchiveConfig, pub tui_monitor: bool, + pub consensus_mode: bool, pub log_path: String, } @@ -61,10 +63,13 @@ use std::path::PathBuf; impl TinyDancer { pub async fn start(config: TinyDancerConfig) -> Result<()> { let status = ClientStatus::Initializing(String::from("Starting Up Tinydancer")); - - let client_status = Arc::new(Mutex::new(status)); + let status_clone = status.clone(); + let client_status = Arc::new(Mutex::new(status_clone)); let status_sampler = client_status.clone(); + let consensus_client_status = Arc::new(TokioMutex::new(status.clone())); + let status_consensus = consensus_client_status.clone(); + let TinyDancerConfig { enable_ui_service, rpc_endpoint, @@ -72,9 +77,10 @@ impl TinyDancer { tui_monitor, log_path, archive_config, + consensus_mode, } = config.clone(); std::env::set_var("RUST_LOG", "info"); - tiny_logger::setup_file_with_default(&log_path, "RUST_LOG"); + // tiny_logger::setup_file_with_default(&log_path, "RUST_LOG"); let mut opts = rocksdb::Options::default(); opts.create_if_missing(true); @@ -86,14 +92,45 @@ impl TinyDancer { .unwrap(); let db = Arc::new(db); - let sample_service_config = SampleServiceConfig { - cluster: rpc_endpoint.clone(), - archive_config, - instance: db.clone(), - status_sampler, - sample_qty, - }; - let sample_service = SampleService::new(sample_service_config); + + if consensus_mode { + println!("Running in consensus_mode"); + + let consensus_service_config = ConsensusServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config, + instance: db.clone(), + status_consensus: status_consensus.clone(), + sample_qty, + }; + + let consensus_service = ConsensusService::new(consensus_service_config); + + // run the sampling service + consensus_service + .join() + .await + .expect("error in consensus service thread"); + } + + else{ + + let sample_service_config = SampleServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config, + instance: db.clone(), + status_sampler, + sample_qty, + }; + + let sample_service = SampleService::new(sample_service_config); + + // run the sampling service + sample_service + .join() + .await + .expect("error in sample service thread"); + } let transaction_service = TransactionService::new(TransactionServiceConfig { cluster: rpc_endpoint.clone(), @@ -110,12 +147,6 @@ impl TinyDancer { None }; - // run - sample_service - .join() - .await - .expect("error in sample service thread"); - transaction_service .join() .await @@ -147,6 +178,7 @@ pub fn endpoint(cluster: Cluster) -> String { Cluster::Custom(cluster) => cluster, } } +#[derive(Clone, PartialEq, Debug)] pub enum ClientStatus { Initializing(String), SearchingForRPCService(String), From 8c711540691eb005cace78f3446871a7decbeb1a Mon Sep 17 00:00:00 2001 From: anoushk1234 Date: Sun, 23 Apr 2023 18:58:13 +0530 Subject: [PATCH 3/3] fix: status indicator works now + fixed async/sync context --- .gitignore | 3 +- tinydancer/src/consensus.rs | 192 +++++++++++++++++------------------ tinydancer/src/macros.rs | 11 ++ tinydancer/src/main.rs | 6 +- tinydancer/src/sampler.rs | 19 ++-- tinydancer/src/tinydancer.rs | 98 ++++++++---------- tinydancer/src/ui/ui.rs | 48 ++++----- 7 files changed, 188 insertions(+), 189 deletions(-) diff --git a/.gitignore b/.gitignore index 4ad91fc..951054a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ **.log tmp/ .vscode -db/ \ No newline at end of file +db/ +archives \ No newline at end of file diff --git a/tinydancer/src/consensus.rs b/tinydancer/src/consensus.rs index 48ff428..5767c4e 100644 --- a/tinydancer/src/consensus.rs +++ b/tinydancer/src/consensus.rs @@ -1,5 +1,5 @@ -use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster}; use crate::sampler::{ArchiveConfig, SlotSubscribeResponse}; +use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster}; use crate::{convert_to_websocket, send_rpc_call, try_coerce_shred}; use anyhow::anyhow; use async_trait::async_trait; @@ -12,42 +12,42 @@ use rayon::prelude::*; use reqwest::Request; use rocksdb::{ColumnFamily, Options as RocksOptions, DB}; use serde::de::DeserializeOwned; +use serde_derive::Deserialize; +use serde_derive::Serialize; use solana_ledger::shred::{ShredId, ShredType}; use solana_ledger::{ -ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, -blockstore::Blockstore, -// blockstore_db::columns::ShredCode, -shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE}, + ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, + blockstore::Blockstore, + // blockstore_db::columns::ShredCode, + shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE}, }; use solana_sdk::hash::hashv; use solana_sdk::{ -clock::Slot, -genesis_config::ClusterType, -hash::{Hash, HASH_BYTES}, -packet::PACKET_DATA_SIZE, -pubkey::{Pubkey, PUBKEY_BYTES}, -signature::{Signable, Signature, Signer, SIGNATURE_BYTES}, -signer::keypair::Keypair, -timing::{duration_as_ms, timestamp}, + clock::Slot, + genesis_config::ClusterType, + hash::{Hash, HASH_BYTES}, + packet::PACKET_DATA_SIZE, + pubkey::{Pubkey, PUBKEY_BYTES}, + signature::{Signable, Signature, Signer, SIGNATURE_BYTES}, + signer::keypair::Keypair, + timing::{duration_as_ms, timestamp}, }; use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; use std::{error::Error, ops::Add}; use std::{ -net::{SocketAddr, UdpSocket}, -thread::Builder, + net::{SocketAddr, UdpSocket}, + thread::Builder, }; use tiny_logger::logs::{debug, error, info}; use tokio::{ -sync::mpsc::UnboundedSender, -task::{JoinError, JoinHandle}, -sync::Mutex as TokioMutex, + sync::mpsc::UnboundedSender, + sync::{Mutex, MutexGuard}, + task::{JoinError, JoinHandle}, }; use tungstenite::{connect, Message}; use url::Url; -use serde_derive::Deserialize; -use serde_derive::Serialize; pub struct ConsensusService { consensus_indices: Vec, @@ -58,20 +58,20 @@ pub struct ConsensusServiceConfig { pub cluster: Cluster, pub archive_config: ArchiveConfig, pub instance: Arc, - pub status_consensus: Arc>, + pub client_status: Arc>, pub sample_qty: usize, } #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] - pub struct RpcBlockCommitment { +pub struct RpcBlockCommitment { pub commitment: Option, pub total_stake: u64, } #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] - pub struct GetCommittmentResponse { +pub struct GetCommittmentResponse { pub jsonrpc: String, pub result: RpcBlockCommitment, pub id: i64, @@ -84,96 +84,95 @@ pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; #[async_trait] impl ClientService for ConsensusService { -type ServiceError = tokio::task::JoinError; + type ServiceError = tokio::task::JoinError; -fn new(config: ConsensusServiceConfig) -> Self { - let consensus_handler = tokio::spawn(async move { - let rpc_url = endpoint(config.cluster); - let pub_sub = convert_to_websocket!(rpc_url); + fn new(config: ConsensusServiceConfig) -> Self { + let consensus_handler = tokio::spawn(async move { + let rpc_url = endpoint(config.cluster); + let pub_sub = convert_to_websocket!(rpc_url); - let mut threads = Vec::default(); + let mut threads = Vec::default(); - let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::(); + let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::(); - let status_arc = config.status_consensus.clone(); + let status_arc = config.client_status.clone(); - // waits on new slots => triggers slot_verify_loop - threads.push(tokio::spawn(slot_update_loop( - slot_update_tx, - pub_sub, - config.status_consensus, - ))); + // waits on new slots => triggers slot_verify_loop + threads.push(tokio::spawn(slot_update_loop( + slot_update_tx, + pub_sub, + config.client_status, + ))); - // verify slot votes - threads.push(tokio::spawn(slot_verify_loop( - slot_update_rx, - rpc_url, - status_arc, - ))); + // verify slot votes + threads.push(tokio::spawn(slot_verify_loop( + slot_update_rx, + rpc_url, + status_arc, + ))); + for thread in threads { + thread.await; + } + }); - for thread in threads { - thread.await; + Self { + consensus_handler, + consensus_indices: Vec::default(), } - }); - - Self { - consensus_handler, - consensus_indices: Vec::default(), } -} -async fn join(self) -> std::result::Result<(), Self::ServiceError> { - self.consensus_handler.await -} + async fn join(self) -> std::result::Result<(), Self::ServiceError> { + self.consensus_handler.await + } } pub async fn slot_update_loop( slot_update_tx: Sender, pub_sub: String, - status_sampler: Arc>, + client_status: Arc>, ) -> anyhow::Result<()> { -let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { - Ok((socket, _response)) => Some((socket, _response)), - Err(_) => { - let mut status = status_sampler.lock().await; - *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); - None - } -}; - -if result.is_none() { - return Err(anyhow!("")); -} - -let (mut socket, _response) = result.unwrap(); - -socket.write_message(Message::Text( - r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(), -))?; + let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { + Ok((socket, _response)) => Some((socket, _response)), + Err(_) => { + let mut status = client_status.lock().await; + *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); + None + } + }; -loop { - match socket.read_message() { - Ok(msg) => { - let res = serde_json::from_str::(msg.to_string().as_str()); + if result.is_none() { + return Err(anyhow!("")); + } - // info!("res: {:?}", msg.to_string().as_str()); - if let Ok(res) = res { - match slot_update_tx.send(res.params.result.root as u64) { - Ok(_) => { - info!("slot updated: {:?}", res.params.result.root); - } - Err(e) => { - info!("error here: {:?} {:?}", e, res.params.result.root as u64); - continue; // @TODO: we should add retries here incase send fails for some reason + let (mut socket, _response) = result.unwrap(); + + socket.write_message(Message::Text( + r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(), + ))?; + + loop { + match socket.read_message() { + Ok(msg) => { + let res = serde_json::from_str::(msg.to_string().as_str()); + + // info!("res: {:?}", msg.to_string().as_str()); + if let Ok(res) = res { + match slot_update_tx.send(res.params.result.root as u64) { + Ok(_) => { + info!("slot updated: {:?}", res.params.result.root); + } + Err(e) => { + info!("error here: {:?} {:?}", e, res.params.result.root as u64); + continue; // @TODO: we should add retries here incase send fails for some reason + } } } } + Err(e) => info!("err: {:?}", e), } - Err(e) => info!("err: {:?}", e), } } -} // verifies the total vote on the slot > 2/3 fn verify_slot(slot_commitment: RpcBlockCommitment) -> bool { @@ -191,20 +190,20 @@ fn verify_slot(slot_commitment: RpcBlockCommitment) -> boo pub async fn slot_verify_loop( slot_update_rx: Receiver, endpoint: String, - status_sampler: Arc>, + client_status: Arc>, ) -> anyhow::Result<()> { -loop { - let mut status = status_sampler.lock().await; + loop { + let mut status = client_status.lock().await; if let ClientStatus::Crashed(_) = &*status { return Err(anyhow!("Client crashed")); } else { - *status = ClientStatus::Active(String::from( - "Monitoring Tinydancer: Verifying consensus", - )); + *status = + ClientStatus::Active(String::from("Monitoring Tinydancer: Verifying consensus")); } + drop(status); if let Ok(slot) = slot_update_rx.recv() { let slot_commitment_result = request_slot_voting(slot, &endpoint).await; - + if let Err(e) = slot_commitment_result { println!("Error {}", e); info!("{}", e); @@ -229,7 +228,6 @@ pub async fn request_slot_voting( slot: u64, endpoint: &String, ) -> Result { - let request = serde_json::json!({ "jsonrpc": "2.0", "id": 1, diff --git a/tinydancer/src/macros.rs b/tinydancer/src/macros.rs index c747f67..7c0f646 100644 --- a/tinydancer/src/macros.rs +++ b/tinydancer/src/macros.rs @@ -17,6 +17,17 @@ macro_rules! block_on { rt.handle().block_on($func).expect($error); }; } +#[macro_export] +macro_rules! block_on_async { + ($func:expr) => {{ + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on($func) + }}; +} + #[macro_export] macro_rules! try_coerce_shred { ($response:expr) => {{ diff --git a/tinydancer/src/main.rs b/tinydancer/src/main.rs index 77a36ab..325c710 100644 --- a/tinydancer/src/main.rs +++ b/tinydancer/src/main.rs @@ -46,9 +46,9 @@ use std::{ use tinydancer::{endpoint, Cluster, TinyDancer, TinyDancerConfig}; mod macros; use colored::Colorize; +mod consensus; mod rpc_wrapper; mod sampler; -mod consensus; mod ui; use anyhow::{anyhow, Result}; @@ -149,7 +149,7 @@ async fn main() -> Result<()> { archive_path, shred_archive_duration, tui_monitor, - consensus_mode + consensus_mode, } => { let config_file = get_config_file().map_err(|_| anyhow!("tinydancer config not set"))?; @@ -234,8 +234,6 @@ async fn main() -> Result<()> { } } ConfigSubcommands::Set { log_path, cluster } => { - // println!("{:?}", fs::create_dir_all("~/.config/tinydancer")); - let home_path = std::env::var("HOME").unwrap(); let tinydancer_dir = home_path + "/.config/tinydancer"; diff --git a/tinydancer/src/sampler.rs b/tinydancer/src/sampler.rs index 885ce17..daa1b77 100644 --- a/tinydancer/src/sampler.rs +++ b/tinydancer/src/sampler.rs @@ -31,7 +31,7 @@ use solana_sdk::{ }; use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; use std::{error::Error, ops::Add}; use std::{ net::{SocketAddr, UdpSocket}, @@ -40,6 +40,7 @@ use std::{ use tiny_logger::logs::{debug, error, info}; use tokio::{ sync::mpsc::UnboundedSender, + sync::{Mutex, MutexGuard}, task::{JoinError, JoinHandle}, }; use tungstenite::{connect, Message}; @@ -56,7 +57,7 @@ pub struct SampleServiceConfig { pub cluster: Cluster, pub archive_config: ArchiveConfig, pub instance: Arc, - pub status_sampler: Arc>, + pub client_status: Arc>, pub sample_qty: usize, } @@ -81,13 +82,13 @@ impl ClientService for SampleService { let (shred_tx, shred_rx) = crossbeam::channel::unbounded(); let (verified_shred_tx, verified_shred_rx) = crossbeam::channel::unbounded(); - let status_arc = config.status_sampler.clone(); + let status_arc = config.client_status.clone(); // waits on new slots => triggers shred_update_loop threads.push(tokio::spawn(slot_update_loop( slot_update_tx, pub_sub, - config.status_sampler, + config.client_status, ))); // sample shreds from new slot @@ -158,13 +159,14 @@ pub async fn request_shreds( pub async fn slot_update_loop( slot_update_tx: Sender, pub_sub: String, - status_sampler: Arc>, + client_status: Arc>, ) -> anyhow::Result<()> { let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { Ok((socket, _response)) => Some((socket, _response)), Err(_) => { - let mut status = status_sampler.lock().unwrap(); + let mut status = client_status.lock().await; *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); + drop(status); None } }; @@ -318,18 +320,19 @@ async fn shred_update_loop( slot_update_rx: Receiver, endpoint: String, shred_tx: Sender<(Vec>, solana_ledger::shred::Pubkey)>, - status_sampler: Arc>, + client_status: Arc>, sample_qty: usize, ) -> anyhow::Result<()> { loop { { - let mut status = status_sampler.lock().unwrap(); + let mut status = client_status.lock().await; if let ClientStatus::Crashed(_) = &*status { return Err(anyhow!("Client crashed")); } else { *status = ClientStatus::Active(String::from( "Monitoring Tinydancer: Actively Sampling Shreds", )); + drop(status) } } diff --git a/tinydancer/src/tinydancer.rs b/tinydancer/src/tinydancer.rs index 8c228e0..e08f89b 100644 --- a/tinydancer/src/tinydancer.rs +++ b/tinydancer/src/tinydancer.rs @@ -1,18 +1,14 @@ //! Sampler struct - incharge of sampling shreds // use rayon::prelude::*; -use std::{ - env, - sync::{Arc, Mutex, MutexGuard}, - thread::Result, -}; +use std::{env, sync::Arc, thread::Result}; // use tokio::time::Duration; use crate::{ block_on, + consensus::{ConsensusService, ConsensusServiceConfig}, rpc_wrapper::{TransactionService, TransactionServiceConfig}, sampler::{ArchiveConfig, SampleService, SampleServiceConfig, SHRED_CF}, - consensus::{ConsensusService, ConsensusServiceConfig}, ui::{UiConfig, UiService}, }; use anyhow::anyhow; @@ -24,7 +20,12 @@ use tiny_logger::logs::info; // use log::info; // use log4rs; use std::error::Error; -use tokio::{runtime::Runtime, task::JoinError, try_join, sync::Mutex as TokioMutex,}; +use tokio::{ + runtime::Runtime, + sync::{Mutex, MutexGuard}, + task::JoinError, + try_join, +}; // use std::{thread, thread::JoinHandle, time::Duration}; #[async_trait] @@ -64,13 +65,8 @@ use std::path::PathBuf; impl TinyDancer { pub async fn start(config: TinyDancerConfig) -> Result<()> { let status = ClientStatus::Initializing(String::from("Starting Up Tinydancer")); - let status_clone = status.clone(); - let client_status = Arc::new(Mutex::new(status_clone)); - let status_sampler = client_status.clone(); - - let consensus_client_status = Arc::new(TokioMutex::new(status.clone())); - let status_consensus = consensus_client_status.clone(); - + let client_status = Arc::new(Mutex::new(status)); + let client_status_ui = client_status.clone(); let TinyDancerConfig { enable_ui_service, rpc_endpoint, @@ -93,46 +89,6 @@ impl TinyDancer { .unwrap(); let db = Arc::new(db); - - if consensus_mode { - println!("Running in consensus_mode"); - - let consensus_service_config = ConsensusServiceConfig { - cluster: rpc_endpoint.clone(), - archive_config, - instance: db.clone(), - status_consensus: status_consensus.clone(), - sample_qty, - }; - - let consensus_service = ConsensusService::new(consensus_service_config); - - // run the sampling service - consensus_service - .join() - .await - .expect("error in consensus service thread"); - } - - else{ - - let sample_service_config = SampleServiceConfig { - cluster: rpc_endpoint.clone(), - archive_config, - instance: db.clone(), - status_sampler, - sample_qty, - }; - - let sample_service = SampleService::new(sample_service_config); - - // run the sampling service - sample_service - .join() - .await - .expect("error in sample service thread"); - } - let transaction_service = TransactionService::new(TransactionServiceConfig { cluster: rpc_endpoint.clone(), db_instance: db.clone(), @@ -140,14 +96,46 @@ impl TinyDancer { let ui_service = if enable_ui_service || tui_monitor { Some(UiService::new(UiConfig { - client_status, + client_status: client_status_ui, enable_ui_service, tui_monitor, })) } else { None }; + // run the sampling service + if !consensus_mode { + let sample_service_config = SampleServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config: archive_config.clone(), + instance: db.clone(), + client_status: client_status.clone(), + sample_qty, + }; + let sample_service = SampleService::new(sample_service_config); + sample_service + .join() + .await + .expect("error in sample service thread"); + } + if consensus_mode { + let consensus_service_config = ConsensusServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config, + instance: db.clone(), + client_status, + sample_qty, + }; + + let consensus_service = ConsensusService::new(consensus_service_config); + + // run the consensus service + consensus_service + .join() + .await + .expect("error in consensus service thread"); + } transaction_service .join() .await diff --git a/tinydancer/src/ui/ui.rs b/tinydancer/src/ui/ui.rs index c4703f6..5bcf9fd 100644 --- a/tinydancer/src/ui/ui.rs +++ b/tinydancer/src/ui/ui.rs @@ -1,3 +1,4 @@ +use crate::block_on_async; use crate::sampler::GetShredResponse; use crate::tinydancer::{ClientService, ClientStatus, TinyDancer}; use async_trait::async_trait; @@ -8,13 +9,14 @@ use crossterm::{ terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; use spinoff::{spinners, Color as SpinColor, Spinner}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; use std::{any::Any, thread::Thread}; use std::{fmt, thread::JoinHandle}; use thiserror::Error; use tiny_logger::logs::info; +use tokio::sync::{Mutex, MutexGuard}; use tui::layout::Rect; use tui::style::{Color, Modifier, Style}; use tui::text::{Span, Spans}; @@ -225,12 +227,30 @@ impl ClientService for UiService { threads.push(std::thread::spawn(move || loop { sleep(Duration::from_millis(100)); + enable_raw_mode(); + if crossterm::event::poll(Duration::from_millis(100)).unwrap() { + let ev = crossterm::event::read().unwrap(); + if ev + == Event::Key(KeyEvent { + code: KeyCode::Char('c'), + modifiers: KeyModifiers::CONTROL, + kind: KeyEventKind::Press, + state: KeyEventState::NONE, + }) + { + let mut status = block_on_async!(client_status.lock()); + *status = ClientStatus::ShuttingDown(String::from( + "Shutting Down Gracefully...", + )); + drop(status); + disable_raw_mode(); + } + } + let status = block_on_async!(client_status.lock()); - let status = client_status.lock().unwrap(); match &*status { ClientStatus::Active(msg) => { spinner.update(spinners::Dots, msg.clone(), SpinColor::Green); - // sleep(Duration::from_secs(100)); } ClientStatus::Initializing(msg) => { spinner.update(spinners::Dots, msg.clone(), SpinColor::Yellow); @@ -245,27 +265,7 @@ impl ClientService for UiService { } _ => {} } - Mutex::unlock(status); - enable_raw_mode(); - if crossterm::event::poll(Duration::from_millis(100)).unwrap() { - let ev = crossterm::event::read().unwrap(); - - if ev - == Event::Key(KeyEvent { - code: KeyCode::Char('c'), - modifiers: KeyModifiers::CONTROL, - kind: KeyEventKind::Press, - state: KeyEventState::NONE, - }) - { - let mut status = client_status.lock().unwrap(); - *status = ClientStatus::ShuttingDown(String::from( - "Shutting Down Gracefully...", - )); - Mutex::unlock(status); - disable_raw_mode(); - } - } + drop(status); })); }