diff --git a/.gitignore b/.gitignore index 05923927..f164cd97 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target .DS_Store +samples diff --git a/Cargo.lock b/Cargo.lock index 725e2438..2879f846 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,6 +329,7 @@ dependencies = [ "test-with", "tokio", "tokio-tar", + "tokio-util", "url", ] @@ -2424,9 +2425,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.13" +version = "0.7.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 6ae66872..ccd18597 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ url = "2.4.1" sha256 = "1.4.0" tokio = { version = "1", features = ["macros", "rt"] } tokio-tar = "0.3.1" +tokio-util = "0.7.16" md5 = "0.7.0" base64 = "0.21.0" async-compression = { version = "0.4.5", features = ["tokio", "gzip"] } diff --git a/src/request_client.rs b/src/request_client.rs index f8dde6ea..20af51e9 100644 --- a/src/request_client.rs +++ b/src/request_client.rs @@ -16,4 +16,10 @@ lazy_static! { ExponentialBackoff::builder().build_with_max_retries(UPLOAD_RETRY_COUNT) )) .build(); + + // Client without retry middleware for streaming uploads (can't be cloned) + pub static ref STREAMING_CLIENT: reqwest::Client = ClientBuilder::new() + .user_agent("codspeed-runner") + .build() + .unwrap(); } diff --git a/src/run/run_environment/local/provider.rs b/src/run/run_environment/local/provider.rs index da9ce256..85999fab 100644 --- a/src/run/run_environment/local/provider.rs +++ b/src/run/run_environment/local/provider.rs @@ -8,7 +8,9 @@ use crate::run::config::RepositoryOverride; use crate::run::helpers::{GitRemote, parse_git_remote}; use crate::run::run_environment::{RunEnvironment, RunPart}; use crate::run::runner::ExecutorName; -use crate::run::uploader::{Runner, UploadMetadata}; +use crate::run::uploader::{ + LATEST_UPLOAD_METADATA_VERSION, ProfileArchive, Runner, UploadMetadata, +}; use crate::run::{ config::Config, helpers::find_repository_root, @@ -147,18 +149,19 @@ impl RunEnvironmentProvider for LocalProvider { &self, config: &Config, system_info: &SystemInfo, - archive_hash: &str, + profile_archive: &ProfileArchive, executor_name: ExecutorName, ) -> Result { let run_environment_metadata = self.get_run_environment_metadata()?; Ok(UploadMetadata { - version: Some(6), + version: Some(LATEST_UPLOAD_METADATA_VERSION), tokenless: config.token.is_none(), repository_provider: self.get_repository_provider(), commit_hash: run_environment_metadata.ref_.clone(), run_environment_metadata, - profile_md5: archive_hash.into(), + profile_md5: profile_archive.hash.clone(), + profile_encoding: profile_archive.content.encoding(), runner: Runner { name: "codspeed-runner".into(), version: crate::VERSION.into(), diff --git a/src/run/run_environment/provider.rs b/src/run/run_environment/provider.rs index c236db93..2e17ddaa 100644 --- a/src/run/run_environment/provider.rs +++ b/src/run/run_environment/provider.rs @@ -5,7 +5,9 @@ use crate::prelude::*; use crate::run::check_system::SystemInfo; use crate::run::config::Config; use crate::run::runner::ExecutorName; -use crate::run::uploader::{Runner, UploadMetadata}; +use crate::run::uploader::{ + LATEST_UPLOAD_METADATA_VERSION, ProfileArchive, Runner, UploadMetadata, +}; use super::interfaces::{RepositoryProvider, RunEnvironment, RunEnvironmentMetadata, RunPart}; @@ -66,7 +68,7 @@ pub trait RunEnvironmentProvider { &self, config: &Config, system_info: &SystemInfo, - archive_hash: &str, + profile_archive: &ProfileArchive, executor_name: ExecutorName, ) -> Result { let run_environment_metadata = self.get_run_environment_metadata()?; @@ -74,11 +76,12 @@ pub trait RunEnvironmentProvider { let commit_hash = get_commit_hash(&run_environment_metadata.repository_root_path)?; Ok(UploadMetadata { - version: Some(6), + version: Some(LATEST_UPLOAD_METADATA_VERSION), tokenless: config.token.is_none(), repository_provider: self.get_repository_provider(), run_environment_metadata, - profile_md5: archive_hash.into(), + profile_md5: profile_archive.hash.clone(), + profile_encoding: profile_archive.content.encoding(), commit_hash, runner: Runner { name: "codspeed-runner".into(), diff --git a/src/run/uploader/interfaces.rs b/src/run/uploader/interfaces.rs index ad43fd94..a25446af 100644 --- a/src/run/uploader/interfaces.rs +++ b/src/run/uploader/interfaces.rs @@ -7,6 +7,8 @@ use crate::run::{ runner::ExecutorName, }; +pub const LATEST_UPLOAD_METADATA_VERSION: u32 = 7; + #[derive(Deserialize, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct UploadMetadata { @@ -14,6 +16,7 @@ pub struct UploadMetadata { pub version: Option, pub tokenless: bool, pub profile_md5: String, + pub profile_encoding: Option, pub runner: Runner, pub run_environment: RunEnvironment, pub run_part: Option, diff --git a/src/run/uploader/mod.rs b/src/run/uploader/mod.rs index 0c015257..7b60a8f5 100644 --- a/src/run/uploader/mod.rs +++ b/src/run/uploader/mod.rs @@ -1,6 +1,8 @@ mod interfaces; +mod profile_archive; mod upload; mod upload_metadata; pub use interfaces::*; +pub use profile_archive::ProfileArchive; pub use upload::upload; diff --git a/src/run/uploader/profile_archive.rs b/src/run/uploader/profile_archive.rs new file mode 100644 index 00000000..163363a6 --- /dev/null +++ b/src/run/uploader/profile_archive.rs @@ -0,0 +1,71 @@ +use base64::{Engine, engine::general_purpose}; + +use crate::prelude::*; +use std::path::PathBuf; + +#[derive(Debug)] +pub struct ProfileArchive { + pub hash: String, + pub content: ProfileArchiveContent, +} + +#[derive(Debug)] +pub enum ProfileArchiveContent { + CompressedInMemory { data: Vec }, + UncompressedOnDisk { path: PathBuf }, +} + +impl ProfileArchive { + pub fn new_compressed_in_memory(data: Vec) -> Self { + let hash = general_purpose::STANDARD.encode(md5::compute(&data).0); + ProfileArchive { + hash, + content: ProfileArchiveContent::CompressedInMemory { data }, + } + } + + pub fn new_uncompressed_on_disk(path: PathBuf) -> Result { + let metadata = std::fs::metadata(&path)?; + if !metadata.is_file() { + return Err(anyhow!("The provided path is not a file")); + } + let mut file = std::fs::File::open(&path)?; + let mut buffer = Vec::new(); + use std::io::Read; + file.read_to_end(&mut buffer)?; + let hash = general_purpose::STANDARD.encode(md5::compute(&buffer).0); + Ok(ProfileArchive { + hash, + content: ProfileArchiveContent::UncompressedOnDisk { path }, + }) + } +} + +impl ProfileArchiveContent { + pub async fn size(&self) -> Result { + match &self { + ProfileArchiveContent::CompressedInMemory { data } => Ok(data.len() as u64), + ProfileArchiveContent::UncompressedOnDisk { path } => { + let metadata = tokio::fs::metadata(path).await?; + Ok(metadata.len()) + } + } + } + + pub fn encoding(&self) -> Option { + match self { + ProfileArchiveContent::CompressedInMemory { .. } => Some("gzip".to_string()), + _ => None, + } + } +} + +impl Drop for ProfileArchiveContent { + fn drop(&mut self) { + if let ProfileArchiveContent::UncompressedOnDisk { path } = self { + if path.exists() { + let _ = std::fs::remove_file(path); + } + } + } +} diff --git a/src/run/uploader/snapshots/codspeed__run__uploader__upload_metadata__tests__get_metadata_hash-2.snap b/src/run/uploader/snapshots/codspeed__run__uploader__upload_metadata__tests__get_metadata_hash-2.snap index cec82e18..07374191 100644 --- a/src/run/uploader/snapshots/codspeed__run__uploader__upload_metadata__tests__get_metadata_hash-2.snap +++ b/src/run/uploader/snapshots/codspeed__run__uploader__upload_metadata__tests__get_metadata_hash-2.snap @@ -4,9 +4,10 @@ expression: upload_metadata --- { "repositoryProvider": "GITHUB", - "version": 6, + "version": 7, "tokenless": true, "profileMd5": "jp/k05RKuqP3ERQuIIvx4Q==", + "profileEncoding": "gzip", "runner": { "name": "codspeed-runner", "version": "2.1.0", diff --git a/src/run/uploader/upload.rs b/src/run/uploader/upload.rs index eb34aec8..18dbe87f 100644 --- a/src/run/uploader/upload.rs +++ b/src/run/uploader/upload.rs @@ -2,34 +2,72 @@ use crate::run::{ check_system::SystemInfo, config::Config, run_environment::{RunEnvironment, RunEnvironmentProvider}, - runner::ExecutorName, - runner::RunData, - uploader::UploadError, + runner::{ExecutorName, RunData}, + uploader::{UploadError, profile_archive::ProfileArchiveContent}, +}; +use crate::{ + prelude::*, + request_client::{REQUEST_CLIENT, STREAMING_CLIENT}, }; -use crate::{prelude::*, request_client::REQUEST_CLIENT}; use async_compression::tokio::write::GzipEncoder; -use base64::{Engine as _, engine::general_purpose}; use console::style; use reqwest::StatusCode; +use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio_tar::Builder; use super::interfaces::{UploadData, UploadMetadata}; +use super::profile_archive::ProfileArchive; + +/// Create a profile archive from the profile folder and return its md5 hash encoded in base64 +/// +/// For Valgrind, we create a gzip-compressed tar archive of the entire profile folder. +/// For WallTime, we create an uncompressed tar archive of the entire profile folder. +async fn create_profile_archive( + run_data: &RunData, + executor_name: ExecutorName, +) -> Result { + let time_start = std::time::Instant::now(); + let profile_archive = match executor_name { + ExecutorName::Valgrind => { + debug!("Creating compressed tar archive for Valgrind"); + let enc = GzipEncoder::new(Vec::new()); + let mut tar = Builder::new(enc); + tar.append_dir_all(".", run_data.profile_folder.clone()) + .await?; + let mut gzip_encoder = tar.into_inner().await?; + gzip_encoder.shutdown().await?; + let data = gzip_encoder.into_inner(); + ProfileArchive::new_compressed_in_memory(data) + } + ExecutorName::WallTime => { + debug!("Creating uncompressed tar archive for WallTime on disk"); + let temp_file = tempfile::NamedTempFile::new()?; + let temp_path = temp_file.path().to_path_buf(); -/// Create a tar.gz archive buffer of the profile folder and return its md5 hash encoded in base64 -async fn get_profile_archive_buffer(run_data: &RunData) -> Result<(Vec, String)> { - let enc = GzipEncoder::new(Vec::new()); - let mut tar = Builder::new(enc); - tar.append_dir_all(".", run_data.profile_folder.clone()) - .await?; - let mut gzip_encoder = tar.into_inner().await?; - gzip_encoder.shutdown().await?; + // Create a tokio File handle to the temporary file + let file = File::create(&temp_path).await?; + { + let mut tar = Builder::new(file); + tar.append_dir_all(".", run_data.profile_folder.clone()) + .await?; + tar.into_inner().await?.sync_all().await?; + } - let archive_buffer = gzip_encoder.into_inner(); - let archive_digest = md5::compute(archive_buffer.as_slice()); - let archive_hash = general_purpose::STANDARD.encode(archive_digest.0); + // Persist the temporary file to prevent deletion when temp_file goes out of scope + let persistent_path = temp_file.into_temp_path().keep()?; + + ProfileArchive::new_uncompressed_on_disk(persistent_path)? + } + }; + + debug!( + "Created archive ({} bytes) in {:.2?}", + profile_archive.content.size().await?, + time_start.elapsed() + ); - Ok((archive_buffer, archive_hash)) + Ok(profile_archive) } async fn retrieve_upload_data( @@ -84,19 +122,46 @@ async fn retrieve_upload_data( } } -async fn upload_archive_buffer( +async fn upload_profile_archive( upload_data: &UploadData, - archive_buffer: Vec, - archive_hash: &String, + profile_archive: ProfileArchive, ) -> Result<()> { - let response = REQUEST_CLIENT - .put(upload_data.upload_url.clone()) - .header("Content-Type", "application/gzip") - .header("Content-Length", archive_buffer.len()) - .header("Content-MD5", archive_hash) - .body(archive_buffer) - .send() - .await?; + let archive_size = profile_archive.content.size().await?; + let archive_hash = profile_archive.hash; + + let response = match &profile_archive.content { + ProfileArchiveContent::CompressedInMemory { data } => { + // Use regular client with retry middleware for compressed data + let mut request = REQUEST_CLIENT + .put(upload_data.upload_url.clone()) + .header("Content-Type", "application/x-tar") + .header("Content-Length", archive_size) + .header("Content-MD5", archive_hash); + + if let Some(encoding) = profile_archive.content.encoding() { + request = request.header("Content-Encoding", encoding); + } + + request.body(data.clone()).send().await? + } + ProfileArchiveContent::UncompressedOnDisk { path } => { + // Use streaming client without retry middleware for file streams + let file = File::open(path) + .await + .context(format!("Failed to open file at path: {}", path.display()))?; + let stream = tokio_util::io::ReaderStream::new(file); + let body = reqwest::Body::wrap_stream(stream); + + STREAMING_CLIENT + .put(upload_data.upload_url.clone()) + .header("Content-Type", "application/x-tar") + .header("Content-Length", archive_size) + .header("Content-MD5", archive_hash) + .body(body) + .send() + .await? + } + }; if !response.status().is_success() { let status = response.status(); @@ -124,7 +189,7 @@ pub async fn upload( run_data: &RunData, executor_name: ExecutorName, ) -> Result { - let (archive_buffer, archive_hash) = get_profile_archive_buffer(run_data).await?; + let profile_archive = create_profile_archive(run_data, executor_name.clone()).await?; debug!( "Run Environment provider detected: {:?}", @@ -132,7 +197,7 @@ pub async fn upload( ); let upload_metadata = - provider.get_upload_metadata(config, system_info, &archive_hash, executor_name)?; + provider.get_upload_metadata(config, system_info, &profile_archive, executor_name)?; debug!("Upload metadata: {upload_metadata:#?}"); info!( "Linked repository: {}\n", @@ -153,8 +218,11 @@ pub async fn upload( debug!("runId: {}", upload_data.run_id); info!("Uploading performance data..."); - debug!("Uploading {} bytes...", archive_buffer.len()); - upload_archive_buffer(&upload_data, archive_buffer, &archive_hash).await?; + debug!( + "Uploading {} bytes...", + profile_archive.content.size().await? + ); + upload_profile_archive(&upload_data, profile_archive).await?; info!("Performance data uploaded"); Ok(UploadResult { diff --git a/src/run/uploader/upload_metadata.rs b/src/run/uploader/upload_metadata.rs index 1c7eb0c5..56ba4a7b 100644 --- a/src/run/uploader/upload_metadata.rs +++ b/src/run/uploader/upload_metadata.rs @@ -23,16 +23,17 @@ mod tests { Sender, }, runner::ExecutorName, - uploader::{Runner, UploadMetadata}, + uploader::{LATEST_UPLOAD_METADATA_VERSION, Runner, UploadMetadata}, }; #[test] fn test_get_metadata_hash() { let upload_metadata = UploadMetadata { repository_provider: RepositoryProvider::GitHub, - version: Some(6), + version: Some(LATEST_UPLOAD_METADATA_VERSION), tokenless: true, profile_md5: "jp/k05RKuqP3ERQuIIvx4Q==".into(), + profile_encoding: Some("gzip".into()), runner: Runner { name: "codspeed-runner".into(), version: "2.1.0".into(), @@ -76,7 +77,7 @@ mod tests { hash, // Caution: when changing this value, we need to ensure that // the related backend snapshot remains the same - @"f827f6a834c26d39900c0a9e2dddfaaf22956494c8db911fc06fef72878b0c70" + @"7275243b4457a8fa70215c084210bea7518a3994b863e4437fa33c5ae2bd219e" ); assert_json_snapshot!(upload_metadata); }