From 053bd0fc936e893c51cc0016f17987308845d272 Mon Sep 17 00:00:00 2001 From: Adrien Cacciaguerra Date: Tue, 16 Sep 2025 16:18:42 +0000 Subject: [PATCH 1/4] feat(upload): do not compress profile archive for walltime runs --- .gitignore | 1 + Cargo.lock | 5 +- Cargo.toml | 1 + src/request_client.rs | 6 + src/run/run_environment/local/provider.rs | 2 +- src/run/run_environment/provider.rs | 2 +- src/run/uploader/upload.rs | 172 ++++++++++++++++++---- src/run/uploader/upload_metadata.rs | 2 +- 8 files changed, 159 insertions(+), 32 deletions(-) diff --git a/.gitignore b/.gitignore index 05923927..f164cd97 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target .DS_Store +samples diff --git a/Cargo.lock b/Cargo.lock index 725e2438..2879f846 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,6 +329,7 @@ dependencies = [ "test-with", "tokio", "tokio-tar", + "tokio-util", "url", ] @@ -2424,9 +2425,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.13" +version = "0.7.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 6ae66872..ccd18597 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ url = "2.4.1" sha256 = "1.4.0" tokio = { version = "1", features = ["macros", "rt"] } tokio-tar = "0.3.1" +tokio-util = "0.7.16" md5 = "0.7.0" base64 = "0.21.0" async-compression = { version = "0.4.5", features = ["tokio", "gzip"] } diff --git a/src/request_client.rs b/src/request_client.rs index f8dde6ea..20af51e9 100644 --- a/src/request_client.rs +++ b/src/request_client.rs @@ -16,4 +16,10 @@ lazy_static! { ExponentialBackoff::builder().build_with_max_retries(UPLOAD_RETRY_COUNT) )) .build(); + + // Client without retry middleware for streaming uploads (can't be cloned) + pub static ref STREAMING_CLIENT: reqwest::Client = ClientBuilder::new() + .user_agent("codspeed-runner") + .build() + .unwrap(); } diff --git a/src/run/run_environment/local/provider.rs b/src/run/run_environment/local/provider.rs index da9ce256..a74e3e9e 100644 --- a/src/run/run_environment/local/provider.rs +++ b/src/run/run_environment/local/provider.rs @@ -153,7 +153,7 @@ impl RunEnvironmentProvider for LocalProvider { let run_environment_metadata = self.get_run_environment_metadata()?; Ok(UploadMetadata { - version: Some(6), + version: Some(7), tokenless: config.token.is_none(), repository_provider: self.get_repository_provider(), commit_hash: run_environment_metadata.ref_.clone(), diff --git a/src/run/run_environment/provider.rs b/src/run/run_environment/provider.rs index c236db93..df4fd134 100644 --- a/src/run/run_environment/provider.rs +++ b/src/run/run_environment/provider.rs @@ -74,7 +74,7 @@ pub trait RunEnvironmentProvider { let commit_hash = get_commit_hash(&run_environment_metadata.repository_root_path)?; Ok(UploadMetadata { - version: Some(6), + version: Some(7), tokenless: config.token.is_none(), repository_provider: self.get_repository_provider(), run_environment_metadata, diff --git a/src/run/uploader/upload.rs b/src/run/uploader/upload.rs index eb34aec8..7a39d7ad 100644 --- a/src/run/uploader/upload.rs +++ b/src/run/uploader/upload.rs @@ -6,30 +6,123 @@ use crate::run::{ runner::RunData, uploader::UploadError, }; -use crate::{prelude::*, request_client::REQUEST_CLIENT}; +use crate::{ + prelude::*, + request_client::{REQUEST_CLIENT, STREAMING_CLIENT}, +}; use async_compression::tokio::write::GzipEncoder; use base64::{Engine as _, engine::general_purpose}; use console::style; use reqwest::StatusCode; -use tokio::io::AsyncWriteExt; +use std::path::PathBuf; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_tar::Builder; use super::interfaces::{UploadData, UploadMetadata}; -/// Create a tar.gz archive buffer of the profile folder and return its md5 hash encoded in base64 -async fn get_profile_archive_buffer(run_data: &RunData) -> Result<(Vec, String)> { - let enc = GzipEncoder::new(Vec::new()); - let mut tar = Builder::new(enc); - tar.append_dir_all(".", run_data.profile_folder.clone()) - .await?; - let mut gzip_encoder = tar.into_inner().await?; - gzip_encoder.shutdown().await?; - - let archive_buffer = gzip_encoder.into_inner(); - let archive_digest = md5::compute(archive_buffer.as_slice()); +#[derive(Debug)] +enum ProfileArchive { + CompressedInMemory(Vec), + UncompressedOnDisk(PathBuf), +} + +impl ProfileArchive { + async fn size(&self) -> Result { + match self { + ProfileArchive::CompressedInMemory(data) => Ok(data.len() as u64), + ProfileArchive::UncompressedOnDisk(path) => { + let metadata = tokio::fs::metadata(path).await?; + Ok(metadata.len()) + } + } + } +} + +impl Drop for ProfileArchive { + fn drop(&mut self) { + if let ProfileArchive::UncompressedOnDisk(path) = self { + if path.exists() { + let _ = std::fs::remove_file(path); + } + } + } +} + +/// Create a profile archive from the profile folder and return its md5 hash encoded in base64 +/// +/// For Valgrind, we create a gzip-compressed tar archive of the entire profile folder. +/// For WallTime, we create an uncompressed tar archive of the entire profile folder. +async fn create_profile_archive( + run_data: &RunData, + executor_name: ExecutorName, +) -> Result<(ProfileArchive, String)> { + let time_start = std::time::Instant::now(); + let profile_archive = match executor_name { + ExecutorName::Valgrind => { + debug!("Creating compressed tar archive for Valgrind"); + let enc = GzipEncoder::new(Vec::new()); + let mut tar = Builder::new(enc); + tar.append_dir_all(".", run_data.profile_folder.clone()) + .await?; + let mut gzip_encoder = tar.into_inner().await?; + gzip_encoder.shutdown().await?; + ProfileArchive::CompressedInMemory(gzip_encoder.into_inner()) + } + ExecutorName::WallTime => { + debug!("Creating uncompressed tar archive for WallTime on disk"); + let temp_file = tempfile::NamedTempFile::new()?; + let temp_path = temp_file.path().to_path_buf(); + + // Create a tokio File handle to the temporary file + let file = File::create(&temp_path).await?; + { + let mut tar = Builder::new(file); + tar.append_dir_all(".", run_data.profile_folder.clone()) + .await?; + tar.into_inner().await?.sync_all().await?; + } + + // Persist the temporary file to prevent deletion when temp_file goes out of scope + let persistent_path = temp_file.into_temp_path().keep()?; + ProfileArchive::UncompressedOnDisk(persistent_path) + } + }; + + let (archive_digest, archive_size) = match &profile_archive { + ProfileArchive::CompressedInMemory(data) => { + let digest = md5::compute(data.as_slice()); + (digest, data.len() as u64) + } + ProfileArchive::UncompressedOnDisk(path) => { + let mut file = File::open(path).await.context(format!( + "Failed to open uncompressed file at path: {}", + path.display() + ))?; + let mut hasher = md5::Context::new(); + let mut buffer = [0; 8192]; + let mut total_size = 0u64; + + loop { + let bytes_read = file.read(&mut buffer).await?; + if bytes_read == 0 { + break; + } + hasher.consume(&buffer[..bytes_read]); + total_size += bytes_read as u64; + } + (hasher.compute(), total_size) + } + }; + let archive_hash = general_purpose::STANDARD.encode(archive_digest.0); + debug!( + "Created archive ({} bytes) in {:.2?}", + archive_size, + time_start.elapsed() + ); - Ok((archive_buffer, archive_hash)) + Ok((profile_archive, archive_hash)) } async fn retrieve_upload_data( @@ -84,19 +177,43 @@ async fn retrieve_upload_data( } } -async fn upload_archive_buffer( +async fn upload_profile_archive( upload_data: &UploadData, - archive_buffer: Vec, + profile_archive: ProfileArchive, archive_hash: &String, ) -> Result<()> { - let response = REQUEST_CLIENT - .put(upload_data.upload_url.clone()) - .header("Content-Type", "application/gzip") - .header("Content-Length", archive_buffer.len()) - .header("Content-MD5", archive_hash) - .body(archive_buffer) - .send() - .await?; + let archive_size = profile_archive.size().await?; + + let response = match &profile_archive { + ProfileArchive::CompressedInMemory(data) => { + // Use regular client with retry middleware for compressed data + let request = REQUEST_CLIENT + .put(upload_data.upload_url.clone()) + .header("Content-Type", "application/x-tar") + .header("Content-Length", archive_size) + .header("Content-MD5", archive_hash) + .header("Content-Encoding", "gzip"); + + request.body(data.clone()).send().await? + } + ProfileArchive::UncompressedOnDisk(path) => { + // Use streaming client without retry middleware for file streams + let file = File::open(path) + .await + .context(format!("Failed to open file at path: {}", path.display()))?; + let stream = tokio_util::io::ReaderStream::new(file); + let body = reqwest::Body::wrap_stream(stream); + + STREAMING_CLIENT + .put(upload_data.upload_url.clone()) + .header("Content-Type", "application/x-tar") + .header("Content-Length", archive_size) + .header("Content-MD5", archive_hash) + .body(body) + .send() + .await? + } + }; if !response.status().is_success() { let status = response.status(); @@ -124,7 +241,8 @@ pub async fn upload( run_data: &RunData, executor_name: ExecutorName, ) -> Result { - let (archive_buffer, archive_hash) = get_profile_archive_buffer(run_data).await?; + let (profile_archive, archive_hash) = + create_profile_archive(run_data, executor_name.clone()).await?; debug!( "Run Environment provider detected: {:?}", @@ -153,8 +271,8 @@ pub async fn upload( debug!("runId: {}", upload_data.run_id); info!("Uploading performance data..."); - debug!("Uploading {} bytes...", archive_buffer.len()); - upload_archive_buffer(&upload_data, archive_buffer, &archive_hash).await?; + debug!("Uploading {} bytes...", profile_archive.size().await?); + upload_profile_archive(&upload_data, profile_archive, &archive_hash).await?; info!("Performance data uploaded"); Ok(UploadResult { diff --git a/src/run/uploader/upload_metadata.rs b/src/run/uploader/upload_metadata.rs index 1c7eb0c5..818cc429 100644 --- a/src/run/uploader/upload_metadata.rs +++ b/src/run/uploader/upload_metadata.rs @@ -30,7 +30,7 @@ mod tests { fn test_get_metadata_hash() { let upload_metadata = UploadMetadata { repository_provider: RepositoryProvider::GitHub, - version: Some(6), + version: Some(7), tokenless: true, profile_md5: "jp/k05RKuqP3ERQuIIvx4Q==".into(), runner: Runner { From 723fcec00339115fe09ee4045fda8a2e9c4cb26a Mon Sep 17 00:00:00 2001 From: Adrien Cacciaguerra Date: Tue, 16 Sep 2025 17:03:44 +0000 Subject: [PATCH 2/4] feat(upload): add content encoding to upload metadata --- src/run/run_environment/local/provider.rs | 2 ++ src/run/run_environment/provider.rs | 2 ++ src/run/uploader/interfaces.rs | 1 + src/run/uploader/upload.rs | 16 ++++++++++++++-- src/run/uploader/upload_metadata.rs | 1 + 5 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/run/run_environment/local/provider.rs b/src/run/run_environment/local/provider.rs index a74e3e9e..a3b0fe1f 100644 --- a/src/run/run_environment/local/provider.rs +++ b/src/run/run_environment/local/provider.rs @@ -148,6 +148,7 @@ impl RunEnvironmentProvider for LocalProvider { config: &Config, system_info: &SystemInfo, archive_hash: &str, + content_encoding: Option, executor_name: ExecutorName, ) -> Result { let run_environment_metadata = self.get_run_environment_metadata()?; @@ -159,6 +160,7 @@ impl RunEnvironmentProvider for LocalProvider { commit_hash: run_environment_metadata.ref_.clone(), run_environment_metadata, profile_md5: archive_hash.into(), + content_encoding, runner: Runner { name: "codspeed-runner".into(), version: crate::VERSION.into(), diff --git a/src/run/run_environment/provider.rs b/src/run/run_environment/provider.rs index df4fd134..f9cf7968 100644 --- a/src/run/run_environment/provider.rs +++ b/src/run/run_environment/provider.rs @@ -67,6 +67,7 @@ pub trait RunEnvironmentProvider { config: &Config, system_info: &SystemInfo, archive_hash: &str, + content_encoding: Option, executor_name: ExecutorName, ) -> Result { let run_environment_metadata = self.get_run_environment_metadata()?; @@ -79,6 +80,7 @@ pub trait RunEnvironmentProvider { repository_provider: self.get_repository_provider(), run_environment_metadata, profile_md5: archive_hash.into(), + content_encoding, commit_hash, runner: Runner { name: "codspeed-runner".into(), diff --git a/src/run/uploader/interfaces.rs b/src/run/uploader/interfaces.rs index ad43fd94..d76ff149 100644 --- a/src/run/uploader/interfaces.rs +++ b/src/run/uploader/interfaces.rs @@ -14,6 +14,7 @@ pub struct UploadMetadata { pub version: Option, pub tokenless: bool, pub profile_md5: String, + pub content_encoding: Option, pub runner: Runner, pub run_environment: RunEnvironment, pub run_part: Option, diff --git a/src/run/uploader/upload.rs b/src/run/uploader/upload.rs index 7a39d7ad..ef8faac2 100644 --- a/src/run/uploader/upload.rs +++ b/src/run/uploader/upload.rs @@ -37,6 +37,13 @@ impl ProfileArchive { } } } + + fn to_content_encoding(&self) -> Option { + match self { + ProfileArchive::CompressedInMemory(_) => Some("gzip".to_string()), + ProfileArchive::UncompressedOnDisk(_) => None, + } + } } impl Drop for ProfileArchive { @@ -249,8 +256,13 @@ pub async fn upload( provider.get_run_environment() ); - let upload_metadata = - provider.get_upload_metadata(config, system_info, &archive_hash, executor_name)?; + let upload_metadata = provider.get_upload_metadata( + config, + system_info, + &archive_hash, + profile_archive.to_content_encoding(), + executor_name, + )?; debug!("Upload metadata: {upload_metadata:#?}"); info!( "Linked repository: {}\n", diff --git a/src/run/uploader/upload_metadata.rs b/src/run/uploader/upload_metadata.rs index 818cc429..1886af67 100644 --- a/src/run/uploader/upload_metadata.rs +++ b/src/run/uploader/upload_metadata.rs @@ -33,6 +33,7 @@ mod tests { version: Some(7), tokenless: true, profile_md5: "jp/k05RKuqP3ERQuIIvx4Q==".into(), + content_encoding: Some("gzip".into()), runner: Runner { name: "codspeed-runner".into(), version: "2.1.0".into(), From 08a8a5dc296a79a828591b28ee5344904877b0f4 Mon Sep 17 00:00:00 2001 From: Adrien Cacciaguerra Date: Tue, 16 Sep 2025 17:04:06 +0000 Subject: [PATCH 3/4] refactor(upload): refactor profile-archive --- src/run/run_environment/local/provider.rs | 9 +- src/run/run_environment/provider.rs | 9 +- src/run/uploader/interfaces.rs | 2 +- src/run/uploader/mod.rs | 2 + src/run/uploader/profile_archive.rs | 71 +++++++++++++ src/run/uploader/upload.rs | 120 ++++++---------------- src/run/uploader/upload_metadata.rs | 4 +- 7 files changed, 113 insertions(+), 104 deletions(-) create mode 100644 src/run/uploader/profile_archive.rs diff --git a/src/run/run_environment/local/provider.rs b/src/run/run_environment/local/provider.rs index a3b0fe1f..fcafe4fe 100644 --- a/src/run/run_environment/local/provider.rs +++ b/src/run/run_environment/local/provider.rs @@ -8,7 +8,7 @@ use crate::run::config::RepositoryOverride; use crate::run::helpers::{GitRemote, parse_git_remote}; use crate::run::run_environment::{RunEnvironment, RunPart}; use crate::run::runner::ExecutorName; -use crate::run::uploader::{Runner, UploadMetadata}; +use crate::run::uploader::{ProfileArchive, Runner, UploadMetadata}; use crate::run::{ config::Config, helpers::find_repository_root, @@ -147,8 +147,7 @@ impl RunEnvironmentProvider for LocalProvider { &self, config: &Config, system_info: &SystemInfo, - archive_hash: &str, - content_encoding: Option, + profile_archive: &ProfileArchive, executor_name: ExecutorName, ) -> Result { let run_environment_metadata = self.get_run_environment_metadata()?; @@ -159,8 +158,8 @@ impl RunEnvironmentProvider for LocalProvider { repository_provider: self.get_repository_provider(), commit_hash: run_environment_metadata.ref_.clone(), run_environment_metadata, - profile_md5: archive_hash.into(), - content_encoding, + profile_md5: profile_archive.hash.clone(), + profile_encoding: profile_archive.content.encoding(), runner: Runner { name: "codspeed-runner".into(), version: crate::VERSION.into(), diff --git a/src/run/run_environment/provider.rs b/src/run/run_environment/provider.rs index f9cf7968..fc65b4e9 100644 --- a/src/run/run_environment/provider.rs +++ b/src/run/run_environment/provider.rs @@ -5,7 +5,7 @@ use crate::prelude::*; use crate::run::check_system::SystemInfo; use crate::run::config::Config; use crate::run::runner::ExecutorName; -use crate::run::uploader::{Runner, UploadMetadata}; +use crate::run::uploader::{ProfileArchive, Runner, UploadMetadata}; use super::interfaces::{RepositoryProvider, RunEnvironment, RunEnvironmentMetadata, RunPart}; @@ -66,8 +66,7 @@ pub trait RunEnvironmentProvider { &self, config: &Config, system_info: &SystemInfo, - archive_hash: &str, - content_encoding: Option, + profile_archive: &ProfileArchive, executor_name: ExecutorName, ) -> Result { let run_environment_metadata = self.get_run_environment_metadata()?; @@ -79,8 +78,8 @@ pub trait RunEnvironmentProvider { tokenless: config.token.is_none(), repository_provider: self.get_repository_provider(), run_environment_metadata, - profile_md5: archive_hash.into(), - content_encoding, + profile_md5: profile_archive.hash.clone(), + profile_encoding: profile_archive.content.encoding(), commit_hash, runner: Runner { name: "codspeed-runner".into(), diff --git a/src/run/uploader/interfaces.rs b/src/run/uploader/interfaces.rs index d76ff149..accf4bde 100644 --- a/src/run/uploader/interfaces.rs +++ b/src/run/uploader/interfaces.rs @@ -14,7 +14,7 @@ pub struct UploadMetadata { pub version: Option, pub tokenless: bool, pub profile_md5: String, - pub content_encoding: Option, + pub profile_encoding: Option, pub runner: Runner, pub run_environment: RunEnvironment, pub run_part: Option, diff --git a/src/run/uploader/mod.rs b/src/run/uploader/mod.rs index 0c015257..7b60a8f5 100644 --- a/src/run/uploader/mod.rs +++ b/src/run/uploader/mod.rs @@ -1,6 +1,8 @@ mod interfaces; +mod profile_archive; mod upload; mod upload_metadata; pub use interfaces::*; +pub use profile_archive::ProfileArchive; pub use upload::upload; diff --git a/src/run/uploader/profile_archive.rs b/src/run/uploader/profile_archive.rs new file mode 100644 index 00000000..163363a6 --- /dev/null +++ b/src/run/uploader/profile_archive.rs @@ -0,0 +1,71 @@ +use base64::{Engine, engine::general_purpose}; + +use crate::prelude::*; +use std::path::PathBuf; + +#[derive(Debug)] +pub struct ProfileArchive { + pub hash: String, + pub content: ProfileArchiveContent, +} + +#[derive(Debug)] +pub enum ProfileArchiveContent { + CompressedInMemory { data: Vec }, + UncompressedOnDisk { path: PathBuf }, +} + +impl ProfileArchive { + pub fn new_compressed_in_memory(data: Vec) -> Self { + let hash = general_purpose::STANDARD.encode(md5::compute(&data).0); + ProfileArchive { + hash, + content: ProfileArchiveContent::CompressedInMemory { data }, + } + } + + pub fn new_uncompressed_on_disk(path: PathBuf) -> Result { + let metadata = std::fs::metadata(&path)?; + if !metadata.is_file() { + return Err(anyhow!("The provided path is not a file")); + } + let mut file = std::fs::File::open(&path)?; + let mut buffer = Vec::new(); + use std::io::Read; + file.read_to_end(&mut buffer)?; + let hash = general_purpose::STANDARD.encode(md5::compute(&buffer).0); + Ok(ProfileArchive { + hash, + content: ProfileArchiveContent::UncompressedOnDisk { path }, + }) + } +} + +impl ProfileArchiveContent { + pub async fn size(&self) -> Result { + match &self { + ProfileArchiveContent::CompressedInMemory { data } => Ok(data.len() as u64), + ProfileArchiveContent::UncompressedOnDisk { path } => { + let metadata = tokio::fs::metadata(path).await?; + Ok(metadata.len()) + } + } + } + + pub fn encoding(&self) -> Option { + match self { + ProfileArchiveContent::CompressedInMemory { .. } => Some("gzip".to_string()), + _ => None, + } + } +} + +impl Drop for ProfileArchiveContent { + fn drop(&mut self) { + if let ProfileArchiveContent::UncompressedOnDisk { path } = self { + if path.exists() { + let _ = std::fs::remove_file(path); + } + } + } +} diff --git a/src/run/uploader/upload.rs b/src/run/uploader/upload.rs index ef8faac2..18dbe87f 100644 --- a/src/run/uploader/upload.rs +++ b/src/run/uploader/upload.rs @@ -2,59 +2,22 @@ use crate::run::{ check_system::SystemInfo, config::Config, run_environment::{RunEnvironment, RunEnvironmentProvider}, - runner::ExecutorName, - runner::RunData, - uploader::UploadError, + runner::{ExecutorName, RunData}, + uploader::{UploadError, profile_archive::ProfileArchiveContent}, }; use crate::{ prelude::*, request_client::{REQUEST_CLIENT, STREAMING_CLIENT}, }; use async_compression::tokio::write::GzipEncoder; -use base64::{Engine as _, engine::general_purpose}; use console::style; use reqwest::StatusCode; -use std::path::PathBuf; use tokio::fs::File; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; use tokio_tar::Builder; use super::interfaces::{UploadData, UploadMetadata}; - -#[derive(Debug)] -enum ProfileArchive { - CompressedInMemory(Vec), - UncompressedOnDisk(PathBuf), -} - -impl ProfileArchive { - async fn size(&self) -> Result { - match self { - ProfileArchive::CompressedInMemory(data) => Ok(data.len() as u64), - ProfileArchive::UncompressedOnDisk(path) => { - let metadata = tokio::fs::metadata(path).await?; - Ok(metadata.len()) - } - } - } - - fn to_content_encoding(&self) -> Option { - match self { - ProfileArchive::CompressedInMemory(_) => Some("gzip".to_string()), - ProfileArchive::UncompressedOnDisk(_) => None, - } - } -} - -impl Drop for ProfileArchive { - fn drop(&mut self) { - if let ProfileArchive::UncompressedOnDisk(path) = self { - if path.exists() { - let _ = std::fs::remove_file(path); - } - } - } -} +use super::profile_archive::ProfileArchive; /// Create a profile archive from the profile folder and return its md5 hash encoded in base64 /// @@ -63,7 +26,7 @@ impl Drop for ProfileArchive { async fn create_profile_archive( run_data: &RunData, executor_name: ExecutorName, -) -> Result<(ProfileArchive, String)> { +) -> Result { let time_start = std::time::Instant::now(); let profile_archive = match executor_name { ExecutorName::Valgrind => { @@ -74,7 +37,8 @@ async fn create_profile_archive( .await?; let mut gzip_encoder = tar.into_inner().await?; gzip_encoder.shutdown().await?; - ProfileArchive::CompressedInMemory(gzip_encoder.into_inner()) + let data = gzip_encoder.into_inner(); + ProfileArchive::new_compressed_in_memory(data) } ExecutorName::WallTime => { debug!("Creating uncompressed tar archive for WallTime on disk"); @@ -92,44 +56,18 @@ async fn create_profile_archive( // Persist the temporary file to prevent deletion when temp_file goes out of scope let persistent_path = temp_file.into_temp_path().keep()?; - ProfileArchive::UncompressedOnDisk(persistent_path) - } - }; - let (archive_digest, archive_size) = match &profile_archive { - ProfileArchive::CompressedInMemory(data) => { - let digest = md5::compute(data.as_slice()); - (digest, data.len() as u64) - } - ProfileArchive::UncompressedOnDisk(path) => { - let mut file = File::open(path).await.context(format!( - "Failed to open uncompressed file at path: {}", - path.display() - ))?; - let mut hasher = md5::Context::new(); - let mut buffer = [0; 8192]; - let mut total_size = 0u64; - - loop { - let bytes_read = file.read(&mut buffer).await?; - if bytes_read == 0 { - break; - } - hasher.consume(&buffer[..bytes_read]); - total_size += bytes_read as u64; - } - (hasher.compute(), total_size) + ProfileArchive::new_uncompressed_on_disk(persistent_path)? } }; - let archive_hash = general_purpose::STANDARD.encode(archive_digest.0); debug!( "Created archive ({} bytes) in {:.2?}", - archive_size, + profile_archive.content.size().await?, time_start.elapsed() ); - Ok((profile_archive, archive_hash)) + Ok(profile_archive) } async fn retrieve_upload_data( @@ -187,23 +125,26 @@ async fn retrieve_upload_data( async fn upload_profile_archive( upload_data: &UploadData, profile_archive: ProfileArchive, - archive_hash: &String, ) -> Result<()> { - let archive_size = profile_archive.size().await?; + let archive_size = profile_archive.content.size().await?; + let archive_hash = profile_archive.hash; - let response = match &profile_archive { - ProfileArchive::CompressedInMemory(data) => { + let response = match &profile_archive.content { + ProfileArchiveContent::CompressedInMemory { data } => { // Use regular client with retry middleware for compressed data - let request = REQUEST_CLIENT + let mut request = REQUEST_CLIENT .put(upload_data.upload_url.clone()) .header("Content-Type", "application/x-tar") .header("Content-Length", archive_size) - .header("Content-MD5", archive_hash) - .header("Content-Encoding", "gzip"); + .header("Content-MD5", archive_hash); + + if let Some(encoding) = profile_archive.content.encoding() { + request = request.header("Content-Encoding", encoding); + } request.body(data.clone()).send().await? } - ProfileArchive::UncompressedOnDisk(path) => { + ProfileArchiveContent::UncompressedOnDisk { path } => { // Use streaming client without retry middleware for file streams let file = File::open(path) .await @@ -248,21 +189,15 @@ pub async fn upload( run_data: &RunData, executor_name: ExecutorName, ) -> Result { - let (profile_archive, archive_hash) = - create_profile_archive(run_data, executor_name.clone()).await?; + let profile_archive = create_profile_archive(run_data, executor_name.clone()).await?; debug!( "Run Environment provider detected: {:?}", provider.get_run_environment() ); - let upload_metadata = provider.get_upload_metadata( - config, - system_info, - &archive_hash, - profile_archive.to_content_encoding(), - executor_name, - )?; + let upload_metadata = + provider.get_upload_metadata(config, system_info, &profile_archive, executor_name)?; debug!("Upload metadata: {upload_metadata:#?}"); info!( "Linked repository: {}\n", @@ -283,8 +218,11 @@ pub async fn upload( debug!("runId: {}", upload_data.run_id); info!("Uploading performance data..."); - debug!("Uploading {} bytes...", profile_archive.size().await?); - upload_profile_archive(&upload_data, profile_archive, &archive_hash).await?; + debug!( + "Uploading {} bytes...", + profile_archive.content.size().await? + ); + upload_profile_archive(&upload_data, profile_archive).await?; info!("Performance data uploaded"); Ok(UploadResult { diff --git a/src/run/uploader/upload_metadata.rs b/src/run/uploader/upload_metadata.rs index 1886af67..c9e8d4e8 100644 --- a/src/run/uploader/upload_metadata.rs +++ b/src/run/uploader/upload_metadata.rs @@ -33,7 +33,7 @@ mod tests { version: Some(7), tokenless: true, profile_md5: "jp/k05RKuqP3ERQuIIvx4Q==".into(), - content_encoding: Some("gzip".into()), + profile_encoding: Some("gzip".into()), runner: Runner { name: "codspeed-runner".into(), version: "2.1.0".into(), @@ -77,7 +77,7 @@ mod tests { hash, // Caution: when changing this value, we need to ensure that // the related backend snapshot remains the same - @"f827f6a834c26d39900c0a9e2dddfaaf22956494c8db911fc06fef72878b0c70" + @"7275243b4457a8fa70215c084210bea7518a3994b863e4437fa33c5ae2bd219e" ); assert_json_snapshot!(upload_metadata); } From 65a2980475069de7dbd85ade13c6849cedf3b80b Mon Sep 17 00:00:00 2001 From: Adrien Cacciaguerra Date: Wed, 17 Sep 2025 09:56:39 +0000 Subject: [PATCH 4/4] refactor(run): store upload metadata latest version in a const --- src/run/run_environment/local/provider.rs | 6 ++++-- src/run/run_environment/provider.rs | 6 ++++-- src/run/uploader/interfaces.rs | 2 ++ ...loader__upload_metadata__tests__get_metadata_hash-2.snap | 3 ++- src/run/uploader/upload_metadata.rs | 4 ++-- 5 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/run/run_environment/local/provider.rs b/src/run/run_environment/local/provider.rs index fcafe4fe..85999fab 100644 --- a/src/run/run_environment/local/provider.rs +++ b/src/run/run_environment/local/provider.rs @@ -8,7 +8,9 @@ use crate::run::config::RepositoryOverride; use crate::run::helpers::{GitRemote, parse_git_remote}; use crate::run::run_environment::{RunEnvironment, RunPart}; use crate::run::runner::ExecutorName; -use crate::run::uploader::{ProfileArchive, Runner, UploadMetadata}; +use crate::run::uploader::{ + LATEST_UPLOAD_METADATA_VERSION, ProfileArchive, Runner, UploadMetadata, +}; use crate::run::{ config::Config, helpers::find_repository_root, @@ -153,7 +155,7 @@ impl RunEnvironmentProvider for LocalProvider { let run_environment_metadata = self.get_run_environment_metadata()?; Ok(UploadMetadata { - version: Some(7), + version: Some(LATEST_UPLOAD_METADATA_VERSION), tokenless: config.token.is_none(), repository_provider: self.get_repository_provider(), commit_hash: run_environment_metadata.ref_.clone(), diff --git a/src/run/run_environment/provider.rs b/src/run/run_environment/provider.rs index fc65b4e9..2e17ddaa 100644 --- a/src/run/run_environment/provider.rs +++ b/src/run/run_environment/provider.rs @@ -5,7 +5,9 @@ use crate::prelude::*; use crate::run::check_system::SystemInfo; use crate::run::config::Config; use crate::run::runner::ExecutorName; -use crate::run::uploader::{ProfileArchive, Runner, UploadMetadata}; +use crate::run::uploader::{ + LATEST_UPLOAD_METADATA_VERSION, ProfileArchive, Runner, UploadMetadata, +}; use super::interfaces::{RepositoryProvider, RunEnvironment, RunEnvironmentMetadata, RunPart}; @@ -74,7 +76,7 @@ pub trait RunEnvironmentProvider { let commit_hash = get_commit_hash(&run_environment_metadata.repository_root_path)?; Ok(UploadMetadata { - version: Some(7), + version: Some(LATEST_UPLOAD_METADATA_VERSION), tokenless: config.token.is_none(), repository_provider: self.get_repository_provider(), run_environment_metadata, diff --git a/src/run/uploader/interfaces.rs b/src/run/uploader/interfaces.rs index accf4bde..a25446af 100644 --- a/src/run/uploader/interfaces.rs +++ b/src/run/uploader/interfaces.rs @@ -7,6 +7,8 @@ use crate::run::{ runner::ExecutorName, }; +pub const LATEST_UPLOAD_METADATA_VERSION: u32 = 7; + #[derive(Deserialize, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct UploadMetadata { diff --git a/src/run/uploader/snapshots/codspeed__run__uploader__upload_metadata__tests__get_metadata_hash-2.snap b/src/run/uploader/snapshots/codspeed__run__uploader__upload_metadata__tests__get_metadata_hash-2.snap index cec82e18..07374191 100644 --- a/src/run/uploader/snapshots/codspeed__run__uploader__upload_metadata__tests__get_metadata_hash-2.snap +++ b/src/run/uploader/snapshots/codspeed__run__uploader__upload_metadata__tests__get_metadata_hash-2.snap @@ -4,9 +4,10 @@ expression: upload_metadata --- { "repositoryProvider": "GITHUB", - "version": 6, + "version": 7, "tokenless": true, "profileMd5": "jp/k05RKuqP3ERQuIIvx4Q==", + "profileEncoding": "gzip", "runner": { "name": "codspeed-runner", "version": "2.1.0", diff --git a/src/run/uploader/upload_metadata.rs b/src/run/uploader/upload_metadata.rs index c9e8d4e8..56ba4a7b 100644 --- a/src/run/uploader/upload_metadata.rs +++ b/src/run/uploader/upload_metadata.rs @@ -23,14 +23,14 @@ mod tests { Sender, }, runner::ExecutorName, - uploader::{Runner, UploadMetadata}, + uploader::{LATEST_UPLOAD_METADATA_VERSION, Runner, UploadMetadata}, }; #[test] fn test_get_metadata_hash() { let upload_metadata = UploadMetadata { repository_provider: RepositoryProvider::GitHub, - version: Some(7), + version: Some(LATEST_UPLOAD_METADATA_VERSION), tokenless: true, profile_md5: "jp/k05RKuqP3ERQuIIvx4Q==".into(), profile_encoding: Some("gzip".into()),