Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion bin/debug-trace-server/src/data_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::sync::broadcast;
use tracing::{debug, instrument, trace, warn};
use validator_core::{withdrawals::MptWitness, LightWitness, RpcClient, ValidatorDB};

use crate::metrics::UpstreamMetrics;
use crate::metrics::{ChainSyncMetrics, UpstreamMetrics};

/// Block data bundle containing all information needed for stateless execution.
///
Expand Down Expand Up @@ -154,6 +154,7 @@ impl DataProvider {
elapsed_ms = start.elapsed().as_millis() as u64,
"Block data retrieved from local DB"
);
self.record_block_distance(data.block.header.number);
return Ok(data);
}
}
Expand All @@ -173,6 +174,7 @@ impl DataProvider {
"Block data fetched from RPC"
);

self.record_block_distance(data.block.header.number);
Ok(data)
}

Expand Down Expand Up @@ -245,6 +247,16 @@ impl DataProvider {
}
}

/// Records the distance of a requested block from the local chain tip.
fn record_block_distance(&self, block_number: u64) {
if let Some(db) = &self.validator_db {
if let Ok(Some((tip, _))) = db.get_local_tip() {
let distance = tip.saturating_sub(block_number);
ChainSyncMetrics::create().record_block_distance(distance);
}
}
}

/// Gets block data from the local database using LightWitness.
async fn get_block_data_from_db(
&self,
Expand All @@ -256,8 +268,12 @@ impl DataProvider {
// Get block data from database using light witness (fast deserialization)
let start = std::time::Instant::now();
let (block, witness) = db.get_block_and_witness(block_hash)?;
let db_read_secs = start.elapsed().as_secs_f64();
let db_read_ms = start.elapsed().as_millis();

// Record DB read duration metric
ChainSyncMetrics::create().record_db_read(db_read_secs);

// Extract code hashes and get contracts
let start = std::time::Instant::now();
let code_hashes = validator_core::extract_code_hashes(&witness);
Expand Down
6 changes: 6 additions & 0 deletions bin/debug-trace-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ async fn main() -> Result<()> {
// Clone response_cache for the callback
let cache_for_reorg = response_cache.clone();
let chain_sync_metrics = metrics::ChainSyncMetrics::create();
let fetch_metrics = metrics::ChainSyncMetrics::create();
task::spawn(remote_chain_tracker(
Arc::clone(&rpc_client),
Arc::clone(db),
Expand All @@ -301,6 +302,11 @@ async fn main() -> Result<()> {
cache_for_reorg.invalidate_blocks(reverted_hashes);
}
}),
Some(move |result: &validator_core::FetchResult| {
if let Some(height) = result.remote_chain_height {
fetch_metrics.set_remote_height(height);
}
}),
));

// Spawn history pruner to prevent unbounded database growth
Expand Down
24 changes: 20 additions & 4 deletions bin/debug-trace-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl RpcMethodMetrics {
}

/// Records an RPC error.
#[allow(dead_code)]
pub fn record_error(&self) {
self.rpc_errors_total.increment(1);
}
Expand Down Expand Up @@ -226,19 +225,16 @@ impl ChainSyncMetrics {
}

/// Sets the remote chain height.
#[allow(dead_code)]
pub fn set_remote_height(&self, height: u64) {
self.remote_chain_height.set(height as f64);
}

/// Records a DB read duration.
#[allow(dead_code)]
pub fn record_db_read(&self, duration_secs: f64) {
self.db_read_duration_seconds.record(duration_secs);
}

/// Records block distance from tip.
#[allow(dead_code)]
pub fn record_block_distance(&self, distance: u64) {
self.block_distance_from_tip.record(distance as f64);
}
Expand Down Expand Up @@ -355,3 +351,23 @@ pub fn record_rpc_request(method: &str, duration_secs: f64) {
}
}
}

/// Records an RPC error for a specific method (backward-compatible helper).
pub fn record_rpc_error(method: &str) {
match method {
METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER => {
RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER).record_error()
}
METHOD_DEBUG_TRACE_BLOCK_BY_HASH => {
RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_HASH).record_error()
}
METHOD_DEBUG_TRACE_TRANSACTION => {
RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_TRANSACTION).record_error()
}
METHOD_TRACE_BLOCK => RpcMethodMetrics::new_for_method(METHOD_TRACE_BLOCK).record_error(),
METHOD_TRACE_TRANSACTION => {
RpcMethodMetrics::new_for_method(METHOD_TRACE_TRANSACTION).record_error()
}
_ => {}
}
}
78 changes: 47 additions & 31 deletions bin/debug-trace-server/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,11 @@ impl DebugTraceRpcServer for RpcContext {

// Stage 1: Resolve block number
let t0 = Instant::now();
let block_num = self
.data_provider
.resolve_block_number(block_number)
.await
.map_err(|e| rpc_err(format!("Failed to resolve block number: {e}")))?;
let block_num =
self.data_provider.resolve_block_number(block_number).await.map_err(|e| {
metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER);
rpc_err(format!("Failed to resolve block number: {e}"))
})?;
let resolve_ms = t0.elapsed().as_millis();

