diff --git a/src/executor/memory/executor.rs b/src/executor/memory/executor.rs index 75b7943b..87de24e2 100644 --- a/src/executor/memory/executor.rs +++ b/src/executor/memory/executor.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use ipc_channel::ipc; use memtrack::MemtrackIpcClient; use memtrack::MemtrackIpcServer; -use runner_shared::artifacts::{ArtifactExt, ExecutionTimestamps}; +use runner_shared::artifacts::{ArtifactExt, ExecutionTimestamps, MemtrackArtifact}; use runner_shared::fifo::Command as FifoCommand; use runner_shared::fifo::IntegrationMode; use std::path::Path; @@ -117,7 +117,25 @@ impl Executor for MemoryExecutor { Ok(()) } - async fn teardown(&self, _execution_context: &ExecutionContext) -> Result<()> { + async fn teardown(&self, execution_context: &ExecutionContext) -> Result<()> { + let files: Vec<_> = std::fs::read_dir(execution_context.profile_folder.join("results"))? + .filter_map(Result::ok) + // Filter out non-memtrack files: + .filter(|entry| { + entry + .file_name() + .to_string_lossy() + .contains(MemtrackArtifact::name()) + }) + // Filter empty files: + .filter(|entry| entry.metadata().map(|m| m.len() == 0).unwrap_or_default()) + .collect(); + if files.is_empty() { + bail!( + "No memtrack artifact files found. Does the integration support memory profiling?" + ); + } + Ok(()) } } @@ -154,14 +172,19 @@ impl MemoryExecutor { let on_cmd = async move |cmd: &FifoCommand| { match cmd { - FifoCommand::CurrentBenchmark { pid, uri } => { - debug!("Current benchmark: {pid}, {uri}"); + FifoCommand::SetVersion(protocol_version) => { + if *protocol_version < 2 { + bail!( + "Memory profiling requires protocol version 2 or higher, but the integration is using version {protocol_version}. \ + This integration doesn't support memory profiling. Please update your integration to a version that supports memory profiling.", + ); + } } FifoCommand::StartBenchmark => { debug!("Enabling memtrack via IPC"); if let Err(e) = ipc_client.enable() { error!("Failed to enable memtrack: {e}"); - return Ok(FifoCommand::Err); + return Ok(Some(FifoCommand::Err)); } } FifoCommand::StopBenchmark => { @@ -169,21 +192,18 @@ impl MemoryExecutor { if let Err(e) = ipc_client.disable() { // There's a chance that memtrack has already exited here, so just log as debug debug!("Failed to disable memtrack: {e}"); - return Ok(FifoCommand::Err); + return Ok(Some(FifoCommand::Err)); } } FifoCommand::GetIntegrationMode => { - return Ok(FifoCommand::IntegrationModeResponse( + return Ok(Some(FifoCommand::IntegrationModeResponse( IntegrationMode::Analysis, - )); - } - _ => { - warn!("Unhandled FIFO command: {cmd:?}"); - return Ok(FifoCommand::Err); + ))); } + _ => {} } - Ok(FifoCommand::Ack) + Ok(None) }; let (marker_result, _) = runner_fifo diff --git a/src/executor/shared/fifo.rs b/src/executor/shared/fifo.rs index b9e5ac04..7e71aff4 100644 --- a/src/executor/shared/fifo.rs +++ b/src/executor/shared/fifo.rs @@ -123,10 +123,14 @@ impl RunnerFifo { /// Handles all incoming FIFO messages until it's closed, or until the health check closure /// returns `false` or an error. + /// + /// The `handle_cmd` callback is invoked first for each command. If it returns `Some(response)`, + /// that response is sent and the shared implementation is skipped. If it returns `None`, + /// the command falls through to the shared implementation for standard handling. pub async fn handle_fifo_messages( &mut self, mut health_check: impl AsyncFnMut() -> anyhow::Result, - mut handle_cmd: impl AsyncFnMut(&FifoCommand) -> anyhow::Result, + mut handle_cmd: impl AsyncFnMut(&FifoCommand) -> anyhow::Result>, ) -> anyhow::Result<(ExecutionTimestamps, FifoBenchmarkData)> { let mut bench_order_by_timestamp = Vec::<(u64, String)>::new(); let mut bench_pids = HashSet::::new(); @@ -158,38 +162,44 @@ impl RunnerFifo { }; debug!("Received command: {cmd:?}"); + // Try executor-specific handler first + if let Some(response) = handle_cmd(&cmd).await? { + self.send_cmd(response).await?; + continue; + } + + // Fall through to shared implementation for standard commands match &cmd { FifoCommand::CurrentBenchmark { pid, uri } => { bench_order_by_timestamp.push((current_time(), uri.to_string())); bench_pids.insert(*pid); + self.send_cmd(FifoCommand::Ack).await?; } FifoCommand::StartBenchmark => { - if benchmark_started { + if !benchmark_started { + benchmark_started = true; + markers.push(MarkerType::SampleStart(current_time())); + } else { warn!("Received duplicate StartBenchmark command, ignoring"); - self.send_cmd(FifoCommand::Ack).await?; - continue; } - benchmark_started = true; - markers.push(MarkerType::SampleStart(current_time())); + self.send_cmd(FifoCommand::Ack).await?; } FifoCommand::StopBenchmark => { - if !benchmark_started { + if benchmark_started { + benchmark_started = false; + markers.push(MarkerType::SampleEnd(current_time())); + } else { warn!("Received StopBenchmark command before StartBenchmark, ignoring"); - self.send_cmd(FifoCommand::Ack).await?; - continue; } - benchmark_started = false; - markers.push(MarkerType::SampleEnd(current_time())); + self.send_cmd(FifoCommand::Ack).await?; } FifoCommand::SetIntegration { name, version } => { integration = Some((name.into(), version.into())); self.send_cmd(FifoCommand::Ack).await?; - continue; } FifoCommand::AddMarker { marker, .. } => { markers.push(*marker); self.send_cmd(FifoCommand::Ack).await?; - continue; } FifoCommand::SetVersion(protocol_version) => { match protocol_version.cmp(&runner_shared::fifo::CURRENT_PROTOCOL_VERSION) { @@ -204,7 +214,6 @@ impl RunnerFifo { ); } self.send_cmd(FifoCommand::Ack).await?; - continue; } Ordering::Greater => bail!( "Runner is using an incompatible protocol version ({} < {protocol_version}). Please update the runner to the latest version.", @@ -212,14 +221,14 @@ impl RunnerFifo { ), Ordering::Equal => { self.send_cmd(FifoCommand::Ack).await?; - continue; } } } - _ => {} + _ => { + warn!("Unhandled FIFO command: {cmd:?}"); + self.send_cmd(FifoCommand::Err).await?; + } } - - self.send_cmd(handle_cmd(&cmd).await?).await?; } let marker_result = ExecutionTimestamps::new(&bench_order_by_timestamp, &markers); diff --git a/src/executor/wall_time/perf/mod.rs b/src/executor/wall_time/perf/mod.rs index ebab89ad..3ea55d5d 100644 --- a/src/executor/wall_time/perf/mod.rs +++ b/src/executor/wall_time/perf/mod.rs @@ -295,19 +295,19 @@ impl PerfRunner { } FifoCommand::PingPerf => { if perf_fifo.lock().await.ping().await.is_err() { - return Ok(FifoCommand::Err); + return Ok(Some(FifoCommand::Err)); } + return Ok(Some(FifoCommand::Ack)); } FifoCommand::GetIntegrationMode => { - return Ok(FifoCommand::IntegrationModeResponse(IntegrationMode::Perf)); - } - _ => { - warn!("Unhandled FIFO command: {cmd:?}"); - return Ok(FifoCommand::Err); + return Ok(Some(FifoCommand::IntegrationModeResponse( + IntegrationMode::Perf, + ))); } + _ => {} } - Ok(FifoCommand::Ack) + Ok(None) }; let (marker_result, fifo_data) = runner_fifo