Skip to content

Emit lifecycle events for workflow and pipeline stages#17

Merged
dpsoft merged 2 commits intomainfrom
feat/deamon-improvements-2
Mar 4, 2026
Merged

Emit lifecycle events for workflow and pipeline stages#17
dpsoft merged 2 commits intomainfrom
feat/deamon-improvements-2

Conversation

@dpsoft
Copy link
Contributor

@dpsoft dpsoft commented Mar 4, 2026

This pull request introduces significant improvements to telemetry and metrics collection, error handling, and event persistence. The main highlights are the addition of a host metrics collector, a ring buffer for telemetry samples, and expanded support for stage-scoped events and error codes. These changes enhance observability, error reporting, and future extensibility of the system.

Telemetry and Metrics Collection Enhancements:

  • Added a new host_metrics module (src/observe/host_metrics.rs) that collects daemon process metrics from the host OS, including RSS memory, CPU percentage, and IO bytes, with Linux-specific implementations and tests. [1] [2]
  • Introduced a TelemetryRingBuffer in src/observe/telemetry.rs to store telemetry samples with automatic sequence assignment, eviction of oldest samples on capacity overflow, and support for querying by sequence and stage name. Includes comprehensive unit tests. [1] [2] [3]

Error Handling Improvements:

  • Added a new InvalidParams variant to the ApiErrorCode enum and corresponding constructor and tests, allowing for more precise error reporting when invalid parameters are encountered. [1] [2] [3]

Event Persistence and Stage Scoping:

  • Extended the RunEvent struct in src/persistence.rs to include stage_name and group_id fields, supporting stage-scoped event tracking for improved orchestration and analytics.
  • Updated legacy event type mapping to recognize stage-related events, enabling better compatibility and future migration to v2 orchestration.

These changes collectively improve system observability, error granularity, and event tracking, laying the groundwork for more robust telemetry and orchestration features.

- Introduced `stage_event_started`, `stage_event_succeeded`, `stage_event_failed`, and `stage_event_skipped` lifecycle events.
- Updated `Scheduler` and `Pipeline` to handle per-stage event emissions with group tracking for telemetry.
- Enhanced `ObservableWorkflow` to support telemetry event channels.
- Adjusted APIs (`run_file`, `run_pipeline`, `run_agent`, `run_workflow`) for optional stage event emitters.
- Expanded tests to cover event emission scenarios.
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds stage/workflow lifecycle event emission and introduces a telemetry ring buffer + host metrics sampling to improve run observability and enable new daemon endpoints for stages and telemetry.

Changes:

  • Emit stage lifecycle RunEvents across agent, pipeline, and workflow execution paths and persist them in the daemon.
  • Add /v1/runs/{id}/stages and /v1/runs/{id}/telemetry endpoints plus a telemetry ring buffer and host metrics collector.
  • Extend RunEvent with stage_name / group_id and add INVALID_PARAMS API error code.

Reviewed changes

Copilot reviewed 12 out of 13 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
tests/oci_integration.rs Updates run_file call signature for new stage event sender parameter.
src/workflow/scheduler.rs Emits stage lifecycle events during workflow step execution (incl. parallel groups).
src/workflow/mod.rs Threads optional stage event sender into workflow observation/run path.
src/spec.rs Adds telemetry_buffer_size field to observe spec.
src/runtime.rs Propagates optional stage event sender through runtime run functions and emits stage events for agent/pipeline/workflow.
src/pipeline.rs Adds run_with_stage_tx and emits stage lifecycle events during pipeline execution.
src/persistence.rs Extends RunEvent with stage scoping fields; adds stage event builders + legacy type mappings + tests.
src/observe/telemetry.rs Introduces TelemetryRingBuffer and telemetry sample types; optionally pushes guest telemetry into the ring buffer.
src/observe/mod.rs Exposes new host_metrics module.
src/observe/host_metrics.rs Adds Linux host process metrics collector with basic tests.
src/error.rs Adds InvalidParams API error code + tests.
src/daemon.rs Adds in-memory telemetry buffers, stage/telemetry endpoints, host metrics sampling task, and stage event collection channel.
src/bin/voidbox.rs Updates CLI run command for new run_file signature.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