let variant = ResponseVariant::from_geth_options(&opts);
Expand All @@ -440,18 +440,20 @@ impl DebugTraceRpcServer for RpcContext {

// Stage 3: Fetch block data (DB -> RPC fallback)
let t2 = Instant::now();
let data = self
.data_provider
.get_block_data(block_num)
.await
.map_err(|e| block_data_err(block_num, e))?;
let data = self.data_provider.get_block_data(block_num).await.map_err(|e| {
metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER);
block_data_err(block_num, e)
})?;
let fetch_ms = t2.elapsed().as_millis();
let block_hash = data.block.header.hash;
let tx_count = data.block.transactions.len();

// Stage 4: Execute trace
let t3 = Instant::now();
let result = compute_debug_trace_block(&self.chain_spec, &data, opts).await?;
let result =
compute_debug_trace_block(&self.chain_spec, &data, opts).await.inspect_err(|_| {
metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER);
})?;
let trace_ms = t3.elapsed().as_millis();

// Stage 5: Cache result
Expand Down Expand Up @@ -514,13 +516,15 @@ impl DebugTraceRpcServer for RpcContext {
}

// Fetch block data (DB -> RPC fallback)
let data = self
.data_provider
.get_block_data_by_hash(block_hash)
.await
.map_err(|e| block_data_err_by_hash(block_hash, e))?;
let data = self.data_provider.get_block_data_by_hash(block_hash).await.map_err(|e| {
metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_HASH);
block_data_err_by_hash(block_hash, e)
})?;
let block_num = data.block.header.number;
let result = compute_debug_trace_block(&self.chain_spec, &data, opts).await?;
let result =
compute_debug_trace_block(&self.chain_spec, &data, opts).await.inspect_err(|_| {
metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_HASH);
})?;

