From 9e9db5b1277e5858d7732f7dc34d570525cea490 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Tue, 22 Apr 2025 10:24:22 +0200 Subject: [PATCH 1/7] chore: dont use regex in perf map harvest --- src/run/runner/valgrind/executor.rs | 2 +- src/run/runner/valgrind/helpers/perf_maps.rs | 48 ++++++++++---------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/run/runner/valgrind/executor.rs b/src/run/runner/valgrind/executor.rs index 409fea69..9ecc383e 100644 --- a/src/run/runner/valgrind/executor.rs +++ b/src/run/runner/valgrind/executor.rs @@ -41,7 +41,7 @@ impl Executor for ValgrindExecutor { _system_info: &SystemInfo, run_data: &RunData, ) -> Result<()> { - harvest_perf_maps(&run_data.profile_folder)?; + harvest_perf_maps(&run_data.profile_folder).await?; Ok(()) } diff --git a/src/run/runner/valgrind/helpers/perf_maps.rs b/src/run/runner/valgrind/helpers/perf_maps.rs index 474e8e80..82839da5 100644 --- a/src/run/runner/valgrind/helpers/perf_maps.rs +++ b/src/run/runner/valgrind/helpers/perf_maps.rs @@ -1,15 +1,9 @@ use crate::prelude::*; -use lazy_static::lazy_static; -use regex::Regex; use std::collections::HashSet; use std::fs; -use std::path::Path; +use std::path::{Path, PathBuf}; -lazy_static! { - static ref PERF_MAP_REGEX: Regex = Regex::new(r"perf-(\d+)\.map").unwrap(); -} - -pub fn harvest_perf_maps(profile_folder: &Path) -> Result<()> { +pub async fn harvest_perf_maps(profile_folder: &Path) -> Result<()> { // Get profile files (files with .out extension) let profile_files = fs::read_dir(profile_folder)? .filter_map(|entry| entry.ok()) @@ -21,27 +15,31 @@ pub fn harvest_perf_maps(profile_folder: &Path) -> Result<()> { .iter() .filter_map(|path| path.file_stem()) .map(|pid| pid.to_str().unwrap()) + .filter_map(|pid| pid.parse().ok()) .collect::>(); - let perf_map_files = fs::read_dir("/tmp")? - .filter_map(|entry| entry.ok()) - .map(|entry| entry.path()) - .filter(|path| { - path.file_name() - .and_then(|name| name.to_str()) - .and_then(|name| PERF_MAP_REGEX.captures(name)) - .and_then(|captures| captures.get(1)) - .map(|pid| pids.contains(pid.as_str())) - .unwrap_or(false) - }); + harvest_perf_maps_for_pids(profile_folder, &pids).await +} + +pub async fn harvest_perf_maps_for_pids(profile_folder: &Path, pids: &HashSet) -> Result<()> { + let perf_maps = pids + .iter() + .map(|pid| format!("perf-{}.map", pid)) + .map(|file_name| { + ( + PathBuf::from("/tmp").join(&file_name), + profile_folder.join(&file_name), + ) + }) + .filter(|(src_path, _)| src_path.exists()) + .collect::>(); + debug!("Found {} perf maps", perf_maps.len()); - for perf_map_file in perf_map_files { - let source_path = perf_map_file.clone(); - let dest_path = profile_folder.join(perf_map_file.file_name().unwrap()); - fs::copy(source_path, dest_path).map_err(|e| { + for (src_path, dst_path) in perf_maps { + fs::copy(&src_path, &dst_path).map_err(|e| { anyhow!( - "Failed to copy perf map file: {} to {}: {}", - perf_map_file.display(), + "Failed to copy perf map file: {:?} to {}: {}", + src_path.file_name(), profile_folder.display(), e ) From 0790357d8b21daeb14e7b4464b394d0d3dd8cc44 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Tue, 22 Apr 2025 10:37:08 +0200 Subject: [PATCH 2/7] feat: add fifo ipc --- src/main.rs | 2 +- src/run/runner/wall_time/perf/fifo.rs | 133 ++++++++++++++++++++++++ src/run/runner/wall_time/perf/shared.rs | 15 +++ 3 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 src/run/runner/wall_time/perf/fifo.rs create mode 100644 src/run/runner/wall_time/perf/shared.rs diff --git a/src/main.rs b/src/main.rs index ddc57bef..4f69f6eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,7 +27,7 @@ lazy_static! { format!("{VALGRIND_CODSPEED_BASE_VERSION}-0codspeed1"); } -#[tokio::main(flavor = "current_thread")] +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() { let res = crate::app::run().await; if let Err(err) = res { diff --git a/src/run/runner/wall_time/perf/fifo.rs b/src/run/runner/wall_time/perf/fifo.rs new file mode 100644 index 00000000..1020a80d --- /dev/null +++ b/src/run/runner/wall_time/perf/fifo.rs @@ -0,0 +1,133 @@ +use super::{FifoCommand, RUNNER_ACK_FIFO, RUNNER_CTL_FIFO}; +use std::path::PathBuf; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::unix::pipe::OpenOptions as TokioPipeOpenOptions; +use tokio::net::unix::pipe::Receiver as TokioPipeReader; +use tokio::net::unix::pipe::Sender as TokioPipeSender; + +fn create_fifo>(path: P) -> anyhow::Result<()> { + // Remove the previous FIFO (if it exists) + let _ = nix::unistd::unlink(path.as_ref()); + + // Create the FIFO with RWX permissions for the owner + nix::unistd::mkfifo(path.as_ref(), nix::sys::stat::Mode::S_IRWXU)?; + + Ok(()) +} + +pub struct RunnerFifo { + ack_fifo: TokioPipeSender, + ctl_fifo: TokioPipeReader, +} + +impl RunnerFifo { + pub fn new() -> anyhow::Result { + create_fifo(RUNNER_CTL_FIFO)?; + create_fifo(RUNNER_ACK_FIFO)?; + + let ack_fifo = TokioPipeOpenOptions::new() + .read_write(true) + .open_sender(RUNNER_ACK_FIFO)?; + let ctl_fifo = TokioPipeOpenOptions::new() + .read_write(true) + .open_receiver(RUNNER_CTL_FIFO)?; + + Ok(Self { ctl_fifo, ack_fifo }) + } + + pub async fn recv_cmd(&mut self) -> anyhow::Result { + let mut len_buffer = [0u8; 4]; + self.ctl_fifo.read_exact(&mut len_buffer).await?; + let message_len = u32::from_le_bytes(len_buffer) as usize; + + let mut buffer = vec![0u8; message_len]; + loop { + if self.ctl_fifo.read_exact(&mut buffer).await.is_ok() { + break; + } + } + + let decoded = bincode::deserialize(&buffer)?; + Ok(decoded) + } + + pub async fn send_cmd(&mut self, cmd: FifoCommand) -> anyhow::Result<()> { + let encoded = bincode::serialize(&cmd)?; + + self.ack_fifo + .write_all(&(encoded.len() as u32).to_le_bytes()) + .await?; + self.ack_fifo.write_all(&encoded).await?; + Ok(()) + } +} + +pub struct PerfFifo { + ctl_fifo: TokioPipeSender, + ack_fifo: TokioPipeReader, + + pub(crate) ctl_fifo_path: PathBuf, + pub(crate) ack_fifo_path: PathBuf, +} + +impl PerfFifo { + pub fn new() -> anyhow::Result { + let fifo_dir = tempfile::tempdir()?.into_path(); + + let ctl_fifo_path = fifo_dir.join("codspeed_perf.ctl.fifo"); + let ack_fifo_path = fifo_dir.join("codspeed_perf.ack.fifo"); + + create_fifo(&ctl_fifo_path)?; + create_fifo(&ack_fifo_path)?; + + let ack_fifo = TokioPipeOpenOptions::new() + .read_write(true) + .open_receiver(&ack_fifo_path)?; + let ctl_fifo = TokioPipeOpenOptions::new() + .read_write(true) + .open_sender(&ctl_fifo_path)?; + + Ok(Self { + ctl_fifo, + ack_fifo, + ctl_fifo_path, + ack_fifo_path, + }) + } + + pub async fn start_events(&mut self) -> anyhow::Result<()> { + self.ctl_fifo.write_all(b"enable\n\0").await?; + self.wait_for_ack().await; + + Ok(()) + } + + pub async fn stop_events(&mut self) -> anyhow::Result<()> { + self.ctl_fifo.write_all(b"disable\n\0").await?; + self.wait_for_ack().await; + + Ok(()) + } + + pub async fn ping(&mut self) -> anyhow::Result<()> { + self.ctl_fifo.write_all(b"ping\n\0").await?; + self.wait_for_ack().await; + + Ok(()) + } + + async fn wait_for_ack(&mut self) { + const ACK: &[u8] = b"ack\n\0"; + + loop { + let mut buf: [u8; ACK.len()] = [0; ACK.len()]; + if self.ack_fifo.read_exact(&mut buf).await.is_err() { + continue; + } + + if buf == ACK { + break; + } + } + } +} diff --git a/src/run/runner/wall_time/perf/shared.rs b/src/run/runner/wall_time/perf/shared.rs new file mode 100644 index 00000000..8afe630b --- /dev/null +++ b/src/run/runner/wall_time/perf/shared.rs @@ -0,0 +1,15 @@ +//! WARNING: Has to be in sync with `codspeed-rust/codspeed`. + +pub const RUNNER_CTL_FIFO: &str = "/tmp/runner.ctl.fifo"; +pub const RUNNER_ACK_FIFO: &str = "/tmp/runner.ack.fifo"; + +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)] +pub enum Command { + CurrentBenchmark { pid: u32, uri: String }, + StartBenchmark, + StopBenchmark, + Ack, + PingPerf, + SetIntegration { name: String, version: String }, + Err, +} From 38717876c3d077f36e1af5045f2b82ea47106d14 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Tue, 22 Apr 2025 11:00:21 +0200 Subject: [PATCH 3/7] feat(runner): add perf integration for rust --- Cargo.lock | 207 +++++++++++-- Cargo.toml | 11 + rust-toolchain.toml | 2 +- src/main.rs | 2 +- src/run/runner/helpers/mod.rs | 1 + .../helpers/run_command_with_log_pipe.rs | 27 +- src/run/runner/helpers/setup.rs | 32 ++ src/run/runner/mod.rs | 7 +- src/run/runner/valgrind/executor.rs | 2 +- src/run/runner/valgrind/measure.rs | 3 +- src/run/runner/valgrind/mod.rs | 2 +- src/run/runner/valgrind/setup.rs | 39 +-- src/run/runner/wall_time/executor.rs | 56 +++- src/run/runner/wall_time/mod.rs | 1 + src/run/runner/wall_time/perf/helpers.rs | 71 +++++ src/run/runner/wall_time/perf/metadata.rs | 26 ++ src/run/runner/wall_time/perf/mod.rs | 288 ++++++++++++++++++ src/run/runner/wall_time/perf/perf_map.rs | 148 +++++++++ src/run/runner/wall_time/perf/unwind_data.rs | 140 +++++++++ 19 files changed, 993 insertions(+), 72 deletions(-) create mode 100644 src/run/runner/helpers/setup.rs create mode 100644 src/run/runner/wall_time/perf/helpers.rs create mode 100644 src/run/runner/wall_time/perf/metadata.rs create mode 100644 src/run/runner/wall_time/perf/mod.rs create mode 100644 src/run/runner/wall_time/perf/perf_map.rs create mode 100644 src/run/runner/wall_time/perf/unwind_data.rs diff --git a/Cargo.lock b/Cargo.lock index d3e473d0..45738413 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,9 +93,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.95" +version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" +checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" [[package]] name = "async-compression" @@ -148,6 +148,15 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -204,6 +213,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.39" @@ -264,17 +279,26 @@ dependencies = [ "async-compression", "async-trait", "base64", + "bincode", "clap", "console", + "debugid", + "futures", "git2", "gql_client", "indicatif", "insta", "itertools", "lazy_static", + "libc", + "linux-perf-data", "log", "md5", + "memmap2", "nestify", + "nix", + "object", + "procfs", "rand", "regex", "reqwest", @@ -381,6 +405,15 @@ dependencies = [ "typenum", ] +[[package]] +name = "debugid" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" +dependencies = [ + "uuid", +] + [[package]] name = "deranged" version = "0.3.11" @@ -1024,9 +1057,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.169" +version = "0.2.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" [[package]] name = "libgit2-sys" @@ -1079,12 +1112,45 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linear-map" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfae20f6b19ad527b550c223fddc3077a547fc70cda94b9b566575423fd303ee" + [[package]] name = "linked-hash-map" version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-perf-data" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f85f35725e15ad6c62b9db73f3d62439094e616a2f83500f7bcdc01ae5b84d8" +dependencies = [ + "byteorder", + "linear-map", + "linux-perf-event-reader", + "memchr", + "prost", + "prost-derive", + "thiserror 2.0.12", +] + +[[package]] +name = "linux-perf-event-reader" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa8fc7e83909ea3b9e2784591655637d3401f2f16014f9d8d6e23ccd138e665f" +dependencies = [ + "bitflags 2.8.0", + "byteorder", + "memchr", + "thiserror 2.0.12", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -1109,9 +1175,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.25" +version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "md5" @@ -1125,6 +1191,15 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "memmap2" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" +dependencies = [ + "libc", +] + [[package]] name = "mime" version = "0.3.17" @@ -1190,6 +1265,18 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.8.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "ntapi" version = "0.4.1" @@ -1226,7 +1313,9 @@ version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" dependencies = [ + "flate2", "memchr", + "ruzstd", ] [[package]] @@ -1350,7 +1439,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b7cafe60d6cf8e62e1b9b2ea516a089c008945bb5a275416789e7db0bc199dc" dependencies = [ "memchr", - "thiserror 2.0.11", + "thiserror 2.0.12", "ucd-trie", ] @@ -1460,6 +1549,53 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc5b72d8145275d844d4b5f6d4e1eef00c8cd889edb6035c21675d1bb1f45c9f" +dependencies = [ + "bitflags 2.8.0", + "chrono", + "flate2", + "hex", + "procfs-core", + "rustix", +] + +[[package]] +name = "procfs-core" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "239df02d8349b06fc07398a3a1697b06418223b1c7725085e801e7c0fc6a12ec" +dependencies = [ + "bitflags 2.8.0", + "chrono", + "hex", +] + +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "quote" version = "1.0.38" @@ -1701,6 +1837,15 @@ version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4" +[[package]] +name = "ruzstd" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fad02996bfc73da3e301efe90b1837be9ed8f4a462b6ed410aa35d00381de89f" +dependencies = [ + "twox-hash", +] + [[package]] name = "ryu" version = "1.0.18" @@ -1747,18 +1892,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.217" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.217" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -1767,9 +1912,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.135" +version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ "indexmap", "itoa", @@ -1881,6 +2026,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.11.1" @@ -2014,11 +2165,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.11" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl 2.0.11", + "thiserror-impl 2.0.12", ] [[package]] @@ -2034,9 +2185,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.11" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", @@ -2086,9 +2237,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.43.0" +version = "1.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" dependencies = [ "backtrace", "bytes", @@ -2203,6 +2354,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "typenum" version = "1.17.0" @@ -2268,6 +2429,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 392a919e..5e96e150 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,17 @@ sysinfo = { version = "0.33.1", features = ["serde"] } indicatif = "0.17.8" console = "0.15.8" async-trait = "0.1.82" +libc = "0.2.171" +bincode = "1.3.3" +object = "0.36.7" +linux-perf-data = "0.11.0" +debugid = "0.8.0" +memmap2 = "0.9.5" +nix = { version = "0.29.0", features = ["fs"] } +futures = "0.3.31" + +[target.'cfg(target_os = "linux")'.dependencies] +procfs = "0.17.0" [dev-dependencies] temp-env = { version = "0.3.6", features = ["async_closure"] } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 58e76ec0..b475f2f9 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.84.0" +channel = "1.85.0" components = ["rustfmt", "clippy"] diff --git a/src/main.rs b/src/main.rs index 4f69f6eb..ddc57bef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,7 +27,7 @@ lazy_static! { format!("{VALGRIND_CODSPEED_BASE_VERSION}-0codspeed1"); } -#[tokio::main(flavor = "multi_thread", worker_threads = 4)] +#[tokio::main(flavor = "current_thread")] async fn main() { let res = crate::app::run().await; if let Err(err) = res { diff --git a/src/run/runner/helpers/mod.rs b/src/run/runner/helpers/mod.rs index 87c246d9..41234673 100644 --- a/src/run/runner/helpers/mod.rs +++ b/src/run/runner/helpers/mod.rs @@ -2,3 +2,4 @@ pub mod env; pub mod get_bench_command; pub mod profile_folder; pub mod run_command_with_log_pipe; +pub mod setup; diff --git a/src/run/runner/helpers/run_command_with_log_pipe.rs b/src/run/runner/helpers/run_command_with_log_pipe.rs index 98ab5975..2b034450 100644 --- a/src/run/runner/helpers/run_command_with_log_pipe.rs +++ b/src/run/runner/helpers/run_command_with_log_pipe.rs @@ -1,12 +1,30 @@ use crate::local_logger::suspend_progress_bar; use crate::prelude::*; use crate::run::runner::EXECUTOR_TARGET; +use std::future::Future; use std::io::{Read, Write}; use std::process::Command; use std::process::ExitStatus; use std::thread; -pub fn run_command_with_log_pipe(mut cmd: Command) -> Result { +/// Run a command and log its output to stdout and stderr +/// +/// # Arguments +/// - `cmd`: The command to run. +/// - `cb`: A callback function that takes the process ID and returns a result. +/// +/// # Returns +/// +/// The exit status of the command. +/// +pub async fn run_command_with_log_pipe_and_callback( + mut cmd: Command, + cb: F, +) -> Result +where + F: FnOnce(u32) -> Fut, + Fut: Future>, +{ fn log_tee( mut reader: impl Read, mut writer: impl Write, @@ -46,5 +64,12 @@ pub fn run_command_with_log_pipe(mut cmd: Command) -> Result { thread::spawn(move || { log_tee(stderr, std::io::stderr(), Some("[stderr]")).unwrap(); }); + + cb(process.id()).await?; + process.wait().context("failed to wait for the process") } + +pub async fn run_command_with_log_pipe(cmd: Command) -> Result { + run_command_with_log_pipe_and_callback(cmd, async |_| Ok(())).await +} diff --git a/src/run/runner/helpers/setup.rs b/src/run/runner/helpers/setup.rs new file mode 100644 index 00000000..762bbcbd --- /dev/null +++ b/src/run/runner/helpers/setup.rs @@ -0,0 +1,32 @@ +use crate::prelude::*; +use log::{debug, info}; +use std::process::{Command, Stdio}; + +/// Run a command with sudo if available +pub fn run_with_sudo(command_args: &[&str]) -> Result<()> { + let use_sudo = Command::new("sudo") + // `sudo true` will fail if sudo does not exist or the current user does not have sudo privileges + .arg("true") + .stdout(Stdio::null()) + .status() + .is_ok_and(|status| status.success()); + let mut command_args: Vec<&str> = command_args.into(); + if use_sudo { + command_args.insert(0, "sudo"); + } + + debug!("Running command: {}", command_args.join(" ")); + let output = Command::new(command_args[0]) + .args(&command_args[1..]) + .stdout(Stdio::piped()) + .output() + .map_err(|_| anyhow!("Failed to execute command: {}", command_args.join(" ")))?; + + if !output.status.success() { + info!("stdout: {}", String::from_utf8_lossy(&output.stdout)); + error!("stderr: {}", String::from_utf8_lossy(&output.stderr)); + bail!("Failed to execute command: {}", command_args.join(" ")); + } + + Ok(()) +} diff --git a/src/run/runner/mod.rs b/src/run/runner/mod.rs index 52800bb7..e02b445e 100644 --- a/src/run/runner/mod.rs +++ b/src/run/runner/mod.rs @@ -30,12 +30,15 @@ pub const EXECUTOR_TARGET: &str = "executor"; pub fn get_executor_from_mode(mode: &RunnerMode) -> Box { match mode { RunnerMode::Instrumentation => Box::new(ValgrindExecutor), - RunnerMode::Walltime => Box::new(WallTimeExecutor), + RunnerMode::Walltime => Box::new(WallTimeExecutor::new()), } } pub fn get_all_executors() -> Vec> { - vec![Box::new(ValgrindExecutor), Box::new(WallTimeExecutor)] + vec![ + Box::new(ValgrindExecutor), + Box::new(WallTimeExecutor::new()), + ] } pub fn get_run_data() -> Result { diff --git a/src/run/runner/valgrind/executor.rs b/src/run/runner/valgrind/executor.rs index 9ecc383e..9bb41d36 100644 --- a/src/run/runner/valgrind/executor.rs +++ b/src/run/runner/valgrind/executor.rs @@ -30,7 +30,7 @@ impl Executor for ValgrindExecutor { mongo_tracer: &Option, ) -> Result<()> { //TODO: add valgrind version check - measure::measure(config, &run_data.profile_folder, mongo_tracer)?; + measure::measure(config, &run_data.profile_folder, mongo_tracer).await?; Ok(()) } diff --git a/src/run/runner/valgrind/measure.rs b/src/run/runner/valgrind/measure.rs index 6d0386fc..8a7b85a4 100644 --- a/src/run/runner/valgrind/measure.rs +++ b/src/run/runner/valgrind/measure.rs @@ -73,7 +73,7 @@ fn create_run_script() -> anyhow::Result { Ok(script_file.into_temp_path()) } -pub fn measure( +pub async fn measure( config: &Config, profile_folder: &Path, mongo_tracer: &Option, @@ -132,6 +132,7 @@ pub fn measure( debug!("cmd: {:?}", cmd); let status = run_command_with_log_pipe(cmd) + .await .map_err(|e| anyhow!("failed to execute the benchmark process. {}", e))?; debug!( "Valgrind exit code = {:?}, Valgrind signal = {:?}", diff --git a/src/run/runner/valgrind/mod.rs b/src/run/runner/valgrind/mod.rs index a3f9ebd0..3db9a666 100644 --- a/src/run/runner/valgrind/mod.rs +++ b/src/run/runner/valgrind/mod.rs @@ -1,4 +1,4 @@ pub mod executor; -mod helpers; +pub mod helpers; mod measure; mod setup; diff --git a/src/run/runner/valgrind/setup.rs b/src/run/runner/valgrind/setup.rs index 41fcd1ad..9b2f23c3 100644 --- a/src/run/runner/valgrind/setup.rs +++ b/src/run/runner/valgrind/setup.rs @@ -1,41 +1,8 @@ -use std::{ - env, - process::{Command, Stdio}, -}; - -use url::Url; - +use crate::run::runner::helpers::setup::run_with_sudo; use crate::{prelude::*, run::helpers::download_file, VALGRIND_CODSPEED_VERSION}; use crate::{run::check_system::SystemInfo, VALGRIND_CODSPEED_DEB_VERSION}; - -/// Run a command with sudo if available -fn run_with_sudo(command_args: &[&str]) -> Result<()> { - let use_sudo = Command::new("sudo") - // `sudo true` will fail if sudo does not exist or the current user does not have sudo privileges - .arg("true") - .stdout(Stdio::null()) - .status() - .is_ok_and(|status| status.success()); - let mut command_args: Vec<&str> = command_args.into(); - if use_sudo { - command_args.insert(0, "sudo"); - } - - debug!("Running command: {}", command_args.join(" ")); - let output = Command::new(command_args[0]) - .args(&command_args[1..]) - .stdout(Stdio::piped()) - .output() - .map_err(|_| anyhow!("Failed to execute command: {}", command_args.join(" ")))?; - - if !output.status.success() { - info!("stdout: {}", String::from_utf8_lossy(&output.stdout)); - error!("stderr: {}", String::from_utf8_lossy(&output.stderr)); - bail!("Failed to execute command: {}", command_args.join(" ")); - } - - Ok(()) -} +use std::{env, process::Command}; +use url::Url; fn get_codspeed_valgrind_filename(system_info: &SystemInfo) -> Result { let (version, architecture) = match ( diff --git a/src/run/runner/wall_time/executor.rs b/src/run/runner/wall_time/executor.rs index 51ca1a8c..95ae936b 100644 --- a/src/run/runner/wall_time/executor.rs +++ b/src/run/runner/wall_time/executor.rs @@ -1,5 +1,5 @@ +use super::perf::PerfRunner; use crate::prelude::*; - use crate::run::instruments::mongo_tracer::MongoTracer; use crate::run::runner::executor::Executor; use crate::run::runner::helpers::env::get_base_injected_env; @@ -11,7 +11,24 @@ use async_trait::async_trait; use std::fs::canonicalize; use std::process::Command; -pub struct WallTimeExecutor; +pub struct WallTimeExecutor { + perf: Option, +} + +impl WallTimeExecutor { + pub fn new() -> Self { + let use_perf = if cfg!(target_os = "linux") { + std::env::var("CODSPEED_USE_PERF").is_ok() + } else { + false + }; + debug!("Running the cmd with perf: {}", use_perf); + + Self { + perf: use_perf.then(PerfRunner::new), + } + } +} #[async_trait(?Send)] impl Executor for WallTimeExecutor { @@ -19,6 +36,14 @@ impl Executor for WallTimeExecutor { ExecutorName::WallTime } + async fn setup(&self, _system_info: &SystemInfo) -> Result<()> { + if self.perf.is_some() { + PerfRunner::setup_environment()?; + } + + Ok(()) + } + async fn run( &self, config: &Config, @@ -38,13 +63,22 @@ impl Executor for WallTimeExecutor { cmd.current_dir(abs_cwd); } - cmd.args(["-c", get_bench_command(config)?.as_str()]); + let bench_cmd = get_bench_command(config)?; + let status = if let Some(perf) = &self.perf { + perf.run(cmd, &bench_cmd).await + } else { + cmd.args(["-c", &bench_cmd]); + debug!("cmd: {:?}", cmd); + + run_command_with_log_pipe(cmd).await + }; + + let status = + status.map_err(|e| anyhow!("failed to execute the benchmark process. {}", e))?; + debug!("cmd exit status: {:?}", status); - debug!("cmd: {:?}", cmd); - let status = run_command_with_log_pipe(cmd) - .map_err(|e| anyhow!("failed to execute the benchmark process. {}", e))?; if !status.success() { - bail!("failed to execute the benchmark process"); + bail!("failed to execute the benchmark process: {}", status); } Ok(()) @@ -54,8 +88,14 @@ impl Executor for WallTimeExecutor { &self, _config: &Config, _system_info: &SystemInfo, - _run_data: &RunData, + run_data: &RunData, ) -> Result<()> { + debug!("Copying files to the profile folder"); + + if let Some(perf) = &self.perf { + perf.save_files_to(&run_data.profile_folder).await?; + } + Ok(()) } } diff --git a/src/run/runner/wall_time/mod.rs b/src/run/runner/wall_time/mod.rs index 0c95fdab..ca152f25 100644 --- a/src/run/runner/wall_time/mod.rs +++ b/src/run/runner/wall_time/mod.rs @@ -1 +1,2 @@ pub mod executor; +pub mod perf; diff --git a/src/run/runner/wall_time/perf/helpers.rs b/src/run/runner/wall_time/perf/helpers.rs new file mode 100644 index 00000000..3dc58832 --- /dev/null +++ b/src/run/runner/wall_time/perf/helpers.rs @@ -0,0 +1,71 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, bail}; +use linux_perf_data::{linux_perf_event_reader::EventRecord, PerfFileReader, PerfFileRecord}; + +/// Tries to find the pid of the sampled process within a perf.data file. +pub fn find_pid>(perf_file: P) -> anyhow::Result { + let content = std::fs::read(perf_file.as_ref())?; + let reader = std::io::Cursor::new(content); + + let PerfFileReader { + mut record_iter, + mut perf_file, + } = PerfFileReader::parse_file(reader)?; + + let mut pid_freq = HashMap::new(); + + // Only consider the first N events to reduce the performance impact. Certain benchmark libraries can generate + // more than 100k for each benchmark, which can slow down the runner a lot. The highest chance of finding + // different pids is in the first few events, where there's a possible overlap. + const COUNT_FIRST_N: usize = 1000; + let mut i = 0; + + while let Some(record) = record_iter.next_record(&mut perf_file)? { + let PerfFileRecord::EventRecord { record, .. } = record else { + continue; + }; + + let Ok(parsed_record) = record.parse() else { + continue; + }; + + let EventRecord::Sample(event) = parsed_record else { + continue; + }; + + // Ignore kernel events + if event.pid == Some(-1) { + continue; + } + + if let Some(pid) = event.pid { + *pid_freq.entry(pid).or_insert(0) += 1; + + i += 1; + if i >= COUNT_FIRST_N { + break; + } + } + } + + // Choose the pid with the highest frequency. However, we can only use a pid if more than N% of the + // events are from that pid. + let total_count = pid_freq.values().sum::(); + let (pid, pid_count) = pid_freq + .iter() + .max_by_key(|&(_, count)| count) + .ok_or_else(|| anyhow!("Couldn't find pid in perf.data"))?; + log::debug!("Pid frequency: {:?}", pid_freq); + + let pid_percentage = (*pid_count as f64 / total_count as f64) * 100.0; + if pid_percentage < 75.0 { + bail!( + "Most common pid {} only has {:.2}% of total events", + pid, + pid_percentage + ); + } + + Ok(*pid) +} diff --git a/src/run/runner/wall_time/perf/metadata.rs b/src/run/runner/wall_time/perf/metadata.rs new file mode 100644 index 00000000..0956f89e --- /dev/null +++ b/src/run/runner/wall_time/perf/metadata.rs @@ -0,0 +1,26 @@ +// !!!!!!!!!!!!!!!!!!!!!!!! +// !! DO NOT TOUCH BELOW !! +// !!!!!!!!!!!!!!!!!!!!!!!! +// Has to be in sync with `perf-parser`. +// + +use std::{collections::HashMap, path::Path}; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct PerfMetadata { + /// Name and version of the integration + pub integration: (String, String), + + /// The URIs of the benchmarks in the order they were executed. + pub bench_order_by_pid: HashMap>, +} + +impl PerfMetadata { + pub fn save_to>(&self, path: P) -> anyhow::Result<()> { + let file = std::fs::File::create(path.as_ref().join("perf.metadata"))?; + serde_json::to_writer(file, self)?; + Ok(()) + } +} diff --git a/src/run/runner/wall_time/perf/mod.rs b/src/run/runner/wall_time/perf/mod.rs new file mode 100644 index 00000000..0866a45d --- /dev/null +++ b/src/run/runner/wall_time/perf/mod.rs @@ -0,0 +1,288 @@ +#![cfg_attr(not(unix), allow(dead_code, unused_mut))] + +use crate::prelude::*; +use crate::run::runner::helpers::run_command_with_log_pipe::run_command_with_log_pipe_and_callback; +use crate::run::runner::helpers::setup::run_with_sudo; +use anyhow::Context; +use fifo::{PerfFifo, RunnerFifo}; +use futures::stream::FuturesUnordered; +use metadata::PerfMetadata; +use perf_map::ProcessSymbols; +use shared::Command as FifoCommand; +use std::path::PathBuf; +use std::process::Command; +use std::time::Duration; +use std::{cell::OnceCell, collections::HashMap, process::ExitStatus}; +use tempfile::TempDir; +use unwind_data::UnwindData; + +mod metadata; +mod shared; +pub use shared::*; + +pub mod fifo; +pub mod helpers; +pub mod perf_map; +pub mod unwind_data; + +const PERF_DATA_PREFIX: &str = "perf.data."; + +pub struct PerfRunner { + perf_dir: TempDir, + benchmark_data: OnceCell, +} + +impl PerfRunner { + pub fn setup_environment() -> anyhow::Result<()> { + let sysctl_read = |name: &str| -> anyhow::Result { + let output = std::process::Command::new("sysctl").arg(name).output()?; + let output = String::from_utf8(output.stdout)?; + + Ok(output + .split(" = ") + .last() + .context("Couldn't find the value in sysctl output")? + .trim() + .parse::()?) + }; + + // Allow access to kernel symbols + if sysctl_read("kernel.kptr_restrict")? != 0 { + run_with_sudo(&["sysctl", "-w", "kernel.kptr_restrict=0"])?; + } + + // Allow non-root profiling + if sysctl_read("kernel.perf_event_paranoid")? != -1 { + run_with_sudo(&["sysctl", "-w", "kernel.perf_event_paranoid=-1"])?; + } + + Ok(()) + } + + pub fn new() -> Self { + Self { + perf_dir: tempfile::tempdir().expect("Failed to create temporary directory"), + benchmark_data: OnceCell::new(), + } + } + + pub async fn run(&self, mut cmd: Command, bench_cmd: &str) -> anyhow::Result { + let perf_fifo = PerfFifo::new()?; + let runner_fifo = RunnerFifo::new()?; + + // We have to pass a file to perf, which will create `perf.data.` files + // when the output is split. + let perf_file = tempfile::Builder::new() + .keep(true) + .prefix(PERF_DATA_PREFIX) + .tempfile_in(&self.perf_dir)?; + + cmd.args([ + "-c", + &format!( + "perf record --user-callchains --freq=999 --switch-output --control=fifo:{},{} --delay=-1 -g --call-graph=dwarf --output={} -- {bench_cmd}", + perf_fifo.ctl_fifo_path.to_string_lossy(), + perf_fifo.ack_fifo_path.to_string_lossy(), + perf_file.path().to_string_lossy() + ), + ]); + debug!("cmd: {:?}", cmd); + + let on_process_started = async |pid: u32| -> anyhow::Result<()> { + let data = Self::handle_fifo(pid, runner_fifo, perf_fifo).await?; + let _ = self.benchmark_data.set(data); + + Ok(()) + }; + run_command_with_log_pipe_and_callback(cmd, on_process_started).await + } + + pub async fn save_files_to(&self, profile_folder: &PathBuf) -> anyhow::Result<()> { + let start = std::time::Instant::now(); + + // Copy the perf data files to the profile folder + let copy_tasks = std::fs::read_dir(&self.perf_dir)? + .filter_map(|entry| entry.ok()) + .map(|entry| entry.path().to_path_buf()) + .filter(|path| { + path.file_name() + .map(|name| name.to_string_lossy().starts_with(PERF_DATA_PREFIX)) + .unwrap_or(false) + }) + .sorted_by_key(|path| path.file_name().unwrap().to_string_lossy().to_string()) + // The first perf.data will only contain metadata that is not relevant to the benchmarks. We + // capture the symbols and unwind data separately. + .skip(1) + .map(|src_path| { + let profile_folder = profile_folder.clone(); + tokio::task::spawn(async move { + let pid = helpers::find_pid(&src_path)?; + + let dst_file_name = format!( + "{}_{}.perf", + pid, + src_path.file_name().unwrap_or_default().to_string_lossy(), + ); + let dst_path = profile_folder.join(dst_file_name); + tokio::fs::copy(src_path, dst_path).await?; + + Ok::<_, anyhow::Error>(()) + }) + }) + .collect::>(); + + let bench_data = self + .benchmark_data + .get() + .expect("Benchmark order is not available"); + assert_eq!( + copy_tasks.len(), + bench_data.bench_count(), + "Benchmark count mismatch" + ); + let _ = futures::future::try_join_all(copy_tasks).await?; + + // Append perf maps, unwind info and other metadata + bench_data.save_to(profile_folder).unwrap(); + + let elapsed = start.elapsed(); + debug!("Perf teardown took: {:?}", elapsed); + Ok(()) + } + + async fn handle_fifo( + perf_pid: u32, + mut runner_fifo: RunnerFifo, + mut perf_fifo: PerfFifo, + ) -> anyhow::Result { + let mut bench_order_by_pid = HashMap::>::new(); + let mut symbols_by_pid = HashMap::::new(); + let mut unwind_data_by_pid = HashMap::>::new(); + let mut integration = None; + + loop { + let perf_ping = tokio::time::timeout(Duration::from_secs(1), perf_fifo.ping()).await; + if let Ok(Err(_)) | Err(_) = perf_ping { + break; + } + + let result = tokio::time::timeout(Duration::from_secs(1), runner_fifo.recv_cmd()).await; + let Ok(Ok(cmd)) = result else { + continue; + }; + debug!("Received command: {:?}", cmd); + + match cmd { + FifoCommand::CurrentBenchmark { pid, uri } => { + bench_order_by_pid.entry(pid).or_default().push(uri); + + #[cfg(target_os = "linux")] + if !symbols_by_pid.contains_key(&pid) && !unwind_data_by_pid.contains_key(&pid) + { + use procfs::process::MMPermissions; + + let bench_proc = procfs::process::Process::new(pid as _) + .expect("Failed to find benchmark process"); + let exe_path = bench_proc.exe().expect("Failed to read /proc/{pid}/exe"); + let exe_maps = bench_proc.maps().expect("Failed to read /proc/{pid}/maps"); + + for map in &exe_maps { + let page_offset = map.offset; + let (base_addr, end_addr) = map.address; + let path = match &map.pathname { + procfs::process::MMapPath::Path(path) => Some(path.clone()), + _ => None, + }; + + if let Some(path) = path { + symbols_by_pid + .entry(pid) + .or_insert(ProcessSymbols::new(pid)) + .add_mapping(pid, &path, base_addr, end_addr); + debug!("Added mapping for module {:?}", path); + } + + if map.perms.contains(MMPermissions::EXECUTE) { + if let Ok(unwind_data) = UnwindData::new( + exe_path.to_string_lossy().as_bytes(), + page_offset, + base_addr, + end_addr - base_addr, + None, + ) { + unwind_data_by_pid.entry(pid).or_default().push(unwind_data); + } + } + } + } + + runner_fifo.send_cmd(FifoCommand::Ack).await?; + } + FifoCommand::StartBenchmark => { + unsafe { libc::kill(perf_pid as i32, libc::SIGUSR2) }; + perf_fifo.start_events().await?; + runner_fifo.send_cmd(FifoCommand::Ack).await?; + } + FifoCommand::StopBenchmark => { + perf_fifo.stop_events().await?; + runner_fifo.send_cmd(FifoCommand::Ack).await?; + } + FifoCommand::PingPerf => { + if perf_fifo.ping().await.is_ok() { + runner_fifo.send_cmd(FifoCommand::Ack).await?; + } else { + runner_fifo.send_cmd(FifoCommand::Err).await?; + } + } + FifoCommand::SetIntegration { name, version } => { + integration = Some((name, version)); + runner_fifo.send_cmd(FifoCommand::Ack).await?; + } + FifoCommand::Ack => unreachable!(), + FifoCommand::Err => unreachable!(), + } + } + + Ok(BenchmarkData { + integration: integration.context("Couldn't find integration metadata")?, + bench_order_by_pid, + symbols_by_pid, + unwind_data_by_pid, + }) + } +} + +pub struct BenchmarkData { + /// Name and version of the integration + integration: (String, String), + + bench_order_by_pid: HashMap>, + symbols_by_pid: HashMap, + unwind_data_by_pid: HashMap>, +} + +impl BenchmarkData { + pub fn save_to>(&self, path: P) -> anyhow::Result<()> { + for proc_sym in self.symbols_by_pid.values() { + proc_sym.save_to(&path).unwrap(); + } + + for (pid, modules) in &self.unwind_data_by_pid { + for module in modules { + module.save_to(&path, *pid).unwrap(); + } + } + + let metadata = PerfMetadata { + integration: self.integration.clone(), + bench_order_by_pid: self.bench_order_by_pid.clone(), + }; + metadata.save_to(&path).unwrap(); + + Ok(()) + } + + pub fn bench_count(&self) -> usize { + self.bench_order_by_pid.values().map(|v| v.len()).sum() + } +} diff --git a/src/run/runner/wall_time/perf/perf_map.rs b/src/run/runner/wall_time/perf/perf_map.rs new file mode 100644 index 00000000..27bf1128 --- /dev/null +++ b/src/run/runner/wall_time/perf/perf_map.rs @@ -0,0 +1,148 @@ +use crate::prelude::*; +use object::{Object, ObjectSymbol, ObjectSymbolTable}; +use std::{ + collections::HashMap, + io::Write, + path::{Path, PathBuf}, +}; + +#[derive(Debug, Hash, PartialEq, Eq, Clone)] +struct Symbol { + offset: u64, + size: u64, + name: String, +} + +#[derive(Debug, Clone)] +pub struct ModuleSymbols { + path: PathBuf, + symbols: Vec, +} + +impl ModuleSymbols { + pub fn new>(path: P) -> anyhow::Result { + let content = std::fs::read(path.as_ref())?; + let object = object::File::parse(&*content)?; + + let mut symbols = Vec::new(); + + if let Some(symbol_table) = object.symbol_table() { + symbols.extend(symbol_table.symbols().filter_map(|symbol| { + Some(Symbol { + offset: symbol.address(), + size: symbol.size(), + name: symbol.name().ok()?.to_string(), + }) + })); + } + + if let Some(symbol_table) = object.dynamic_symbol_table() { + symbols.extend(symbol_table.symbols().filter_map(|symbol| { + Some(Symbol { + offset: symbol.address(), + size: symbol.size(), + name: symbol.name().ok()?.to_string(), + }) + })); + } + + symbols.retain(|symbol| symbol.offset > 0 && symbol.size > 0); + if symbols.is_empty() { + return Err(anyhow::anyhow!("No symbols found")); + } + + Ok(Self { + path: path.as_ref().to_path_buf(), + symbols, + }) + } + + fn append_to_file>(&self, path: P, base_addr: u64) -> anyhow::Result<()> { + let mut file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path)?; + + for symbol in &self.symbols { + writeln!( + file, + "{:x} {:x} {}", + base_addr + symbol.offset, + symbol.size, + symbol.name + )?; + } + + Ok(()) + } +} + +/// Represents all the modules inside a process and their symbols. +pub struct ProcessSymbols { + pid: u32, + module_mappings: HashMap>, + modules: HashMap, +} + +impl ProcessSymbols { + pub fn new(pid: u32) -> Self { + Self { + pid, + module_mappings: HashMap::new(), + modules: HashMap::new(), + } + } + + pub fn add_mapping>( + &mut self, + pid: u32, + module_path: P, + start_addr: u64, + end_addr: u64, + ) { + if self.pid != pid { + warn!("pid mismatch: {} != {}", self.pid, pid); + return; + } + + let path = module_path.as_ref().to_path_buf(); + match ModuleSymbols::new(module_path) { + Ok(symbol) => { + self.modules.entry(path.clone()).or_insert(symbol); + } + Err(error) => { + debug!( + "Failed to load symbols for module {}: {}", + path.display(), + error + ); + } + } + + self.module_mappings + .entry(path.clone()) + .or_default() + .push((start_addr, end_addr)); + } + + pub fn save_to>(&self, folder: P) -> anyhow::Result<()> { + if self.modules.is_empty() { + return Ok(()); + } + + let symbols_path = folder.as_ref().join(format!("perf-{}.map", self.pid)); + for module in self.modules.values() { + let Some((base_addr, _)) = self + .module_mappings + .get(&module.path) + .and_then(|bounds| bounds.iter().min_by_key(|(start, _)| start)) + else { + warn!("No bounds found for module: {}", module.path.display()); + continue; + }; + module.append_to_file(&symbols_path, *base_addr)?; + } + + Ok(()) + } +} diff --git a/src/run/runner/wall_time/perf/unwind_data.rs b/src/run/runner/wall_time/perf/unwind_data.rs new file mode 100644 index 00000000..d79d27ba --- /dev/null +++ b/src/run/runner/wall_time/perf/unwind_data.rs @@ -0,0 +1,140 @@ +//! WARNING: This file has to be in sync with perf-parser! + +use anyhow::{bail, Context}; +use debugid::CodeId; +use serde::{Deserialize, Serialize}; +use std::ops::Range; + +/// Unwind data for a single module. +#[derive(Debug, Serialize, Deserialize)] +pub struct UnwindData { + pub path: String, + + pub avma_range: Range, + pub base_avma: u64, + + pub eh_frame_hdr: Vec, + pub eh_frame_hdr_svma: Range, + + pub eh_frame: Vec, + pub eh_frame_svma: Range, +} + +impl UnwindData { + // Based on this: https://github.com/mstange/linux-perf-stuff/blob/22ca6531b90c10dd2a4519351c843b8d7958a451/src/main.rs#L747-L893 + pub fn new( + path_slice: &[u8], + mapping_start_file_offset: u64, + mapping_start_avma: u64, + mapping_size: u64, + build_id: Option<&[u8]>, + ) -> anyhow::Result { + use object::{Object, ObjectSection, ObjectSegment}; + + let avma_range = mapping_start_avma..(mapping_start_avma + mapping_size); + + let path = String::from_utf8_lossy(path_slice).to_string(); + let Some(file) = std::fs::File::open(&path).ok() else { + bail!("Could not open file {path}"); + }; + + let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? }; + let file = object::File::parse(&mmap[..])?; + + // Verify the build id (if we have one) + match (build_id, file.build_id()) { + (Some(build_id), Ok(Some(file_build_id))) => { + if build_id != file_build_id { + let file_build_id = CodeId::from_binary(file_build_id); + let expected_build_id = CodeId::from_binary(build_id); + bail!( + "File {:?} has non-matching build ID {} (expected {})", + path, + file_build_id, + expected_build_id + ); + } + } + (Some(_), Err(_)) | (Some(_), Ok(None)) => { + bail!( + "File {:?} does not contain a build ID, but we expected it to have one", + path + ); + } + _ => { + // No build id to check + } + }; + + let mapping_end_file_offset = mapping_start_file_offset + mapping_size; + let mapped_segment = file + .segments() + .find(|segment| { + let (segment_start_file_offset, segment_size) = segment.file_range(); + let segment_end_file_offset = segment_start_file_offset + segment_size; + mapping_start_file_offset <= segment_start_file_offset + && segment_end_file_offset <= mapping_end_file_offset + }) + .context("Failed to find segment")?; + + let (segment_start_file_offset, _segment_size) = mapped_segment.file_range(); + let segment_start_svma = mapped_segment.address(); + let segment_start_avma = + mapping_start_avma + (segment_start_file_offset - mapping_start_file_offset); + + let base_avma = segment_start_avma - segment_start_svma; + let eh_frame = file.section_by_name(".eh_frame"); + let eh_frame_hdr = file.section_by_name(".eh_frame_hdr"); + + fn section_data<'a>(section: &impl ObjectSection<'a>) -> Option> { + section.data().ok().map(|data| data.to_owned()) + } + + let eh_frame_data = eh_frame.as_ref().and_then(section_data); + let eh_frame_hdr_data = eh_frame_hdr.as_ref().and_then(section_data); + + fn svma_range<'a>(section: &impl ObjectSection<'a>) -> Range { + section.address()..section.address() + section.size() + } + + Ok(Self { + path, + avma_range, + base_avma, + eh_frame_hdr: eh_frame_hdr_data.context("Failed to find eh_frame hdr data")?, + eh_frame_hdr_svma: eh_frame_hdr + .as_ref() + .map(svma_range) + .context("Failed to find eh_frame hdr section")?, + eh_frame: eh_frame_data.context("Failed to find eh_frame data")?, + eh_frame_svma: eh_frame + .as_ref() + .map(svma_range) + .context("Failed to find eh_frame section")?, + }) + } + + pub fn save_to>(&self, folder: P, pid: u32) -> anyhow::Result<()> { + let unwind_data_path = folder.as_ref().join(format!( + "{}_{:x}_{:x}.unwind", + pid, self.avma_range.start, self.avma_range.end + )); + self.to_file(unwind_data_path)?; + + Ok(()) + } + + fn to_file>(&self, path: P) -> anyhow::Result<()> { + if let Ok(true) = std::fs::exists(path.as_ref()) { + log::warn!( + "{} already exists, file will be truncated", + path.as_ref().display() + ); + log::warn!("{} {:x?}", self.path, self.avma_range); + } + + let mut writer = std::fs::File::create(path.as_ref())?; + bincode::serialize_into(&mut writer, self)?; + Ok(()) + } +} From 476a551639b14841bc9c1fb0519ed02c3cf5632a Mon Sep 17 00:00:00 2001 From: not-matthias Date: Tue, 22 Apr 2025 11:05:14 +0200 Subject: [PATCH 4/7] feat(runner): add perf integration for python --- src/run/runner/wall_time/perf/metadata.rs | 3 ++ src/run/runner/wall_time/perf/mod.rs | 52 +++++++++++++++++++++-- src/run/runner/wall_time/perf/perf_map.rs | 9 ++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/run/runner/wall_time/perf/metadata.rs b/src/run/runner/wall_time/perf/metadata.rs index 0956f89e..e46441f8 100644 --- a/src/run/runner/wall_time/perf/metadata.rs +++ b/src/run/runner/wall_time/perf/metadata.rs @@ -15,6 +15,9 @@ pub struct PerfMetadata { /// The URIs of the benchmarks in the order they were executed. pub bench_order_by_pid: HashMap>, + + /// Modules that should be ignored and removed from the folded trace and callgraph (e.g. python interpreter) + pub ignored_modules: Vec<(String, u64, u64)>, } impl PerfMetadata { diff --git a/src/run/runner/wall_time/perf/mod.rs b/src/run/runner/wall_time/perf/mod.rs index 0866a45d..39abe52d 100644 --- a/src/run/runner/wall_time/perf/mod.rs +++ b/src/run/runner/wall_time/perf/mod.rs @@ -3,12 +3,15 @@ use crate::prelude::*; use crate::run::runner::helpers::run_command_with_log_pipe::run_command_with_log_pipe_and_callback; use crate::run::runner::helpers::setup::run_with_sudo; +use crate::run::runner::valgrind::helpers::ignored_objects_path::get_objects_path_to_ignore; +use crate::run::runner::valgrind::helpers::perf_maps::harvest_perf_maps_for_pids; use anyhow::Context; use fifo::{PerfFifo, RunnerFifo}; use futures::stream::FuturesUnordered; use metadata::PerfMetadata; use perf_map::ProcessSymbols; use shared::Command as FifoCommand; +use std::collections::HashSet; use std::path::PathBuf; use std::process::Command; use std::time::Duration; @@ -77,10 +80,23 @@ impl PerfRunner { .prefix(PERF_DATA_PREFIX) .tempfile_in(&self.perf_dir)?; + // Detect the mode based on the command to be executed + let cg_mode = if bench_cmd.contains("cargo") { + "dwarf" + } else if bench_cmd.contains("pytest") { + "fp" + } else { + panic!( + "Perf not supported. Failed to detect call graph mode for command: {}", + bench_cmd + ) + }; + debug!("Using call graph mode: {}", cg_mode); + cmd.args([ "-c", &format!( - "perf record --user-callchains --freq=999 --switch-output --control=fifo:{},{} --delay=-1 -g --call-graph=dwarf --output={} -- {bench_cmd}", + "perf record --user-callchains --freq=999 --switch-output --control=fifo:{},{} --delay=-1 -g --call-graph={cg_mode} --output={} -- {bench_cmd}", perf_fifo.ctl_fifo_path.to_string_lossy(), perf_fifo.ack_fifo_path.to_string_lossy(), perf_file.path().to_string_lossy() @@ -126,7 +142,7 @@ impl PerfRunner { let dst_path = profile_folder.join(dst_file_name); tokio::fs::copy(src_path, dst_path).await?; - Ok::<_, anyhow::Error>(()) + Ok::<_, anyhow::Error>(pid) }) }) .collect::>(); @@ -140,7 +156,16 @@ impl PerfRunner { bench_data.bench_count(), "Benchmark count mismatch" ); - let _ = futures::future::try_join_all(copy_tasks).await?; + + // Harvest the perf maps generated by python. This will copy the perf + // maps from /tmp to the profile folder. We have to write our own perf + // maps to these files AFTERWARDS, otherwise it'll be overwritten! + let perf_map_pids = futures::future::try_join_all(copy_tasks) + .await? + .into_iter() + .filter_map(Result::ok) + .collect::>(); + harvest_perf_maps_for_pids(profile_folder, &perf_map_pids).await?; // Append perf maps, unwind info and other metadata bench_data.save_to(profile_folder).unwrap(); @@ -276,6 +301,27 @@ impl BenchmarkData { let metadata = PerfMetadata { integration: self.integration.clone(), bench_order_by_pid: self.bench_order_by_pid.clone(), + ignored_modules: { + let mut to_ignore = vec![]; + + // Check if any of the ignored modules has been loaded in the process + for ignore_path in get_objects_path_to_ignore() { + for proc in self.symbols_by_pid.values() { + if let Some(mapping) = proc.module_mapping(&ignore_path) { + let (Some((base_addr, _)), Some((_, end_addr))) = ( + mapping.iter().min_by_key(|(base_addr, _)| base_addr), + mapping.iter().max_by_key(|(_, end_addr)| end_addr), + ) else { + continue; + }; + + to_ignore.push((ignore_path.clone(), *base_addr, *end_addr)); + } + } + } + + to_ignore + }, }; metadata.save_to(&path).unwrap(); diff --git a/src/run/runner/wall_time/perf/perf_map.rs b/src/run/runner/wall_time/perf/perf_map.rs index 27bf1128..3bc2ba65 100644 --- a/src/run/runner/wall_time/perf/perf_map.rs +++ b/src/run/runner/wall_time/perf/perf_map.rs @@ -125,6 +125,15 @@ impl ProcessSymbols { .push((start_addr, end_addr)); } + pub fn module_mapping>( + &self, + module_path: P, + ) -> Option<&[(u64, u64)]> { + self.module_mappings + .get(module_path.as_ref()) + .map(|bounds| bounds.as_slice()) + } + pub fn save_to>(&self, folder: P) -> anyhow::Result<()> { if self.modules.is_empty() { return Ok(()); From 308bb0f623d2834bef20dff9f77ed754865a4434 Mon Sep 17 00:00:00 2001 From: Adrien Cacciaguerra Date: Mon, 28 Apr 2025 12:28:32 +0200 Subject: [PATCH 5/7] chore: Release codspeed-runner version 3.6.0-beta.2 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45738413..23b9cd46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -273,7 +273,7 @@ checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" [[package]] name = "codspeed-runner" -version = "3.5.0" +version = "3.6.0-beta.2" dependencies = [ "anyhow", "async-compression", diff --git a/Cargo.toml b/Cargo.toml index 5e96e150..58b1b1c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "codspeed-runner" -version = "3.5.0" +version = "3.6.0-beta.2" edition = "2021" repository = "https://github.com/CodSpeedHQ/runner" publish = false From 8e4a390ec7d26fc986a9dac73c8a46092b4bbb95 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Thu, 1 May 2025 14:32:54 +0200 Subject: [PATCH 6/7] feat: install perf on setup --- src/run/runner/wall_time/perf/mod.rs | 5 ++- src/run/runner/wall_time/perf/setup.rs | 54 ++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 src/run/runner/wall_time/perf/setup.rs diff --git a/src/run/runner/wall_time/perf/mod.rs b/src/run/runner/wall_time/perf/mod.rs index 39abe52d..f69ddcb1 100644 --- a/src/run/runner/wall_time/perf/mod.rs +++ b/src/run/runner/wall_time/perf/mod.rs @@ -20,6 +20,7 @@ use tempfile::TempDir; use unwind_data::UnwindData; mod metadata; +mod setup; mod shared; pub use shared::*; @@ -37,6 +38,8 @@ pub struct PerfRunner { impl PerfRunner { pub fn setup_environment() -> anyhow::Result<()> { + setup::install_perf()?; + let sysctl_read = |name: &str| -> anyhow::Result { let output = std::process::Command::new("sysctl").arg(name).output()?; let output = String::from_utf8(output.stdout)?; @@ -191,7 +194,7 @@ impl PerfRunner { break; } - let result = tokio::time::timeout(Duration::from_secs(1), runner_fifo.recv_cmd()).await; + let result = tokio::time::timeout(Duration::from_secs(5), runner_fifo.recv_cmd()).await; let Ok(Ok(cmd)) = result else { continue; }; diff --git a/src/run/runner/wall_time/perf/setup.rs b/src/run/runner/wall_time/perf/setup.rs new file mode 100644 index 00000000..59e87d55 --- /dev/null +++ b/src/run/runner/wall_time/perf/setup.rs @@ -0,0 +1,54 @@ +use crate::{prelude::*, run::runner::helpers::setup::run_with_sudo}; +use std::process::Command; + +fn cmd_version(cmd: &str) -> anyhow::Result { + let is_installed = Command::new("which") + .arg(cmd) + .output() + .is_ok_and(|output| output.status.success()); + if !is_installed { + bail!("{cmd} is not installed") + } + + Ok(Command::new(cmd) + .arg("--version") + .output() + .map(|out| String::from_utf8_lossy(&out.stdout).to_string())?) +} + +fn is_perf_installed() -> bool { + let version_str = cmd_version("perf"); + debug!("Perf version: {:?}", version_str); + + version_str.is_ok() +} + +pub fn install_perf() -> Result<()> { + if is_perf_installed() { + info!("Perf is already installed, skipping installation"); + return Ok(()); + } + + let cmd = Command::new("uname") + .arg("-r") + .output() + .expect("Failed to execute uname"); + let kernel_release = String::from_utf8_lossy(&cmd.stdout); + debug!("Kernel release: {}", kernel_release.trim()); + + debug!("Installing perf"); + run_with_sudo(&["apt-get", "update"])?; + run_with_sudo(&[ + "apt-get", + "install", + "--allow-downgrades", + "-y", + "linux-tools-common", + "linux-tools-generic", + &format!("linux-tools-{}", kernel_release.trim()), + ])?; + + info!("Perf installation completed successfully"); + + Ok(()) +} From 4d6c0622592d8d0619c6a2f198407f5fcd4a9808 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Fri, 2 May 2025 13:56:51 +0200 Subject: [PATCH 7/7] fix: use bash to ensure correct behavior across systems See COD-843 for more information. --- src/run/runner/wall_time/executor.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/run/runner/wall_time/executor.rs b/src/run/runner/wall_time/executor.rs index 95ae936b..ea0adbb6 100644 --- a/src/run/runner/wall_time/executor.rs +++ b/src/run/runner/wall_time/executor.rs @@ -51,7 +51,9 @@ impl Executor for WallTimeExecutor { run_data: &RunData, _mongo_tracer: &Option, ) -> Result<()> { - let mut cmd = Command::new("sh"); + // IMPORTANT: Don't use `sh` here! We will use this pid to send signals to the + // spawned child process which won't work if we use a different shell. + let mut cmd = Command::new("bash"); cmd.envs(get_base_injected_env( RunnerMode::Walltime,