fn read_rss_bytes() -> Option<u64> {
let statm = std::fs::read_to_string("/proc/self/statm").ok()?;
let rss_pages: u64 = statm.split_whitespace().nth(1)?.parse().ok()?;
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64;
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_rss_bytes() casts the result of libc::sysconf(_SC_PAGESIZE) directly to u64. sysconf can return -1 on error, which would become a huge u64 and corrupt the RSS calculation. Consider checking for <= 0 and returning None (or 0) on error before casting.

Suggested change
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64;
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
if page_size <= 0 {
return None;
}
let page_size = page_size as u64;

Copilot uses AI. Check for mistakes.
Comment on lines +103 to +104
#[serde(default)]
pub telemetry_buffer_size: Option<usize>,
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ObserveSpec.telemetry_buffer_size is introduced here but doesn’t appear to be used anywhere (the daemon currently reads VOIDBOX_TELEMETRY_BUFFER_SIZE instead). Either wire this into run/daemon configuration (and document precedence vs env var) or remove it to avoid an inert config knob.

Suggested change
#[serde(default)]
pub telemetry_buffer_size: Option<usize>,

Copilot uses AI. Check for mistakes.
Comment on lines +221 to +224
// Emit StageStarted
self.emit(crate::persistence::stage_event_started(
step_name, None, &gid, 1,
));
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stage lifecycle events are emitted with stage_attempt hard-coded to 1, but this code supports retries (execute_with_retry / step.retry). This makes the emitted attempt count inaccurate, and consumers can’t distinguish a retry success from a first-attempt success. Consider threading the actual attempt number through the retry path and emitting StageStarted/StageSucceeded/StageFailed with the real attempt value (and optionally emitting StageStarted per attempt).

Suggested change
// Emit StageStarted
self.emit(crate::persistence::stage_event_started(
step_name, None, &gid, 1,
));
// Emit StageStarted only for non-retry steps; retrying steps should emit
// per-attempt events within the retry logic to ensure accurate attempt counts.
if step.retry.is_none() {
self.emit(crate::persistence::stage_event_started(
step_name, None, &gid, 1,
));
}

Copilot uses AI. Check for mistakes.
Comment on lines +345 to +352
// Create telemetry ring buffer for this run
let buffer_size = std::env::var("VOIDBOX_TELEMETRY_BUFFER_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5000usize);
let ring_buffer = std::sync::Arc::new(std::sync::Mutex::new(
crate::observe::telemetry::TelemetryRingBuffer::new(buffer_size),
));
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A per-run telemetry buffer is created here, but it’s only fed by the host metrics task below. Guest telemetry ingestion (TelemetryAggregator) is still constructed without a ring buffer, so /v1/runs/{id}/telemetry will never include guest samples. Consider wiring this ring buffer into the guest telemetry path (e.g., creating the aggregator via TelemetryAggregator::with_ring_buffer(...) and updating its current stage name from stage.started events).

Copilot uses AI. Check for mistakes.
let mut bufs = state.telemetry_buffers.lock().await;
bufs.insert(run_id.clone(), ring_buffer.clone());
}

Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

telemetry_buffers grows by inserting a ring buffer per run, but there’s no cleanup when a run reaches a terminal state. This can cause unbounded memory growth over time. Consider removing the entry when the run transitions to terminal (or implementing a TTL/GC sweep).

Suggested change
// Spawn cleanup task to remove telemetry buffer once run reaches a terminal state
{
let cleanup_state = state.clone();
let cleanup_run_id = run_id.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
// Check if the run is still present and active
let is_still_active = {
let runs = cleanup_state.runs.lock().await;
if let Some(run) = runs.get(&cleanup_run_id) {
run.status.is_active()
} else {
// Run has been removed; treat as terminal
false
}
};
if !is_still_active {
let mut bufs = cleanup_state.telemetry_buffers.lock().await;
bufs.remove(&cleanup_run_id);
break;
}
}
});
}

Copilot uses AI. Check for mistakes.
Comment on lines +180 to +183
if let Some(id) = id.strip_suffix("/telemetry") {
if method == "GET" {
return get_telemetry(id, query, state).await;
}
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New /v1/runs/{id}/telemetry endpoint (including from_seq validation and stage_name filtering) isn’t covered by existing integration tests. Adding contract tests would help prevent regressions and validate the new INVALID_PARAMS behavior.

Copilot uses AI. Check for mistakes.
Comment on lines +901 to +905
"stage.started" => {
if let Some(sv) = stages.get_mut(sn) {
sv.status = "running".to_string();
sv.started_at = ev.timestamp.clone();
sv.stage_attempt = sa;
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_stages only updates stages on stage.started/stage.completed/etc if a prior stage.queued event created the entry. If a run lacks queued events (older runs, partial event streams, or future producers), those stages will be silently omitted. Consider “upserting” a StageView for non-queued events as well (and backfilling group_id/box_name when present).

Copilot uses AI. Check for mistakes.
Comment on lines +860 to +864
let gid = ev.group_id.clone().unwrap_or_default();
let sa = ev
.payload
.as_ref()
.and_then(|p| p.get("stage_attempt"))
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group_id defaults to an empty string when missing (unwrap_or_default()), but StageView.group_id is required. This makes the API response ambiguous for events without group_id. Consider using a sensible default like g0 or making group_id optional in the response when it’s truly unknown.

Copilot uses AI. Check for mistakes.
Comment on lines +75 to +79
self.next_seq += 1;
if self.samples.len() >= self.capacity {
self.samples.pop_front();
}
self.samples.push_back(sample);
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TelemetryRingBuffer::push doesn’t handle capacity == 0 as a true “disable buffering” mode: with capacity set to 0 the buffer will still retain the last pushed sample. Consider explicitly short-circuiting push() when capacity == 0 (or clamping capacity to at least 1) to make behavior predictable.

Copilot uses AI. Check for mistakes.
Comment on lines +175 to +179
if let Some(id) = id.strip_suffix("/stages") {
if method == "GET" {
return get_stages(id, state).await;
}
}
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New /v1/runs/{id}/stages endpoint isn’t covered by the existing orchestration contract tests. Consider adding integration tests (e.g. in tests/orchestration_contract.rs) for the happy path (empty stages / some stages) and 404 behavior.

Copilot uses AI. Check for mistakes.
…` initialization

- Eliminated unused `Mutex` import and redundant `Default` implementation in `HostMetricsCollector`.
- Adjusted `HostMetricsCollector` to use `#[derive(Default)]` where applicable.
@dpsoft dpsoft merged commit ee19920 into main Mar 4, 2026
14 checks passed
@dpsoft dpsoft deleted the feat/deamon-improvements-2 branch March 4, 2026 00:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants