Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/archivers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ mod tests {
ip: String::new(),
port: 0,
},
robust_query: config::RobustQueryConfig {
enabled: false,
redundancy: 3,
max_retries: 5,
verbose_logs: false,
},
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,11 @@
"notifier": {
"ip": "127.0.0.1",
"port": 4701
},
"robust_query": {
"enabled": true,
"redundancy": 3,
"max_retries": 5,
"verbose_logs": true
}
}
17 changes: 17 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct Config {
pub local_source: LocalSource,

pub notifier: NotifierConfig,

pub robust_query: RobustQueryConfig,
}

#[derive(Debug, serde::Deserialize, Clone)]
Expand Down Expand Up @@ -95,6 +97,21 @@ pub struct NotifierConfig {
pub port: u16,
}

#[derive(Debug, serde::Deserialize, Clone)]
pub struct RobustQueryConfig {
/// Master switch. When false, all GET validator routes use the old single-forward path.
pub enabled: bool,
/// Number of matching responses required to consider the result trustworthy.
/// Clamped to min(redundancy, available_nodes) at runtime.
pub redundancy: usize,
/// Maximum number of retry iterations if no consensus is reached in the first batch.
/// Each iteration queries (redundancy - highest_tally_count) additional nodes.
pub max_retries: usize,
/// When true, logs detailed information about robust query results including
/// node IDs, tally counts, and whether consensus was reached.
pub verbose_logs: bool,
}

