diff --git a/examples/sample-app/src/main.rs b/examples/sample-app/src/main.rs index 9f8f461..e48ac74 100644 --- a/examples/sample-app/src/main.rs +++ b/examples/sample-app/src/main.rs @@ -88,7 +88,7 @@ async fn main() { STATIC_METRICS.updown_counter.add(val * -1.0, &[]); } STATIC_METRICS.observable_gauge.observe(iteration, &[]); - info!("iteration: {iteration}"); + info!("iteration: {}", iteration); sleep(Duration::from_micros(100)).await; } info!("Done. Please hit [Ctrl+C] to exit"); diff --git a/src/config.rs b/src/config.rs index e6683c4..e33270c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -88,12 +88,28 @@ pub struct LogsExportTarget { pub timeout: u64, /// export severity - severity >= which to export pub export_severity: Option, + /// target name filters - only export logs that match these exact target names. If None, exports all logs. + pub target_filters: Option>, /// path to root ca cert pub ca_cert_path: Option, /// a fn that provides the bearer token, which will be called to get the token for each export request pub bearer_token_provider_fn: Option String>, } +impl Default for LogsExportTarget { + fn default() -> Self { + Self { + url: String::new(), + interval_secs: 30, + timeout: 5, + export_severity: None, + target_filters: None, + ca_cert_path: None, + bearer_token_provider_fn: None, + } + } +} + #[derive(Clone, Debug)] pub struct Attribute { pub key: String, @@ -147,8 +163,8 @@ mod tests { interval_secs: 1, timeout: 5, export_severity: Some(Severity::Error), - ca_cert_path: None, bearer_token_provider_fn: Some(get_dummy_auth_token), + ..Default::default() }]; let config = Config { diff --git a/src/filtered_log_processor.rs b/src/filtered_log_processor.rs index a8ac726..e621808 100644 --- a/src/filtered_log_processor.rs +++ b/src/filtered_log_processor.rs @@ -16,7 +16,7 @@ use futures_util::{ use opentelemetry::{ global, - logs::{LogError, LogResult, Severity}, + logs::{AnyValue, LogError, LogResult, Severity}, }; use opentelemetry_sdk::{ export::logs::{ExportResult, LogData, LogExporter}, @@ -125,17 +125,13 @@ impl FilteredBatchLogProcessor { while let Some(message) = messages.next().await { match message { BatchMessage::ExportLog(log) => { - // add log only if the severity is >= export_severity - if let Some(severity) = log.record.severity_number { - if severity >= config.export_severity { - logs.push(Cow::Owned(log)); - } else { - continue; - } - } else { - continue; + // Apply filtering using the dedicated function + if !should_export_log(&log, &config) { + continue; // skip logs that do not match the filters } + logs.push(Cow::Owned(log)); + if logs.len() == config.max_export_batch_size { let result = export_with_timeout( config.max_export_timeout, @@ -216,6 +212,44 @@ impl FilteredBatchLogProcessor { } } +/// Check if a log should be exported based on severity and target filtering rules +fn should_export_log(log: &LogData, config: &FilteredBatchConfig) -> bool { + // Apply severity filtering + let severity_matches = log + .record + .severity_number + .is_some_and(|sev| sev >= config.export_severity); + + if !severity_matches { + return false; // skip logs that do not match the severity filter + } + + // Apply target filtering + let target_matches = if let Some(ref target_filters) = config.target_filters { + // Check if the log has a "target" attribute that matches any of our filters + log.record.attributes.as_ref().is_some_and(|attrs| { + attrs.iter().any(|(key, value)| { + if key.as_str() == "target" { + // Extract string value from AnyValue by matching on the enum + if let AnyValue::String(target_value) = value { + let target_str = target_value.as_str(); + // Check if target matches any of the configured filters (exact match only) + target_filters.iter().any(|filter| target_str == filter) + } else { + false + } + } else { + false + } + }) + }) + } else { + true // if no target filters specified, accept all logs + }; + + target_matches +} + async fn export_with_timeout( time_out: Duration, exporter: &mut E, @@ -240,7 +274,7 @@ where } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub(crate) struct FilteredBatchConfig { /// The maximum queue size to buffer logs for delayed processing. If the /// queue gets full it drops the logs. The default value of is 2048. @@ -261,6 +295,9 @@ pub(crate) struct FilteredBatchConfig { /// export level - levels >= which to export pub export_severity: Severity, + + /// target filters - only export logs from targets matching these exact names. If None, exports all logs. + pub target_filters: Option>, } impl Default for FilteredBatchConfig { @@ -271,6 +308,7 @@ impl Default for FilteredBatchConfig { max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, max_export_timeout: Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT), export_severity: Severity::Error, + target_filters: None, } } } @@ -317,3 +355,325 @@ enum BatchMessage { /// Set the resource for the exporter. SetResource(Arc), } + +#[cfg(test)] +#[allow(clippy::default_trait_access)] +mod tests { + use super::*; + use opentelemetry::{logs::AnyValue, logs::LogRecord}; + use opentelemetry_sdk::logs::LogRecord as SdkLogRecord; + + /// Helper function to create a `LogData` with specified severity and target + fn create_log_data(severity: Option, target: Option<&str>) -> LogData { + let mut log_record = SdkLogRecord::default(); + + if let Some(sev) = severity { + log_record.set_severity_number(sev); + } + + if let Some(target_str) = target { + log_record.add_attribute("target", AnyValue::from(target_str.to_string())); + } + + LogData { + record: log_record, + instrumentation: Default::default(), + } + } + + /// Helper function to create `FilteredBatchConfig` for testing + fn create_test_config( + export_severity: Severity, + target_filters: Option>, + ) -> FilteredBatchConfig { + FilteredBatchConfig { + max_queue_size: 100, + scheduled_delay: Duration::from_millis(100), + max_export_batch_size: 10, + max_export_timeout: Duration::from_millis(1000), + export_severity, + target_filters, + } + } + + #[test] + fn test_should_export_log_severity_filtering_basic() { + let config = create_test_config(Severity::Error, None); + + // Test logs with different severities + let error_log = create_log_data(Some(Severity::Error), None); + let warn_log = create_log_data(Some(Severity::Warn), None); + let info_log = create_log_data(Some(Severity::Info), None); + let debug_log = create_log_data(Some(Severity::Debug), None); + + // Only Error and above should pass + assert!(should_export_log(&error_log, &config)); + assert!(!should_export_log(&warn_log, &config)); + assert!(!should_export_log(&info_log, &config)); + assert!(!should_export_log(&debug_log, &config)); + } + + #[test] + fn test_should_export_log_severity_filtering_warn_level() { + let config = create_test_config(Severity::Warn, None); + + let error_log = create_log_data(Some(Severity::Error), None); + let warn_log = create_log_data(Some(Severity::Warn), None); + let info_log = create_log_data(Some(Severity::Info), None); + + // Warn and above should pass + assert!(should_export_log(&error_log, &config)); + assert!(should_export_log(&warn_log, &config)); + assert!(!should_export_log(&info_log, &config)); + } + + #[test] + fn test_should_export_log_severity_filtering_no_severity() { + let config = create_test_config(Severity::Error, None); + let log_without_severity = create_log_data(None, None); + + // Logs without severity should not pass + assert!(!should_export_log(&log_without_severity, &config)); + } + + #[test] + fn test_should_export_log_target_filtering_exact_match() { + let config = create_test_config( + Severity::Info, + Some(vec![ + "critical_service".to_string(), + "payment_processor".to_string(), + ]), + ); + + let matching_log1 = create_log_data(Some(Severity::Info), Some("critical_service")); + let matching_log2 = create_log_data(Some(Severity::Info), Some("payment_processor")); + let non_matching_log = create_log_data(Some(Severity::Info), Some("other_service")); + + assert!(should_export_log(&matching_log1, &config)); + assert!(should_export_log(&matching_log2, &config)); + assert!(!should_export_log(&non_matching_log, &config)); + } + + #[test] + fn test_should_export_log_target_filtering_no_target_attribute() { + let config = create_test_config(Severity::Info, Some(vec!["critical_service".to_string()])); + + let log_without_target = create_log_data(Some(Severity::Info), None); + + // Logs without target attribute should not pass when target filters are configured + assert!(!should_export_log(&log_without_target, &config)); + } + + #[test] + fn test_should_export_log_target_filtering_no_filters_configured() { + let config = create_test_config(Severity::Info, None); + + let log_with_target = create_log_data(Some(Severity::Info), Some("any_target")); + let log_without_target = create_log_data(Some(Severity::Info), None); + + // When no target filters are configured, all logs should pass target filtering + assert!(should_export_log(&log_with_target, &config)); + assert!(should_export_log(&log_without_target, &config)); + } + + #[test] + fn test_should_export_log_target_filtering_case_sensitive() { + let config = create_test_config(Severity::Info, Some(vec!["CriticalService".to_string()])); + + let matching_log = create_log_data(Some(Severity::Info), Some("CriticalService")); + let non_matching_log = create_log_data(Some(Severity::Info), Some("criticalservice")); + + // Target matching should be case-sensitive + assert!(should_export_log(&matching_log, &config)); + assert!(!should_export_log(&non_matching_log, &config)); + } + + #[test] + fn test_should_export_log_combined_severity_and_target_filtering() { + let config = + create_test_config(Severity::Error, Some(vec!["critical_service".to_string()])); + + // Both severity and target match + let matching_log = create_log_data(Some(Severity::Error), Some("critical_service")); + + // Severity matches but target doesn't + let severity_only_log = create_log_data(Some(Severity::Error), Some("other_service")); + + // Target matches but severity doesn't + let target_only_log = create_log_data(Some(Severity::Warn), Some("critical_service")); + + // Neither matches + let no_match_log = create_log_data(Some(Severity::Warn), Some("other_service")); + + assert!(should_export_log(&matching_log, &config)); + assert!(!should_export_log(&severity_only_log, &config)); + assert!(!should_export_log(&target_only_log, &config)); + assert!(!should_export_log(&no_match_log, &config)); + } + + #[test] + fn test_should_export_log_target_filtering_empty_filter_list() { + let config = create_test_config(Severity::Info, Some(vec![])); + + let log_with_target = create_log_data(Some(Severity::Info), Some("any_target")); + + // Empty filter list should reject all logs (no targets match) + assert!(!should_export_log(&log_with_target, &config)); + } + + #[test] + fn test_should_export_log_target_filtering_special_characters() { + let config = create_test_config( + Severity::Info, + Some(vec![ + "service-with-dashes".to_string(), + "service_with_underscores".to_string(), + "service::with::colons".to_string(), + ]), + ); + + let dash_log = create_log_data(Some(Severity::Info), Some("service-with-dashes")); + let underscore_log = + create_log_data(Some(Severity::Info), Some("service_with_underscores")); + let colon_log = create_log_data(Some(Severity::Info), Some("service::with::colons")); + + assert!(should_export_log(&dash_log, &config)); + assert!(should_export_log(&underscore_log, &config)); + assert!(should_export_log(&colon_log, &config)); + } + + #[test] + fn test_should_export_log_target_filtering_non_string_attribute() { + let config = create_test_config(Severity::Info, Some(vec!["critical_service".to_string()])); + + // Create a log with target attribute that's not a string + let mut log_record = SdkLogRecord::default(); + log_record.set_severity_number(Severity::Info); + log_record.add_attribute("target", AnyValue::Int(123)); // Non-string value + + let log_data = LogData { + record: log_record, + instrumentation: Default::default(), + }; + + // Non-string target attributes should not match + assert!(!should_export_log(&log_data, &config)); + } + + #[test] + fn test_should_export_log_multiple_attributes_with_target() { + let config = create_test_config(Severity::Info, Some(vec!["critical_service".to_string()])); + + let mut log_record = SdkLogRecord::default(); + log_record.set_severity_number(Severity::Info); + log_record.add_attribute("service", AnyValue::from("some_service".to_string())); + log_record.add_attribute("target", AnyValue::from("critical_service".to_string())); + log_record.add_attribute("component", AnyValue::from("auth".to_string())); + + let log_data = LogData { + record: log_record, + instrumentation: Default::default(), + }; + + // Should find the target attribute among multiple attributes + assert!(should_export_log(&log_data, &config)); + } + + #[test] + fn test_should_export_log_severity_levels_comprehensive() { + // Test all severity levels with different thresholds + let trace_config = create_test_config(Severity::Trace, None); + let debug_config = create_test_config(Severity::Debug, None); + let info_config = create_test_config(Severity::Info, None); + let warn_config = create_test_config(Severity::Warn, None); + let error_config = create_test_config(Severity::Error, None); + + let trace_log = create_log_data(Some(Severity::Trace), None); + let debug_log = create_log_data(Some(Severity::Debug), None); + let info_log = create_log_data(Some(Severity::Info), None); + let warn_log = create_log_data(Some(Severity::Warn), None); + let error_log = create_log_data(Some(Severity::Error), None); + + // Trace threshold - should accept all + assert!(should_export_log(&trace_log, &trace_config)); + assert!(should_export_log(&debug_log, &trace_config)); + assert!(should_export_log(&info_log, &trace_config)); + assert!(should_export_log(&warn_log, &trace_config)); + assert!(should_export_log(&error_log, &trace_config)); + + // Debug threshold - should accept debug and above + assert!(!should_export_log(&trace_log, &debug_config)); + assert!(should_export_log(&debug_log, &debug_config)); + assert!(should_export_log(&info_log, &debug_config)); + assert!(should_export_log(&warn_log, &debug_config)); + assert!(should_export_log(&error_log, &debug_config)); + + // Info threshold - should accept info and above + assert!(!should_export_log(&trace_log, &info_config)); + assert!(!should_export_log(&debug_log, &info_config)); + assert!(should_export_log(&info_log, &info_config)); + assert!(should_export_log(&warn_log, &info_config)); + assert!(should_export_log(&error_log, &info_config)); + + // Warn threshold - should accept warn and above + assert!(!should_export_log(&trace_log, &warn_config)); + assert!(!should_export_log(&debug_log, &warn_config)); + assert!(!should_export_log(&info_log, &warn_config)); + assert!(should_export_log(&warn_log, &warn_config)); + assert!(should_export_log(&error_log, &warn_config)); + + // Error threshold - should accept only error + assert!(!should_export_log(&trace_log, &error_config)); + assert!(!should_export_log(&debug_log, &error_config)); + assert!(!should_export_log(&info_log, &error_config)); + assert!(!should_export_log(&warn_log, &error_config)); + assert!(should_export_log(&error_log, &error_config)); + } + + #[test] + fn test_should_export_log_edge_cases() { + // Test various edge cases + + // Empty string target + let config = create_test_config(Severity::Info, Some(vec![String::new()])); + let empty_target_log = create_log_data(Some(Severity::Info), Some("")); + assert!(should_export_log(&empty_target_log, &config)); + + // Very long target name + let long_target = "a".repeat(1000); + let long_config = create_test_config(Severity::Info, Some(vec![long_target.clone()])); + let long_target_log = create_log_data(Some(Severity::Info), Some(&long_target)); + assert!(should_export_log(&long_target_log, &long_config)); + + // Unicode characters in target + let unicode_config = create_test_config( + Severity::Info, + Some(vec!["服务器".to_string(), "🚀rocket".to_string()]), + ); + let unicode_log1 = create_log_data(Some(Severity::Info), Some("服务器")); + let unicode_log2 = create_log_data(Some(Severity::Info), Some("🚀rocket")); + assert!(should_export_log(&unicode_log1, &unicode_config)); + assert!(should_export_log(&unicode_log2, &unicode_config)); + } + + #[test] + fn test_should_export_log_multiple_target_attributes() { + // Test log with multiple target-related attributes (only "target" should match) + let config = create_test_config(Severity::Info, Some(vec!["service1".to_string()])); + + let mut log_record = SdkLogRecord::default(); + log_record.set_severity_number(Severity::Info); + log_record.add_attribute("target", AnyValue::from("service1".to_string())); + log_record.add_attribute("target_service", AnyValue::from("service2".to_string())); + log_record.add_attribute("service_target", AnyValue::from("service3".to_string())); + + let log_data = LogData { + record: log_record, + instrumentation: Default::default(), + }; + + // Should match because "target" attribute has "service1" + assert!(should_export_log(&log_data, &config)); + } +} diff --git a/src/loggers.rs b/src/loggers.rs index db38608..b2011d1 100644 --- a/src/loggers.rs +++ b/src/loggers.rs @@ -64,6 +64,9 @@ where log_record.set_severity_text(record.level().as_str().into()); log_record.set_timestamp(timestamp); + // Add target as an attribute so it can be filtered + log_record.add_attribute("target", AnyValue::from(record.target().to_string())); + self.logger.emit(log_record); } @@ -163,6 +166,7 @@ pub(crate) fn init_logs(config: Config) -> Result) -> Result, Status> { let header_value = request .metadata()