From d3268e0a3e165abdf09d1e56e8d1d3855ef5b18c Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 23 Feb 2026 19:20:18 -0800 Subject: [PATCH 01/10] feat(workers): add worker management API and UI components, including worker list and detail views, transcript handling, and integration with live context --- Cargo.lock | 1 + Cargo.toml | 3 + interface/src/api/client.ts | 55 ++ interface/src/hooks/useLiveContext.tsx | 118 +++- interface/src/router.tsx | 10 +- interface/src/routes/AgentWorkers.tsx | 539 ++++++++++++++++++ .../20260223000001_worker_transcript.sql | 6 + src/agent/channel.rs | 21 +- src/agent/worker.rs | 24 + src/api.rs | 1 + src/api/server.rs | 4 +- src/api/state.rs | 12 + src/api/workers.rs | 175 ++++++ src/conversation.rs | 6 +- src/conversation/history.rs | 160 +++++- src/conversation/worker_transcript.rs | 148 +++++ src/hooks/spacebot.rs | 4 +- src/lib.rs | 3 + 18 files changed, 1271 insertions(+), 19 deletions(-) create mode 100644 interface/src/routes/AgentWorkers.tsx create mode 100644 migrations/20260223000001_worker_transcript.sql create mode 100644 src/api/workers.rs create mode 100644 src/conversation/worker_transcript.rs diff --git a/Cargo.lock b/Cargo.lock index 8b5bf8604..945796092 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8259,6 +8259,7 @@ dependencies = [ "dirs", "emojis", "fastembed", + "flate2", "futures", "hex", "ignore", diff --git a/Cargo.toml b/Cargo.toml index d2f24f0c3..fd73aa72b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,9 @@ fastembed = "4" base64 = "0.22" hex = "0.4" +# Compression +flate2 = "1" + # Logging and tracing tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/interface/src/api/client.ts b/interface/src/api/client.ts index c52180177..a7c0c55dc 100644 --- a/interface/src/api/client.ts +++ b/interface/src/api/client.ts @@ -53,6 +53,7 @@ export interface WorkerStartedEvent { channel_id: string | null; worker_id: string; task: string; + worker_type?: string; } export interface WorkerStatusEvent { @@ -69,6 +70,7 @@ export interface WorkerCompletedEvent { channel_id: string | null; worker_id: string; result: string; + success?: boolean; } export interface BranchStartedEvent { @@ -94,6 +96,7 @@ export interface ToolStartedEvent { process_type: ProcessType; process_id: string; tool_name: string; + args: string; } export interface ToolCompletedEvent { @@ -103,6 +106,7 @@ export interface ToolCompletedEvent { process_type: ProcessType; process_id: string; tool_name: string; + result: string; } export type ApiEvent = @@ -193,6 +197,48 @@ export interface StatusBlockSnapshot { /** channel_id -> StatusBlockSnapshot */ export type ChannelStatusResponse = Record; +// --- Workers API types --- + +export type ActionContent = + | { type: "text"; text: string } + | { type: "tool_call"; id: string; name: string; args: string }; + +export type TranscriptStep = + | { type: "action"; content: ActionContent[] } + | { type: "tool_result"; call_id: string; name: string; text: string }; + +export interface WorkerRunInfo { + id: string; + task: string; + status: string; + worker_type: string; + channel_id: string | null; + channel_name: string | null; + started_at: string; + completed_at: string | null; + has_transcript: boolean; + live_status: string | null; + tool_calls: number; +} + +export interface WorkerDetailResponse { + id: string; + task: string; + result: string | null; + status: string; + worker_type: string; + channel_id: string | null; + channel_name: string | null; + started_at: string; + completed_at: string | null; + transcript: TranscriptStep[] | null; +} + +export interface WorkerListResponse { + workers: WorkerRunInfo[]; + total: number; +} + export interface AgentInfo { id: string; display_name?: string; @@ -1075,6 +1121,15 @@ export const api = { return fetchJson(`/channels/messages?${params}`); }, channelStatus: () => fetchJson("/channels/status"), + workersList: (agentId: string, params: { limit?: number; offset?: number; status?: string } = {}) => { + const search = new URLSearchParams({ agent_id: agentId }); + if (params.limit) search.set("limit", String(params.limit)); + if (params.offset) search.set("offset", String(params.offset)); + if (params.status) search.set("status", params.status); + return fetchJson(`/agents/workers?${search}`); + }, + workerDetail: (agentId: string, workerId: string) => + fetchJson(`/agents/workers/detail?agent_id=${encodeURIComponent(agentId)}&worker_id=${encodeURIComponent(workerId)}`), agentMemories: (agentId: string, params: MemoriesListParams = {}) => { const search = new URLSearchParams({ agent_id: agentId }); if (params.limit) search.set("limit", String(params.limit)); diff --git a/interface/src/hooks/useLiveContext.tsx b/interface/src/hooks/useLiveContext.tsx index dd987dc90..46a2cf8a2 100644 --- a/interface/src/hooks/useLiveContext.tsx +++ b/interface/src/hooks/useLiveContext.tsx @@ -1,8 +1,8 @@ import { createContext, useContext, useCallback, useRef, useState, useMemo, type ReactNode } from "react"; import { useQuery, useQueryClient } from "@tanstack/react-query"; -import { api, type AgentMessageEvent, type ChannelInfo } from "@/api/client"; +import { api, type AgentMessageEvent, type ChannelInfo, type ToolStartedEvent, type ToolCompletedEvent, type WorkerStatusEvent, type TranscriptStep } from "@/api/client"; import { useEventSource, type ConnectionState } from "@/hooks/useEventSource"; -import { useChannelLiveState, type ChannelLiveState } from "@/hooks/useChannelLiveState"; +import { useChannelLiveState, type ChannelLiveState, type ActiveWorker } from "@/hooks/useChannelLiveState"; interface LiveContextValue { liveStates: Record; @@ -12,6 +12,12 @@ interface LiveContextValue { loadOlderMessages: (channelId: string) => void; /** Set of edge IDs ("from->to") with recent message activity */ activeLinks: Set; + /** Flat map of all active workers across all channels, keyed by worker_id. */ + activeWorkers: Record; + /** Monotonically increasing counter, bumped on every worker lifecycle SSE event. */ + workerEventVersion: number; + /** Live transcript steps for running workers, keyed by worker_id. Built from SSE tool events. */ + liveTranscripts: Record; } const LiveContext = createContext({ @@ -21,6 +27,9 @@ const LiveContext = createContext({ hasData: false, loadOlderMessages: () => {}, activeLinks: new Set(), + activeWorkers: {}, + workerEventVersion: 0, + liveTranscripts: {}, }); export function useLiveContext() { @@ -42,6 +51,27 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { const channels = channelsData?.channels ?? []; const { liveStates, handlers: channelHandlers, syncStatusSnapshot, loadOlderMessages } = useChannelLiveState(channels); + // Flat active workers map + event version counter for the workers tab. + // This is a separate piece of state from channel liveStates so the workers + // tab can react to SSE events without scanning all channels. + const [workerEventVersion, setWorkerEventVersion] = useState(0); + const bumpWorkerVersion = useCallback(() => setWorkerEventVersion((v) => v + 1), []); + + // Live transcript accumulator: builds TranscriptStep[] from SSE tool events + // for running workers. Cleared when worker completes. + const [liveTranscripts, setLiveTranscripts] = useState>({}); + + // Derive flat active workers from channel live states + const activeWorkers = useMemo(() => { + const map: Record = {}; + for (const [channelId, state] of Object.entries(liveStates)) { + for (const [workerId, worker] of Object.entries(state.workers)) { + map[workerId] = { ...worker, channelId }; + } + } + return map; + }, [liveStates]); + // Track recently active link edges const [activeLinks, setActiveLinks] = useState>(new Set()); const timersRef = useRef>>(new Map()); @@ -85,14 +115,94 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { [markEdgeActive], ); + // Wrap channel worker handlers to also bump the worker event version + // and accumulate live transcript steps from SSE events. + const wrappedWorkerStarted = useCallback((data: unknown) => { + channelHandlers.worker_started(data); + bumpWorkerVersion(); + }, [channelHandlers, bumpWorkerVersion]); + + const wrappedWorkerStatus = useCallback((data: unknown) => { + channelHandlers.worker_status(data); + const event = data as WorkerStatusEvent; + // Push status text as an action step in the live transcript + if (event.status && event.status !== "starting" && event.status !== "running") { + setLiveTranscripts((prev) => { + const steps = prev[event.worker_id] ?? []; + const step: TranscriptStep = { + type: "action", + content: [{ type: "text", text: event.status }], + }; + return { ...prev, [event.worker_id]: [...steps, step] }; + }); + } + bumpWorkerVersion(); + }, [channelHandlers, bumpWorkerVersion]); + + const wrappedWorkerCompleted = useCallback((data: unknown) => { + channelHandlers.worker_completed(data); + // Clear the live transcript on completion (persisted transcript takes over) + const event = data as { worker_id: string }; + setLiveTranscripts((prev) => { + if (!prev[event.worker_id]) return prev; + const { [event.worker_id]: _, ...rest } = prev; + return rest; + }); + bumpWorkerVersion(); + }, [channelHandlers, bumpWorkerVersion]); + + const wrappedToolStarted = useCallback((data: unknown) => { + channelHandlers.tool_started(data); + const event = data as ToolStartedEvent; + if (event.process_type === "worker") { + setLiveTranscripts((prev) => { + const steps = prev[event.process_id] ?? []; + const step: TranscriptStep = { + type: "action", + content: [{ + type: "tool_call", + id: `${event.tool_name}:${steps.length}`, + name: event.tool_name, + args: event.args || "", + }], + }; + return { ...prev, [event.process_id]: [...steps, step] }; + }); + bumpWorkerVersion(); + } + }, [channelHandlers, bumpWorkerVersion]); + + const wrappedToolCompleted = useCallback((data: unknown) => { + channelHandlers.tool_completed(data); + const event = data as ToolCompletedEvent; + if (event.process_type === "worker") { + setLiveTranscripts((prev) => { + const steps = prev[event.process_id] ?? []; + const step: TranscriptStep = { + type: "tool_result", + call_id: `${event.tool_name}:${steps.length}`, + name: event.tool_name, + text: event.result || "", + }; + return { ...prev, [event.process_id]: [...steps, step] }; + }); + bumpWorkerVersion(); + } + }, [channelHandlers, bumpWorkerVersion]); + // Merge channel handlers with agent message handlers const handlers = useMemo( () => ({ ...channelHandlers, + worker_started: wrappedWorkerStarted, + worker_status: wrappedWorkerStatus, + worker_completed: wrappedWorkerCompleted, + tool_started: wrappedToolStarted, + tool_completed: wrappedToolCompleted, agent_message_sent: handleAgentMessage, agent_message_received: handleAgentMessage, }), - [channelHandlers, handleAgentMessage], + [channelHandlers, wrappedWorkerStarted, wrappedWorkerStatus, wrappedWorkerCompleted, wrappedToolStarted, wrappedToolCompleted, handleAgentMessage], ); const onReconnect = useCallback(() => { @@ -111,7 +221,7 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { const hasData = channels.length > 0 || channelsData !== undefined; return ( - + {children} ); diff --git a/interface/src/router.tsx b/interface/src/router.tsx index 1c6950f05..c48ab7593 100644 --- a/interface/src/router.tsx +++ b/interface/src/router.tsx @@ -20,6 +20,7 @@ import {AgentConfig} from "@/routes/AgentConfig"; import {AgentCron} from "@/routes/AgentCron"; import {AgentIngest} from "@/routes/AgentIngest"; import {AgentSkills} from "@/routes/AgentSkills"; +import {AgentWorkers} from "@/routes/AgentWorkers"; import {AgentChat} from "@/routes/AgentChat"; import {Settings} from "@/routes/Settings"; import {useLiveContext} from "@/hooks/useLiveContext"; @@ -187,15 +188,16 @@ const agentIngestRoute = createRoute({ const agentWorkersRoute = createRoute({ getParentRoute: () => rootRoute, path: "/agents/$agentId/workers", + validateSearch: (search: Record): {worker?: string} => ({ + worker: typeof search.worker === "string" ? search.worker : undefined, + }), component: function AgentWorkersPage() { const {agentId} = agentWorkersRoute.useParams(); return (
-
-

- Workers control interface coming soon -

+
+
); diff --git a/interface/src/routes/AgentWorkers.tsx b/interface/src/routes/AgentWorkers.tsx new file mode 100644 index 000000000..94e72a180 --- /dev/null +++ b/interface/src/routes/AgentWorkers.tsx @@ -0,0 +1,539 @@ +import {useState, useMemo, useEffect, useCallback, useRef} from "react"; +import {useQuery, useQueryClient} from "@tanstack/react-query"; +import {useNavigate, useSearch} from "@tanstack/react-router"; +import { + api, + type WorkerRunInfo, + type WorkerDetailResponse, + type TranscriptStep, + type ActionContent, +} from "@/api/client"; +import {Badge} from "@/ui/Badge"; +import {formatTimeAgo, formatDuration} from "@/lib/format"; +import {LiveDuration} from "@/components/LiveDuration"; +import {useLiveContext} from "@/hooks/useLiveContext"; +import {cx} from "@/ui/utils"; + +const STATUS_FILTERS = ["all", "running", "done", "failed"] as const; +type StatusFilter = (typeof STATUS_FILTERS)[number]; + +function statusBadgeVariant(status: string) { + switch (status) { + case "running": + return "amber" as const; + case "done": + return "green" as const; + case "failed": + return "red" as const; + default: + return "default" as const; + } +} + +function workerTypeBadgeVariant(workerType: string) { + return workerType === "opencode" ? ("accent" as const) : ("outline" as const); +} + +function durationBetween(start: string, end: string | null): string { + if (!end) return ""; + const seconds = Math.floor( + (new Date(end).getTime() - new Date(start).getTime()) / 1000, + ); + return formatDuration(seconds); +} + +export function AgentWorkers({agentId}: {agentId: string}) { + const [statusFilter, setStatusFilter] = useState("all"); + const [search, setSearch] = useState(""); + const queryClient = useQueryClient(); + const navigate = useNavigate(); + const routeSearch = useSearch({strict: false}) as {worker?: string}; + const selectedWorkerId = routeSearch.worker ?? null; + const {activeWorkers, workerEventVersion, liveTranscripts} = useLiveContext(); + + // Invalidate worker queries when SSE events fire + const prevVersion = useRef(workerEventVersion); + useEffect(() => { + if (workerEventVersion !== prevVersion.current) { + prevVersion.current = workerEventVersion; + queryClient.invalidateQueries({queryKey: ["workers", agentId]}); + if (selectedWorkerId) { + queryClient.invalidateQueries({ + queryKey: ["worker-detail", agentId, selectedWorkerId], + }); + } + } + }, [workerEventVersion, agentId, selectedWorkerId, queryClient]); + + // List query + const {data: listData} = useQuery({ + queryKey: ["workers", agentId, statusFilter], + queryFn: () => + api.workersList(agentId, { + limit: 200, + status: statusFilter === "all" ? undefined : statusFilter, + }), + refetchInterval: 10_000, + }); + + // Detail query (only when a worker is selected) + const {data: detailData} = useQuery({ + queryKey: ["worker-detail", agentId, selectedWorkerId], + queryFn: () => + selectedWorkerId + ? api.workerDetail(agentId, selectedWorkerId) + : Promise.resolve(null), + enabled: !!selectedWorkerId, + }); + + const workers = listData?.workers ?? []; + const total = listData?.total ?? 0; + + // Merge live SSE state onto the API-returned list. + // Workers that exist in SSE state but haven't hit the DB yet + // are synthesized and prepended so they appear instantly. + const mergedWorkers: WorkerRunInfo[] = useMemo(() => { + const dbIds = new Set(workers.map((w) => w.id)); + + // Overlay live state onto existing DB rows + const merged = workers.map((worker) => { + const live = activeWorkers[worker.id]; + if (!live) return worker; + return { + ...worker, + status: "running", + live_status: live.status, + tool_calls: live.toolCalls, + }; + }); + + // Synthesize entries for workers only known via SSE (not in DB yet) + const synthetic: WorkerRunInfo[] = Object.values(activeWorkers) + .filter((w) => !dbIds.has(w.id)) + .map((live) => ({ + id: live.id, + task: live.task, + status: "running", + worker_type: "builtin", + channel_id: live.channelId ?? null, + channel_name: null, + started_at: new Date(live.startedAt).toISOString(), + completed_at: null, + has_transcript: false, + live_status: live.status, + tool_calls: live.toolCalls, + })); + + return [...synthetic, ...merged]; + }, [workers, activeWorkers]); + + // Client-side task text search filter + const filteredWorkers = useMemo(() => { + if (!search.trim()) return mergedWorkers; + const term = search.toLowerCase(); + return mergedWorkers.filter((w) => w.task.toLowerCase().includes(term)); + }, [mergedWorkers, search]); + + // Merge live state onto the detail response too + const mergedDetail: WorkerDetailResponse | null = useMemo(() => { + if (!detailData) return null; + const live = selectedWorkerId ? activeWorkers[selectedWorkerId] : null; + if (!live) return detailData; + return { + ...detailData, + status: "running", + }; + }, [detailData, activeWorkers, selectedWorkerId]); + + const selectWorker = useCallback( + (workerId: string | null) => { + navigate({ + to: `/agents/${agentId}/workers`, + search: workerId ? {worker: workerId} : {}, + replace: true, + } as any); + }, + [navigate, agentId], + ); + + return ( +
+ {/* Left column: worker list */} +
+ {/* Toolbar */} +
+ setSearch(e.target.value)} + className="h-7 flex-1 rounded-md border border-app-line/50 bg-app-input px-2.5 text-xs text-ink placeholder:text-ink-faint focus:border-accent/50 focus:outline-none" + /> + {total} +
+ + {/* Status filter pills */} +
+ {STATUS_FILTERS.map((filter) => ( + + ))} +
+ + {/* Worker list */} +
+ {filteredWorkers.length === 0 ? ( +
+

No workers found

+
+ ) : ( + filteredWorkers.map((worker) => ( + selectWorker(worker.id)} + /> + )) + )} +
+
+ + {/* Right column: detail view */} +
+ {selectedWorkerId && mergedDetail ? ( + + ) : ( +
+

+ Select a worker to view details +

+
+ )} +
+
+ ); +} + +interface LiveWorker { + id: string; + task: string; + status: string; + startedAt: number; + toolCalls: number; + currentTool: string | null; +} + +function WorkerCard({ + worker, + liveWorker, + selected, + onClick, +}: { + worker: WorkerRunInfo; + liveWorker?: LiveWorker; + selected: boolean; + onClick: () => void; +}) { + const isRunning = worker.status === "running" || !!liveWorker; + const displayStatus = liveWorker?.status ?? worker.live_status; + const toolCalls = liveWorker?.toolCalls ?? worker.tool_calls; + const currentTool = liveWorker?.currentTool; + + return ( + + ); +} + +function WorkerDetail({ + detail, + liveWorker, + liveTranscript, +}: { + detail: WorkerDetailResponse; + liveWorker?: LiveWorker; + liveTranscript?: TranscriptStep[]; +}) { + const isRunning = detail.status === "running" || !!liveWorker; + const duration = durationBetween(detail.started_at, detail.completed_at); + const displayStatus = liveWorker?.status; + const currentTool = liveWorker?.currentTool; + const toolCalls = liveWorker?.toolCalls ?? 0; + // Use persisted transcript if available, otherwise fall back to live SSE transcript + const transcript = detail.transcript ?? (isRunning ? liveTranscript : null); + const transcriptRef = useRef(null); + + // Auto-scroll to latest transcript step for running workers + useEffect(() => { + if (isRunning && transcriptRef.current) { + transcriptRef.current.scrollTop = transcriptRef.current.scrollHeight; + } + }, [isRunning, transcript?.length]); + + return ( +
+ {/* Header */} +
+
+

{detail.task}

+
+ + {detail.worker_type} + + + {isRunning && ( + + )} + {isRunning ? "running" : detail.status} + +
+
+
+ {detail.channel_name && {detail.channel_name}} + {isRunning ? ( + + Running for{" "} + + + ) : ( + duration && {duration} + )} + {!isRunning && {formatTimeAgo(detail.started_at)}} + {isRunning && toolCalls > 0 && ( + {toolCalls} tool calls + )} +
+ {/* Live status bar for running workers */} + {isRunning && (currentTool || displayStatus) && ( +
+ {currentTool ? ( + + Running {currentTool}... + + ) : displayStatus ? ( + {displayStatus} + ) : null} +
+ )} +
+ + {/* Content */} +
+ {/* Result section */} + {detail.result && ( +
+

+ Result +

+
+ {detail.result} +
+
+ )} + + {/* Transcript section */} + {transcript && transcript.length > 0 ? ( +
+

+ {isRunning ? "Live Transcript" : "Transcript"} +

+
+ {transcript.map((step, index) => ( + + ))} + {isRunning && currentTool && ( +
+ + Running {currentTool}... +
+ )} +
+
+ ) : isRunning ? ( +
+
+

Waiting for first tool call...

+
+ ) : ( +
+ Full transcript not available for this worker +
+ )} +
+
+ ); +} + +function TranscriptStepView({step}: {step: TranscriptStep}) { + if (step.type === "action") { + return ( +
+ {step.content.map((content, index) => ( + + ))} +
+ ); + } + + return ; +} + +function ActionContentView({content}: {content: ActionContent}) { + if (content.type === "text") { + return ( +
+ {content.text} +
+ ); + } + + return ; +} + +function ToolCallView({ + content, +}: { + content: Extract; +}) { + const [expanded, setExpanded] = useState(false); + + return ( +
+ + {expanded && ( +
+					{content.args}
+				
+ )} +
+ ); +} + +function ToolResultView({ + step, +}: { + step: Extract; +}) { + const [expanded, setExpanded] = useState(false); + const isLong = step.text.length > 300; + const displayText = + isLong && !expanded ? step.text.slice(0, 300) + "..." : step.text; + + return ( +
+
+ + {step.name && ( + + {step.name} + + )} +
+
+				{displayText}
+			
+ {isLong && ( + + )} +
+ ); +} diff --git a/migrations/20260223000001_worker_transcript.sql b/migrations/20260223000001_worker_transcript.sql new file mode 100644 index 000000000..774b733d6 --- /dev/null +++ b/migrations/20260223000001_worker_transcript.sql @@ -0,0 +1,6 @@ +-- Add transcript storage and worker metadata to worker_runs. +ALTER TABLE worker_runs ADD COLUMN worker_type TEXT NOT NULL DEFAULT 'builtin'; +ALTER TABLE worker_runs ADD COLUMN agent_id TEXT; +ALTER TABLE worker_runs ADD COLUMN transcript BLOB; + +CREATE INDEX idx_worker_runs_agent ON worker_runs(agent_id, started_at); diff --git a/src/agent/channel.rs b/src/agent/channel.rs index dbfecb478..211f9d2f9 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -1576,9 +1576,16 @@ impl Channel { worker_id, channel_id, task, + worker_type, .. } => { - run_logger.log_worker_started(channel_id.as_ref(), *worker_id, task); + run_logger.log_worker_started( + channel_id.as_ref(), + *worker_id, + task, + worker_type, + &self.deps.agent_id, + ); } ProcessEvent::WorkerStatus { worker_id, status, .. @@ -1589,9 +1596,10 @@ impl Channel { worker_id, result, notify, + success, .. } => { - run_logger.log_worker_completed(*worker_id, result); + run_logger.log_worker_completed(*worker_id, result, *success); let mut workers = self.state.active_workers.write().await; workers.remove(worker_id); @@ -2033,6 +2041,7 @@ pub async fn spawn_worker_from_state( worker_id, channel_id: Some(state.channel_id.clone()), task: task.clone(), + worker_type: "builtin".into(), }) .ok(); @@ -2132,6 +2141,7 @@ pub async fn spawn_opencode_worker_from_state( worker_id, channel_id: Some(state.channel_id.clone()), task: opencode_task, + worker_type: "opencode".into(), }) .ok(); @@ -2166,11 +2176,11 @@ where .with_label_values(&[&*agent_id]) .inc(); - let (result_text, notify) = match future.await { - Ok(text) => (text, true), + let (result_text, notify, success) = match future.await { + Ok(text) => (text, true, true), Err(error) => { tracing::error!(worker_id = %worker_id, %error, "worker failed"); - (format!("Worker failed: {error}"), true) + (format!("Worker failed: {error}"), true, false) } }; #[cfg(feature = "metrics")] @@ -2192,6 +2202,7 @@ where channel_id, result: result_text, notify, + success, }); }) } diff --git a/src/agent/worker.rs b/src/agent/worker.rs index 14f725e7c..a2ed80a82 100644 --- a/src/agent/worker.rs +++ b/src/agent/worker.rs @@ -287,6 +287,7 @@ impl Worker { self.state = WorkerState::Failed; self.hook.send_status("cancelled"); self.write_failure_log(&history, &format!("cancelled: {reason}")); + self.persist_transcript(&history); tracing::info!(worker_id = %self.id, %reason, "worker cancelled"); return Ok(format!("Worker cancelled: {reason}")); } @@ -296,6 +297,7 @@ impl Worker { self.state = WorkerState::Failed; self.hook.send_status("failed"); self.write_failure_log(&history, &format!("context overflow after {MAX_OVERFLOW_RETRIES} compaction attempts: {error}")); + self.persist_transcript(&history); tracing::error!(worker_id = %self.id, %error, "worker context overflow unrecoverable"); return Err(crate::error::AgentError::Other(error.into()).into()); } @@ -317,6 +319,7 @@ impl Worker { self.state = WorkerState::Failed; self.hook.send_status("failed"); self.write_failure_log(&history, &error.to_string()); + self.persist_transcript(&history); tracing::error!(worker_id = %self.id, %error, "worker LLM call failed"); return Err(crate::error::AgentError::Other(error.into()).into()); } @@ -393,6 +396,9 @@ impl Worker { self.write_success_log(&history); } + // Persist transcript blob (fire-and-forget) + self.persist_transcript(&history); + tracing::info!(worker_id = %self.id, "worker completed"); Ok(result) } @@ -470,6 +476,24 @@ impl Worker { ); } + /// Persist the compressed transcript blob to worker_runs. Fire-and-forget. + fn persist_transcript(&self, history: &[rig::message::Message]) { + let transcript_blob = crate::conversation::worker_transcript::serialize_transcript(history); + let pool = self.deps.sqlite_pool.clone(); + let worker_id = self.id.to_string(); + + tokio::spawn(async move { + if let Err(error) = sqlx::query("UPDATE worker_runs SET transcript = ? WHERE id = ?") + .bind(&transcript_blob) + .bind(&worker_id) + .execute(&pool) + .await + { + tracing::warn!(%error, worker_id, "failed to persist worker transcript"); + } + }); + } + /// Check if worker is in a terminal state. pub fn is_done(&self) -> bool { matches!(self.state, WorkerState::Done | WorkerState::Failed) diff --git a/src/api.rs b/src/api.rs index e8a13c91a..3c4d38170 100644 --- a/src/api.rs +++ b/src/api.rs @@ -23,6 +23,7 @@ mod skills; mod state; mod system; mod webchat; +mod workers; pub use server::start_http_server; pub use state::{AgentInfo, ApiEvent, ApiState}; diff --git a/src/api/server.rs b/src/api/server.rs index 42ed6bb06..1b11e8f46 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -3,7 +3,7 @@ use super::state::ApiState; use super::{ agents, bindings, channels, config, cortex, cron, ingest, links, mcp, memories, messaging, - models, providers, settings, skills, system, webchat, + models, providers, settings, skills, system, webchat, workers, }; use axum::Json; @@ -88,6 +88,8 @@ pub async fn start_http_server( ) .route("/channels/messages", get(channels::channel_messages)) .route("/channels/status", get(channels::channel_status)) + .route("/agents/workers", get(workers::list_workers)) + .route("/agents/workers/detail", get(workers::worker_detail)) .route("/agents/memories", get(memories::list_memories)) .route("/agents/memories/search", get(memories::search_memories)) .route("/agents/memories/graph", get(memories::memory_graph)) diff --git a/src/api/state.rs b/src/api/state.rs index 7f7599c33..253cf4d5c 100644 --- a/src/api/state.rs +++ b/src/api/state.rs @@ -135,6 +135,7 @@ pub enum ApiEvent { channel_id: Option, worker_id: String, task: String, + worker_type: String, }, /// A worker's status changed. WorkerStatusUpdate { @@ -149,6 +150,7 @@ pub enum ApiEvent { channel_id: Option, worker_id: String, result: String, + success: bool, }, /// A branch was started. BranchStarted { @@ -171,6 +173,7 @@ pub enum ApiEvent { process_type: String, process_id: String, tool_name: String, + args: String, }, /// A tool call completed on a process. ToolCompleted { @@ -179,6 +182,7 @@ pub enum ApiEvent { process_type: String, process_id: String, tool_name: String, + result: String, }, /// Configuration was reloaded (skills, identity, etc.). ConfigReloaded, @@ -286,6 +290,7 @@ impl ApiState { worker_id, channel_id, task, + worker_type, .. } => { api_tx @@ -294,6 +299,7 @@ impl ApiState { channel_id: channel_id.as_deref().map(|s| s.to_string()), worker_id: worker_id.to_string(), task: task.clone(), + worker_type: worker_type.clone(), }) .ok(); } @@ -331,6 +337,7 @@ impl ApiState { worker_id, channel_id, result, + success, .. } => { api_tx @@ -339,6 +346,7 @@ impl ApiState { channel_id: channel_id.as_deref().map(|s| s.to_string()), worker_id: worker_id.to_string(), result: result.clone(), + success: *success, }) .ok(); } @@ -361,6 +369,7 @@ impl ApiState { process_id, channel_id, tool_name, + args, .. } => { let (process_type, id_str) = process_id_info(process_id); @@ -371,6 +380,7 @@ impl ApiState { process_type, process_id: id_str, tool_name: tool_name.clone(), + args: args.clone(), }) .ok(); } @@ -378,6 +388,7 @@ impl ApiState { process_id, channel_id, tool_name, + result, .. } => { let (process_type, id_str) = process_id_info(process_id); @@ -388,6 +399,7 @@ impl ApiState { process_type, process_id: id_str, tool_name: tool_name.clone(), + result: result.clone(), }) .ok(); } diff --git a/src/api/workers.rs b/src/api/workers.rs new file mode 100644 index 000000000..b445c84b3 --- /dev/null +++ b/src/api/workers.rs @@ -0,0 +1,175 @@ +//! Workers API endpoints: list and detail views for worker runs. + +use super::state::ApiState; + +use crate::conversation::history::ProcessRunLogger; +use crate::conversation::worker_transcript; + +use axum::Json; +use axum::extract::{Query, State}; +use axum::http::StatusCode; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +#[derive(Deserialize)] +pub(super) struct WorkerListQuery { + agent_id: String, + #[serde(default = "default_limit")] + limit: i64, + #[serde(default)] + offset: i64, + status: Option, +} + +fn default_limit() -> i64 { + 50 +} + +#[derive(Serialize)] +pub(super) struct WorkerListResponse { + workers: Vec, + total: i64, +} + +#[derive(Serialize)] +pub(super) struct WorkerListItem { + id: String, + task: String, + status: String, + worker_type: String, + channel_id: Option, + channel_name: Option, + started_at: String, + completed_at: Option, + has_transcript: bool, + /// Live status text from StatusBlock (running workers only). + live_status: Option, + /// Tool call count from StatusBlock (running workers only). + tool_calls: usize, +} + +#[derive(Deserialize)] +pub(super) struct WorkerDetailQuery { + agent_id: String, + worker_id: String, +} + +#[derive(Serialize)] +pub(super) struct WorkerDetailResponse { + id: String, + task: String, + result: Option, + status: String, + worker_type: String, + channel_id: Option, + channel_name: Option, + started_at: String, + completed_at: Option, + transcript: Option>, +} + +/// List worker runs for an agent, with live status merged from StatusBlocks. +pub(super) async fn list_workers( + State(state): State>, + Query(query): Query, +) -> Result, StatusCode> { + let pools = state.agent_pools.load(); + let pool = pools.get(&query.agent_id).ok_or(StatusCode::NOT_FOUND)?; + let logger = ProcessRunLogger::new(pool.clone()); + + let limit = query.limit.min(200); + let (rows, total) = logger + .list_worker_runs( + &query.agent_id, + limit, + query.offset, + query.status.as_deref(), + ) + .await + .map_err(|error| { + tracing::warn!(%error, "failed to list worker runs"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Build a live status lookup from all channel StatusBlocks + let live_statuses = { + let blocks = state.channel_status_blocks.read().await; + let mut map = std::collections::HashMap::new(); + for (_channel_id, status_block) in blocks.iter() { + let block = status_block.read().await; + for worker in &block.active_workers { + map.insert( + worker.id.to_string(), + (worker.status.clone(), worker.tool_calls), + ); + } + } + map + }; + + let workers = rows + .into_iter() + .map(|row| { + let (live_status, tool_calls) = live_statuses + .get(&row.id) + .map(|(status, calls)| (Some(status.clone()), *calls)) + .unwrap_or((None, 0)); + + WorkerListItem { + id: row.id, + task: row.task, + status: row.status, + worker_type: row.worker_type, + channel_id: row.channel_id, + channel_name: row.channel_name, + started_at: row.started_at, + completed_at: row.completed_at, + has_transcript: row.has_transcript, + live_status, + tool_calls, + } + }) + .collect(); + + Ok(Json(WorkerListResponse { workers, total })) +} + +/// Get full detail for a single worker run, including decompressed transcript. +pub(super) async fn worker_detail( + State(state): State>, + Query(query): Query, +) -> Result, StatusCode> { + let pools = state.agent_pools.load(); + let pool = pools.get(&query.agent_id).ok_or(StatusCode::NOT_FOUND)?; + let logger = ProcessRunLogger::new(pool.clone()); + + let detail = logger + .get_worker_detail(&query.worker_id) + .await + .map_err(|error| { + tracing::warn!(%error, worker_id = %query.worker_id, "failed to load worker detail"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let transcript = detail.transcript_blob.as_deref().and_then(|blob| { + worker_transcript::deserialize_transcript(blob) + .map_err(|error| { + tracing::warn!(%error, worker_id = %query.worker_id, "failed to decompress transcript"); + }) + .ok() + }); + + Ok(Json(WorkerDetailResponse { + id: detail.id, + task: detail.task, + result: detail.result, + status: detail.status, + worker_type: detail.worker_type, + channel_id: detail.channel_id, + channel_name: detail.channel_name, + started_at: detail.started_at, + completed_at: detail.completed_at, + transcript, + })) +} diff --git a/src/conversation.rs b/src/conversation.rs index 0097a7d42..bbfc5fac9 100644 --- a/src/conversation.rs +++ b/src/conversation.rs @@ -3,6 +3,10 @@ pub mod channels; pub mod context; pub mod history; +pub mod worker_transcript; pub use channels::ChannelStore; -pub use history::{ConversationLogger, ProcessRunLogger, TimelineItem}; +pub use history::{ + ConversationLogger, ProcessRunLogger, TimelineItem, WorkerDetailRow, WorkerRunRow, +}; +pub use worker_transcript::{ActionContent, TranscriptStep}; diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 4c62659d0..e2cd0709c 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -280,19 +280,26 @@ impl ProcessRunLogger { channel_id: Option<&ChannelId>, worker_id: WorkerId, task: &str, + worker_type: &str, + agent_id: &crate::AgentId, ) { let pool = self.pool.clone(); let id = worker_id.to_string(); let channel_id = channel_id.map(|c| c.to_string()); let task = task.to_string(); + let worker_type = worker_type.to_string(); + let agent_id = agent_id.to_string(); tokio::spawn(async move { if let Err(error) = sqlx::query( - "INSERT OR IGNORE INTO worker_runs (id, channel_id, task) VALUES (?, ?, ?)", + "INSERT OR IGNORE INTO worker_runs (id, channel_id, task, worker_type, agent_id) \ + VALUES (?, ?, ?, ?, ?)", ) .bind(&id) .bind(&channel_id) .bind(&task) + .bind(&worker_type) + .bind(&agent_id) .execute(&pool) .await { @@ -320,16 +327,18 @@ impl ProcessRunLogger { } /// Record a worker completing with its result. Fire-and-forget. - pub fn log_worker_completed(&self, worker_id: WorkerId, result: &str) { + pub fn log_worker_completed(&self, worker_id: WorkerId, result: &str, success: bool) { let pool = self.pool.clone(); let id = worker_id.to_string(); let result = result.to_string(); + let status = if success { "done" } else { "failed" }; tokio::spawn(async move { if let Err(error) = sqlx::query( - "UPDATE worker_runs SET result = ?, status = 'done', completed_at = CURRENT_TIMESTAMP WHERE id = ?" + "UPDATE worker_runs SET result = ?, status = ?, completed_at = CURRENT_TIMESTAMP WHERE id = ?" ) .bind(&result) + .bind(status) .bind(&id) .execute(&pool) .await @@ -438,4 +447,149 @@ impl ProcessRunLogger { items.reverse(); Ok(items) } + + /// List worker runs for an agent, ordered by most recent first. + /// Does NOT include the transcript blob — that's fetched separately via `get_worker_detail`. + pub async fn list_worker_runs( + &self, + agent_id: &str, + limit: i64, + offset: i64, + status_filter: Option<&str>, + ) -> crate::error::Result<(Vec, i64)> { + let (where_clause, has_status_filter) = if status_filter.is_some() { + ("WHERE w.agent_id = ?1 AND w.status = ?4", true) + } else { + ("WHERE w.agent_id = ?1", false) + }; + + let count_query = format!("SELECT COUNT(*) as total FROM worker_runs w {where_clause}"); + let list_query = format!( + "SELECT w.id, w.task, w.status, w.worker_type, w.channel_id, w.started_at, \ + w.completed_at, w.transcript IS NOT NULL as has_transcript, \ + c.display_name as channel_name \ + FROM worker_runs w \ + LEFT JOIN channels c ON w.channel_id = c.id \ + {where_clause} \ + ORDER BY w.started_at DESC \ + LIMIT ?2 OFFSET ?3" + ); + + let mut count_q = sqlx::query(&count_query).bind(agent_id); + let mut list_q = sqlx::query(&list_query) + .bind(agent_id) + .bind(limit) + .bind(offset); + + if has_status_filter { + let filter = status_filter.unwrap_or(""); + count_q = count_q.bind(filter); + list_q = list_q.bind(filter); + } + + let total: i64 = count_q + .fetch_one(&self.pool) + .await + .map(|row| row.try_get("total").unwrap_or(0)) + .map_err(|e| anyhow::anyhow!(e))?; + + let rows = list_q + .fetch_all(&self.pool) + .await + .map_err(|e| anyhow::anyhow!(e))?; + + let items = rows + .into_iter() + .map(|row| WorkerRunRow { + id: row.try_get("id").unwrap_or_default(), + task: row.try_get("task").unwrap_or_default(), + status: row.try_get("status").unwrap_or_default(), + worker_type: row + .try_get("worker_type") + .unwrap_or_else(|_| "builtin".into()), + channel_id: row.try_get("channel_id").ok(), + channel_name: row.try_get("channel_name").ok(), + started_at: row + .try_get::, _>("started_at") + .map(|t| t.to_rfc3339()) + .unwrap_or_default(), + completed_at: row + .try_get::, _>("completed_at") + .ok() + .map(|t| t.to_rfc3339()), + has_transcript: row.try_get::("has_transcript").unwrap_or(false), + }) + .collect(); + + Ok((items, total)) + } + + /// Get full detail for a single worker run, including the compressed transcript blob. + pub async fn get_worker_detail( + &self, + worker_id: &str, + ) -> crate::error::Result> { + let row = sqlx::query( + "SELECT w.id, w.task, w.result, w.status, w.worker_type, w.channel_id, \ + w.started_at, w.completed_at, w.transcript, \ + c.display_name as channel_name \ + FROM worker_runs w \ + LEFT JOIN channels c ON w.channel_id = c.id \ + WHERE w.id = ?", + ) + .bind(worker_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow::anyhow!(e))?; + + Ok(row.map(|row| WorkerDetailRow { + id: row.try_get("id").unwrap_or_default(), + task: row.try_get("task").unwrap_or_default(), + result: row.try_get("result").ok(), + status: row.try_get("status").unwrap_or_default(), + worker_type: row + .try_get("worker_type") + .unwrap_or_else(|_| "builtin".into()), + channel_id: row.try_get("channel_id").ok(), + channel_name: row.try_get("channel_name").ok(), + started_at: row + .try_get::, _>("started_at") + .map(|t| t.to_rfc3339()) + .unwrap_or_default(), + completed_at: row + .try_get::, _>("completed_at") + .ok() + .map(|t| t.to_rfc3339()), + transcript_blob: row.try_get("transcript").ok(), + })) + } +} + +/// A worker run row without the transcript blob (for list queries). +#[derive(Debug, Clone, Serialize)] +pub struct WorkerRunRow { + pub id: String, + pub task: String, + pub status: String, + pub worker_type: String, + pub channel_id: Option, + pub channel_name: Option, + pub started_at: String, + pub completed_at: Option, + pub has_transcript: bool, +} + +/// A worker run row with full detail including the transcript blob. +#[derive(Debug, Clone)] +pub struct WorkerDetailRow { + pub id: String, + pub task: String, + pub result: Option, + pub status: String, + pub worker_type: String, + pub channel_id: Option, + pub channel_name: Option, + pub started_at: String, + pub completed_at: Option, + pub transcript_blob: Option>, } diff --git a/src/conversation/worker_transcript.rs b/src/conversation/worker_transcript.rs new file mode 100644 index 000000000..ecf5abc36 --- /dev/null +++ b/src/conversation/worker_transcript.rs @@ -0,0 +1,148 @@ +//! Worker transcript serialization and compression. +//! +//! Converts a Rig `Vec` history into a flat `Vec`, +//! then serializes to gzipped JSON for compact storage on the `worker_runs` row. + +use crate::tools::{MAX_TOOL_OUTPUT_BYTES, truncate_output}; + +use flate2::Compression; +use flate2::read::GzDecoder; +use flate2::write::GzEncoder; +use serde::{Deserialize, Serialize}; +use std::io::{Read, Write}; + +/// Maximum byte length for tool call arguments in transcripts. +const MAX_TOOL_ARGS_BYTES: usize = 2_000; + +/// A single step in a worker transcript. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TranscriptStep { + /// Agent reasoning and/or tool calls. + Action { content: Vec }, + /// Tool execution result. + ToolResult { + call_id: String, + name: String, + text: String, + }, +} + +/// Content within an action step. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ActionContent { + Text { + text: String, + }, + ToolCall { + id: String, + name: String, + args: String, + }, +} + +/// Convert a Rig message history to transcript steps, serialize as JSON, and gzip compress. +pub fn serialize_transcript(history: &[rig::message::Message]) -> Vec { + let steps = convert_history(history); + let json = serde_json::to_vec(&steps).unwrap_or_default(); + + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&json).ok(); + encoder.finish().unwrap_or_default() +} + +/// Decompress and deserialize a gzipped transcript blob. +pub fn deserialize_transcript(blob: &[u8]) -> anyhow::Result> { + let mut decoder = GzDecoder::new(blob); + let mut json = Vec::new(); + decoder.read_to_end(&mut json)?; + let steps: Vec = serde_json::from_slice(&json)?; + Ok(steps) +} + +/// Convert Rig `Vec` to `Vec`. +fn convert_history(history: &[rig::message::Message]) -> Vec { + let mut steps = Vec::new(); + + for message in history { + match message { + rig::message::Message::Assistant { content, .. } => { + let mut parts = Vec::new(); + for item in content.iter() { + match item { + rig::message::AssistantContent::Text(text) => { + if !text.text.is_empty() { + parts.push(ActionContent::Text { + text: text.text.clone(), + }); + } + } + rig::message::AssistantContent::ToolCall(tool_call) => { + let args_str = tool_call.function.arguments.to_string(); + let args = if args_str.len() > MAX_TOOL_ARGS_BYTES { + truncate_output(&args_str, MAX_TOOL_ARGS_BYTES) + } else { + args_str + }; + parts.push(ActionContent::ToolCall { + id: tool_call.id.clone(), + name: tool_call.function.name.clone(), + args, + }); + } + _ => {} + } + } + if !parts.is_empty() { + steps.push(TranscriptStep::Action { content: parts }); + } + } + rig::message::Message::User { content } => { + for item in content.iter() { + match item { + rig::message::UserContent::ToolResult(tool_result) => { + let call_id = tool_result + .call_id + .clone() + .unwrap_or_else(|| tool_result.id.clone()); + + let text = tool_result + .content + .iter() + .filter_map(|c| { + if let rig::message::ToolResultContent::Text(t) = c { + Some(t.text.as_str()) + } else { + None + } + }) + .collect::>() + .join("\n"); + + let truncated = truncate_output(&text, MAX_TOOL_OUTPUT_BYTES); + + steps.push(TranscriptStep::ToolResult { + call_id, + name: String::new(), + text: truncated, + }); + } + rig::message::UserContent::Text(text) => { + if !text.text.is_empty() { + steps.push(TranscriptStep::Action { + content: vec![ActionContent::Text { + text: text.text.clone(), + }], + }); + } + } + _ => {} + } + } + } + } + } + + steps +} diff --git a/src/hooks/spacebot.rs b/src/hooks/spacebot.rs index 6f867f9c4..e124277c9 100644 --- a/src/hooks/spacebot.rs +++ b/src/hooks/spacebot.rs @@ -192,12 +192,14 @@ where }; } - // Send event without blocking + // Send event without blocking. Truncate args to keep broadcast payloads bounded. + let capped_args = crate::tools::truncate_output(args, 2_000); let event = ProcessEvent::ToolStarted { agent_id: self.agent_id.clone(), process_id: self.process_id.clone(), channel_id: self.channel_id.clone(), tool_name: tool_name.to_string(), + args: capped_args, }; let _ = self.event_tx.send(event); diff --git a/src/lib.rs b/src/lib.rs index 8649cebd1..49d9e0e21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,6 +115,7 @@ pub enum ProcessEvent { worker_id: WorkerId, channel_id: Option, task: String, + worker_type: String, }, WorkerStatus { agent_id: AgentId, @@ -128,12 +129,14 @@ pub enum ProcessEvent { channel_id: Option, result: String, notify: bool, + success: bool, }, ToolStarted { agent_id: AgentId, process_id: ProcessId, channel_id: Option, tool_name: String, + args: String, }, ToolCompleted { agent_id: AgentId, From 8fba3aba52a3115b1ffdfc871511e7e76aebd709 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 23 Feb 2026 20:10:57 -0800 Subject: [PATCH 02/10] feat(agent-workers): enhance worker detail view with animated transcript and improved error handling for missing data --- interface/src/routes/AgentWorkers.tsx | 75 +++++++++++++++++++++------ src/agent/channel.rs | 47 +++++++++++++---- 2 files changed, 98 insertions(+), 24 deletions(-) diff --git a/interface/src/routes/AgentWorkers.tsx b/interface/src/routes/AgentWorkers.tsx index 94e72a180..bdca022d3 100644 --- a/interface/src/routes/AgentWorkers.tsx +++ b/interface/src/routes/AgentWorkers.tsx @@ -1,6 +1,7 @@ import {useState, useMemo, useEffect, useCallback, useRef} from "react"; import {useQuery, useQueryClient} from "@tanstack/react-query"; import {useNavigate, useSearch} from "@tanstack/react-router"; +import {AnimatePresence, motion} from "framer-motion"; import { api, type WorkerRunInfo, @@ -76,12 +77,14 @@ export function AgentWorkers({agentId}: {agentId: string}) { refetchInterval: 10_000, }); - // Detail query (only when a worker is selected) + // Detail query (only when a worker is selected). + // Returns null instead of throwing on 404 — the worker may not be in the DB + // yet while it's still visible via SSE state. const {data: detailData} = useQuery({ queryKey: ["worker-detail", agentId, selectedWorkerId], queryFn: () => selectedWorkerId - ? api.workerDetail(agentId, selectedWorkerId) + ? api.workerDetail(agentId, selectedWorkerId).catch(() => null) : Promise.resolve(null), enabled: !!selectedWorkerId, }); @@ -134,14 +137,31 @@ export function AgentWorkers({agentId}: {agentId: string}) { return mergedWorkers.filter((w) => w.task.toLowerCase().includes(term)); }, [mergedWorkers, search]); - // Merge live state onto the detail response too + // Build detail view: prefer DB data, fall back to synthesized live state. + // Running workers that haven't hit the DB yet still get a full detail view + // from SSE state + live transcript. const mergedDetail: WorkerDetailResponse | null = useMemo(() => { - if (!detailData) return null; const live = selectedWorkerId ? activeWorkers[selectedWorkerId] : null; - if (!live) return detailData; + + if (detailData) { + // DB data exists — overlay live status if worker is still running + if (!live) return detailData; + return { ...detailData, status: "running" }; + } + + // No DB data yet — synthesize from SSE state + if (!live) return null; return { - ...detailData, + id: live.id, + task: live.task, + result: null, status: "running", + worker_type: "builtin", + channel_id: live.channelId ?? null, + channel_name: null, + started_at: new Date(live.startedAt).toISOString(), + completed_at: null, + transcript: null, }; }, [detailData, activeWorkers, selectedWorkerId]); @@ -421,15 +441,40 @@ function WorkerDetail({ {isRunning ? "Live Transcript" : "Transcript"}
- {transcript.map((step, index) => ( - - ))} - {isRunning && currentTool && ( -
- - Running {currentTool}... -
- )} + + {transcript.map((step, index) => ( + + + + ))} + {isRunning && currentTool && ( + + + Running {currentTool}... + + )} +
) : isRunning ? ( diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 211f9d2f9..c8a3fa320 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -1434,15 +1434,44 @@ impl Channel { } else if replied { tracing::debug!(channel_id = %self.id, "channel turn replied via tool (fallback suppressed)"); } else if is_retrigger { - // On retrigger turns, suppress fallback text. The LLM should - // use the reply tool explicitly if it has something to say, or - // the skip tool if not. Raw text output from retriggers is - // almost always internal acknowledgment, not a real response. - tracing::debug!( - channel_id = %self.id, - response_len = response.len(), - "retrigger turn fallback suppressed (LLM did not use reply/skip tool)" - ); + // On retrigger turns the LLM should use the reply tool, but + // some models return the result as raw text instead. Send it + // as a fallback so the user still gets the worker/branch output. + let text = response.trim(); + if !text.is_empty() { + tracing::info!( + channel_id = %self.id, + response_len = text.len(), + "retrigger produced text without reply tool, sending as fallback" + ); + let extracted = extract_reply_from_tool_syntax(text); + let source = self + .conversation_id + .as_deref() + .and_then(|conversation_id| conversation_id.split(':').next()) + .unwrap_or("unknown"); + let final_text = crate::tools::reply::normalize_discord_mention_tokens( + extracted.as_deref().unwrap_or(text), + source, + ); + if !final_text.is_empty() { + self.state + .conversation_logger + .log_bot_message(&self.state.channel_id, &final_text); + if let Err(error) = self + .response_tx + .send(OutboundResponse::Text(final_text)) + .await + { + tracing::error!(%error, channel_id = %self.id, "failed to send retrigger fallback reply"); + } + } + } else { + tracing::debug!( + channel_id = %self.id, + "retrigger turn produced no text and no reply tool call" + ); + } } else { // If the LLM returned text without using the reply tool, send it // directly. Some models respond with text instead of tool calls. From 19c7cdbcdb1ea726a1055e9ee2f08bb40c608a68 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 23 Feb 2026 20:37:33 -0800 Subject: [PATCH 03/10] feat(workers): add tool_calls tracking to worker runs and enhance UI components for displaying tool call counts --- interface/src/api/client.ts | 1 + interface/src/routes/AgentWorkers.tsx | 115 +++++++++--------- .../20260224000001_worker_tool_calls.sql | 1 + src/agent/worker.rs | 32 +++-- src/api/workers.rs | 17 ++- src/conversation/history.rs | 8 +- src/conversation/worker_transcript.rs | 3 +- 7 files changed, 106 insertions(+), 71 deletions(-) create mode 100644 migrations/20260224000001_worker_tool_calls.sql diff --git a/interface/src/api/client.ts b/interface/src/api/client.ts index a7c0c55dc..e02451af4 100644 --- a/interface/src/api/client.ts +++ b/interface/src/api/client.ts @@ -232,6 +232,7 @@ export interface WorkerDetailResponse { started_at: string; completed_at: string | null; transcript: TranscriptStep[] | null; + tool_calls: number; } export interface WorkerListResponse { diff --git a/interface/src/routes/AgentWorkers.tsx b/interface/src/routes/AgentWorkers.tsx index bdca022d3..0ba1544df 100644 --- a/interface/src/routes/AgentWorkers.tsx +++ b/interface/src/routes/AgentWorkers.tsx @@ -1,7 +1,8 @@ import {useState, useMemo, useEffect, useCallback, useRef} from "react"; import {useQuery, useQueryClient} from "@tanstack/react-query"; import {useNavigate, useSearch} from "@tanstack/react-router"; -import {AnimatePresence, motion} from "framer-motion"; +import {motion} from "framer-motion"; +import {Markdown} from "@/components/Markdown"; import { api, type WorkerRunInfo, @@ -23,7 +24,7 @@ function statusBadgeVariant(status: string) { case "running": return "amber" as const; case "done": - return "green" as const; + return "outline" as const; case "failed": return "red" as const; default: @@ -162,6 +163,7 @@ export function AgentWorkers({agentId}: {agentId: string}) { started_at: new Date(live.startedAt).toISOString(), completed_at: null, transcript: null, + tool_calls: live.toolCalls, }; }, [detailData, activeWorkers, selectedWorkerId]); @@ -280,11 +282,11 @@ function WorkerCard({ onClick={onClick} className={cx( "flex w-full flex-col gap-1 border-b border-app-line/30 px-4 py-3 text-left transition-colors", - selected ? "bg-app-selected" : "hover:bg-app-hover", + selected ? "bg-app-selected/50" : "", )} >
-

+

{worker.task}

)}
- {isRunning && currentTool && ( -

- {currentTool} -

- )} - {isRunning && !currentTool && displayStatus && ( -

- {displayStatus} -

- )} ); } @@ -348,9 +340,23 @@ function WorkerDetail({ const duration = durationBetween(detail.started_at, detail.completed_at); const displayStatus = liveWorker?.status; const currentTool = liveWorker?.currentTool; - const toolCalls = liveWorker?.toolCalls ?? 0; - // Use persisted transcript if available, otherwise fall back to live SSE transcript - const transcript = detail.transcript ?? (isRunning ? liveTranscript : null); + const toolCalls = liveWorker?.toolCalls ?? detail.tool_calls ?? 0; + // Use persisted transcript if available, otherwise fall back to live SSE transcript. + // Strip the final action step if it duplicates the result text shown above. + const rawTranscript = detail.transcript ?? (isRunning ? liveTranscript : null); + const transcript = useMemo(() => { + if (!rawTranscript || !detail.result) return rawTranscript; + const last = rawTranscript[rawTranscript.length - 1]; + if ( + last?.type === "action" && + last.content.length === 1 && + last.content[0].type === "text" && + last.content[0].text.trim() === detail.result.trim() + ) { + return rawTranscript.slice(0, -1); + } + return rawTranscript; + }, [rawTranscript, detail.result]); const transcriptRef = useRef(null); // Auto-scroll to latest transcript step for running workers @@ -365,7 +371,7 @@ function WorkerDetail({ {/* Header */}
-

{detail.task}

+
{duration} )} {!isRunning && {formatTimeAgo(detail.started_at)}} - {isRunning && toolCalls > 0 && ( + {toolCalls > 0 && ( {toolCalls} tool calls )}
@@ -428,8 +434,8 @@ function WorkerDetail({

Result

-
- {detail.result} +
+ {detail.result}
)} @@ -441,40 +447,22 @@ function WorkerDetail({ {isRunning ? "Live Transcript" : "Transcript"}
- - {transcript.map((step, index) => ( - - - - ))} - {isRunning && currentTool && ( - - - Running {currentTool}... - - )} - + {transcript.map((step, index) => ( + + + + ))} + {isRunning && currentTool && ( +
+ + Running {currentTool}... +
+ )}
) : isRunning ? ( @@ -492,6 +480,19 @@ function WorkerDetail({ ); } +function TaskText({text}: {text: string}) { + const [expanded, setExpanded] = useState(false); + + return ( + + ); +} + function TranscriptStepView({step}: {step: TranscriptStep}) { if (step.type === "action") { return ( @@ -509,8 +510,8 @@ function TranscriptStepView({step}: {step: TranscriptStep}) { function ActionContentView({content}: {content: ActionContent}) { if (content.type === "text") { return ( -
- {content.text} +
+ {content.text}
); } diff --git a/migrations/20260224000001_worker_tool_calls.sql b/migrations/20260224000001_worker_tool_calls.sql new file mode 100644 index 000000000..311ab28aa --- /dev/null +++ b/migrations/20260224000001_worker_tool_calls.sql @@ -0,0 +1 @@ +ALTER TABLE worker_runs ADD COLUMN tool_calls INTEGER NOT NULL DEFAULT 0; diff --git a/src/agent/worker.rs b/src/agent/worker.rs index a2ed80a82..177b7be7d 100644 --- a/src/agent/worker.rs +++ b/src/agent/worker.rs @@ -482,12 +482,28 @@ impl Worker { let pool = self.deps.sqlite_pool.clone(); let worker_id = self.id.to_string(); + // Count tool calls from the Rig history (each ToolCall in an Assistant message) + let tool_calls: i64 = history + .iter() + .filter_map(|message| match message { + rig::message::Message::Assistant { content, .. } => Some( + content + .iter() + .filter(|c| matches!(c, rig::message::AssistantContent::ToolCall(_))) + .count() as i64, + ), + _ => None, + }) + .sum(); + tokio::spawn(async move { - if let Err(error) = sqlx::query("UPDATE worker_runs SET transcript = ? WHERE id = ?") - .bind(&transcript_blob) - .bind(&worker_id) - .execute(&pool) - .await + if let Err(error) = + sqlx::query("UPDATE worker_runs SET transcript = ?, tool_calls = ? WHERE id = ?") + .bind(&transcript_blob) + .bind(tool_calls) + .bind(&worker_id) + .execute(&pool) + .await { tracing::warn!(%error, worker_id, "failed to persist worker transcript"); } @@ -696,7 +712,8 @@ fn build_worker_recap(messages: &[rig::message::Message]) -> String { rig::message::Message::Assistant { content, .. } => { for item in content.iter() { if let rig::message::AssistantContent::ToolCall(tc) = item { - let args = tc.function.arguments.to_string(); + let args = + crate::tools::truncate_output(&tc.function.arguments.to_string(), 200); recap.push_str(&format!("- Called `{}` ({args})\n", tc.function.name)); } if let rig::message::AssistantContent::Text(t) = item @@ -711,7 +728,8 @@ fn build_worker_recap(messages: &[rig::message::Message]) -> String { if let rig::message::UserContent::ToolResult(tr) = item { for c in tr.content.iter() { if let rig::message::ToolResultContent::Text(t) = c { - recap.push_str(&format!(" Result: {}\n", t.text)); + let truncated = crate::tools::truncate_output(&t.text, 200); + recap.push_str(&format!(" Result: {truncated}\n")); } } } diff --git a/src/api/workers.rs b/src/api/workers.rs index b445c84b3..b3ae1d0fc 100644 --- a/src/api/workers.rs +++ b/src/api/workers.rs @@ -44,8 +44,8 @@ pub(super) struct WorkerListItem { has_transcript: bool, /// Live status text from StatusBlock (running workers only). live_status: Option, - /// Tool call count from StatusBlock (running workers only). - tool_calls: usize, + /// Total tool calls. From DB for completed workers, from StatusBlock for running. + tool_calls: i64, } #[derive(Deserialize)] @@ -66,6 +66,7 @@ pub(super) struct WorkerDetailResponse { started_at: String, completed_at: Option, transcript: Option>, + tool_calls: i64, } /// List worker runs for an agent, with live status merged from StatusBlocks. @@ -110,11 +111,18 @@ pub(super) async fn list_workers( let workers = rows .into_iter() .map(|row| { - let (live_status, tool_calls) = live_statuses + let (live_status, live_tool_calls) = live_statuses .get(&row.id) - .map(|(status, calls)| (Some(status.clone()), *calls)) + .map(|(status, calls)| (Some(status.clone()), *calls as i64)) .unwrap_or((None, 0)); + // Use live tool call count for running workers, DB count for completed + let tool_calls = if row.status == "running" && live_tool_calls > 0 { + live_tool_calls + } else { + row.tool_calls + }; + WorkerListItem { id: row.id, task: row.task, @@ -171,5 +179,6 @@ pub(super) async fn worker_detail( started_at: detail.started_at, completed_at: detail.completed_at, transcript, + tool_calls: detail.tool_calls, })) } diff --git a/src/conversation/history.rs b/src/conversation/history.rs index e2cd0709c..0ae31408a 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -467,7 +467,7 @@ impl ProcessRunLogger { let list_query = format!( "SELECT w.id, w.task, w.status, w.worker_type, w.channel_id, w.started_at, \ w.completed_at, w.transcript IS NOT NULL as has_transcript, \ - c.display_name as channel_name \ + w.tool_calls, c.display_name as channel_name \ FROM worker_runs w \ LEFT JOIN channels c ON w.channel_id = c.id \ {where_clause} \ @@ -518,6 +518,7 @@ impl ProcessRunLogger { .ok() .map(|t| t.to_rfc3339()), has_transcript: row.try_get::("has_transcript").unwrap_or(false), + tool_calls: row.try_get::("tool_calls").unwrap_or(0), }) .collect(); @@ -531,7 +532,7 @@ impl ProcessRunLogger { ) -> crate::error::Result> { let row = sqlx::query( "SELECT w.id, w.task, w.result, w.status, w.worker_type, w.channel_id, \ - w.started_at, w.completed_at, w.transcript, \ + w.started_at, w.completed_at, w.transcript, w.tool_calls, \ c.display_name as channel_name \ FROM worker_runs w \ LEFT JOIN channels c ON w.channel_id = c.id \ @@ -561,6 +562,7 @@ impl ProcessRunLogger { .ok() .map(|t| t.to_rfc3339()), transcript_blob: row.try_get("transcript").ok(), + tool_calls: row.try_get::("tool_calls").unwrap_or(0), })) } } @@ -577,6 +579,7 @@ pub struct WorkerRunRow { pub started_at: String, pub completed_at: Option, pub has_transcript: bool, + pub tool_calls: i64, } /// A worker run row with full detail including the transcript blob. @@ -592,4 +595,5 @@ pub struct WorkerDetailRow { pub started_at: String, pub completed_at: Option, pub transcript_blob: Option>, + pub tool_calls: i64, } diff --git a/src/conversation/worker_transcript.rs b/src/conversation/worker_transcript.rs index ecf5abc36..e58da56c9 100644 --- a/src/conversation/worker_transcript.rs +++ b/src/conversation/worker_transcript.rs @@ -129,7 +129,8 @@ fn convert_history(history: &[rig::message::Message]) -> Vec { }); } rig::message::UserContent::Text(text) => { - if !text.text.is_empty() { + // Skip compaction markers and system-injected messages + if !text.text.is_empty() && !text.text.starts_with("[System:") { steps.push(TranscriptStep::Action { content: vec![ActionContent::Text { text: text.text.clone(), From 6191c1c57d06db499f637bf71b4054855f36c416 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 23 Feb 2026 20:51:26 -0800 Subject: [PATCH 04/10] feat(channel): enhance worker run item links with agentId and improve worker completion message format --- interface/src/routes/AgentWorkers.tsx | 1 + interface/src/routes/ChannelDetail.tsx | 47 ++++++++++++-------------- src/agent/channel.rs | 2 +- 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/interface/src/routes/AgentWorkers.tsx b/interface/src/routes/AgentWorkers.tsx index 0ba1544df..4148d8684 100644 --- a/interface/src/routes/AgentWorkers.tsx +++ b/interface/src/routes/AgentWorkers.tsx @@ -292,6 +292,7 @@ function WorkerCard({ {isRunning && ( diff --git a/interface/src/routes/ChannelDetail.tsx b/interface/src/routes/ChannelDetail.tsx index e5ba17ed3..dbc3874e6 100644 --- a/interface/src/routes/ChannelDetail.tsx +++ b/interface/src/routes/ChannelDetail.tsx @@ -70,14 +70,19 @@ function LiveBranchRunItem({ item, live, channelId }: { item: TimelineBranchRun; ); } -function LiveWorkerRunItem({ item, live, channelId }: { item: TimelineWorkerRun; live: ActiveWorker; channelId: string }) { +function LiveWorkerRunItem({ item, live, channelId, agentId }: { item: TimelineWorkerRun; live: ActiveWorker; channelId: string; agentId: string }) { return (
{formatTimestamp(new Date(item.started_at).getTime())}
-
+
Worker @@ -93,7 +98,7 @@ function LiveWorkerRunItem({ item, live, channelId }: { item: TimelineWorkerRun; {live.toolCalls} tool calls )}
-
+
); @@ -137,48 +142,37 @@ function BranchRunItem({ item }: { item: TimelineBranchRun }) { ); } -function WorkerRunItem({ item }: { item: TimelineWorkerRun }) { - const [expanded, setExpanded] = useState(false); - +function WorkerRunItem({ item, agentId }: { item: TimelineWorkerRun; agentId: string }) { return (
{formatTimestamp(new Date(item.started_at).getTime())}
- - {expanded && item.result && ( -
-
- {item.result} -
-
- )} +
); } -function TimelineEntry({ item, liveWorkers, liveBranches, channelId }: { +function TimelineEntry({ item, liveWorkers, liveBranches, channelId, agentId }: { item: TimelineItem; liveWorkers: Record; liveBranches: Record; channelId: string; + agentId: string; }) { switch (item.type) { case "message": @@ -210,8 +204,8 @@ function TimelineEntry({ item, liveWorkers, liveBranches, channelId }: { } case "worker_run": { const live = liveWorkers[item.id]; - if (live) return ; - return ; + if (live) return ; + return ; } } } @@ -339,6 +333,7 @@ export function ChannelDetail({ agentId, channelId, channel, liveState, onLoadMo liveWorkers={workers} liveBranches={branches} channelId={channelId} + agentId={agentId} /> )) )} diff --git a/src/agent/channel.rs b/src/agent/channel.rs index c8a3fa320..5c2261594 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -1639,7 +1639,7 @@ impl Channel { if *notify { let mut history = self.state.history.write().await; - let worker_message = format!("[Worker completed]: {result}"); + let worker_message = format!("[Worker {worker_id} completed]: {result}"); history.push(rig::message::Message::from(worker_message)); should_retrigger = true; } From da5f93cb784cfdd45dbc7db3bdd7d88b074eb2cd Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 23 Feb 2026 20:57:03 -0800 Subject: [PATCH 05/10] feat(worker-inspect): add tool for inspecting worker execution transcripts and recent runs --- .../en/tools/worker_inspect_description.md.j2 | 1 + src/agent/channel.rs | 2 + src/agent/ingestion.rs | 2 + src/api/agents.rs | 3 + src/main.rs | 3 + src/prompts/text.rs | 3 + src/tools.rs | 10 + src/tools/worker_inspect.rs | 221 ++++++++++++++++++ 8 files changed, 245 insertions(+) create mode 100644 prompts/en/tools/worker_inspect_description.md.j2 create mode 100644 src/tools/worker_inspect.rs diff --git a/prompts/en/tools/worker_inspect_description.md.j2 b/prompts/en/tools/worker_inspect_description.md.j2 new file mode 100644 index 000000000..8ab06504e --- /dev/null +++ b/prompts/en/tools/worker_inspect_description.md.j2 @@ -0,0 +1 @@ +Inspect a worker's full execution transcript to see exactly what it did — every tool call, result, and reasoning step. Use without a worker_id to list recent workers. Use with a worker_id to retrieve the full transcript. Useful for verifying worker output, debugging incomplete results, or understanding what sources/steps a worker used. \ No newline at end of file diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 5c2261594..7cf4db43f 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -1894,6 +1894,8 @@ async fn spawn_branch( state.deps.memory_search.clone(), state.conversation_logger.clone(), state.channel_store.clone(), + crate::conversation::ProcessRunLogger::new(state.deps.sqlite_pool.clone()), + &state.deps.agent_id, ); let branch_max_turns = **state.deps.runtime_config.branch_max_turns.load(); diff --git a/src/agent/ingestion.rs b/src/agent/ingestion.rs index 506baa45c..cdfa77844 100644 --- a/src/agent/ingestion.rs +++ b/src/agent/ingestion.rs @@ -475,6 +475,8 @@ async fn process_chunk( deps.memory_search.clone(), conversation_logger, channel_store, + crate::conversation::ProcessRunLogger::new(deps.sqlite_pool.clone()), + &deps.agent_id, ); let agent = AgentBuilder::new(model) diff --git a/src/api/agents.rs b/src/api/agents.rs index 6e2a48fbf..2b54210f1 100644 --- a/src/api/agents.rs +++ b/src/api/agents.rs @@ -702,10 +702,13 @@ pub(super) async fn create_agent( let conversation_logger = crate::conversation::history::ConversationLogger::new(db.sqlite.clone()); let channel_store = crate::conversation::ChannelStore::new(db.sqlite.clone()); + let run_logger = crate::conversation::ProcessRunLogger::new(db.sqlite.clone()); let cortex_tool_server = crate::tools::create_cortex_chat_tool_server( memory_search.clone(), conversation_logger, channel_store, + run_logger, + &deps.agent_id, browser_config, agent_config.screenshot_dir(), brave_search_key, diff --git a/src/main.rs b/src/main.rs index f0c6f733a..e22a7c929 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1759,10 +1759,13 @@ async fn initialize_agents( let conversation_logger = spacebot::conversation::history::ConversationLogger::new(agent.db.sqlite.clone()); let channel_store = spacebot::conversation::ChannelStore::new(agent.db.sqlite.clone()); + let run_logger = spacebot::conversation::ProcessRunLogger::new(agent.db.sqlite.clone()); let tool_server = spacebot::tools::create_cortex_chat_tool_server( agent.deps.memory_search.clone(), conversation_logger, channel_store, + run_logger, + &agent.deps.agent_id, browser_config, agent.config.screenshot_dir(), brave_search_key, diff --git a/src/prompts/text.rs b/src/prompts/text.rs index 0ee33462d..f615c6bcc 100644 --- a/src/prompts/text.rs +++ b/src/prompts/text.rs @@ -159,6 +159,9 @@ fn lookup(lang: &str, key: &str) -> &'static str { ("en", "tools/channel_recall") => { include_str!("../../prompts/en/tools/channel_recall_description.md.j2") } + ("en", "tools/worker_inspect") => { + include_str!("../../prompts/en/tools/worker_inspect_description.md.j2") + } ("en", "tools/send_file") => { include_str!("../../prompts/en/tools/send_file_description.md.j2") } diff --git a/src/tools.rs b/src/tools.rs index 69770851a..e9e112492 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -45,6 +45,7 @@ pub mod shell; pub mod skip; pub mod spawn_worker; pub mod web_search; +pub mod worker_inspect; pub use branch_tool::{BranchArgs, BranchError, BranchOutput, BranchTool}; pub use browser::{ @@ -88,6 +89,9 @@ pub use shell::{ShellArgs, ShellError, ShellOutput, ShellResult, ShellTool}; pub use skip::{SkipArgs, SkipError, SkipFlag, SkipOutput, SkipTool, new_skip_flag}; pub use spawn_worker::{SpawnWorkerArgs, SpawnWorkerError, SpawnWorkerOutput, SpawnWorkerTool}; pub use web_search::{SearchResult, WebSearchArgs, WebSearchError, WebSearchOutput, WebSearchTool}; +pub use worker_inspect::{ + WorkerInspectArgs, WorkerInspectError, WorkerInspectOutput, WorkerInspectTool, +}; use crate::agent::channel::ChannelState; use crate::config::{BrowserConfig, RuntimeConfig}; @@ -333,12 +337,15 @@ pub fn create_branch_tool_server( memory_search: Arc, conversation_logger: crate::conversation::history::ConversationLogger, channel_store: crate::conversation::ChannelStore, + run_logger: crate::conversation::history::ProcessRunLogger, + agent_id: &str, ) -> ToolServerHandle { ToolServer::new() .tool(MemorySaveTool::new(memory_search.clone())) .tool(MemoryRecallTool::new(memory_search.clone())) .tool(MemoryDeleteTool::new(memory_search)) .tool(ChannelRecallTool::new(conversation_logger, channel_store)) + .tool(WorkerInspectTool::new(run_logger, agent_id.to_string())) .run() } @@ -408,6 +415,8 @@ pub fn create_cortex_chat_tool_server( memory_search: Arc, conversation_logger: crate::conversation::history::ConversationLogger, channel_store: crate::conversation::ChannelStore, + run_logger: crate::conversation::history::ProcessRunLogger, + agent_id: &str, browser_config: BrowserConfig, screenshot_dir: PathBuf, brave_search_key: Option, @@ -419,6 +428,7 @@ pub fn create_cortex_chat_tool_server( .tool(MemoryRecallTool::new(memory_search.clone())) .tool(MemoryDeleteTool::new(memory_search)) .tool(ChannelRecallTool::new(conversation_logger, channel_store)) + .tool(WorkerInspectTool::new(run_logger, agent_id.to_string())) .tool(ShellTool::new(instance_dir.clone(), workspace.clone())) .tool(FileTool::new(workspace.clone())) .tool(ExecTool::new(instance_dir, workspace)); diff --git a/src/tools/worker_inspect.rs b/src/tools/worker_inspect.rs new file mode 100644 index 000000000..8c2a40015 --- /dev/null +++ b/src/tools/worker_inspect.rs @@ -0,0 +1,221 @@ +//! Worker transcript inspection tool for branches. +//! +//! Allows a branch to retrieve the full transcript of a completed worker run, +//! or list recent worker runs to find the right one. + +use crate::conversation::history::ProcessRunLogger; +use crate::conversation::worker_transcript; + +use rig::completion::ToolDefinition; +use rig::tool::Tool; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Tool for inspecting worker run transcripts. +#[derive(Debug, Clone)] +pub struct WorkerInspectTool { + run_logger: ProcessRunLogger, + agent_id: String, +} + +impl WorkerInspectTool { + pub fn new(run_logger: ProcessRunLogger, agent_id: String) -> Self { + Self { + run_logger, + agent_id, + } + } +} + +#[derive(Debug, thiserror::Error)] +#[error("Worker inspect failed: {0}")] +pub struct WorkerInspectError(String); + +#[derive(Debug, Deserialize, JsonSchema)] +pub struct WorkerInspectArgs { + /// The worker ID to inspect. Omit to list recent worker runs. + #[serde(default)] + pub worker_id: Option, + /// Maximum number of worker runs to list (default 10, max 50). Only used when listing. + #[serde(default = "default_list_limit")] + pub limit: i64, +} + +fn default_list_limit() -> i64 { + 10 +} + +#[derive(Debug, Serialize)] +pub struct WorkerInspectOutput { + pub action: String, + pub summary: String, +} + +impl Tool for WorkerInspectTool { + const NAME: &'static str = "worker_inspect"; + + type Error = WorkerInspectError; + type Args = WorkerInspectArgs; + type Output = WorkerInspectOutput; + + async fn definition(&self, _prompt: String) -> ToolDefinition { + ToolDefinition { + name: Self::NAME.to_string(), + description: crate::prompts::text::get("tools/worker_inspect").to_string(), + parameters: serde_json::json!({ + "type": "object", + "properties": { + "worker_id": { + "type": "string", + "description": "UUID of the worker run to inspect. Omit to list recent workers." + }, + "limit": { + "type": "integer", + "minimum": 1, + "maximum": 50, + "default": 10, + "description": "Number of recent workers to list (1-50). Only used when worker_id is omitted." + } + } + }), + } + } + + async fn call(&self, args: Self::Args) -> Result { + let Some(worker_id) = args.worker_id else { + return self.list_workers(args.limit).await; + }; + + let detail = self + .run_logger + .get_worker_detail(&worker_id) + .await + .map_err(|e| WorkerInspectError(format!("Failed to query worker: {e}")))? + .ok_or_else(|| WorkerInspectError(format!("No worker found with ID {worker_id}")))?; + + let mut summary = format!( + "## Worker {}\n\n**Task:** {}\n**Status:** {}\n**Started:** {}\n", + detail.id, detail.task, detail.status, detail.started_at, + ); + + if let Some(completed_at) = &detail.completed_at { + summary.push_str(&format!("**Completed:** {completed_at}\n")); + } + + if let Some(result) = &detail.result { + summary.push_str(&format!("\n### Result\n\n{result}\n")); + } + + if let Some(blob) = &detail.transcript_blob { + match worker_transcript::deserialize_transcript(blob) { + Ok(steps) => { + summary.push_str(&format!("\n### Transcript ({} steps)\n\n", steps.len())); + for step in &steps { + match step { + worker_transcript::TranscriptStep::Action { content } => { + for item in content { + match item { + worker_transcript::ActionContent::Text { text } => { + summary.push_str(&format!("**Agent:** {text}\n\n")); + } + worker_transcript::ActionContent::ToolCall { + name, + args, + .. + } => { + summary.push_str(&format!( + "**Tool call:** `{name}`\n```\n{args}\n```\n\n" + )); + } + } + } + } + worker_transcript::TranscriptStep::ToolResult { + name, text, .. + } => { + let label = if name.is_empty() { "tool" } else { name }; + let display = if text.len() > 500 { + format!( + "{}...\n[truncated, {} bytes total]", + &text[..500], + text.len() + ) + } else { + text.clone() + }; + summary.push_str(&format!( + "**Result ({label}):**\n```\n{display}\n```\n\n" + )); + } + } + } + } + Err(error) => { + summary.push_str(&format!( + "\n*Transcript could not be decompressed: {error}*\n" + )); + } + } + } else { + summary.push_str("\n*No transcript available for this worker.*\n"); + } + + Ok(WorkerInspectOutput { + action: "inspect".to_string(), + summary, + }) + } +} + +impl WorkerInspectTool { + async fn list_workers(&self, limit: i64) -> Result { + let limit = limit.clamp(1, 50); + let (rows, total) = self + .run_logger + .list_worker_runs(&self.agent_id, limit, 0, None) + .await + .map_err(|e| WorkerInspectError(format!("Failed to list workers: {e}")))?; + + if rows.is_empty() { + return Ok(WorkerInspectOutput { + action: "list".to_string(), + summary: "No worker runs found.".to_string(), + }); + } + + let mut summary = format!("## Recent Workers ({} of {total})\n\n", rows.len()); + + for row in &rows { + let status_marker = match row.status.as_str() { + "running" => "[running]", + "done" => "[done]", + "failed" => "[failed]", + _ => "[-]", + }; + summary.push_str(&format!( + "{status_marker} `{}` — {} ({})\n", + row.id, row.task, row.status, + )); + if let Some(channel) = &row.channel_name { + summary.push_str(&format!(" Channel: {channel}\n")); + } + summary.push_str(&format!( + " Started: {} | {} tool calls{}\n\n", + row.started_at, + row.tool_calls, + if row.has_transcript { + " | transcript available" + } else { + "" + }, + )); + } + + summary.push_str("Use `worker_inspect` with a `worker_id` to view the full transcript."); + + Ok(WorkerInspectOutput { + action: "list".to_string(), + summary, + }) + } +} From 7d97d818654690ccdefa59d4a7d80be1861ebc0b Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 23 Feb 2026 21:05:17 -0800 Subject: [PATCH 06/10] feat(worker-cancellation): add CancelWorkerButton component to allow users to cancel running workers and update documentation to include cancellation instructions --- interface/src/routes/AgentWorkers.tsx | 29 +++++++++++++++++++++++++++ prompts/en/channel.md.j2 | 6 +++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/interface/src/routes/AgentWorkers.tsx b/interface/src/routes/AgentWorkers.tsx index 4148d8684..2498cebce 100644 --- a/interface/src/routes/AgentWorkers.tsx +++ b/interface/src/routes/AgentWorkers.tsx @@ -374,6 +374,12 @@ function WorkerDetail({
+ {isRunning && detail.channel_id && ( + + )} ; } +function CancelWorkerButton({ + channelId, + workerId, +}: { + channelId: string; + workerId: string; +}) { + const [cancelling, setCancelling] = useState(false); + + return ( + + ); +} + function ActionContentView({content}: {content: ActionContent}) { if (content.type === "text") { return ( diff --git a/prompts/en/channel.md.j2 b/prompts/en/channel.md.j2 index 1528a8859..e4aa0a0c3 100644 --- a/prompts/en/channel.md.j2 +++ b/prompts/en/channel.md.j2 @@ -44,7 +44,9 @@ You are able to write code or do work extremely fast inside a worker, never say You have three paths for getting things done. Choosing the right one matters. -**Branch** — for thinking and memory. Branch when you need to recall, save, or forget something from long-term memory, reason through a complex decision, figure out what instructions to give a worker, or retrieve transcript context from another channel. Branches have your full conversation context and access to the memory system (recall, save, and delete) plus cross-channel transcript recall (`channel_recall`). They return a conclusion. You never see the working. Branch often — it's cheap and keeps you responsive. +**Branch** — for thinking and memory. Branch when you need to recall, save, or forget something from long-term memory, reason through a complex decision, figure out what instructions to give a worker, or retrieve transcript context from another channel. Branches have your full conversation context and access to the memory system (recall, save, and delete), cross-channel transcript recall (`channel_recall`), and worker transcript inspection (`worker_inspect`). They return a conclusion. You never see the working. Branch often — it's cheap and keeps you responsive. + +Use `worker_inspect` in a branch when you need to verify what a worker actually did — what tools it called, what results it got, what sources it checked. Useful when a worker returns a thin or unexpected result, or when the user asks "what did you actually do?" **Worker** — for doing. Workers have task tools (see Worker Capabilities section below). They do NOT have your conversation context or access to memories — they only know what you tell them in the task description, so be specific. Two flavors: @@ -59,6 +61,8 @@ The key distinction: branches think, workers do, you talk. Never use a worker fo When an interactive worker is active and the user's message is directed at that work, route the message to the worker instead of spawning a new one. +**Cancel** — for stopping work. Use `cancel` when a worker is stuck, taking too long, working on the wrong thing, or the user asks you to stop it. You can cancel workers and branches by their ID (visible in the status block and in spawn confirmations). Don't let a runaway worker burn tokens — if something looks wrong, cancel it and start fresh. + ## When To Stay Silent You have a `skip` tool. Use it. Not every message needs a response from you. From 70477ce9dbdb46c37e5dc783700986a44c829a62 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 23 Feb 2026 21:25:01 -0800 Subject: [PATCH 07/10] feat(worker-status): normalize worker status handling and improve UI badge rendering; update cancellation logging in channel --- interface/src/routes/AgentWorkers.tsx | 21 ++++++++++++++------- src/agent/channel.rs | 6 +++++- src/conversation/history.rs | 23 ++++++++--------------- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/interface/src/routes/AgentWorkers.tsx b/interface/src/routes/AgentWorkers.tsx index 2498cebce..56f128500 100644 --- a/interface/src/routes/AgentWorkers.tsx +++ b/interface/src/routes/AgentWorkers.tsx @@ -19,16 +19,23 @@ import {cx} from "@/ui/utils"; const STATUS_FILTERS = ["all", "running", "done", "failed"] as const; type StatusFilter = (typeof STATUS_FILTERS)[number]; +const KNOWN_STATUSES = new Set(["running", "done", "failed"]); + +function normalizeStatus(status: string): string { + if (KNOWN_STATUSES.has(status)) return status; + // Legacy rows where set_status text overwrote the state enum. + // If it has a completed_at it finished, otherwise it was interrupted. + return "failed"; +} + function statusBadgeVariant(status: string) { switch (status) { case "running": return "amber" as const; - case "done": - return "outline" as const; case "failed": return "red" as const; default: - return "default" as const; + return "outline" as const; } } @@ -297,7 +304,7 @@ function WorkerCard({ {isRunning && ( )} - {isRunning ? "running" : worker.status} + {isRunning ? "running" : normalizeStatus(worker.status)}
@@ -388,14 +395,14 @@ function WorkerDetail({ {isRunning && ( )} - {isRunning ? "running" : detail.status} + {isRunning ? "running" : normalizeStatus(detail.status)}
@@ -530,7 +537,7 @@ function CancelWorkerButton({ setCancelling(true); api.cancelProcess(channelId, "worker", workerId).catch(console.warn); }} - className="rounded-md border border-red-500/30 px-2 py-0.5 text-tiny font-medium text-red-400 transition-colors hover:bg-red-500/15 disabled:opacity-50" + className="rounded-md border border-app-line px-2 py-0.5 text-tiny font-medium text-ink-dull transition-colors hover:border-ink-faint hover:text-ink disabled:opacity-50" > {cancelling ? "Cancelling..." : "Cancel"} diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 7cf4db43f..2e27ee139 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -74,9 +74,13 @@ impl ChannelState { if let Some(handle) = handle { handle.abort(); + // Mark the DB row as cancelled since the abort prevents WorkerComplete from firing + self.process_run_logger + .log_worker_completed(worker_id, "Worker cancelled", false); Ok(()) } else if removed { - // Worker was in active_workers but had no handle (shouldn't happen, but handle gracefully) + self.process_run_logger + .log_worker_completed(worker_id, "Worker cancelled", false); Ok(()) } else { Err(format!("Worker {worker_id} not found")) diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 0ae31408a..0079f87e5 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -309,21 +309,14 @@ impl ProcessRunLogger { } /// Update a worker's status. Fire-and-forget. - pub fn log_worker_status(&self, worker_id: WorkerId, status: &str) { - let pool = self.pool.clone(); - let id = worker_id.to_string(); - let status = status.to_string(); - - tokio::spawn(async move { - if let Err(error) = sqlx::query("UPDATE worker_runs SET status = ? WHERE id = ?") - .bind(&status) - .bind(&id) - .execute(&pool) - .await - { - tracing::warn!(%error, worker_id = %id, "failed to persist worker status"); - } - }); + /// Worker status text updates are transient — they're available via the + /// in-memory StatusBlock for live workers and don't need to be persisted. + /// The `status` column is reserved for the state enum (running/done/failed). + pub fn log_worker_status(&self, _worker_id: WorkerId, _status: &str) { + // Intentionally a no-op. Status text was previously written to the + // `status` column, overwriting the state enum with free-text like + // "Searching for weather in Germany" which broke badge rendering + // and status filtering. } /// Record a worker completing with its result. Fire-and-forget. From 3b90448ecb4b6c946cc61a6014c4ce04ae1bd0a6 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Mon, 23 Feb 2026 21:38:42 -0800 Subject: [PATCH 08/10] feat(webchat): implement SSE message handling for assistant responses and improve message rendering logic --- interface/src/components/WebChatPanel.tsx | 55 +++++++++++++++++++---- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/interface/src/components/WebChatPanel.tsx b/interface/src/components/WebChatPanel.tsx index 747e0de5a..3f91e3d73 100644 --- a/interface/src/components/WebChatPanel.tsx +++ b/interface/src/components/WebChatPanel.tsx @@ -174,14 +174,51 @@ export function WebChatPanel({agentId}: WebChatPanelProps) { useWebChat(agentId); const {liveStates} = useLiveContext(); const [input, setInput] = useState(""); + const [sseMessages, setSseMessages] = useState<{id: string; role: "assistant"; content: string}[]>([]); const messagesEndRef = useRef(null); const sessionId = getPortalChatSessionId(agentId); const activeWorkers = Object.values(liveStates[sessionId]?.workers ?? {}); const hasActiveWorkers = activeWorkers.length > 0; + // Pick up assistant messages from the global SSE stream that arrived + // after the webchat request SSE closed (e.g. worker completion retriggers). + const timeline = liveStates[sessionId]?.timeline; + const seenIdsRef = useRef(new Set()); + useEffect(() => { + if (!timeline) return; + // Seed seen IDs from webchat messages so we don't duplicate + for (const m of messages) seenIdsRef.current.add(m.id); + + const newMessages: {id: string; role: "assistant"; content: string}[] = []; + for (const item of timeline) { + if ( + item.type === "message" && + item.role === "assistant" && + !seenIdsRef.current.has(item.id) + ) { + seenIdsRef.current.add(item.id); + newMessages.push({ + id: item.id, + role: "assistant", + content: item.content, + }); + } + } + if (newMessages.length > 0) { + setSseMessages((prev) => [...prev, ...newMessages]); + } + }, [timeline, messages]); + + // Clear SSE messages when a new webchat send starts (they'll be in history on next load) + useEffect(() => { + if (isStreaming) setSseMessages([]); + }, [isStreaming]); + + const allMessages = [...messages, ...sseMessages]; + useEffect(() => { messagesEndRef.current?.scrollIntoView({behavior: "smooth"}); - }, [messages.length, isStreaming, toolActivity.length, activeWorkers.length]); + }, [allMessages.length, isStreaming, toolActivity.length, activeWorkers.length]); const handleSubmit = () => { const trimmed = input.trim(); @@ -201,7 +238,7 @@ export function WebChatPanel({agentId}: WebChatPanelProps) {
)} - {messages.length === 0 && !isStreaming && ( + {allMessages.length === 0 && !isStreaming && (

Start a conversation with {agentId} @@ -209,7 +246,7 @@ export function WebChatPanel({agentId}: WebChatPanelProps) {

)} - {messages.map((message) => ( + {allMessages.map((message) => (
{message.role === "user" ? (
@@ -225,18 +262,18 @@ export function WebChatPanel({agentId}: WebChatPanelProps) {
))} - {/* Streaming state */} - {isStreaming && - messages[messages.length - 1]?.role !== "assistant" && ( + {/* Streaming state */} + {isStreaming && + allMessages[allMessages.length - 1]?.role !== "assistant" && (
{toolActivity.length === 0 && }
)} - {/* Inline tool activity during streaming assistant message */} - {isStreaming && - messages[messages.length - 1]?.role === "assistant" && + {/* Inline tool activity during streaming assistant message */} + {isStreaming && + allMessages[allMessages.length - 1]?.role === "assistant" && toolActivity.length > 0 && ( )} From 896acecf081386ef22b93cc7cd005a7d1b68e4e0 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Tue, 24 Feb 2026 02:35:39 -0800 Subject: [PATCH 09/10] Add ProcessRunLogger to branch tool server initialization in context dump tests - Introduced ProcessRunLogger to both dump_branch_context and dump_all_contexts functions. - Updated branch tool server creation to include the new logger for improved process tracking. --- tests/context_dump.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/context_dump.rs b/tests/context_dump.rs index 30780f261..a455ae62f 100644 --- a/tests/context_dump.rs +++ b/tests/context_dump.rs @@ -274,10 +274,13 @@ async fn dump_branch_context() { let conversation_logger = spacebot::conversation::ConversationLogger::new(deps.sqlite_pool.clone()); let channel_store = spacebot::conversation::ChannelStore::new(deps.sqlite_pool.clone()); + let run_logger = spacebot::conversation::ProcessRunLogger::new(deps.sqlite_pool.clone()); let branch_tool_server = spacebot::tools::create_branch_tool_server( deps.memory_search.clone(), conversation_logger, channel_store, + run_logger, + "test-agent", ); let tool_defs = branch_tool_server @@ -459,10 +462,13 @@ async fn dump_all_contexts() { let branch_prompt = prompt_engine .render_branch_prompt(&instance_dir, &workspace_dir) .expect("failed to render branch prompt"); + let run_logger = spacebot::conversation::ProcessRunLogger::new(deps.sqlite_pool.clone()); let branch_tool_server = spacebot::tools::create_branch_tool_server( deps.memory_search.clone(), conversation_logger, channel_store, + run_logger, + "test-agent", ); let branch_tool_defs = branch_tool_server.get_tool_defs(None).await.unwrap(); let branch_tools_text = format_tool_defs(&branch_tool_defs); From 714620953ba2cef55f81cfe9c2f8b409acfc6753 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Tue, 24 Feb 2026 07:36:16 -0800 Subject: [PATCH 10/10] Refactor WebChatPanel and LiveContextProvider for improved message handling and deduplication - Updated WebChatPanel to use useMemo for deduplicating messages from SSE and API. - Enhanced LiveContextProvider to include agentId in activeWorkers and optimized the retrieval of active workers based on agentId. - Adjusted AgentWorkers to utilize scopedActiveWorkers for better performance and clarity. - Modified worker history management in the Rust backend to improve context handling and transcript persistence. - Improved API endpoints for listing and retrieving worker details to ensure proper agent context is maintained. --- interface/src/components/WebChatPanel.tsx | 8 +++- interface/src/hooks/useLiveContext.tsx | 45 ++++++++++++++------ interface/src/routes/AgentWorkers.tsx | 27 +++++++----- interface/src/routes/ChannelDetail.tsx | 18 ++++---- src/agent/worker.rs | 51 ++++++++++++++++------- src/api/workers.rs | 12 ++---- src/conversation/history.rs | 20 ++++++--- src/hooks/spacebot.rs | 6 +-- src/tools/worker_inspect.rs | 15 +++++-- 9 files changed, 136 insertions(+), 66 deletions(-) diff --git a/interface/src/components/WebChatPanel.tsx b/interface/src/components/WebChatPanel.tsx index 3f91e3d73..1e322c9e9 100644 --- a/interface/src/components/WebChatPanel.tsx +++ b/interface/src/components/WebChatPanel.tsx @@ -1,4 +1,4 @@ -import {useEffect, useRef, useState} from "react"; +import {useEffect, useMemo, useRef, useState} from "react"; import { useWebChat, getPortalChatSessionId, @@ -214,7 +214,11 @@ export function WebChatPanel({agentId}: WebChatPanelProps) { if (isStreaming) setSseMessages([]); }, [isStreaming]); - const allMessages = [...messages, ...sseMessages]; + const allMessages = useMemo(() => { + const messageIds = new Set(messages.map((message) => message.id)); + const dedupedSse = sseMessages.filter((message) => !messageIds.has(message.id)); + return [...messages, ...dedupedSse]; + }, [messages, sseMessages]); useEffect(() => { messagesEndRef.current?.scrollIntoView({behavior: "smooth"}); diff --git a/interface/src/hooks/useLiveContext.tsx b/interface/src/hooks/useLiveContext.tsx index 46a2cf8a2..e64c7a765 100644 --- a/interface/src/hooks/useLiveContext.tsx +++ b/interface/src/hooks/useLiveContext.tsx @@ -13,7 +13,7 @@ interface LiveContextValue { /** Set of edge IDs ("from->to") with recent message activity */ activeLinks: Set; /** Flat map of all active workers across all channels, keyed by worker_id. */ - activeWorkers: Record; + activeWorkers: Record; /** Monotonically increasing counter, bumped on every worker lifecycle SSE event. */ workerEventVersion: number; /** Live transcript steps for running workers, keyed by worker_id. Built from SSE tool events. */ @@ -62,15 +62,20 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { const [liveTranscripts, setLiveTranscripts] = useState>({}); // Derive flat active workers from channel live states + const pendingToolCallIdsRef = useRef>>({}); + const activeWorkers = useMemo(() => { - const map: Record = {}; + const channelAgentIds = new Map(channels.map((channel) => [channel.id, channel.agent_id])); + const map: Record = {}; for (const [channelId, state] of Object.entries(liveStates)) { + const channelAgentId = channelAgentIds.get(channelId); + if (!channelAgentId) continue; for (const [workerId, worker] of Object.entries(state.workers)) { - map[workerId] = { ...worker, channelId }; + map[workerId] = { ...worker, channelId, agentId: channelAgentId }; } } return map; - }, [liveStates]); + }, [liveStates, channels]); // Track recently active link edges const [activeLinks, setActiveLinks] = useState>(new Set()); @@ -119,6 +124,9 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { // and accumulate live transcript steps from SSE events. const wrappedWorkerStarted = useCallback((data: unknown) => { channelHandlers.worker_started(data); + const event = data as { worker_id: string }; + setLiveTranscripts((prev) => ({ ...prev, [event.worker_id]: [] })); + delete pendingToolCallIdsRef.current[event.worker_id]; bumpWorkerVersion(); }, [channelHandlers, bumpWorkerVersion]); @@ -141,13 +149,8 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { const wrappedWorkerCompleted = useCallback((data: unknown) => { channelHandlers.worker_completed(data); - // Clear the live transcript on completion (persisted transcript takes over) const event = data as { worker_id: string }; - setLiveTranscripts((prev) => { - if (!prev[event.worker_id]) return prev; - const { [event.worker_id]: _, ...rest } = prev; - return rest; - }); + delete pendingToolCallIdsRef.current[event.worker_id]; bumpWorkerVersion(); }, [channelHandlers, bumpWorkerVersion]); @@ -155,13 +158,18 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { channelHandlers.tool_started(data); const event = data as ToolStartedEvent; if (event.process_type === "worker") { + const callId = crypto.randomUUID(); + const pendingByTool = pendingToolCallIdsRef.current[event.process_id] ?? {}; + const queue = pendingByTool[event.tool_name] ?? []; + pendingByTool[event.tool_name] = [...queue, callId]; + pendingToolCallIdsRef.current[event.process_id] = pendingByTool; setLiveTranscripts((prev) => { const steps = prev[event.process_id] ?? []; const step: TranscriptStep = { type: "action", content: [{ type: "tool_call", - id: `${event.tool_name}:${steps.length}`, + id: callId, name: event.tool_name, args: event.args || "", }], @@ -176,11 +184,24 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { channelHandlers.tool_completed(data); const event = data as ToolCompletedEvent; if (event.process_type === "worker") { + const pendingByTool = pendingToolCallIdsRef.current[event.process_id]; + const queue = pendingByTool?.[event.tool_name] ?? []; + const [callId, ...rest] = queue; + if (pendingByTool) { + if (rest.length > 0) { + pendingByTool[event.tool_name] = rest; + } else { + delete pendingByTool[event.tool_name]; + } + if (Object.keys(pendingByTool).length === 0) { + delete pendingToolCallIdsRef.current[event.process_id]; + } + } setLiveTranscripts((prev) => { const steps = prev[event.process_id] ?? []; const step: TranscriptStep = { type: "tool_result", - call_id: `${event.tool_name}:${steps.length}`, + call_id: callId ?? `${event.process_id}:${event.tool_name}:${steps.length}`, name: event.tool_name, text: event.result || "", }; diff --git a/interface/src/routes/AgentWorkers.tsx b/interface/src/routes/AgentWorkers.tsx index 56f128500..f52718f90 100644 --- a/interface/src/routes/AgentWorkers.tsx +++ b/interface/src/routes/AgentWorkers.tsx @@ -99,6 +99,12 @@ export function AgentWorkers({agentId}: {agentId: string}) { const workers = listData?.workers ?? []; const total = listData?.total ?? 0; + const scopedActiveWorkers = useMemo(() => { + const entries = Object.entries(activeWorkers).filter( + ([, worker]) => worker.agentId === agentId, + ); + return Object.fromEntries(entries); + }, [activeWorkers, agentId]); // Merge live SSE state onto the API-returned list. // Workers that exist in SSE state but haven't hit the DB yet @@ -108,7 +114,7 @@ export function AgentWorkers({agentId}: {agentId: string}) { // Overlay live state onto existing DB rows const merged = workers.map((worker) => { - const live = activeWorkers[worker.id]; + const live = scopedActiveWorkers[worker.id]; if (!live) return worker; return { ...worker, @@ -119,7 +125,7 @@ export function AgentWorkers({agentId}: {agentId: string}) { }); // Synthesize entries for workers only known via SSE (not in DB yet) - const synthetic: WorkerRunInfo[] = Object.values(activeWorkers) + const synthetic: WorkerRunInfo[] = Object.values(scopedActiveWorkers) .filter((w) => !dbIds.has(w.id)) .map((live) => ({ id: live.id, @@ -136,7 +142,7 @@ export function AgentWorkers({agentId}: {agentId: string}) { })); return [...synthetic, ...merged]; - }, [workers, activeWorkers]); + }, [workers, scopedActiveWorkers]); // Client-side task text search filter const filteredWorkers = useMemo(() => { @@ -149,7 +155,7 @@ export function AgentWorkers({agentId}: {agentId: string}) { // Running workers that haven't hit the DB yet still get a full detail view // from SSE state + live transcript. const mergedDetail: WorkerDetailResponse | null = useMemo(() => { - const live = selectedWorkerId ? activeWorkers[selectedWorkerId] : null; + const live = selectedWorkerId ? scopedActiveWorkers[selectedWorkerId] : null; if (detailData) { // DB data exists — overlay live status if worker is still running @@ -172,7 +178,7 @@ export function AgentWorkers({agentId}: {agentId: string}) { transcript: null, tool_calls: live.toolCalls, }; - }, [detailData, activeWorkers, selectedWorkerId]); + }, [detailData, scopedActiveWorkers, selectedWorkerId]); const selectWorker = useCallback( (workerId: string | null) => { @@ -230,7 +236,7 @@ export function AgentWorkers({agentId}: {agentId: string}) { selectWorker(worker.id)} /> @@ -244,7 +250,7 @@ export function AgentWorkers({agentId}: {agentId: string}) { {selectedWorkerId && mergedDetail ? ( ) : ( @@ -280,9 +286,7 @@ function WorkerCard({ onClick: () => void; }) { const isRunning = worker.status === "running" || !!liveWorker; - const displayStatus = liveWorker?.status ?? worker.live_status; const toolCalls = liveWorker?.toolCalls ?? worker.tool_calls; - const currentTool = liveWorker?.currentTool; return (