Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 187 additions & 43 deletions packages/desktop/src-tauri/src/commands/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ use std::io::Read;
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::time::{SystemTime, UNIX_EPOCH};
use tauri::AppHandle;
use tauri::Emitter;
use tauri::State;
use tauri_plugin_shell::process::CommandEvent;
use tauri_plugin_shell::ShellExt;
use uuid::Uuid;

Expand Down Expand Up @@ -708,6 +711,18 @@ pub fn orchestrator_instance_dispose(
Ok(result.disposed)
}

/// Snapshot the last ≤20 non-empty lines from the orchestrator output ring buffer.
fn snapshot_proc_output(buf: &Arc<Mutex<String>>) -> String {
let locked = buf.lock().unwrap_or_else(|e| e.into_inner());
let trimmed = locked.trim();
if trimmed.is_empty() {
return "<no orchestrator output captured>".to_string();
}
let lines: Vec<&str> = trimmed.lines().filter(|l| !l.trim().is_empty()).collect();
let start = lines.len().saturating_sub(20);
lines[start..].join("\n")
}

#[tauri::command]
pub fn orchestrator_start_detached(
app: AppHandle,
Expand Down Expand Up @@ -796,50 +811,99 @@ pub fn orchestrator_start_detached(

// Start a dedicated host stack for this workspace.
// We pass explicit tokens and a free port so the UI can connect deterministically.
{
let mut args: Vec<String> = vec![
"start".to_string(),
"--workspace".to_string(),
workspace_path.clone(),
"--approval".to_string(),
"auto".to_string(),
"--no-opencode-auth".to_string(),
"--opencode-router".to_string(),
"true".to_string(),
"--detach".to_string(),
"--openwork-host".to_string(),
"0.0.0.0".to_string(),
"--openwork-port".to_string(),
port.to_string(),
"--openwork-token".to_string(),
token.clone(),
"--openwork-host-token".to_string(),
host_token.clone(),
"--run-id".to_string(),
sandbox_run_id.clone(),
];
let mut args: Vec<String> = vec![
"start".to_string(),
"--workspace".to_string(),
workspace_path.clone(),
"--approval".to_string(),
"auto".to_string(),
"--no-opencode-auth".to_string(),
"--opencode-router".to_string(),
"true".to_string(),
"--detach".to_string(),
"--openwork-host".to_string(),
"0.0.0.0".to_string(),
"--openwork-port".to_string(),
port.to_string(),
"--openwork-token".to_string(),
token.clone(),
"--openwork-host-token".to_string(),
host_token.clone(),
"--run-id".to_string(),
sandbox_run_id.clone(),
];

if wants_docker_sandbox {
args.push("--sandbox".to_string());
args.push("docker".to_string());
}
if wants_docker_sandbox {
args.push("--sandbox".to_string());
args.push("docker".to_string());
}

// Convert to &str for the shell command builder.
let mut str_args: Vec<&str> = Vec::with_capacity(args.len());
for arg in &args {
str_args.push(arg.as_str());
}
let str_args: Vec<&str> = args.iter().map(String::as_str).collect();

command
.args(str_args)
.spawn()
.map_err(|e| format!("Failed to start openwork orchestrator: {e}"))?;
eprintln!(
"[sandbox-create][at={}][runId={}][stage=spawn] launched openwork sidecar for detached sandbox host",
now_ms(),
sandbox_run_id
);
}
let (mut rx, child) = command
.args(str_args)
.spawn()
.map_err(|e| format!("Failed to start openwork orchestrator: {e}"))?;

eprintln!(
"[sandbox-create][at={}][runId={}][stage=spawn] launched openwork sidecar port={} bindHost=0.0.0.0 probeHost=127.0.0.1",
now_ms(),
sandbox_run_id,
port,
);

// Drain the sidecar's stdout/stderr into a bounded ring buffer so we can
// include a snippet in the error message if the health probe times out or
// the process exits unexpectedly. The CommandChild is kept alive inside
// the async task to prevent premature process teardown.
let proc_output: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));
let proc_output_writer = proc_output.clone();
let proc_exited = Arc::new(AtomicBool::new(false));
let proc_exited_writer = proc_exited.clone();
let run_id_reader = sandbox_run_id.clone();

tauri::async_runtime::spawn(async move {
// Hold the CommandChild alive for the lifetime of this drain task.
let _held = child;
while let Some(event) = rx.recv().await {
match event {
CommandEvent::Stdout(bytes) | CommandEvent::Stderr(bytes) => {
let text = String::from_utf8_lossy(&bytes).trim().to_string();
if !text.is_empty() {
eprintln!(
"[sandbox-create][runId={}][sidecar] {}",
run_id_reader, text
);
let mut buf = proc_output_writer.lock().unwrap_or_else(|e| e.into_inner());
// Rolling ~4 KB cap: drop oldest half when exceeded.
if buf.len() > 4000 {
let keep_from = buf.len() - 2000;
*buf = buf[keep_from..].to_string();
}
buf.push_str(&text);
buf.push('\n');
}
}
CommandEvent::Terminated(payload) => {
eprintln!(
"[sandbox-create][runId={}][stage=proc-exit] sidecar terminated code={:?} signal={:?}",
run_id_reader, payload.code, payload.signal
);
proc_exited_writer.store(true, Ordering::Relaxed);
break;
}
CommandEvent::Error(msg) => {
eprintln!(
"[sandbox-create][runId={}][stage=proc-error] {}",
run_id_reader, msg
);
proc_exited_writer.store(true, Ordering::Relaxed);
break;
}
_ => {}
}
}
});

