From 547d023eb2e37d2520fe5eea50655f8e66274812 Mon Sep 17 00:00:00 2001 From: krabat-l Date: Mon, 9 Feb 2026 16:50:17 +0800 Subject: [PATCH] feat: activate metrics --- bin/debug-trace-server/src/data_provider.rs | 18 ++++- bin/debug-trace-server/src/main.rs | 6 ++ bin/debug-trace-server/src/metrics.rs | 24 +++++-- bin/debug-trace-server/src/rpc_service.rs | 78 +++++++++++++-------- bin/stateless-validator/src/main.rs | 3 +- crates/validator-core/src/chain_sync.rs | 17 ++++- 6 files changed, 108 insertions(+), 38 deletions(-) diff --git a/bin/debug-trace-server/src/data_provider.rs b/bin/debug-trace-server/src/data_provider.rs index dcdfe8c..1ac6d58 100644 --- a/bin/debug-trace-server/src/data_provider.rs +++ b/bin/debug-trace-server/src/data_provider.rs @@ -29,7 +29,7 @@ use tokio::sync::broadcast; use tracing::{debug, instrument, trace, warn}; use validator_core::{withdrawals::MptWitness, LightWitness, RpcClient, ValidatorDB}; -use crate::metrics::UpstreamMetrics; +use crate::metrics::{ChainSyncMetrics, UpstreamMetrics}; /// Block data bundle containing all information needed for stateless execution. /// @@ -154,6 +154,7 @@ impl DataProvider { elapsed_ms = start.elapsed().as_millis() as u64, "Block data retrieved from local DB" ); + self.record_block_distance(data.block.header.number); return Ok(data); } } @@ -173,6 +174,7 @@ impl DataProvider { "Block data fetched from RPC" ); + self.record_block_distance(data.block.header.number); Ok(data) } @@ -245,6 +247,16 @@ impl DataProvider { } } + /// Records the distance of a requested block from the local chain tip. + fn record_block_distance(&self, block_number: u64) { + if let Some(db) = &self.validator_db { + if let Ok(Some((tip, _))) = db.get_local_tip() { + let distance = tip.saturating_sub(block_number); + ChainSyncMetrics::create().record_block_distance(distance); + } + } + } + /// Gets block data from the local database using LightWitness. async fn get_block_data_from_db( &self, @@ -256,8 +268,12 @@ impl DataProvider { // Get block data from database using light witness (fast deserialization) let start = std::time::Instant::now(); let (block, witness) = db.get_block_and_witness(block_hash)?; + let db_read_secs = start.elapsed().as_secs_f64(); let db_read_ms = start.elapsed().as_millis(); + // Record DB read duration metric + ChainSyncMetrics::create().record_db_read(db_read_secs); + // Extract code hashes and get contracts let start = std::time::Instant::now(); let code_hashes = validator_core::extract_code_hashes(&witness); diff --git a/bin/debug-trace-server/src/main.rs b/bin/debug-trace-server/src/main.rs index ccaec6d..1a39552 100644 --- a/bin/debug-trace-server/src/main.rs +++ b/bin/debug-trace-server/src/main.rs @@ -287,6 +287,7 @@ async fn main() -> Result<()> { // Clone response_cache for the callback let cache_for_reorg = response_cache.clone(); let chain_sync_metrics = metrics::ChainSyncMetrics::create(); + let fetch_metrics = metrics::ChainSyncMetrics::create(); task::spawn(remote_chain_tracker( Arc::clone(&rpc_client), Arc::clone(db), @@ -301,6 +302,11 @@ async fn main() -> Result<()> { cache_for_reorg.invalidate_blocks(reverted_hashes); } }), + Some(move |result: &validator_core::FetchResult| { + if let Some(height) = result.remote_chain_height { + fetch_metrics.set_remote_height(height); + } + }), )); // Spawn history pruner to prevent unbounded database growth diff --git a/bin/debug-trace-server/src/metrics.rs b/bin/debug-trace-server/src/metrics.rs index c608d07..15fdba9 100644 --- a/bin/debug-trace-server/src/metrics.rs +++ b/bin/debug-trace-server/src/metrics.rs @@ -68,7 +68,6 @@ impl RpcMethodMetrics { } /// Records an RPC error. - #[allow(dead_code)] pub fn record_error(&self) { self.rpc_errors_total.increment(1); } @@ -226,19 +225,16 @@ impl ChainSyncMetrics { } /// Sets the remote chain height. - #[allow(dead_code)] pub fn set_remote_height(&self, height: u64) { self.remote_chain_height.set(height as f64); } /// Records a DB read duration. - #[allow(dead_code)] pub fn record_db_read(&self, duration_secs: f64) { self.db_read_duration_seconds.record(duration_secs); } /// Records block distance from tip. - #[allow(dead_code)] pub fn record_block_distance(&self, distance: u64) { self.block_distance_from_tip.record(distance as f64); } @@ -355,3 +351,23 @@ pub fn record_rpc_request(method: &str, duration_secs: f64) { } } } + +/// Records an RPC error for a specific method (backward-compatible helper). +pub fn record_rpc_error(method: &str) { + match method { + METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER => { + RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER).record_error() + } + METHOD_DEBUG_TRACE_BLOCK_BY_HASH => { + RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_HASH).record_error() + } + METHOD_DEBUG_TRACE_TRANSACTION => { + RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_TRANSACTION).record_error() + } + METHOD_TRACE_BLOCK => RpcMethodMetrics::new_for_method(METHOD_TRACE_BLOCK).record_error(), + METHOD_TRACE_TRANSACTION => { + RpcMethodMetrics::new_for_method(METHOD_TRACE_TRANSACTION).record_error() + } + _ => {} + } +} diff --git a/bin/debug-trace-server/src/rpc_service.rs b/bin/debug-trace-server/src/rpc_service.rs index 1ed6a6f..959c296 100644 --- a/bin/debug-trace-server/src/rpc_service.rs +++ b/bin/debug-trace-server/src/rpc_service.rs @@ -417,11 +417,11 @@ impl DebugTraceRpcServer for RpcContext { // Stage 1: Resolve block number let t0 = Instant::now(); - let block_num = self - .data_provider - .resolve_block_number(block_number) - .await - .map_err(|e| rpc_err(format!("Failed to resolve block number: {e}")))?; + let block_num = + self.data_provider.resolve_block_number(block_number).await.map_err(|e| { + metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER); + rpc_err(format!("Failed to resolve block number: {e}")) + })?; let resolve_ms = t0.elapsed().as_millis(); let variant = ResponseVariant::from_geth_options(&opts); @@ -440,18 +440,20 @@ impl DebugTraceRpcServer for RpcContext { // Stage 3: Fetch block data (DB -> RPC fallback) let t2 = Instant::now(); - let data = self - .data_provider - .get_block_data(block_num) - .await - .map_err(|e| block_data_err(block_num, e))?; + let data = self.data_provider.get_block_data(block_num).await.map_err(|e| { + metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER); + block_data_err(block_num, e) + })?; let fetch_ms = t2.elapsed().as_millis(); let block_hash = data.block.header.hash; let tx_count = data.block.transactions.len(); // Stage 4: Execute trace let t3 = Instant::now(); - let result = compute_debug_trace_block(&self.chain_spec, &data, opts).await?; + let result = + compute_debug_trace_block(&self.chain_spec, &data, opts).await.inspect_err(|_| { + metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER); + })?; let trace_ms = t3.elapsed().as_millis(); // Stage 5: Cache result @@ -514,13 +516,15 @@ impl DebugTraceRpcServer for RpcContext { } // Fetch block data (DB -> RPC fallback) - let data = self - .data_provider - .get_block_data_by_hash(block_hash) - .await - .map_err(|e| block_data_err_by_hash(block_hash, e))?; + let data = self.data_provider.get_block_data_by_hash(block_hash).await.map_err(|e| { + metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_HASH); + block_data_err_by_hash(block_hash, e) + })?; let block_num = data.block.header.number; - let result = compute_debug_trace_block(&self.chain_spec, &data, opts).await?; + let result = + compute_debug_trace_block(&self.chain_spec, &data, opts).await.inspect_err(|_| { + metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_HASH); + })?; // Cache and record metrics self.response_cache.insert( @@ -547,7 +551,10 @@ impl DebugTraceRpcServer for RpcContext { let opts = opts.unwrap_or_default(); let (data, tx_index) = - self.data_provider.get_block_data_for_tx(tx_hash).await.map_err(tx_data_err)?; + self.data_provider.get_block_data_for_tx(tx_hash).await.map_err(|e| { + metrics::record_rpc_error(METHOD_DEBUG_TRACE_TRANSACTION); + tx_data_err(e) + })?; let result = validator_core::trace_transaction( &self.chain_spec, @@ -557,7 +564,10 @@ impl DebugTraceRpcServer for RpcContext { &data.contracts, opts, ) - .map_err(|e| rpc_err(format!("Trace execution failed: {e}")))?; + .map_err(|e| { + metrics::record_rpc_error(METHOD_DEBUG_TRACE_TRANSACTION); + rpc_err(format!("Trace execution failed: {e}")) + })?; let elapsed = start.elapsed(); metrics::record_rpc_request(METHOD_DEBUG_TRACE_TRANSACTION, elapsed.as_secs_f64()); @@ -604,11 +614,11 @@ impl TraceRpcServer for RpcContext { let _guard = self.watch_dog.start_request(METHOD_TRACE_BLOCK, format!("{block_number}")); let start = Instant::now(); - let block_num = self - .data_provider - .resolve_block_number(block_number) - .await - .map_err(|e| rpc_err(format!("Failed to resolve block number: {e}")))?; + let block_num = + self.data_provider.resolve_block_number(block_number).await.map_err(|e| { + metrics::record_rpc_error(METHOD_TRACE_BLOCK); + rpc_err(format!("Failed to resolve block number: {e}")) + })?; tracing::Span::current().record("block_number", block_num); @@ -625,14 +635,16 @@ impl TraceRpcServer for RpcContext { } // Fetch block data (DB -> RPC fallback) - let data = self - .data_provider - .get_block_data(block_num) - .await - .map_err(|e| block_data_err(block_num, e))?; + let data = self.data_provider.get_block_data(block_num).await.map_err(|e| { + metrics::record_rpc_error(METHOD_TRACE_BLOCK); + block_data_err(block_num, e) + })?; let block_hash = data.block.header.hash; - let result = compute_parity_trace_block(&self.chain_spec, &data).await?; + let result = + compute_parity_trace_block(&self.chain_spec, &data).await.inspect_err(|_| { + metrics::record_rpc_error(METHOD_TRACE_BLOCK); + })?; // Cache and record metrics self.response_cache.insert( @@ -663,6 +675,7 @@ impl TraceRpcServer for RpcContext { { return Ok(serde_json::Value::Null); } + metrics::record_rpc_error(METHOD_TRACE_TRANSACTION); return Err(rpc_err("internal error".to_string())); } }; @@ -674,7 +687,10 @@ impl TraceRpcServer for RpcContext { data.witness.clone(), &data.contracts, ) - .map_err(|e| rpc_err(format!("Trace execution failed: {e}")))?; + .map_err(|e| { + metrics::record_rpc_error(METHOD_TRACE_TRANSACTION); + rpc_err(format!("Trace execution failed: {e}")) + })?; let elapsed = start.elapsed(); metrics::record_rpc_request(METHOD_TRACE_TRANSACTION, elapsed.as_secs_f64()); diff --git a/bin/stateless-validator/src/main.rs b/bin/stateless-validator/src/main.rs index 388462d..b5ffb9c 100644 --- a/bin/stateless-validator/src/main.rs +++ b/bin/stateless-validator/src/main.rs @@ -17,7 +17,7 @@ use stateless_common::logging::{LogArgs, migrate_legacy_env_vars}; use tokio::{signal, task}; use tracing::{debug, error, info, warn}; use validator_core::{ - ChainSyncConfig, RpcClient, RpcClientConfig, ValidatorDB, + ChainSyncConfig, FetchResult, RpcClient, RpcClientConfig, ValidatorDB, chain_spec::ChainSpec, data_types::{PlainKey, PlainValue}, executor::{ValidationResult, validate_block}, @@ -289,6 +289,7 @@ async fn chain_sync( Arc::clone(&validator_db), Arc::clone(&config), Some(metrics::on_chain_reorg), + None::, )); // Step 3: Spawn validation reporter (optional, based on config) diff --git a/crates/validator-core/src/chain_sync.rs b/crates/validator-core/src/chain_sync.rs index bb81fd9..b3ac0d9 100644 --- a/crates/validator-core/src/chain_sync.rs +++ b/crates/validator-core/src/chain_sync.rs @@ -89,6 +89,8 @@ pub struct FetchResult { pub had_error: bool, /// Block hashes that were reverted due to reorg (empty if no reorg). pub reverted_hashes: Vec, + /// Latest block number on the remote chain (if fetched). + pub remote_chain_height: Option, } /// Fetches a batch of blocks from RPC and stores them in the database. @@ -164,6 +166,7 @@ pub async fn fetch_blocks_batch( should_wait: false, had_error: false, reverted_hashes: Vec::new(), + remote_chain_height: Some(chain_latest), }); } } @@ -203,6 +206,7 @@ pub async fn fetch_blocks_batch( should_wait: false, had_error: false, reverted_hashes, + remote_chain_height: None, }); } Err(e) => { @@ -241,6 +245,7 @@ pub async fn fetch_blocks_batch( should_wait: true, had_error: false, reverted_hashes: Vec::new(), + remote_chain_height: None, }); } @@ -271,6 +276,7 @@ pub async fn fetch_blocks_batch( should_wait: true, had_error: false, reverted_hashes: Vec::new(), + remote_chain_height: Some(chain_latest), }); } @@ -410,6 +416,7 @@ pub async fn fetch_blocks_batch( should_wait: false, had_error, reverted_hashes: Vec::new(), + remote_chain_height: Some(chain_latest), }) } @@ -425,17 +432,20 @@ pub async fn fetch_blocks_batch( /// * `config` - Configuration for tracker behavior /// * `on_reorg` - Optional callback invoked when a chain reorg is detected, receives reverted block /// hashes +/// * `on_fetch` - Optional callback invoked after each successful fetch batch with the result /// /// # Returns /// * Never returns under normal operation - runs indefinitely until externally terminated -pub async fn remote_chain_tracker( +pub async fn remote_chain_tracker( client: Arc, db: Arc, config: Arc, on_reorg: Option, + on_fetch: Option, ) -> Result<()> where F: Fn(&[B256]) + Send + Sync, + G: Fn(&FetchResult) + Send + Sync, { info!(lookahead_blocks = config.tracker_lookahead_blocks, "Starting remote chain tracker"); @@ -452,6 +462,11 @@ where callback(&result.reverted_hashes); } + // Call fetch callback with the result + if let Some(ref callback) = on_fetch { + callback(&result); + } + if result.had_error { tokio::time::sleep(config.tracker_error_sleep).await; } else if result.should_wait || result.blocks_fetched == 0 {