// Cache and record metrics
self.response_cache.insert(
Expand All @@ -547,7 +551,10 @@ impl DebugTraceRpcServer for RpcContext {
let opts = opts.unwrap_or_default();

let (data, tx_index) =
self.data_provider.get_block_data_for_tx(tx_hash).await.map_err(tx_data_err)?;
self.data_provider.get_block_data_for_tx(tx_hash).await.map_err(|e| {
metrics::record_rpc_error(METHOD_DEBUG_TRACE_TRANSACTION);
tx_data_err(e)
})?;

let result = validator_core::trace_transaction(
&self.chain_spec,
Expand All @@ -557,7 +564,10 @@ impl DebugTraceRpcServer for RpcContext {
&data.contracts,
opts,
)
.map_err(|e| rpc_err(format!("Trace execution failed: {e}")))?;
.map_err(|e| {
metrics::record_rpc_error(METHOD_DEBUG_TRACE_TRANSACTION);
rpc_err(format!("Trace execution failed: {e}"))
})?;

let elapsed = start.elapsed();
metrics::record_rpc_request(METHOD_DEBUG_TRACE_TRANSACTION, elapsed.as_secs_f64());
Expand Down Expand Up @@ -604,11 +614,11 @@ impl TraceRpcServer for RpcContext {
let _guard = self.watch_dog.start_request(METHOD_TRACE_BLOCK, format!("{block_number}"));
let start = Instant::now();

let block_num = self
.data_provider
.resolve_block_number(block_number)
.await
.map_err(|e| rpc_err(format!("Failed to resolve block number: {e}")))?;
let block_num =
self.data_provider.resolve_block_number(block_number).await.map_err(|e| {
metrics::record_rpc_error(METHOD_TRACE_BLOCK);
rpc_err(format!("Failed to resolve block number: {e}"))
})?;

tracing::Span::current().record("block_number", block_num);

Expand All @@ -625,14 +635,16 @@ impl TraceRpcServer for RpcContext {
}

// Fetch block data (DB -> RPC fallback)
let data = self
.data_provider
.get_block_data(block_num)
.await
.map_err(|e| block_data_err(block_num, e))?;
let data = self.data_provider.get_block_data(block_num).await.map_err(|e| {
metrics::record_rpc_error(METHOD_TRACE_BLOCK);
block_data_err(block_num, e)
})?;

let block_hash = data.block.header.hash;
let result = compute_parity_trace_block(&self.chain_spec, &data).await?;
let result =
compute_parity_trace_block(&self.chain_spec, &data).await.inspect_err(|_| {
metrics::record_rpc_error(METHOD_TRACE_BLOCK);
})?;

// Cache and record metrics
self.response_cache.insert(
Expand Down Expand Up @@ -663,6 +675,7 @@ impl TraceRpcServer for RpcContext {
{
return Ok(serde_json::Value::Null);
}
metrics::record_rpc_error(METHOD_TRACE_TRANSACTION);
return Err(rpc_err("internal error".to_string()));
}
};
Expand All @@ -674,7 +687,10 @@ impl TraceRpcServer for RpcContext {
data.witness.clone(),
&data.contracts,
)
.map_err(|e| rpc_err(format!("Trace execution failed: {e}")))?;
.map_err(|e| {
metrics::record_rpc_error(METHOD_TRACE_TRANSACTION);
rpc_err(format!("Trace execution failed: {e}"))
})?;

let elapsed = start.elapsed();
metrics::record_rpc_request(METHOD_TRACE_TRANSACTION, elapsed.as_secs_f64());
Expand Down
3 changes: 2 additions & 1 deletion bin/stateless-validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use stateless_common::logging::{LogArgs, migrate_legacy_env_vars};
use tokio::{signal, task};
use tracing::{debug, error, info, warn};
use validator_core::{
ChainSyncConfig, RpcClient, RpcClientConfig, ValidatorDB,
ChainSyncConfig, FetchResult, RpcClient, RpcClientConfig, ValidatorDB,
chain_spec::ChainSpec,
data_types::{PlainKey, PlainValue},
executor::{ValidationResult, validate_block},
Expand Down Expand Up @@ -289,6 +289,7 @@ async fn chain_sync(
Arc::clone(&validator_db),
Arc::clone(&config),
Some(metrics::on_chain_reorg),
None::<fn(&FetchResult)>,
));

// Step 3: Spawn validation reporter (optional, based on config)
Expand Down
17 changes: 16 additions & 1 deletion crates/validator-core/src/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub struct FetchResult {
pub had_error: bool,
/// Block hashes that were reverted due to reorg (empty if no reorg).
pub reverted_hashes: Vec<B256>,
/// Latest block number on the remote chain (if fetched).
pub remote_chain_height: Option<u64>,
}

/// Fetches a batch of blocks from RPC and stores them in the database.
Expand Down Expand Up @@ -164,6 +166,7 @@ pub async fn fetch_blocks_batch(
should_wait: false,
had_error: false,
reverted_hashes: Vec::new(),
remote_chain_height: Some(chain_latest),
});
}
}
Expand Down Expand Up @@ -203,6 +206,7 @@ pub async fn fetch_blocks_batch(
should_wait: false,
had_error: false,
reverted_hashes,
remote_chain_height: None,
});
}
Err(e) => {
Expand Down Expand Up @@ -241,6 +245,7 @@ pub async fn fetch_blocks_batch(
should_wait: true,
had_error: false,
reverted_hashes: Vec::new(),
remote_chain_height: None,
});
}

Expand Down Expand Up @@ -271,6 +276,7 @@ pub async fn fetch_blocks_batch(
should_wait: true,
had_error: false,
reverted_hashes: Vec::new(),
remote_chain_height: Some(chain_latest),
});
}

Expand Down Expand Up @@ -410,6 +416,7 @@ pub async fn fetch_blocks_batch(
should_wait: false,
had_error,
reverted_hashes: Vec::new(),
remote_chain_height: Some(chain_latest),
})
}

Expand All @@ -425,17 +432,20 @@ pub async fn fetch_blocks_batch(
/// * `config` - Configuration for tracker behavior
/// * `on_reorg` - Optional callback invoked when a chain reorg is detected, receives reverted block
/// hashes
/// * `on_fetch` - Optional callback invoked after each successful fetch batch with the result
///
/// # Returns
/// * Never returns under normal operation - runs indefinitely until externally terminated
pub async fn remote_chain_tracker<F>(
pub async fn remote_chain_tracker<F, G>(
client: Arc<RpcClient>,
db: Arc<ValidatorDB>,
config: Arc<ChainSyncConfig>,
on_reorg: Option<F>,
on_fetch: Option<G>,
) -> Result<()>
where
F: Fn(&[B256]) + Send + Sync,
G: Fn(&FetchResult) + Send + Sync,
{
info!(lookahead_blocks = config.tracker_lookahead_blocks, "Starting remote chain tracker");

Expand All @@ -452,6 +462,11 @@ where
callback(&result.reverted_hashes);
}

// Call fetch callback with the result
if let Some(ref callback) = on_fetch {
callback(&result);
}

if result.had_error {
tokio::time::sleep(config.tracker_error_sleep).await;
} else if result.should_wait || result.blocks_fetched == 0 {
Expand Down