Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/run/runner/wall_time/perf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ impl PerfRunner {
}
Err(_) => continue,
};
debug!("Received command: {cmd:?}");
trace!("Received command: {cmd:?}");

match cmd {
FifoCommand::CurrentBenchmark { pid, uri } => {
Expand Down
25 changes: 23 additions & 2 deletions src/run/uploader/profile_archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct ProfileArchive {
pub enum ProfileArchiveContent {
CompressedInMemory { data: Vec<u8> },
UncompressedOnDisk { path: PathBuf },
CompressedOnDisk { path: PathBuf },
}

impl ProfileArchive {
Expand All @@ -39,13 +40,30 @@ impl ProfileArchive {
content: ProfileArchiveContent::UncompressedOnDisk { path },
})
}

pub fn new_compressed_on_disk(path: PathBuf) -> Result<Self> {
let metadata = std::fs::metadata(&path)?;
if !metadata.is_file() {
return Err(anyhow!("The provided path is not a file"));
}
let mut file = std::fs::File::open(&path)?;
let mut buffer = Vec::new();
use std::io::Read;
file.read_to_end(&mut buffer)?;
let hash = general_purpose::STANDARD.encode(md5::compute(&buffer).0);
Ok(ProfileArchive {
hash,
content: ProfileArchiveContent::CompressedOnDisk { path },
})
}
}

impl ProfileArchiveContent {
pub async fn size(&self) -> Result<u64> {
match &self {
ProfileArchiveContent::CompressedInMemory { data } => Ok(data.len() as u64),
ProfileArchiveContent::UncompressedOnDisk { path } => {
ProfileArchiveContent::UncompressedOnDisk { path }
| ProfileArchiveContent::CompressedOnDisk { path } => {
let metadata = tokio::fs::metadata(path).await?;
Ok(metadata.len())
}
Expand All @@ -55,14 +73,17 @@ impl ProfileArchiveContent {
pub fn encoding(&self) -> Option<String> {
match self {
ProfileArchiveContent::CompressedInMemory { .. } => Some("gzip".to_string()),
ProfileArchiveContent::CompressedOnDisk { .. } => Some("gzip".to_string()),
_ => None,
}
}
}

impl Drop for ProfileArchiveContent {
fn drop(&mut self) {
if let ProfileArchiveContent::UncompressedOnDisk { path } = self {
if let ProfileArchiveContent::UncompressedOnDisk { path }
| ProfileArchiveContent::CompressedOnDisk { path } = self
{
if path.exists() {
let _ = std::fs::remove_file(path);
}
Expand Down
89 changes: 73 additions & 16 deletions src/run/uploader/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,39 @@ use tokio_tar::Builder;
use super::interfaces::{UploadData, UploadMetadata};
use super::profile_archive::ProfileArchive;

fn bytes_to_mib(bytes: u64) -> u64 {
bytes / (1024 * 1024)
}

/// Maximum uncompressed profile folder size in MiB before compression is required
const MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES: u64 = 1024 * 1024 * 1024 * 5; // 5 GiB

/// Calculate the total size of a directory in bytes
async fn calculate_folder_size(path: &std::path::Path) -> Result<u64> {
let mut total_size = 0u64;
let mut dirs_to_process = vec![path.to_path_buf()];

while let Some(current_dir) = dirs_to_process.pop() {
let mut entries = tokio::fs::read_dir(&current_dir).await?;

while let Some(entry) = entries.next_entry().await? {
let metadata = entry.metadata().await?;
if metadata.is_file() {
total_size += metadata.len();
} else if metadata.is_dir() {
dirs_to_process.push(entry.path());
}
}
}

Ok(total_size)
}

/// Create a profile archive from the profile folder and return its md5 hash encoded in base64
///
/// For Valgrind, we create a gzip-compressed tar archive of the entire profile folder.
/// For WallTime, we create an uncompressed tar archive of the entire profile folder.
/// For WallTime, we check the folder size and create either a compressed or uncompressed tar archive
/// based on the MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES threshold.
async fn create_profile_archive(
run_data: &RunData,
executor_name: ExecutorName,
Expand All @@ -41,23 +70,47 @@ async fn create_profile_archive(
ProfileArchive::new_compressed_in_memory(data)
}
ExecutorName::WallTime => {
debug!("Creating uncompressed tar archive for WallTime on disk");
// Check folder size to decide on compression
let folder_size_bytes = calculate_folder_size(&run_data.profile_folder).await?;
let should_compress = folder_size_bytes >= MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES;

let temp_file = tempfile::NamedTempFile::new()?;
let temp_path = temp_file.path().to_path_buf();

// Create a tokio File handle to the temporary file
let file = File::create(&temp_path).await?;
{

// Persist the temporary file to prevent deletion when temp_file goes out of scope
let persistent_path = temp_file.into_temp_path().keep()?;

if should_compress {
debug!(
"Profile folder size ({} MiB) exceeds threshold ({} MiB), creating compressed tar.gz archive on disk",
bytes_to_mib(folder_size_bytes),
bytes_to_mib(MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES)
);
let enc = GzipEncoder::new(file);
let mut tar = Builder::new(enc);
tar.append_dir_all(".", run_data.profile_folder.clone())
.await?;
let mut gzip_encoder = tar.into_inner().await?;
gzip_encoder.shutdown().await?;
gzip_encoder.into_inner().sync_all().await?;

ProfileArchive::new_compressed_on_disk(persistent_path)?
} else {
debug!(
"Profile folder size ({} MiB) is below threshold ({} MiB), creating uncompressed tar archive on disk",
bytes_to_mib(folder_size_bytes),
bytes_to_mib(MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES)
);
let mut tar = Builder::new(file);
tar.append_dir_all(".", run_data.profile_folder.clone())
.await?;
tar.into_inner().await?.sync_all().await?;
}

// Persist the temporary file to prevent deletion when temp_file goes out of scope
let persistent_path = temp_file.into_temp_path().keep()?;

ProfileArchive::new_uncompressed_on_disk(persistent_path)?
ProfileArchive::new_uncompressed_on_disk(persistent_path)?
}
}
};

Expand Down Expand Up @@ -130,36 +183,40 @@ async fn upload_profile_archive(
let archive_hash = profile_archive.hash;

let response = match &profile_archive.content {
ProfileArchiveContent::CompressedInMemory { data } => {
content @ ProfileArchiveContent::CompressedInMemory { data } => {
// Use regular client with retry middleware for compressed data
let mut request = REQUEST_CLIENT
.put(upload_data.upload_url.clone())
.header("Content-Type", "application/x-tar")
.header("Content-Length", archive_size)
.header("Content-MD5", archive_hash);

if let Some(encoding) = profile_archive.content.encoding() {
if let Some(encoding) = content.encoding() {
request = request.header("Content-Encoding", encoding);
}

request.body(data.clone()).send().await?
}
ProfileArchiveContent::UncompressedOnDisk { path } => {
content @ ProfileArchiveContent::UncompressedOnDisk { path }
| content @ ProfileArchiveContent::CompressedOnDisk { path } => {
// Use streaming client without retry middleware for file streams
let file = File::open(path)
.await
.context(format!("Failed to open file at path: {}", path.display()))?;
let stream = tokio_util::io::ReaderStream::new(file);
let body = reqwest::Body::wrap_stream(stream);

STREAMING_CLIENT
let mut request = STREAMING_CLIENT
.put(upload_data.upload_url.clone())
.header("Content-Type", "application/x-tar")
.header("Content-Length", archive_size)
.header("Content-MD5", archive_hash)
.body(body)
.send()
.await?
.header("Content-MD5", archive_hash);

if let Some(encoding) = content.encoding() {
request = request.header("Content-Encoding", encoding);
}

request.body(body).send().await?
}
};

Expand Down
Loading