From 2ea499cef6c1e4c38e32d7ae1b18a9a8c0dbbabc Mon Sep 17 00:00:00 2001 From: not-matthias Date: Mon, 5 Jan 2026 15:13:18 +0100 Subject: [PATCH 1/3] feat(memtrack): add zstd compression support --- Cargo.lock | 29 ++++++++ crates/memtrack/src/main.rs | 73 +++++++++++++------ crates/runner-shared/Cargo.toml | 1 + .../runner-shared/src/artifacts/memtrack.rs | 53 ++++++++++++-- crates/runner-shared/src/artifacts/mod.rs | 12 ++- 5 files changed, 137 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69509ff5..a861d002 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2477,6 +2477,7 @@ dependencies = [ "rmp-serde", "serde", "serde_json", + "zstd", ] [[package]] @@ -4101,3 +4102,31 @@ dependencies = [ "quote", "syn 2.0.111", ] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/crates/memtrack/src/main.rs b/crates/memtrack/src/main.rs index e3ead412..ec23fbb2 100644 --- a/crates/memtrack/src/main.rs +++ b/crates/memtrack/src/main.rs @@ -3,12 +3,14 @@ use clap::Parser; use ipc_channel::ipc::{self}; use log::{debug, info}; use memtrack::{MemtrackIpcMessage, Tracker, handle_ipc_message}; -use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact, MemtrackEvent}; -use std::path::PathBuf; +use runner_shared::artifacts::{ArtifactExt, MemtrackArtifact, MemtrackEvent, MemtrackWriter}; +use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; use std::thread; +use std::time::Duration; #[derive(Parser)] #[command(name = "memtrack")] @@ -51,10 +53,8 @@ fn main() -> Result<()> { } => { debug!("Starting memtrack for command: {command}"); - let (root_pid, events, status) = - track_command(&command, ipc_server).context("Failed to track command")?; - let result = MemtrackArtifact { events }; - result.save_with_pid_to(&out_dir, root_pid as libc::pid_t)?; + let status = + track_command(&command, ipc_server, &out_dir).context("Failed to track command")?; std::process::exit(status.code().unwrap_or(1)); } @@ -64,7 +64,8 @@ fn main() -> Result<()> { fn track_command( cmd_string: &str, ipc_server_name: Option, -) -> anyhow::Result<(u32, Vec, std::process::ExitStatus)> { + out_dir: &Path, +) -> anyhow::Result { let tracker = Tracker::new()?; let tracker_arc = Arc::new(Mutex::new(tracker)); @@ -95,37 +96,65 @@ fn track_command( let event_rx = { tracker_arc.lock().unwrap().track(root_pid)? }; info!("Spawned child with pid {root_pid}"); - // Spawn event processing thread - let process_events = Arc::new(AtomicBool::new(true)); - let process_events_clone = process_events.clone(); - let processing_thread = thread::spawn(move || { - let mut events = Vec::new(); + // Generate output file name and create file for streaming events + let file_name = MemtrackArtifact::file_name(Some(root_pid)); + let out_file = std::fs::File::create(out_dir.join(file_name))?; + + let (write_tx, write_rx) = channel::(); + + // Stage A: Fast drain thread - This is required so that we immediately clear the ring buffer + // because it only has a limited size. + static DRAIN_EVENTS: AtomicBool = AtomicBool::new(true); + let write_tx_clone = write_tx.clone(); + let drain_thread = thread::spawn(move || { loop { - if !process_events_clone.load(Ordering::Relaxed) { + if !DRAIN_EVENTS.load(Ordering::Relaxed) { break; } - - let Ok(event) = event_rx.try_recv() else { + let Ok(event) = event_rx.recv_timeout(Duration::from_millis(100)) else { continue; }; + let _ = write_tx_clone.send(event.into()); + } + }); + + // Stage B: Writer thread - Immediately writes the events to disk + let writer_thread = thread::spawn(move || -> anyhow::Result<()> { + let mut writer = MemtrackWriter::new(out_file)?; - events.push(event.into()); + while let Ok(first) = write_rx.recv() { + writer.write_event(&first)?; + + // Drain any backlog in a tight loop (batching) + while let Ok(ev) = write_rx.try_recv() { + writer.write_event(&ev)?; + } } - events + writer.finish()?; + + Ok(()) }); // Wait for the command to complete let status = child.wait().context("Failed to wait for command")?; info!("Command exited with status: {status}"); - info!("Waiting for the event processing thread to finish"); - process_events.store(false, Ordering::Relaxed); - let events = processing_thread + // Wait for drain thread to finish + info!("Waiting for the drain thread to finish"); + DRAIN_EVENTS.store(false, Ordering::Relaxed); + drain_thread + .join() + .map_err(|_| anyhow::anyhow!("Failed to join drain thread"))?; + + // Wait for writer thread to finish and propagate errors + info!("Waiting for the writer thread to finish"); + drop(write_tx); + writer_thread .join() - .map_err(|_| anyhow::anyhow!("Failed to join event thread"))?; + .map_err(|_| anyhow::anyhow!("Failed to join writer thread"))??; // IPC thread will exit when channel closes drop(ipc_handle); - Ok((root_pid as u32, events, status)) + Ok(status) } diff --git a/crates/runner-shared/Cargo.toml b/crates/runner-shared/Cargo.toml index 242dd56c..99413461 100644 --- a/crates/runner-shared/Cargo.toml +++ b/crates/runner-shared/Cargo.toml @@ -13,3 +13,4 @@ log = { workspace = true } rmp = "0.8.14" rmp-serde = "1.3.0" libc = { workspace = true } +zstd = "0.13" diff --git a/crates/runner-shared/src/artifacts/memtrack.rs b/crates/runner-shared/src/artifacts/memtrack.rs index 484dd234..d6562e06 100644 --- a/crates/runner-shared/src/artifacts/memtrack.rs +++ b/crates/runner-shared/src/artifacts/memtrack.rs @@ -1,6 +1,6 @@ use libc::pid_t; use serde::{Deserialize, Serialize}; -use std::io::{Read, Write}; +use std::io::{BufWriter, Read, Write}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MemtrackArtifact { @@ -8,20 +8,22 @@ pub struct MemtrackArtifact { } impl super::ArtifactExt for MemtrackArtifact { fn encode_to_writer(&self, writer: W) -> anyhow::Result<()> { - // This is required for `decode_streamed`: We can't stream the deserialization of - // the whole artifact, so we have to encode them one by one. - let mut serializer = rmp_serde::Serializer::new(writer); + let mut writer = MemtrackWriter::new(writer)?; for event in &self.events { - event.serialize(&mut serializer)?; + writer.write_event(event)?; } + writer.finish()?; Ok(()) } } impl MemtrackArtifact { - pub fn decode_streamed(reader: R) -> anyhow::Result> { + pub fn decode_streamed( + reader: R, + ) -> anyhow::Result>>> { + let decoder = zstd::Decoder::new(reader)?; Ok(MemtrackEventStream { - deserializer: rmp_serde::Deserializer::new(reader), + deserializer: rmp_serde::Deserializer::new(decoder), }) } } @@ -61,6 +63,43 @@ impl Iterator for MemtrackEventStream { } } +/// Streaming writer for memtrack events with compression +pub struct MemtrackWriter { + serializer: rmp_serde::Serializer>>, +} + +impl MemtrackWriter { + pub fn new(writer: W) -> anyhow::Result { + // We're dealing with a lot of events, so we want to compress as much as possible + // while not taking too much time to compress. + const COMPRESSION_LEVEL: i32 = 1; + const BUFFER_SIZE: usize = 256 * 1024 /* 256 KB */; + + let writer = BufWriter::with_capacity(BUFFER_SIZE, writer); + let encoder = zstd::Encoder::new(writer, COMPRESSION_LEVEL)?; + Ok(Self { + serializer: rmp_serde::Serializer::new(encoder), + }) + } + + /// Write a single event to the stream + pub fn write_event(&mut self, event: &MemtrackEvent) -> anyhow::Result<()> { + event.serialize(&mut self.serializer)?; + Ok(()) + } + + /// Finish writing and flush the compression stream + pub fn finish(self) -> anyhow::Result<()> { + let encoder = self.serializer.into_inner(); + let mut writer = encoder.finish()?; + + // Flush the writer to ensure all data is written to the underlying writer + writer.flush()?; + + Ok(()) + } +} + #[cfg(test)] mod tests { use crate::artifacts::ArtifactExt; diff --git a/crates/runner-shared/src/artifacts/mod.rs b/crates/runner-shared/src/artifacts/mod.rs index 1b3cb7f5..7a95ebca 100644 --- a/crates/runner-shared/src/artifacts/mod.rs +++ b/crates/runner-shared/src/artifacts/mod.rs @@ -17,6 +17,14 @@ where std::any::type_name::().rsplit("::").next().unwrap() } + fn file_name(pid: Option) -> String { + if let Some(pid) = pid { + format!("{pid}.{}.msgpack", Self::name()) + } else { + format!("{}.msgpack", Self::name()) + } + } + fn encode_to_writer(&self, mut writer: W) -> anyhow::Result<()> { let encoded = rmp_serde::to_vec_named(self)?; writer.write_all(&encoded)?; @@ -37,7 +45,7 @@ where } fn save_to>(&self, folder: P) -> anyhow::Result<()> { - self.save_file_to(folder, &format!("{}.msgpack", Self::name())) + self.save_file_to(folder, &Self::file_name(None)) } fn save_with_pid_to>( @@ -45,6 +53,6 @@ where folder: P, pid: pid_t, ) -> anyhow::Result<()> { - self.save_file_to(folder, &format!("{pid}.{}.msgpack", Self::name())) + self.save_file_to(folder, &Self::file_name(Some(pid))) } } From ae8b5c83612906d22ac2416b809f887109088540 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Mon, 5 Jan 2026 17:21:10 +0100 Subject: [PATCH 2/3] fix(memtrack): drain events arriving after process terminated --- crates/memtrack/src/main.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/crates/memtrack/src/main.rs b/crates/memtrack/src/main.rs index ec23fbb2..fa7a64ce 100644 --- a/crates/memtrack/src/main.rs +++ b/crates/memtrack/src/main.rs @@ -107,15 +107,31 @@ fn track_command( static DRAIN_EVENTS: AtomicBool = AtomicBool::new(true); let write_tx_clone = write_tx.clone(); let drain_thread = thread::spawn(move || { - loop { - if !DRAIN_EVENTS.load(Ordering::Relaxed) { - break; - } + // Regular draining loop + while DRAIN_EVENTS.load(Ordering::Relaxed) { let Ok(event) = event_rx.recv_timeout(Duration::from_millis(100)) else { continue; }; let _ = write_tx_clone.send(event.into()); } + + // Final aggressive drain - keep trying until truly empty + loop { + match event_rx.try_recv() { + Ok(event) => { + let _ = write_tx_clone.send(event.into()); + } + Err(_) => { + // Sleep briefly and try once more to catch late arrivals + thread::sleep(Duration::from_millis(50)); + if let Ok(event) = event_rx.try_recv() { + let _ = write_tx_clone.send(event.into()); + } else { + break; + } + } + } + } }); // Stage B: Writer thread - Immediately writes the events to disk From 35e5f3f7d61546db7758c884d346c79c25e85c07 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Mon, 5 Jan 2026 17:24:50 +0100 Subject: [PATCH 3/3] chore(runner-shared): add memtrack serialization benchmark --- .github/workflows/ci.yml | 15 ++++ Cargo.lock | 82 ++++++++++++++++++- crates/runner-shared/Cargo.toml | 8 ++ .../runner-shared/benches/memtrack_writer.rs | 52 ++++++++++++ 4 files changed, 155 insertions(+), 2 deletions(-) create mode 100644 crates/runner-shared/benches/memtrack_writer.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3a18e650..bfb80321 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,3 +47,18 @@ jobs: - name: Run tests run: sudo -E $(which cargo) test working-directory: crates/memtrack + + benchmarks: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: moonrepo/setup-rust@v1 + with: + bins: cargo-codspeed + - name: Build benchmarks + run: cargo codspeed build -p runner-shared + - name: Run benchmarks + uses: CodSpeedHQ/action@v4 + with: + mode: instrumentation + run: cargo codspeed run -p runner-shared diff --git a/Cargo.lock b/Cargo.lock index a861d002..7178268b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -335,6 +335,7 @@ dependencies = [ "anstyle", "clap_lex", "strsim", + "terminal_size", ] [[package]] @@ -357,9 +358,9 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "codspeed" -version = "4.1.0" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3b847e05a34be5c38f3f2a5052178a3bd32e6b5702f3ea775efde95c483a539" +checksum = "eb56923193c76a0e5b6b17b2c2bb1e151ef8a5e06b557e1cbe38c6db467763f9" dependencies = [ "anyhow", "cc", @@ -373,6 +374,48 @@ dependencies = [ "statrs", ] +[[package]] +name = "codspeed-divan-compat" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7558ff5740fbc26a5fc55c4934cfed94dfccee76abc17b57ecf5d0bee3592b5e" +dependencies = [ + "clap", + "codspeed", + "codspeed-divan-compat-macros", + "codspeed-divan-compat-walltime", + "regex", +] + +[[package]] +name = "codspeed-divan-compat-macros" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de343ca0a4fbaabbd3422941fdee24407d00e2fa686a96021c21a78ab2bb895" +dependencies = [ + "divan-macros", + "itertools 0.14.0", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.111", +] + +[[package]] +name = "codspeed-divan-compat-walltime" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d9de586cc7e9752fc232f08e0733c2016122e16065c4adf0c8a8d9e370749ee" +dependencies = [ + "cfg-if", + "clap", + "codspeed", + "condtype", + "divan-macros", + "libc", + "regex-lite", +] + [[package]] name = "codspeed-runner" version = "4.5.2" @@ -469,6 +512,12 @@ version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" +[[package]] +name = "condtype" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf0a07a401f374238ab8e2f11a104d2851bf9ce711ec69804834de8af45c7af" + [[package]] name = "console" version = "0.15.11" @@ -629,6 +678,17 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "divan-macros" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8dc51d98e636f5e3b0759a39257458b22619cac7e96d932da6eeb052891bb67c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "either" version = "1.15.0" @@ -2271,6 +2331,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d942b98df5e658f56f20d592c7f868833fe38115e65c33003d8cd224b0155da" + [[package]] name = "regex-syntax" version = "0.8.8" @@ -2471,8 +2537,10 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "codspeed-divan-compat", "libc", "log", + "rand", "rmp", "rmp-serde", "serde", @@ -2968,6 +3036,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "terminal_size" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60b8cb979cb11c32ce1603f8137b22262a9d131aaa5c37b5678025f22b8becd0" +dependencies = [ + "rustix 1.1.2", + "windows-sys 0.60.2", +] + [[package]] name = "test-log" version = "0.2.19" diff --git a/crates/runner-shared/Cargo.toml b/crates/runner-shared/Cargo.toml index 99413461..5b2feded 100644 --- a/crates/runner-shared/Cargo.toml +++ b/crates/runner-shared/Cargo.toml @@ -14,3 +14,11 @@ rmp = "0.8.14" rmp-serde = "1.3.0" libc = { workspace = true } zstd = "0.13" + +[dev-dependencies] +divan = { version = "4.2.0", package = "codspeed-divan-compat" } +rand = "0.8" + +[[bench]] +name = "memtrack_writer" +harness = false diff --git a/crates/runner-shared/benches/memtrack_writer.rs b/crates/runner-shared/benches/memtrack_writer.rs new file mode 100644 index 00000000..bcd6f9e3 --- /dev/null +++ b/crates/runner-shared/benches/memtrack_writer.rs @@ -0,0 +1,52 @@ +use divan::Bencher; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use runner_shared::artifacts::{MemtrackEvent, MemtrackEventKind, MemtrackWriter}; + +fn main() { + divan::main(); +} + +/// Generate N random memtrack events with a seeded RNG +fn generate_events(n: usize) -> Vec { + let mut rng = StdRng::seed_from_u64(12345); + let mut events = Vec::with_capacity(n); + for _ in 0..n { + let size = rng.gen_range(8..8192); + let kind = match rng.gen_range(0..8) { + 0 => MemtrackEventKind::Malloc { size }, + 1 => MemtrackEventKind::Free, + 2 => MemtrackEventKind::Realloc { size }, + 3 => MemtrackEventKind::Calloc { size }, + 4 => MemtrackEventKind::AlignedAlloc { size }, + 5 => MemtrackEventKind::Mmap { size }, + 6 => MemtrackEventKind::Munmap { size }, + 7 => MemtrackEventKind::Brk { size }, + _ => unreachable!(), + }; + + events.push(MemtrackEvent { + pid: rng.r#gen(), + tid: rng.r#gen(), + timestamp: rng.r#gen(), + addr: rng.r#gen(), + kind, + }); + } + + events +} + +#[divan::bench(args = [10_000, 100_000, 500_000, 1_000_000])] +fn write_events(bencher: Bencher, n: usize) { + let events = generate_events(n); + + bencher.bench_local(|| { + let mut output = Vec::new(); + let mut writer = MemtrackWriter::new(&mut output).unwrap(); + for event in &events { + writer.write_event(event).unwrap(); + } + writer.finish().unwrap(); + }); +}