diff --git a/.gitignore b/.gitignore index ce33184..5cddde3 100644 --- a/.gitignore +++ b/.gitignore @@ -397,5 +397,8 @@ FodyWeavers.xsd # JetBrains Rider *.sln.iml +# Cargo build artifacts +target/ + # Cargo lock Cargo.lock diff --git a/examples/sample-app/src/main.rs b/examples/sample-app/src/main.rs index e48ac74..58c92a7 100644 --- a/examples/sample-app/src/main.rs +++ b/examples/sample-app/src/main.rs @@ -41,6 +41,7 @@ async fn main() { interval_secs: 1, timeout: 5, export_severity: Some(Severity::Error), + target_filters: None, ca_cert_path: args.ca_cert_path, bearer_token_provider_fn: Some(get_dummy_bearer_token), }]; @@ -65,6 +66,7 @@ async fn main() { }; let mut otel_component = Otel::new(config); + let shutdown_handle = otel_component.shutdown_handle(); // Start the otel running task let otel_long_running_task = otel_component.run(); // initialize static metrics @@ -95,7 +97,7 @@ async fn main() { }); let _ = join!(instrumentation_task, otel_long_running_task); - otel_component.shutdown().await; + let _ = shutdown_handle.shutdown().await; } #[derive(Parser, Debug)] diff --git a/src/lib.rs b/src/lib.rs index 2cc0cef..59e232c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,6 +60,19 @@ pub struct Otel { shutdown_rx: mpsc::Receiver<()>, } +pub struct ShutdownHandle { + sender: mpsc::Sender<()>, +} + +impl ShutdownHandle { + /// Send a shutdown signal + /// # Errors + /// * `mpsc::error::SendError` - If the shutdown signal could not be sent + pub async fn shutdown(&self) -> Result<(), mpsc::error::SendError<()>> { + self.sender.send(()).await + } +} + impl Otel { #[must_use] pub fn new(config: Config) -> Otel { @@ -144,28 +157,41 @@ impl Otel { return Err(OtelError::PrometheusServerStopped) } _ = self.shutdown_rx.recv() => { + // Graceful shutdown that flushes any pending metrics and logs to the exporter. info!("shutting down otel component"); + + if let Err(metrics_error) = self.meter_provider.force_flush() { + warn!("encountered error while flushing metrics: {metrics_error:?}"); + } + if let Err(metrics_error) = self.meter_provider.shutdown() { + warn!("encountered error while shutting down meter provider: {metrics_error:?}"); + } + + if let Some(logger_provider) = self.logger_provider.clone() { + logger_provider.force_flush(); + let _ = logger_provider.shutdown(); + } + } } Ok(()) } - /// Graceful shutdown that flushes any pending metrics and logs to the exporter. - pub async fn shutdown(&self) { - if let Err(metrics_error) = self.meter_provider.force_flush() { - warn!("ecountered error while flushing metrics: {metrics_error:?}"); - } - if let Err(metrics_error) = self.meter_provider.shutdown() { - warn!("ecountered error while shutting down meter provider: {metrics_error:?}"); - } - - if let Some(logger_provider) = self.logger_provider.clone() { - logger_provider.force_flush(); - let _ = logger_provider.shutdown(); + /// Get a shutdown handle that can be used to trigger shutdown from other contexts. + #[must_use] + pub fn shutdown_handle(&self) -> ShutdownHandle { + ShutdownHandle { + sender: self.shutdown_tx.clone(), } + } - let _ = self.shutdown_tx.send(()).await; + /// Convenience function to trigger shutdown from the Otel struct directly. + /// + /// # Errors + /// * `mpsc::error::SendError` - If the shutdown signal could not be sent + pub async fn shutdown(&self) -> Result<(), mpsc::error::SendError<()>> { + self.shutdown_tx.send(()).await // Just sends signal, run() handles the rest } } @@ -508,3 +534,51 @@ impl Interceptor for AuthIntercepter { Ok(modified_request) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use tokio::time::timeout; + + #[tokio::test] + async fn test_shutdown_approaches_both_work() { + // Test that both shutdown approaches trigger the same graceful shutdown behavior + { + // Arrange + let config = Config::default(); + let mut otel = Otel::new(config); + let shutdown_handle = otel.shutdown_handle(); + + let run_task = tokio::spawn(async move { otel.run().await }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Act + shutdown_handle + .shutdown() + .await + .expect("Should be able to send shutdown signal"); + + // Assert + let result = timeout(Duration::from_secs(1), run_task).await; + assert!( + result.is_ok(), + "Run task should complete after direct shutdown" + ); + assert!(result.unwrap().is_ok(), "Run task should exit cleanly"); + } + + { + // Arrange + let config = Config::default(); + let otel = Otel::new(config); + + // Act + let shutdown_result = otel.shutdown().await; + + // Assert + assert!(shutdown_result.is_ok(), "Convenience shutdown should work"); + } + } +}