diff --git a/packages/desktop/src-tauri/src/commands/orchestrator.rs b/packages/desktop/src-tauri/src/commands/orchestrator.rs index 652fdc025..548edb1a0 100644 --- a/packages/desktop/src-tauri/src/commands/orchestrator.rs +++ b/packages/desktop/src-tauri/src/commands/orchestrator.rs @@ -7,12 +7,15 @@ use std::io::Read; use std::net::TcpListener; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{self, RecvTimeoutError}; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::time::{SystemTime, UNIX_EPOCH}; use tauri::AppHandle; use tauri::Emitter; use tauri::State; +use tauri_plugin_shell::process::CommandEvent; use tauri_plugin_shell::ShellExt; use uuid::Uuid; @@ -708,6 +711,18 @@ pub fn orchestrator_instance_dispose( Ok(result.disposed) } +/// Snapshot the last ≤20 non-empty lines from the orchestrator output ring buffer. +fn snapshot_proc_output(buf: &Arc>) -> String { + let locked = buf.lock().unwrap_or_else(|e| e.into_inner()); + let trimmed = locked.trim(); + if trimmed.is_empty() { + return "".to_string(); + } + let lines: Vec<&str> = trimmed.lines().filter(|l| !l.trim().is_empty()).collect(); + let start = lines.len().saturating_sub(20); + lines[start..].join("\n") +} + #[tauri::command] pub fn orchestrator_start_detached( app: AppHandle, @@ -796,50 +811,99 @@ pub fn orchestrator_start_detached( // Start a dedicated host stack for this workspace. // We pass explicit tokens and a free port so the UI can connect deterministically. - { - let mut args: Vec = vec![ - "start".to_string(), - "--workspace".to_string(), - workspace_path.clone(), - "--approval".to_string(), - "auto".to_string(), - "--no-opencode-auth".to_string(), - "--opencode-router".to_string(), - "true".to_string(), - "--detach".to_string(), - "--openwork-host".to_string(), - "0.0.0.0".to_string(), - "--openwork-port".to_string(), - port.to_string(), - "--openwork-token".to_string(), - token.clone(), - "--openwork-host-token".to_string(), - host_token.clone(), - "--run-id".to_string(), - sandbox_run_id.clone(), - ]; + let mut args: Vec = vec![ + "start".to_string(), + "--workspace".to_string(), + workspace_path.clone(), + "--approval".to_string(), + "auto".to_string(), + "--no-opencode-auth".to_string(), + "--opencode-router".to_string(), + "true".to_string(), + "--detach".to_string(), + "--openwork-host".to_string(), + "0.0.0.0".to_string(), + "--openwork-port".to_string(), + port.to_string(), + "--openwork-token".to_string(), + token.clone(), + "--openwork-host-token".to_string(), + host_token.clone(), + "--run-id".to_string(), + sandbox_run_id.clone(), + ]; - if wants_docker_sandbox { - args.push("--sandbox".to_string()); - args.push("docker".to_string()); - } + if wants_docker_sandbox { + args.push("--sandbox".to_string()); + args.push("docker".to_string()); + } - // Convert to &str for the shell command builder. - let mut str_args: Vec<&str> = Vec::with_capacity(args.len()); - for arg in &args { - str_args.push(arg.as_str()); - } + let str_args: Vec<&str> = args.iter().map(String::as_str).collect(); - command - .args(str_args) - .spawn() - .map_err(|e| format!("Failed to start openwork orchestrator: {e}"))?; - eprintln!( - "[sandbox-create][at={}][runId={}][stage=spawn] launched openwork sidecar for detached sandbox host", - now_ms(), - sandbox_run_id - ); - } + let (mut rx, child) = command + .args(str_args) + .spawn() + .map_err(|e| format!("Failed to start openwork orchestrator: {e}"))?; + + eprintln!( + "[sandbox-create][at={}][runId={}][stage=spawn] launched openwork sidecar port={} bindHost=0.0.0.0 probeHost=127.0.0.1", + now_ms(), + sandbox_run_id, + port, + ); + + // Drain the sidecar's stdout/stderr into a bounded ring buffer so we can + // include a snippet in the error message if the health probe times out or + // the process exits unexpectedly. The CommandChild is kept alive inside + // the async task to prevent premature process teardown. + let proc_output: Arc> = Arc::new(Mutex::new(String::new())); + let proc_output_writer = proc_output.clone(); + let proc_exited = Arc::new(AtomicBool::new(false)); + let proc_exited_writer = proc_exited.clone(); + let run_id_reader = sandbox_run_id.clone(); + + tauri::async_runtime::spawn(async move { + // Hold the CommandChild alive for the lifetime of this drain task. + let _held = child; + while let Some(event) = rx.recv().await { + match event { + CommandEvent::Stdout(bytes) | CommandEvent::Stderr(bytes) => { + let text = String::from_utf8_lossy(&bytes).trim().to_string(); + if !text.is_empty() { + eprintln!( + "[sandbox-create][runId={}][sidecar] {}", + run_id_reader, text + ); + let mut buf = proc_output_writer.lock().unwrap_or_else(|e| e.into_inner()); + // Rolling ~4 KB cap: drop oldest half when exceeded. + if buf.len() > 4000 { + let keep_from = buf.len() - 2000; + *buf = buf[keep_from..].to_string(); + } + buf.push_str(&text); + buf.push('\n'); + } + } + CommandEvent::Terminated(payload) => { + eprintln!( + "[sandbox-create][runId={}][stage=proc-exit] sidecar terminated code={:?} signal={:?}", + run_id_reader, payload.code, payload.signal + ); + proc_exited_writer.store(true, Ordering::Relaxed); + break; + } + CommandEvent::Error(msg) => { + eprintln!( + "[sandbox-create][runId={}][stage=proc-error] {}", + run_id_reader, msg + ); + proc_exited_writer.store(true, Ordering::Relaxed); + break; + } + _ => {} + } + } + }); emit_sandbox_progress( &app, @@ -858,10 +922,43 @@ pub fn orchestrator_start_detached( let mut last_container_state: Option = None; let mut last_container_probe_error: Option = None; let mut last_error: Option = None; + let mut first_health_attempt_logged = false; while start.elapsed() < Duration::from_millis(health_timeout_ms) { let elapsed_ms = start.elapsed().as_millis() as u64; + // Fast-fail if the sidecar process exited before the health endpoint became ready. + if proc_exited.load(Ordering::Relaxed) { + let output_snippet = snapshot_proc_output(&proc_output); + let base = last_error + .as_deref() + .unwrap_or("Connection refused") + .to_string(); + let message = format!( + "Orchestrator process exited before health endpoint was ready: {base}\nOrchestrator output:\n{output_snippet}" + ); + emit_sandbox_progress( + &app, + &sandbox_run_id, + "error", + "Sandbox process exited early.", + json!({ + "error": message, + "stage": "proc-exit-early", + "elapsedMs": elapsed_ms, + "openworkUrl": openwork_url, + "orchestratorOutput": output_snippet, + }), + ); + eprintln!( + "[sandbox-create][at={}][runId={}][stage=proc-exit-early] elapsedMs={}", + now_ms(), + sandbox_run_id, + elapsed_ms + ); + return Err(message); + } + if wants_docker_sandbox { if last_container_check.elapsed() > Duration::from_millis(1500) { last_container_check = Instant::now(); @@ -909,6 +1006,17 @@ pub fn orchestrator_start_detached( } } + if !first_health_attempt_logged { + first_health_attempt_logged = true; + eprintln!( + "[sandbox-create][at={}][runId={}][stage=first-health-attempt] url={}/health elapsedMs={}", + now_ms(), + sandbox_run_id, + openwork_url.trim_end_matches('/'), + elapsed_ms + ); + } + match ureq::get(&format!("{}/health", openwork_url.trim_end_matches('/'))).call() { Ok(response) if response.status() >= 200 && response.status() < 300 => { emit_sandbox_progress( @@ -954,8 +1062,11 @@ pub fn orchestrator_start_detached( } if start.elapsed() >= Duration::from_millis(health_timeout_ms) { - let message = + let output_snippet = snapshot_proc_output(&proc_output); + let base_message = last_error.unwrap_or_else(|| "Timed out waiting for OpenWork server".to_string()); + let message = + format!("{base_message}\nOrchestrator output (last lines):\n{output_snippet}"); emit_sandbox_progress( &app, &sandbox_run_id, @@ -963,10 +1074,12 @@ pub fn orchestrator_start_detached( "Sandbox failed to start.", json!({ "error": message, + "stage": "timeout", "elapsedMs": start.elapsed().as_millis() as u64, "openworkUrl": openwork_url, "containerState": last_container_state, "containerProbeError": last_container_probe_error, + "orchestratorOutput": output_snippet, }), ); eprintln!( @@ -974,7 +1087,7 @@ pub fn orchestrator_start_detached( now_ms(), sandbox_run_id, start.elapsed().as_millis(), - message + base_message ); return Err(message); } @@ -1573,4 +1686,35 @@ exit 0 let _ = fs::remove_dir_all(&tmp); } + + #[test] + fn snapshot_proc_output_empty_returns_placeholder() { + let buf = Arc::new(Mutex::new(String::new())); + assert_eq!( + snapshot_proc_output(&buf), + "" + ); + } + + #[test] + fn snapshot_proc_output_returns_last_20_lines() { + let mut content = String::new(); + for i in 1..=30 { + content.push_str(&format!("line {i}\n")); + } + let buf = Arc::new(Mutex::new(content)); + let snap = snapshot_proc_output(&buf); + let lines: Vec<&str> = snap.lines().collect(); + assert_eq!(lines.len(), 20); + assert_eq!(lines[0], "line 11"); + assert_eq!(lines[19], "line 30"); + } + + #[test] + fn snapshot_proc_output_skips_blank_lines() { + let buf = Arc::new(Mutex::new("hello\n\n\nworld\n".to_string())); + let snap = snapshot_proc_output(&buf); + let lines: Vec<&str> = snap.lines().collect(); + assert_eq!(lines, vec!["hello", "world"]); + } }