emit_sandbox_progress(
&app,
Expand All @@ -858,10 +922,43 @@ pub fn orchestrator_start_detached(
let mut last_container_state: Option<String> = None;
let mut last_container_probe_error: Option<String> = None;
let mut last_error: Option<String> = None;
let mut first_health_attempt_logged = false;

while start.elapsed() < Duration::from_millis(health_timeout_ms) {
let elapsed_ms = start.elapsed().as_millis() as u64;

// Fast-fail if the sidecar process exited before the health endpoint became ready.
if proc_exited.load(Ordering::Relaxed) {
let output_snippet = snapshot_proc_output(&proc_output);
let base = last_error
.as_deref()
.unwrap_or("Connection refused")
.to_string();
let message = format!(
"Orchestrator process exited before health endpoint was ready: {base}\nOrchestrator output:\n{output_snippet}"
);
emit_sandbox_progress(
&app,
&sandbox_run_id,
"error",
"Sandbox process exited early.",
json!({
"error": message,
"stage": "proc-exit-early",
"elapsedMs": elapsed_ms,
"openworkUrl": openwork_url,
"orchestratorOutput": output_snippet,
}),
);
eprintln!(
"[sandbox-create][at={}][runId={}][stage=proc-exit-early] elapsedMs={}",
now_ms(),
sandbox_run_id,
elapsed_ms
);
return Err(message);
}

if wants_docker_sandbox {
if last_container_check.elapsed() > Duration::from_millis(1500) {
last_container_check = Instant::now();
Expand Down Expand Up @@ -909,6 +1006,17 @@ pub fn orchestrator_start_detached(
}
}

if !first_health_attempt_logged {
first_health_attempt_logged = true;
eprintln!(
"[sandbox-create][at={}][runId={}][stage=first-health-attempt] url={}/health elapsedMs={}",
now_ms(),
sandbox_run_id,
openwork_url.trim_end_matches('/'),
elapsed_ms
);
}

match ureq::get(&format!("{}/health", openwork_url.trim_end_matches('/'))).call() {
Ok(response) if response.status() >= 200 && response.status() < 300 => {
emit_sandbox_progress(
Expand Down Expand Up @@ -954,27 +1062,32 @@ pub fn orchestrator_start_detached(
}

if start.elapsed() >= Duration::from_millis(health_timeout_ms) {
let message =
let output_snippet = snapshot_proc_output(&proc_output);
let base_message =
last_error.unwrap_or_else(|| "Timed out waiting for OpenWork server".to_string());
let message =
format!("{base_message}\nOrchestrator output (last lines):\n{output_snippet}");
emit_sandbox_progress(
&app,
&sandbox_run_id,
"error",
"Sandbox failed to start.",
json!({
"error": message,
"stage": "timeout",
"elapsedMs": start.elapsed().as_millis() as u64,
"openworkUrl": openwork_url,
"containerState": last_container_state,
"containerProbeError": last_container_probe_error,
"orchestratorOutput": output_snippet,
}),
);
eprintln!(
"[sandbox-create][at={}][runId={}][stage=timeout] health wait timed out after {}ms error={}",
now_ms(),
sandbox_run_id,
start.elapsed().as_millis(),
message
base_message
);
return Err(message);
}
Expand Down Expand Up @@ -1573,4 +1686,35 @@ exit 0

let _ = fs::remove_dir_all(&tmp);
}

#[test]
fn snapshot_proc_output_empty_returns_placeholder() {
let buf = Arc::new(Mutex::new(String::new()));
assert_eq!(
snapshot_proc_output(&buf),
"<no orchestrator output captured>"
);
}

#[test]
fn snapshot_proc_output_returns_last_20_lines() {
let mut content = String::new();
for i in 1..=30 {
content.push_str(&format!("line {i}\n"));
}
let buf = Arc::new(Mutex::new(content));
let snap = snapshot_proc_output(&buf);
let lines: Vec<&str> = snap.lines().collect();
assert_eq!(lines.len(), 20);
assert_eq!(lines[0], "line 11");
assert_eq!(lines[19], "line 30");
}

#[test]
fn snapshot_proc_output_skips_blank_lines() {
let buf = Arc::new(Mutex::new("hello\n\n\nworld\n".to_string()));
let snap = snapshot_proc_output(&buf);
let lines: Vec<&str> = snap.lines().collect();
assert_eq!(lines, vec!["hello", "world"]);
}
}