From 9e5e929c3d6c7ec8d3faa974479b789a5ce7b666 Mon Sep 17 00:00:00 2001 From: Guillaume Lagrange Date: Wed, 15 Oct 2025 11:57:00 +0200 Subject: [PATCH 1/2] feat: automatically compress archive if profile folder is above a certain threshold --- src/run/uploader/profile_archive.rs | 25 +++++++- src/run/uploader/upload.rs | 89 +++++++++++++++++++++++------ 2 files changed, 96 insertions(+), 18 deletions(-) diff --git a/src/run/uploader/profile_archive.rs b/src/run/uploader/profile_archive.rs index 163363a6..5484b96c 100644 --- a/src/run/uploader/profile_archive.rs +++ b/src/run/uploader/profile_archive.rs @@ -13,6 +13,7 @@ pub struct ProfileArchive { pub enum ProfileArchiveContent { CompressedInMemory { data: Vec }, UncompressedOnDisk { path: PathBuf }, + CompressedOnDisk { path: PathBuf }, } impl ProfileArchive { @@ -39,13 +40,30 @@ impl ProfileArchive { content: ProfileArchiveContent::UncompressedOnDisk { path }, }) } + + pub fn new_compressed_on_disk(path: PathBuf) -> Result { + let metadata = std::fs::metadata(&path)?; + if !metadata.is_file() { + return Err(anyhow!("The provided path is not a file")); + } + let mut file = std::fs::File::open(&path)?; + let mut buffer = Vec::new(); + use std::io::Read; + file.read_to_end(&mut buffer)?; + let hash = general_purpose::STANDARD.encode(md5::compute(&buffer).0); + Ok(ProfileArchive { + hash, + content: ProfileArchiveContent::CompressedOnDisk { path }, + }) + } } impl ProfileArchiveContent { pub async fn size(&self) -> Result { match &self { ProfileArchiveContent::CompressedInMemory { data } => Ok(data.len() as u64), - ProfileArchiveContent::UncompressedOnDisk { path } => { + ProfileArchiveContent::UncompressedOnDisk { path } + | ProfileArchiveContent::CompressedOnDisk { path } => { let metadata = tokio::fs::metadata(path).await?; Ok(metadata.len()) } @@ -55,6 +73,7 @@ impl ProfileArchiveContent { pub fn encoding(&self) -> Option { match self { ProfileArchiveContent::CompressedInMemory { .. } => Some("gzip".to_string()), + ProfileArchiveContent::CompressedOnDisk { .. } => Some("gzip".to_string()), _ => None, } } @@ -62,7 +81,9 @@ impl ProfileArchiveContent { impl Drop for ProfileArchiveContent { fn drop(&mut self) { - if let ProfileArchiveContent::UncompressedOnDisk { path } = self { + if let ProfileArchiveContent::UncompressedOnDisk { path } + | ProfileArchiveContent::CompressedOnDisk { path } = self + { if path.exists() { let _ = std::fs::remove_file(path); } diff --git a/src/run/uploader/upload.rs b/src/run/uploader/upload.rs index 18dbe87f..ba436e6e 100644 --- a/src/run/uploader/upload.rs +++ b/src/run/uploader/upload.rs @@ -19,10 +19,39 @@ use tokio_tar::Builder; use super::interfaces::{UploadData, UploadMetadata}; use super::profile_archive::ProfileArchive; +fn bytes_to_mib(bytes: u64) -> u64 { + bytes / (1024 * 1024) +} + +/// Maximum uncompressed profile folder size in MiB before compression is required +const MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES: u64 = 1024 * 1024 * 1024 * 5; // 5 GiB + +/// Calculate the total size of a directory in bytes +async fn calculate_folder_size(path: &std::path::Path) -> Result { + let mut total_size = 0u64; + let mut dirs_to_process = vec![path.to_path_buf()]; + + while let Some(current_dir) = dirs_to_process.pop() { + let mut entries = tokio::fs::read_dir(¤t_dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let metadata = entry.metadata().await?; + if metadata.is_file() { + total_size += metadata.len(); + } else if metadata.is_dir() { + dirs_to_process.push(entry.path()); + } + } + } + + Ok(total_size) +} + /// Create a profile archive from the profile folder and return its md5 hash encoded in base64 /// /// For Valgrind, we create a gzip-compressed tar archive of the entire profile folder. -/// For WallTime, we create an uncompressed tar archive of the entire profile folder. +/// For WallTime, we check the folder size and create either a compressed or uncompressed tar archive +/// based on the MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES threshold. async fn create_profile_archive( run_data: &RunData, executor_name: ExecutorName, @@ -41,23 +70,47 @@ async fn create_profile_archive( ProfileArchive::new_compressed_in_memory(data) } ExecutorName::WallTime => { - debug!("Creating uncompressed tar archive for WallTime on disk"); + // Check folder size to decide on compression + let folder_size_bytes = calculate_folder_size(&run_data.profile_folder).await?; + let should_compress = folder_size_bytes >= MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES; + let temp_file = tempfile::NamedTempFile::new()?; let temp_path = temp_file.path().to_path_buf(); // Create a tokio File handle to the temporary file let file = File::create(&temp_path).await?; - { + + // Persist the temporary file to prevent deletion when temp_file goes out of scope + let persistent_path = temp_file.into_temp_path().keep()?; + + if should_compress { + debug!( + "Profile folder size ({} MiB) exceeds threshold ({} MiB), creating compressed tar.gz archive on disk", + bytes_to_mib(folder_size_bytes), + bytes_to_mib(MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES) + ); + let enc = GzipEncoder::new(file); + let mut tar = Builder::new(enc); + tar.append_dir_all(".", run_data.profile_folder.clone()) + .await?; + let mut gzip_encoder = tar.into_inner().await?; + gzip_encoder.shutdown().await?; + gzip_encoder.into_inner().sync_all().await?; + + ProfileArchive::new_compressed_on_disk(persistent_path)? + } else { + debug!( + "Profile folder size ({} MiB) is below threshold ({} MiB), creating uncompressed tar archive on disk", + bytes_to_mib(folder_size_bytes), + bytes_to_mib(MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES) + ); let mut tar = Builder::new(file); tar.append_dir_all(".", run_data.profile_folder.clone()) .await?; tar.into_inner().await?.sync_all().await?; - } - - // Persist the temporary file to prevent deletion when temp_file goes out of scope - let persistent_path = temp_file.into_temp_path().keep()?; - ProfileArchive::new_uncompressed_on_disk(persistent_path)? + ProfileArchive::new_uncompressed_on_disk(persistent_path)? + } } }; @@ -130,7 +183,7 @@ async fn upload_profile_archive( let archive_hash = profile_archive.hash; let response = match &profile_archive.content { - ProfileArchiveContent::CompressedInMemory { data } => { + content @ ProfileArchiveContent::CompressedInMemory { data } => { // Use regular client with retry middleware for compressed data let mut request = REQUEST_CLIENT .put(upload_data.upload_url.clone()) @@ -138,13 +191,14 @@ async fn upload_profile_archive( .header("Content-Length", archive_size) .header("Content-MD5", archive_hash); - if let Some(encoding) = profile_archive.content.encoding() { + if let Some(encoding) = content.encoding() { request = request.header("Content-Encoding", encoding); } request.body(data.clone()).send().await? } - ProfileArchiveContent::UncompressedOnDisk { path } => { + content @ ProfileArchiveContent::UncompressedOnDisk { path } + | content @ ProfileArchiveContent::CompressedOnDisk { path } => { // Use streaming client without retry middleware for file streams let file = File::open(path) .await @@ -152,14 +206,17 @@ async fn upload_profile_archive( let stream = tokio_util::io::ReaderStream::new(file); let body = reqwest::Body::wrap_stream(stream); - STREAMING_CLIENT + let mut request = STREAMING_CLIENT .put(upload_data.upload_url.clone()) .header("Content-Type", "application/x-tar") .header("Content-Length", archive_size) - .header("Content-MD5", archive_hash) - .body(body) - .send() - .await? + .header("Content-MD5", archive_hash); + + if let Some(encoding) = content.encoding() { + request = request.header("Content-Encoding", encoding); + } + + request.body(body).send().await? } }; From 1bef5250c85a84bf4939871d8ee40eddfaff4b2a Mon Sep 17 00:00:00 2001 From: Guillaume Lagrange Date: Wed, 15 Oct 2025 14:14:13 +0200 Subject: [PATCH 2/2] chore: make fifo command dump trace level Witht he introduction of markers, we ere getting spammed because each iteration has a Start and a Stop marker. --- src/run/runner/wall_time/perf/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/run/runner/wall_time/perf/mod.rs b/src/run/runner/wall_time/perf/mod.rs index c9386bc7..b8dedc26 100644 --- a/src/run/runner/wall_time/perf/mod.rs +++ b/src/run/runner/wall_time/perf/mod.rs @@ -306,7 +306,7 @@ impl PerfRunner { } Err(_) => continue, }; - debug!("Received command: {cmd:?}"); + trace!("Received command: {cmd:?}"); match cmd { FifoCommand::CurrentBenchmark { pid, uri } => {