From 82a4c4a14af9aefb96d4b3a6604c8cc85220ab90 Mon Sep 17 00:00:00 2001 From: Peyton Date: Thu, 19 Feb 2026 15:48:28 -0800 Subject: [PATCH] feat: support concurrent KVP telemetry with configurable event prefix --- libazureinit/src/kvp.rs | 232 +++++++++++++++++++++++++++++++----- libazureinit/src/logging.rs | 14 ++- 2 files changed, 215 insertions(+), 31 deletions(-) diff --git a/libazureinit/src/kvp.rs b/libazureinit/src/kvp.rs index ae73be50..1f1b19a2 100644 --- a/libazureinit/src/kvp.rs +++ b/libazureinit/src/kvp.rs @@ -45,7 +45,9 @@ use uuid::Uuid; const HV_KVP_EXCHANGE_MAX_KEY_SIZE: usize = 512; const HV_KVP_EXCHANGE_MAX_VALUE_SIZE: usize = 2048; const HV_KVP_AZURE_MAX_VALUE_SIZE: usize = 1022; -const EVENT_PREFIX: &str = concat!("azure-init-", env!("CARGO_PKG_VERSION")); +/// The default event prefix used when no custom prefix is provided. +pub const EVENT_PREFIX: &str = + concat!("azure-init-", env!("CARGO_PKG_VERSION")); /// Encapsulates the KVP (Key-Value Pair) tracing infrastructure. /// @@ -75,6 +77,7 @@ impl Kvp { pub(crate) fn new( file_path: std::path::PathBuf, vm_id: &str, + event_prefix: &str, graceful_shutdown: CancellationToken, ) -> Result { truncate_guest_pool_file(&file_path)?; @@ -96,6 +99,7 @@ impl Kvp { tracing_layer: EmitKVPLayer { events_tx, vm_id: vm_id.to_string(), + event_prefix: event_prefix.to_string(), }, writer, }) @@ -167,7 +171,10 @@ impl Kvp { /// # Arguments /// * `file` - A mutable reference to the file to write to. /// * `kvps` - A slice of encoded KVP messages to write. - fn write_kvps(file: &mut File, kvps: &[Vec]) -> io::Result<()> { + pub(crate) fn write_kvps( + file: &mut File, + kvps: &[Vec], + ) -> io::Result<()> { FileExt::lock_exclusive(file).map_err(|e| { io::Error::other(format!("Failed to lock KVP file: {e}")) })?; @@ -254,6 +261,7 @@ impl Visit for StringVisitor<'_> { pub struct EmitKVPLayer { events_tx: UnboundedSender>, vm_id: String, + event_prefix: String, } impl EmitKVPLayer { @@ -275,8 +283,13 @@ impl EmitKVPLayer { span_id: &str, event_value: &str, ) { - let event_key = - generate_event_key(&self.vm_id, event_level, event_name, span_id); + let event_key = generate_event_key( + &self.event_prefix, + &self.vm_id, + event_level, + event_name, + span_id, + ); let encoded_kvp = encode_kvp_item(&event_key, event_value); let encoded_kvp_flattened: Vec = encoded_kvp.concat(); self.send_event(encoded_kvp_flattened); @@ -425,19 +438,24 @@ where } } -/// Generates a unique event key by combining the event level, name, and span ID. +/// Generates a unique event key by combining the event prefix, VM ID, level, +/// name, and span ID. /// /// # Arguments +/// * `event_prefix` - A prefix identifying the emitting application or library +/// (e.g., `"azure-init-0.1.1"` or `"my-library-2.0"`). +/// * `vm_id` - The unique identifier for the VM. /// * `event_level` - The logging level (e.g., "INFO", "DEBUG"). /// * `event_name` - The name of the event. /// * `span_id` - A unique identifier for the span. fn generate_event_key( + event_prefix: &str, vm_id: &str, event_level: &str, event_name: &str, span_id: &str, ) -> String { - format!("{EVENT_PREFIX}|{vm_id}|{event_level}|{event_name}|{span_id}") + format!("{event_prefix}|{vm_id}|{event_level}|{event_name}|{span_id}") } /// Encodes a key-value pair (KVP) into one or more byte slices. If the value @@ -452,7 +470,7 @@ fn generate_event_key( /// # Arguments /// * `key` - The key as a string slice. /// * `value` - The value associated with the key. -fn encode_kvp_item(key: &str, value: &str) -> Vec> { +pub(crate) fn encode_kvp_item(key: &str, value: &str) -> Vec> { let key_buf = key .as_bytes() .iter() @@ -532,37 +550,50 @@ pub fn decode_kvp_item( } /// Truncates the guest pool KVP file if it contains stale data (i.e., data -/// older than the system's boot time). Logs whether the file was truncated -/// or no action was needed. +/// older than the system's boot time). +/// +/// An exclusive `flock` is held while checking metadata and truncating so +/// that concurrent processes don't race on the same check-then-truncate +/// sequence. fn truncate_guest_pool_file(kvp_file: &Path) -> Result<(), anyhow::Error> { let boot_time = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() - get_uptime().as_secs(); - match kvp_file.metadata() { - Ok(metadata) => { - if metadata.mtime() < boot_time as i64 { - OpenOptions::new() - .write(true) - .truncate(true) - .open(kvp_file)?; - println!("Truncated the KVP file due to stale data."); - } else { - println!( - "File has been truncated since boot, no action taken." - ); - } - } + // Try to open the file; if it doesn't exist there is nothing to truncate. + let file = match OpenOptions::new().read(true).write(true).open(kvp_file) { + Ok(f) => f, Err(ref e) if e.kind() == ErrorKind::NotFound => { println!("File not found: {kvp_file:?}"); return Ok(()); } Err(e) => { return Err(anyhow::Error::from(e) - .context("Failed to access file metadata")); + .context("Failed to open KVP file for truncation")); } - } + }; + + // Hold an exclusive lock for the metadata-check + truncate window so + // that two concurrent callers cannot both decide the file is stale and + // truncate data the other has already written. + FileExt::lock_exclusive(&file).map_err(|e| { + anyhow::Error::from(e).context("Failed to lock KVP file for truncation") + })?; + + let result = (|| -> Result<(), anyhow::Error> { + let metadata = file.metadata()?; + if metadata.mtime() < boot_time as i64 { + file.set_len(0)?; + println!("Truncated the KVP file due to stale data."); + } else { + println!("File has been truncated since boot, no action taken."); + } + Ok(()) + })(); - Ok(()) + // Always release the lock, even if the inner operation failed. + let _ = FileExt::unlock(&file); + + result } /// Retrieves the system's uptime using the `sysinfo` crate, returning the duration @@ -686,9 +717,13 @@ mod tests { let test_vm_id = "00000000-0000-0000-0000-000000000001"; let graceful_shutdown = CancellationToken::new(); - let kvp = - Kvp::new(temp_path.clone(), test_vm_id, graceful_shutdown.clone()) - .expect("Failed to create Kvp"); + let kvp = Kvp::new( + temp_path.clone(), + test_vm_id, + EVENT_PREFIX, + graceful_shutdown.clone(), + ) + .expect("Failed to create Kvp"); let subscriber = Registry::default().with(kvp.tracing_layer); let default_guard = tracing::subscriber::set_default(subscriber); @@ -871,6 +906,7 @@ mod tests { Kvp::new( temp_path.clone(), test_vm_id, + EVENT_PREFIX, graceful_shutdown.clone(), ) .expect("Failed to create Kvp") @@ -899,4 +935,142 @@ mod tests { println!("KVP file is empty as expected because kvp_diagnostics is disabled."); } + + /// Helper: verify that a file contains exactly `expected` well-formed KVP + /// records (each `HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE` + /// bytes). + fn assert_kvp_record_count(path: &std::path::Path, expected: usize) { + let contents = std::fs::read(path).expect("Failed to read KVP file"); + let record_size = + HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE; + + assert_eq!( + contents.len() % record_size, + 0, + "File size ({}) is not a multiple of the record size ({record_size})", + contents.len() + ); + + let actual = contents.len() / record_size; + assert_eq!( + actual, expected, + "Expected {expected} KVP records but found {actual}" + ); + + // Validate every record is decodable. + for i in 0..actual { + let start = i * record_size; + let end = start + record_size; + decode_kvp_item(&contents[start..end]) + .unwrap_or_else(|e| panic!("Record {i} failed to decode: {e}")); + } + } + + /// 4 threads × 5,000 iterations writing to the same file via separate FDs. + #[test] + fn test_multi_thread_kvp_concurrent_writes() { + let temp_file = + NamedTempFile::new().expect("Failed to create tempfile"); + let temp_path = temp_file.path().to_path_buf(); + + let num_threads: usize = 4; + let iterations: usize = 5_000; + + let handles: Vec<_> = (0..num_threads) + .map(|tid| { + let path = temp_path.clone(); + std::thread::spawn(move || { + // Each thread opens its own file descriptor. + let mut file = OpenOptions::new() + .append(true) + .create(true) + .open(&path) + .expect("Failed to open KVP file"); + + for i in 0..iterations { + let key = format!("thread-{tid}-iter-{i}"); + let value = format!("value-{tid}-{i}"); + let encoded = encode_kvp_item(&key, &value).concat(); + Kvp::write_kvps(&mut file, &[encoded]) + .expect("write_kvps failed"); + } + }) + }) + .collect(); + + for h in handles { + h.join().expect("Thread panicked"); + } + + let expected_records = num_threads * iterations; + assert_kvp_record_count(&temp_path, expected_records); + println!( + "Multi-thread test passed: {expected_records} records verified." + ); + } + + /// 4 child processes × 5,000 iterations writing to the same file. + /// + /// When the env var `__KVP_CHILD_WORKER` is set the process acts as a + /// worker (encode + write); otherwise it orchestrates the children. + #[test] + fn test_multi_process_kvp_concurrent_writes() { + let num_processes: usize = 4; + let iterations: usize = 5_000; + + // --- Child worker path --- + if let Ok(path) = std::env::var("__KVP_CHILD_WORKER_PATH") { + let pid = std::process::id(); + let mut file = OpenOptions::new() + .append(true) + .create(true) + .open(&path) + .expect("Child: failed to open KVP file"); + + for i in 0..iterations { + let key = format!("proc-{pid}-iter-{i}"); + let value = format!("value-{pid}-{i}"); + let encoded = encode_kvp_item(&key, &value).concat(); + Kvp::write_kvps(&mut file, &[encoded]) + .expect("Child: write_kvps failed"); + } + return; // done – the parent will verify the file. + } + + // --- Parent orchestrator path --- + let temp_file = + NamedTempFile::new().expect("Failed to create tempfile"); + let temp_path = temp_file.path().to_path_buf(); + + let test_exe = std::env::current_exe() + .expect("Failed to determine test executable path"); + + let children: Vec<_> = (0..num_processes) + .map(|_| { + std::process::Command::new(&test_exe) + .env("__KVP_CHILD_WORKER_PATH", temp_path.to_str().unwrap()) + .arg("--exact") + .arg("kvp::tests::test_multi_process_kvp_concurrent_writes") + .arg("--nocapture") + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .expect("Failed to spawn child process") + }) + .collect(); + + for mut child in children { + let status = child.wait().expect("Failed to wait on child"); + assert!( + status.success(), + "Child process exited with failure: {status}" + ); + } + + let expected_records = num_processes * iterations; + assert_kvp_record_count(&temp_path, expected_records); + println!( + "Multi-process test passed: {expected_records} records verified." + ); + } } diff --git a/libazureinit/src/logging.rs b/libazureinit/src/logging.rs index fb5c543a..c762b227 100644 --- a/libazureinit/src/logging.rs +++ b/libazureinit/src/logging.rs @@ -16,6 +16,7 @@ use tracing_subscriber::{ }; use crate::config::Config; +pub use crate::kvp::EVENT_PREFIX; use crate::kvp::{EmitKVPLayer, Kvp as KvpInternal}; pub type LoggingSetup = ( @@ -153,7 +154,7 @@ struct KvpLayer(Filtered); /// /// # #[tokio::main] /// # async fn main() -> anyhow::Result<()> { -/// let mut kvp = Kvp::new("a-unique-id")?; +/// let mut kvp = Kvp::new("a-unique-id", None)?; /// let registry = tracing_subscriber::Registry::default().with(kvp.layer()); /// /// // When it's time to shut down, doing this ensures all writes are flushed @@ -173,12 +174,20 @@ pub struct Kvp { impl LookupSpan<'lookup>> Kvp { /// Create a new tracing layer for KVP. /// + /// When `event_prefix` is `None`, the default [`EVENT_PREFIX`] is used. + /// Pass `Some("my-library-1.0")` to identify a different emitting library. + /// /// Refer to [`libazureinit::get_vm_id`] to retrieve the VM's unique identifier. - pub fn new>(vm_id: T) -> Result { + pub fn new>( + vm_id: T, + event_prefix: Option<&str>, + ) -> Result { + let event_prefix = event_prefix.unwrap_or(EVENT_PREFIX); let shutdown = CancellationToken::new(); let inner = KvpInternal::new( std::path::PathBuf::from("/var/lib/hyperv/.kvp_pool_1"), vm_id.as_ref(), + event_prefix, shutdown.clone(), )?; @@ -244,6 +253,7 @@ pub fn setup_layers( match KvpInternal::new( std::path::PathBuf::from("/var/lib/hyperv/.kvp_pool_1"), vm_id, + EVENT_PREFIX, graceful_shutdown, ) { Ok(kvp) => {