/// Load the configuration from the config json file
/// path is src/config.json
impl Config {
Expand Down
44 changes: 35 additions & 9 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,14 @@ where
.await
{
Ok(Ok(())) => {
let (_method, route) = get_route(&req_buf).unwrap();
let (_method, route) = match get_route(&req_buf) {
Some(r) => r,
None => {
eprintln!("Failed to parse HTTP method/route from request");
respond_with_bad_request(&mut client_stream).await?;
continue;
}
};

match get_application(route.as_str()) {
Application::Monitor => {
Expand Down Expand Up @@ -161,14 +168,25 @@ where
}
}
Application::Validator => {
if let Err(e) = liberdus::handle_request(
req_buf,
&mut client_stream,
liberdus.clone(),
config.clone(),
)
.await
{
let handler_result = if _method == "GET" && config.robust_query.enabled {
liberdus::handle_request_robust(
req_buf,
&mut client_stream,
liberdus.clone(),
config.clone(),
)
.await
} else {
liberdus::handle_request(
req_buf,
&mut client_stream,
liberdus.clone(),
config.clone(),
)
.await
};

if let Err(e) = handler_result {
eprintln!("Error handling validator request: {}", e);
}
continue;
Expand Down Expand Up @@ -271,6 +289,14 @@ where
client_stream.write_all(response.as_bytes()).await
}

pub async fn respond_with_bad_request<S>(client_stream: &mut S) -> Result<(), std::io::Error>
where
S: AsyncWrite + Unpin + Send,
{
let response = "HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n";
client_stream.write_all(response.as_bytes()).await
}

pub async fn respond_with_notfound<S>(client_stream: &mut S) -> Result<(), std::io::Error>
where
S: AsyncWrite + Unpin + Send,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod crypto;
pub mod http;
pub mod liberdus;
pub mod notifier;
pub mod robust_query;
pub mod rpc;
pub mod shardus_monitor;
pub mod subscription;
Expand Down
105 changes: 101 additions & 4 deletions src/liberdus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::{
Arc,
},
time::Duration,
u128,
};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::time::sleep;
Expand Down Expand Up @@ -209,7 +208,7 @@ impl Liberdus {
/// For a node with:
/// - `timetaken_ms = 100`
/// - `max_timeout = 500`
/// The bias is calculated as:
/// The bias is calculated as:
/// ```math
/// normalized_rtt = (100 - 0.01) / (500 - 0.01) ≈ 0.19996
/// bias = 1.0 - 0.19996 ≈ 0.80004
Expand Down Expand Up @@ -297,7 +296,7 @@ impl Liberdus {
guard.clone()
};

let max_timeout = self.config.max_http_timeout_ms.try_into().unwrap_or(4000); // 3 seconds
let max_timeout = self.config.max_http_timeout_ms; // 3 seconds
let mut sorted_nodes = nodes.as_ref().clone();

sorted_nodes.sort_by(|a, b| {
Expand Down Expand Up @@ -433,6 +432,41 @@ impl Liberdus {
}
}

/// Picks up to `n` distinct consensors, excluding any whose ID is in `exclude`.
/// Uses the same selection strategy as get_next_appropriate_consensor
/// (round-robin initially, then biased random after prepare_list).
/// If fewer than `n` unique nodes are available, returns as many as possible.
pub async fn get_n_distinct_consensors(
&self,
n: usize,
exclude: &std::collections::HashSet<String>,
) -> Vec<Consensor> {
let mut result = Vec::with_capacity(n);
let mut seen = exclude.clone();
let mut attempts = 0;
let max_attempts = n * 3; // safety cap to avoid infinite loop

// Safety break if we can't find nodes
if self.active_nodelist.load().is_empty() {
return result;
}

while result.len() < n && attempts < max_attempts {
attempts += 1;
match self.get_next_appropriate_consensor_with_retry(3).await {
Some((_, node)) => {
if seen.contains(&node.id) {
continue;
}
seen.insert(node.id.clone());
result.push(node);
}
None => break,
}
}
result
}

pub fn set_consensor_trip_ms(&self, node_id: String, trip_ms: u128) {
// list already prepared on the first round robin, no need to keep recording rtt for nodes
if self
Expand Down Expand Up @@ -789,6 +823,12 @@ mod tests {
ip: String::new(),
port: 0,
},
robust_query: config::RobustQueryConfig {
enabled: true,
redundancy: 3,
max_retries: 5,
verbose_logs: false,
},
}
}

Expand Down Expand Up @@ -1179,7 +1219,13 @@ mod tests {
.await
.expect_err("connection should fail");
let err_text = format!("{}", err);
assert!(err_text.contains("Connection refused") || err_text.contains("Error connecting"));
assert!(
err_text.contains("Connection refused")
|| err_text.contains("Error connecting")
|| err_text.contains("Timeout connecting"),
"unexpected error: {}",
err_text
);

let mut buf = vec![0u8; 128];
let n = tokio::time::timeout(std::time::Duration::from_secs(1), peer.read(&mut buf))
Expand Down Expand Up @@ -1348,6 +1394,57 @@ where
Ok(())
}

/// Like handle_request, but queries multiple validators and returns
/// the consensus response. Used for read-only GET routes.
pub async fn handle_request_robust<S>(
request_buffer: Vec<u8>,
client_stream: &mut S,
liberdus: Arc<Liberdus>,
config: Arc<config::Config>,
) -> Result<(), Box<dyn std::error::Error>>
where
S: AsyncWrite + AsyncRead + Unpin + Send,
{
let result = match crate::robust_query::robust_query(&liberdus, request_buffer, &config).await {
Ok(r) => r,
Err(e) => {
eprintln!("Robust query failed: {}", e);
let error_str = e.to_string();
if error_str.contains("Timeout")
|| error_str.contains("timeout")
|| error_str.contains("Connection refused")
{
http::respond_with_timeout(client_stream).await?;
} else {
http::respond_with_internal_error(client_stream).await?;
}
return Err(e);
}
};

if !result.is_robust && config.robust_query.verbose_logs {
eprintln!("Warning: returning non-robust result (best-effort)");
}

// Set the same headers as handle_request does
let mut response_data = result.response_data;
http::set_http_header(&mut response_data, "Connection", "keep-alive");
http::set_http_header(
&mut response_data,
"Keep-Alive",
format!("timeout={}", config.tcp_keepalive_time_sec).as_str(),
);
http::set_http_header(&mut response_data, "Access-Control-Allow-Origin", "*");

if let Err(e) = client_stream.write_all(&response_data).await {
eprintln!("Error relaying robust response to client: {}", e);
http::respond_with_internal_error(client_stream).await?;
return Err(Box::new(e));
}

Ok(())
}

/// Returns `(true, <tx_hash>)` when `route` is exactly
/// `/old_receipt/<64-char hex>` with no extra path segments.
/// Otherwise returns `(false, String::new())`.
Expand Down
10 changes: 8 additions & 2 deletions src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ mod tests {
use tokio::net::TcpListener;

use crate::config::{
Config, LocalSource, NodeFilteringConfig, NotifierConfig, ShardusMonitorProxyConfig,
StandaloneNetworkConfig, TLSConfig,
Config, LocalSource, NodeFilteringConfig, NotifierConfig, RobustQueryConfig,
ShardusMonitorProxyConfig, StandaloneNetworkConfig, TLSConfig,
};

fn test_config(port: u16) -> Config {
Expand Down Expand Up @@ -172,6 +172,12 @@ mod tests {
ip: "127.0.0.1".into(),
port,
},
robust_query: RobustQueryConfig {
enabled: false,
redundancy: 3,
max_retries: 5,
verbose_logs: false,
},
}
}

Expand Down
Loading