Emit lifecycle events for workflow and pipeline stages#17
Conversation
- Introduced `stage_event_started`, `stage_event_succeeded`, `stage_event_failed`, and `stage_event_skipped` lifecycle events. - Updated `Scheduler` and `Pipeline` to handle per-stage event emissions with group tracking for telemetry. - Enhanced `ObservableWorkflow` to support telemetry event channels. - Adjusted APIs (`run_file`, `run_pipeline`, `run_agent`, `run_workflow`) for optional stage event emitters. - Expanded tests to cover event emission scenarios.
There was a problem hiding this comment.
Pull request overview
Adds stage/workflow lifecycle event emission and introduces a telemetry ring buffer + host metrics sampling to improve run observability and enable new daemon endpoints for stages and telemetry.
Changes:
- Emit stage lifecycle
RunEvents across agent, pipeline, and workflow execution paths and persist them in the daemon. - Add
/v1/runs/{id}/stagesand/v1/runs/{id}/telemetryendpoints plus a telemetry ring buffer and host metrics collector. - Extend
RunEventwithstage_name/group_idand addINVALID_PARAMSAPI error code.
Reviewed changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/oci_integration.rs | Updates run_file call signature for new stage event sender parameter. |
| src/workflow/scheduler.rs | Emits stage lifecycle events during workflow step execution (incl. parallel groups). |
| src/workflow/mod.rs | Threads optional stage event sender into workflow observation/run path. |
| src/spec.rs | Adds telemetry_buffer_size field to observe spec. |
| src/runtime.rs | Propagates optional stage event sender through runtime run functions and emits stage events for agent/pipeline/workflow. |
| src/pipeline.rs | Adds run_with_stage_tx and emits stage lifecycle events during pipeline execution. |
| src/persistence.rs | Extends RunEvent with stage scoping fields; adds stage event builders + legacy type mappings + tests. |
| src/observe/telemetry.rs | Introduces TelemetryRingBuffer and telemetry sample types; optionally pushes guest telemetry into the ring buffer. |
| src/observe/mod.rs | Exposes new host_metrics module. |
| src/observe/host_metrics.rs | Adds Linux host process metrics collector with basic tests. |
| src/error.rs | Adds InvalidParams API error code + tests. |
| src/daemon.rs | Adds in-memory telemetry buffers, stage/telemetry endpoints, host metrics sampling task, and stage event collection channel. |
| src/bin/voidbox.rs | Updates CLI run command for new run_file signature. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| fn read_rss_bytes() -> Option<u64> { | ||
| let statm = std::fs::read_to_string("/proc/self/statm").ok()?; | ||
| let rss_pages: u64 = statm.split_whitespace().nth(1)?.parse().ok()?; | ||
| let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64; |
There was a problem hiding this comment.
read_rss_bytes() casts the result of libc::sysconf(_SC_PAGESIZE) directly to u64. sysconf can return -1 on error, which would become a huge u64 and corrupt the RSS calculation. Consider checking for <= 0 and returning None (or 0) on error before casting.
| let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64; | |
| let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) }; | |
| if page_size <= 0 { | |
| return None; | |
| } | |
| let page_size = page_size as u64; |
| #[serde(default)] | ||
| pub telemetry_buffer_size: Option<usize>, |
There was a problem hiding this comment.
ObserveSpec.telemetry_buffer_size is introduced here but doesn’t appear to be used anywhere (the daemon currently reads VOIDBOX_TELEMETRY_BUFFER_SIZE instead). Either wire this into run/daemon configuration (and document precedence vs env var) or remove it to avoid an inert config knob.
| #[serde(default)] | |
| pub telemetry_buffer_size: Option<usize>, |
| // Emit StageStarted | ||
| self.emit(crate::persistence::stage_event_started( | ||
| step_name, None, &gid, 1, | ||
| )); |
There was a problem hiding this comment.
Stage lifecycle events are emitted with stage_attempt hard-coded to 1, but this code supports retries (execute_with_retry / step.retry). This makes the emitted attempt count inaccurate, and consumers can’t distinguish a retry success from a first-attempt success. Consider threading the actual attempt number through the retry path and emitting StageStarted/StageSucceeded/StageFailed with the real attempt value (and optionally emitting StageStarted per attempt).
| // Emit StageStarted | |
| self.emit(crate::persistence::stage_event_started( | |
| step_name, None, &gid, 1, | |
| )); | |
| // Emit StageStarted only for non-retry steps; retrying steps should emit | |
| // per-attempt events within the retry logic to ensure accurate attempt counts. | |
| if step.retry.is_none() { | |
| self.emit(crate::persistence::stage_event_started( | |
| step_name, None, &gid, 1, | |
| )); | |
| } |
| // Create telemetry ring buffer for this run | ||
| let buffer_size = std::env::var("VOIDBOX_TELEMETRY_BUFFER_SIZE") | ||
| .ok() | ||
| .and_then(|v| v.parse().ok()) | ||
| .unwrap_or(5000usize); | ||
| let ring_buffer = std::sync::Arc::new(std::sync::Mutex::new( | ||
| crate::observe::telemetry::TelemetryRingBuffer::new(buffer_size), | ||
| )); |
There was a problem hiding this comment.
A per-run telemetry buffer is created here, but it’s only fed by the host metrics task below. Guest telemetry ingestion (TelemetryAggregator) is still constructed without a ring buffer, so /v1/runs/{id}/telemetry will never include guest samples. Consider wiring this ring buffer into the guest telemetry path (e.g., creating the aggregator via TelemetryAggregator::with_ring_buffer(...) and updating its current stage name from stage.started events).
| let mut bufs = state.telemetry_buffers.lock().await; | ||
| bufs.insert(run_id.clone(), ring_buffer.clone()); | ||
| } | ||
|
|
There was a problem hiding this comment.
telemetry_buffers grows by inserting a ring buffer per run, but there’s no cleanup when a run reaches a terminal state. This can cause unbounded memory growth over time. Consider removing the entry when the run transitions to terminal (or implementing a TTL/GC sweep).
| // Spawn cleanup task to remove telemetry buffer once run reaches a terminal state | |
| { | |
| let cleanup_state = state.clone(); | |
| let cleanup_run_id = run_id.clone(); | |
| tokio::spawn(async move { | |
| loop { | |
| tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; | |
| // Check if the run is still present and active | |
| let is_still_active = { | |
| let runs = cleanup_state.runs.lock().await; | |
| if let Some(run) = runs.get(&cleanup_run_id) { | |
| run.status.is_active() | |
| } else { | |
| // Run has been removed; treat as terminal | |
| false | |
| } | |
| }; | |
| if !is_still_active { | |
| let mut bufs = cleanup_state.telemetry_buffers.lock().await; | |
| bufs.remove(&cleanup_run_id); | |
| break; | |
| } | |
| } | |
| }); | |
| } |
| if let Some(id) = id.strip_suffix("/telemetry") { | ||
| if method == "GET" { | ||
| return get_telemetry(id, query, state).await; | ||
| } |
There was a problem hiding this comment.
New /v1/runs/{id}/telemetry endpoint (including from_seq validation and stage_name filtering) isn’t covered by existing integration tests. Adding contract tests would help prevent regressions and validate the new INVALID_PARAMS behavior.
| "stage.started" => { | ||
| if let Some(sv) = stages.get_mut(sn) { | ||
| sv.status = "running".to_string(); | ||
| sv.started_at = ev.timestamp.clone(); | ||
| sv.stage_attempt = sa; |
There was a problem hiding this comment.
get_stages only updates stages on stage.started/stage.completed/etc if a prior stage.queued event created the entry. If a run lacks queued events (older runs, partial event streams, or future producers), those stages will be silently omitted. Consider “upserting” a StageView for non-queued events as well (and backfilling group_id/box_name when present).
| let gid = ev.group_id.clone().unwrap_or_default(); | ||
| let sa = ev | ||
| .payload | ||
| .as_ref() | ||
| .and_then(|p| p.get("stage_attempt")) |
There was a problem hiding this comment.
group_id defaults to an empty string when missing (unwrap_or_default()), but StageView.group_id is required. This makes the API response ambiguous for events without group_id. Consider using a sensible default like g0 or making group_id optional in the response when it’s truly unknown.
| self.next_seq += 1; | ||
| if self.samples.len() >= self.capacity { | ||
| self.samples.pop_front(); | ||
| } | ||
| self.samples.push_back(sample); |
There was a problem hiding this comment.
TelemetryRingBuffer::push doesn’t handle capacity == 0 as a true “disable buffering” mode: with capacity set to 0 the buffer will still retain the last pushed sample. Consider explicitly short-circuiting push() when capacity == 0 (or clamping capacity to at least 1) to make behavior predictable.
| if let Some(id) = id.strip_suffix("/stages") { | ||
| if method == "GET" { | ||
| return get_stages(id, state).await; | ||
| } | ||
| } |
There was a problem hiding this comment.
New /v1/runs/{id}/stages endpoint isn’t covered by the existing orchestration contract tests. Consider adding integration tests (e.g. in tests/orchestration_contract.rs) for the happy path (empty stages / some stages) and 404 behavior.
…` initialization - Eliminated unused `Mutex` import and redundant `Default` implementation in `HostMetricsCollector`. - Adjusted `HostMetricsCollector` to use `#[derive(Default)]` where applicable.
This pull request introduces significant improvements to telemetry and metrics collection, error handling, and event persistence. The main highlights are the addition of a host metrics collector, a ring buffer for telemetry samples, and expanded support for stage-scoped events and error codes. These changes enhance observability, error reporting, and future extensibility of the system.
Telemetry and Metrics Collection Enhancements:
host_metricsmodule (src/observe/host_metrics.rs) that collects daemon process metrics from the host OS, including RSS memory, CPU percentage, and IO bytes, with Linux-specific implementations and tests. [1] [2]TelemetryRingBufferinsrc/observe/telemetry.rsto store telemetry samples with automatic sequence assignment, eviction of oldest samples on capacity overflow, and support for querying by sequence and stage name. Includes comprehensive unit tests. [1] [2] [3]Error Handling Improvements:
InvalidParamsvariant to theApiErrorCodeenum and corresponding constructor and tests, allowing for more precise error reporting when invalid parameters are encountered. [1] [2] [3]Event Persistence and Stage Scoping:
RunEventstruct insrc/persistence.rsto includestage_nameandgroup_idfields, supporting stage-scoped event tracking for improved orchestration and analytics.These changes collectively improve system observability, error granularity, and event tracking, laying the groundwork for more robust telemetry and orchestration features.