diff --git a/Cargo.toml b/Cargo.toml index 546bcce..862350a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,4 +15,6 @@ tracing-subscriber = "0.3" tracing-appender = "0.2" tracing-log = "0.1" unimock = "0.6" -parking_lot = "0.12" \ No newline at end of file +parking_lot = "0.12" +tokio = { version = "1.0", features = ["time", "sync", "macros", "rt", "rt-multi-thread"] } +tokio-util = "0.7" \ No newline at end of file diff --git a/src/core/context/examples.rs b/src/core/context/examples.rs new file mode 100644 index 0000000..2e79b02 --- /dev/null +++ b/src/core/context/examples.rs @@ -0,0 +1,157 @@ +//! Examples demonstrating ThrowableContext usage patterns +//! +//! This module provides examples of how to use ThrowableContext in various scenarios, +//! similar to Go's context usage patterns but with Rust-specific adaptations. + +use crate::core::context::ThrowableContext; +use anyhow::{anyhow, Result}; +use tokio::time::{sleep, Duration}; +use tracing::Span; + +/// Example: Using ThrowableContext for startup operations, +/// This demonstrates how to handle irrecoverable startup errors +pub async fn startup_with_context_example(parent_span: &Span) -> Result<()> { + let ctx = ThrowableContext::with_timeout(parent_span, Duration::from_secs(30)); + + // Simulate multiple startup operations + let operations = vec![ + "initialize_network", + "load_configuration", + "setup_routing_table", + "register_services", + ]; + + for operation in operations { + // Use run_or_throw for critical startup operations + // If any operation fails, the program will terminate with an error message + ctx.run_or_throw(simulate_startup_operation(operation)).await; + tracing::info!("Completed startup operation: {}", operation); + } + + tracing::info!("All startup operations completed successfully"); + Ok(()) +} + +/// Example: Using context hierarchy with value propagation +pub async fn hierarchical_context_example(parent_span: &Span) -> Result<()> { + let root_ctx = ThrowableContext::new(parent_span); + + // Add configuration values to context + let config_ctx = root_ctx + .with_value("service_name".to_string()) + .with_value(42u16); // port number + + // Create child context for specific operation + let operation_ctx = config_ctx.with_parent(); + + // Child can access parent values + if let Some(service_name) = operation_ctx.value::() { + tracing::info!("Running operation for service: {}", service_name); + } + + if let Some(port) = operation_ctx.value::() { + tracing::info!("Using port: {}", port); + } + + // Use the context for timeout-sensitive operations + operation_ctx.run(async { + sleep(Duration::from_millis(100)).await; + Ok(()) + }).await?; + + Ok(()) +} + +/// Example: Context with cancellation +pub async fn cancellation_example(parent_span: &Span) -> Result<()> { + let ctx = ThrowableContext::new(parent_span); + let (child_ctx, cancel) = ctx.with_cancel(); + + // Spawn a background task + let background_task = tokio::spawn(async move { + child_ctx.run(async { + // Long running operation + sleep(Duration::from_secs(10)).await; + Ok(()) + }).await + }); + + // Cancel after a short time + tokio::spawn(async move { + sleep(Duration::from_millis(100)).await; + cancel(); + }); + + // The background task will be cancelled + match background_task.await { + Ok(Ok(())) => tracing::info!("Task completed successfully"), + Ok(Err(e)) => tracing::info!("Task cancelled: {}", e), + Err(e) => tracing::error!("Task panicked: {}", e), + } + + Ok(()) +} + +/// Example: Error propagation up context chain +pub async fn error_propagation_example(parent_span: &Span) -> Result<()> { + let root_ctx = ThrowableContext::new(parent_span); + let child_ctx = root_ctx.with_parent(); + let grandchild_ctx = child_ctx.with_parent(); + + // In a real scenario, this would be called from a deeply nested operation + // When throw_irrecoverable is called, it will propagate up to the root and terminate + // For demo purposes, we'll just show the pattern without actually calling it + + let critical_error = anyhow!("Database connection failed during critical operation"); + + // In production, this would terminate the program: + // grandchild_ctx.throw_irrecoverable(critical_error); + + // Instead, we'll just log what would happen + tracing::info!("Would propagate irrecoverable error: {}", critical_error); + + Ok(()) +} + +// Helper function to simulate startup operations +async fn simulate_startup_operation(operation_name: &str) -> Result<()> { + // Simulate some work + sleep(Duration::from_millis(10)).await; + + // Simulate potential failure + if operation_name == "fail_example" { + return Err(anyhow!("Simulated failure in {}", operation_name)); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::testutil::fixtures::span_fixture; + + #[tokio::test] + async fn test_startup_example() { + let result = startup_with_context_example(&span_fixture()).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_hierarchical_context_example() { + let result = hierarchical_context_example(&span_fixture()).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_cancellation_example() { + let result = cancellation_example(&span_fixture()).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_error_propagation_example() { + let result = error_propagation_example(&span_fixture()).await; + assert!(result.is_ok()); + } +} \ No newline at end of file diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs new file mode 100644 index 0000000..27089d6 --- /dev/null +++ b/src/core/context/mod.rs @@ -0,0 +1,322 @@ +//! Throwable context implementation for skipgraph, similar to Go's context with irrecoverable error propagation. +//! +//! This module provides a context implementation that combines: +//! - Cancellation support via tokio's CancellationToken +//! - Timeout/deadline functionality +//! - Value storage and retrieval +//! - Parent-child context hierarchies +//! - Irrecoverable error propagation that terminates the application + +pub mod examples; + +use anyhow::{anyhow, Result}; +use std::any::{Any, TypeId}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tokio::time::{Duration, Instant}; +use tokio_util::sync::CancellationToken; +use tracing::Span; + +/// A context that can propagate irrecoverable errors up the context chain, +/// similar to Go's context.Context but with throwable error handling. +/// +/// If an irrecoverable error is thrown, it will propagate to the parent context if it exists. +/// If there is no parent context, it will log the error and terminate the program. +/// This is useful for components that need to signal fatal errors that should stop the entire application. +#[derive(Clone)] +pub struct ThrowableContext { + inner: Arc, +} + +struct ContextInner { + token: CancellationToken, + deadline: Option, + values: Arc>>>, + parent: Option, + span: Span, +} + +impl ThrowableContext { + /// Create a new root context. + pub fn new(parent_span: &Span) -> Self { + let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "throwable_context"); + + Self { + inner: Arc::new(ContextInner { + token: CancellationToken::new(), + deadline: None, + values: Arc::new(RwLock::new(HashMap::new())), + parent: None, + span, + }), + } + } + + /// Create a new context with a timeout deadline. + pub fn with_timeout(parent_span: &Span, timeout: Duration) -> Self { + let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "throwable_context_timeout"); + + Self { + inner: Arc::new(ContextInner { + token: CancellationToken::new(), + deadline: Some(Instant::now() + timeout), + values: Arc::new(RwLock::new(HashMap::new())), + parent: None, + span, + }), + } + } + + /// Create a child context that inherits from a parent context. + pub fn with_parent(&self) -> Self { + // Create a new span for the child context with the parent context's span as its parent + let span = tracing::span!(parent: &self.inner.span, tracing::Level::TRACE, "throwable_context_child"); + + Self { + inner: Arc::new(ContextInner { + // Create a child token that inherits parent's cancellation, a child cancellation + // token that will be automatically cancelled when the parent token is cancelled + token: self.inner.token.child_token(), + // Inherit the parent's deadline + deadline: self.inner.deadline, + // Share the same values map as the parent + values: Arc::clone(&self.inner.values), + // Set the parent context + parent: Some(self.clone()), + span, + }), + } + } + + /// Create a child context with a value added. + pub fn with_value(&self, value: T) -> Self { + let span = tracing::span!(parent: &self.inner.span, tracing::Level::TRACE, "throwable_context_value"); + let values: Arc>>> = Arc::new(RwLock::new(HashMap::new())); + { + let mut values_guard = values.write().unwrap(); + values_guard.insert( + TypeId::of::(), // uses type as a key + Box::new(value) as Box // type-erases and makes thread-safe + ); + } + + Self { + inner: Arc::new(ContextInner { + // Create a child token that inherits parent's cancellation + // a child cancellation token that will be automatically cancelled when the parent token is cancelled + token: self.inner.token.child_token(), + // Inherit the parent's deadline + deadline: self.inner.deadline, + values, + parent: Some(self.clone()), + span, + }), + } + } + + /// Propagates an irrecoverable error up the context chain. + /// When it reaches the top-level context, it logs the error and terminates the program. + /// This function does not return normally, it always terminates the program. + pub fn throw_irrecoverable(&self, err: anyhow::Error) -> ! { + let _enter = self.inner.span.enter(); + + // Propagate the error to the parent context if it exists + if let Some(parent) = &self.inner.parent { + tracing::trace!("propagating irrecoverable error to parent context"); + parent.throw_irrecoverable(err); + } + + // If there is no parent context, log and terminate the program + tracing::error!("irrecoverable error: {}", err); + std::process::exit(1); + } + + /// Returns true if the context has been cancelled. + /// non-blocking check + pub fn is_cancelled(&self) -> bool { + self.inner.token.is_cancelled() + } + + /// Cancels the context and all its children. + pub fn cancel(&self) { + let _enter = self.inner.span.enter(); + tracing::trace!("cancelling context"); + // Cancelling the token will also cancel all child tokens + self.inner.token.cancel(); + } + + /// Wait for the context to be cancelled. + /// This is an async method that blocks until cancellation. + pub async fn cancelled(&self) { + self.inner.token.cancelled().await; + } + + /// Returns the deadline for this context, if any. + pub fn deadline(&self) -> Option { + self.inner.deadline + } + + /// Returns true if the context has exceeded its deadline. + pub fn is_deadline_exceeded(&self) -> bool { + self.inner.deadline.map_or(false, |d| Instant::now() >= d) + } + + /// Gets a value from the context by type, searching up the parent chain. + /// Note: This method returns a cloned value, not a reference, due to lifetime constraints. + pub fn value(&self) -> Option { + { + let values_guard = self.inner.values.read().unwrap(); + if let Some(boxed_value) = values_guard.get(&TypeId::of::()) { + if let Some(typed_value) = boxed_value.downcast_ref::() { + return Some(typed_value.clone()); + } + } + } + + // Search in parent if not found in the current context + self.inner + .parent + .as_ref() + .and_then(|parent| parent.value::()) + } + + /// Returns the current context error if canceled or deadline exceeded. + pub fn err(&self) -> Option { + if self.is_cancelled() { + Some(anyhow!("context cancelled")) + } else if self.is_deadline_exceeded() { + Some(anyhow!("context deadline exceeded")) + } else { + None + } + } + + /// Runs a future with context cancellation and timeout support. + /// Returns an error if the context is canceled or the deadline is exceeded. + pub async fn run(&self, future: F) -> Result + where + F: std::future::Future>, + { + let _enter = self.inner.span.enter(); + + if let Some(deadline) = self.inner.deadline { + let timeout_duration = deadline.saturating_duration_since(Instant::now()); + + tokio::select! { + result = future => result, + _ = tokio::time::sleep(timeout_duration) => { + Err(anyhow!("context deadline exceeded")) + } + _ = self.cancelled() => { + Err(anyhow!("context cancelled")) + } + } + } else { + tokio::select! { + result = future => result, + _ = self.cancelled() => { + Err(anyhow!("context cancelled")) + } + } + } + } + + /// Runs a future with context support, throwing irrecoverable error on failure. + /// This combines `run` with `throw_irrecoverable` for convenience. + pub async fn run_or_throw(&self, future: F) -> T + where + F: std::future::Future>, + { + match self.run(future).await { + Ok(value) => value, + Err(err) => self.throw_irrecoverable(err), + } + } + + /// Helper method similar to Go's context.WithCancel - returns (context, cancel_fn). + pub fn with_cancel(&self) -> (Self, impl Fn()) { + let child = self.with_parent(); + let token = child.inner.token.clone(); + let cancel_fn = move || token.cancel(); + (child, cancel_fn) + } +} + +impl std::fmt::Debug for ThrowableContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ThrowableContext") + .field("is_cancelled", &self.is_cancelled()) + .field("deadline", &self.deadline()) + .field("has_parent", &self.inner.parent.is_some()) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::testutil::fixtures::span_fixture; + use tokio::time::sleep; + + #[tokio::test] + async fn test_context_cancellation() { + let ctx = ThrowableContext::new(&span_fixture()); + let child = ctx.with_parent(); + + assert!(!child.is_cancelled()); + ctx.cancel(); + + // Small delay to allow cancellation to propagate + sleep(Duration::from_millis(1)).await; + assert!(child.is_cancelled()); + } + + #[tokio::test] + async fn test_context_timeout() { + let ctx = ThrowableContext::with_timeout(&span_fixture(), Duration::from_millis(10)); + + let result = ctx.run(async { + sleep(Duration::from_millis(50)).await; + Ok::<(), anyhow::Error>(()) + }).await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("deadline exceeded")); + } + + #[tokio::test] + async fn test_context_values() { + let ctx = ThrowableContext::new(&span_fixture()); + let ctx_with_value = ctx.with_value("test_key".to_string()); + let child = ctx_with_value.with_parent(); + + assert_eq!(child.value::(), Some("test_key".to_string())); + assert_eq!(ctx.value::(), None); + } + + #[tokio::test] + async fn test_context_with_cancel() { + let ctx = ThrowableContext::new(&span_fixture()); + let (child, cancel) = ctx.with_cancel(); + + assert!(!child.is_cancelled()); + cancel(); + + // Small delay to allow cancellation to propagate + sleep(Duration::from_millis(1)).await; + assert!(child.is_cancelled()); + } + + #[tokio::test] + async fn test_successful_operation() { + let ctx = ThrowableContext::new(&span_fixture()); + + let result = ctx.run(async { + Ok(42) as Result + }).await; + + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + } +} \ No newline at end of file diff --git a/src/core/mod.rs b/src/core/mod.rs index 15e7d84..f3c3009 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,8 +1,10 @@ +pub mod context; mod lookup; pub mod model; #[cfg(test)] pub mod testutil; +pub use crate::core::context::ThrowableContext; pub use crate::core::lookup::array_lookup_table::ArrayLookupTable; pub use crate::core::lookup::array_lookup_table::LOOKUP_TABLE_LEVELS; pub use crate::core::lookup::LookupTable; diff --git a/src/node/base_node.rs b/src/node/base_node.rs index 4139f9d..aa4c4fc 100644 --- a/src/node/base_node.rs +++ b/src/node/base_node.rs @@ -1,6 +1,6 @@ use crate::core::model::direction::Direction; use crate::core::{ - Identifier, IdSearchReq, IdSearchRes, LookupTable, MembershipVector, + Identifier, IdSearchReq, IdSearchRes, LookupTable, MembershipVector, ThrowableContext, }; #[cfg(test)] // TODO: Remove once BaseNode is used in production code. use crate::network::MessageProcessor; @@ -21,6 +21,7 @@ pub(crate) struct BaseNode { lt: Box, net: Box, span: Span, + ctx: ThrowableContext, } impl Node for BaseNode { @@ -217,6 +218,11 @@ impl EventProcessorCore for BaseNode { } impl BaseNode { + /// Get a reference to the throwable context for this node. + pub fn context(&self) -> &ThrowableContext { + &self.ctx + } + /// Create a new `BaseNode` with the provided identifier, membership vector /// and lookup table. #[cfg(test)] // TODO: Remove once BaseNode is used in production code. @@ -231,6 +237,9 @@ impl BaseNode { let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "base_node"); let _enter = span.enter(); + // Create a throwable context for this node + let ctx = ThrowableContext::new(&span); + tracing::trace!( "creating BaseNode with id {:?}, mem_vec {:?}", id, @@ -243,6 +252,7 @@ impl BaseNode { lt, net, span: span.clone(), + ctx, }; // Create a MessageProcessor from this node, instead of casting directly @@ -253,9 +263,11 @@ impl BaseNode { id ); - clone_net - .register_processor(processor) - .map_err(|e| anyhow!("could not register node in network: {}", e))?; + if let Err(e) = clone_net.register_processor(processor) { + let error = anyhow!("could not register node in network: {}", e); + // Use throw_irrecoverable for critical startup failures - this will terminate the program + node.ctx.throw_irrecoverable(error); + } tracing::trace!( "successfully created and registered BaseNode {:?}", @@ -272,7 +284,7 @@ impl BaseNode { impl PartialEq for BaseNode { fn eq(&self, other: &Self) -> bool { self.id == other.id && self.mem_vec == other.mem_vec - // ignore lt for equality check as comparing trait objects is non-trivial + // ignore lt and ctx for equality check as comparing trait objects is non-trivial } } @@ -293,6 +305,7 @@ impl Clone for BaseNode { lt: self.lt.clone(), net: self.net.clone(), span: self.span.clone(), + ctx: self.ctx.clone(), } } } @@ -310,12 +323,14 @@ mod tests { fn test_base_node() { let id = random_identifier(); let mem_vec = random_membership_vector(); + let span = span_fixture(); let node = BaseNode { id, mem_vec, - lt: Box::new(ArrayLookupTable::new(&span_fixture())), + lt: Box::new(ArrayLookupTable::new(&span)), net: Box::new(Unimock::new(())), // No expectations needed for direct struct construction - span: span_fixture(), + span: span.clone(), + ctx: ThrowableContext::new(&span), }; assert_eq!(node.get_identifier(), &id); assert_eq!(node.get_membership_vector(), &mem_vec);