diff --git a/AGENTS.md b/AGENTS.md index 982adfa..b9f052b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -758,6 +758,8 @@ See `apps/api/.env.example`: - `ACP_VIEWER_SEND_BUFFER` - VM Agent: per-viewer send channel buffer size (default: 256) - `ACP_PING_INTERVAL` - VM Agent: WebSocket ping interval for stale connection detection (default: 30s) - `ACP_PONG_TIMEOUT` - VM Agent: WebSocket pong deadline after ping (default: 10s) +- `ACP_PROMPT_TIMEOUT` - VM Agent: max ACP prompt runtime before timeout/force-stop fallback (default: 10m) +- `ACP_PROMPT_CANCEL_GRACE_PERIOD` - VM Agent: grace wait after cancel before force-stop fallback (default: 5s) - `MAX_NODE_EVENTS` - VM Agent: max node-level events retained in memory (default: 500) - `MAX_WORKSPACE_EVENTS` - VM Agent: max workspace-level events retained in memory (default: 500) - `MAX_VM_AGENT_ERROR_BODY_BYTES` - API: max VM agent error request body (default: 32768) diff --git a/CLAUDE.md b/CLAUDE.md index 982adfa..b9f052b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -758,6 +758,8 @@ See `apps/api/.env.example`: - `ACP_VIEWER_SEND_BUFFER` - VM Agent: per-viewer send channel buffer size (default: 256) - `ACP_PING_INTERVAL` - VM Agent: WebSocket ping interval for stale connection detection (default: 30s) - `ACP_PONG_TIMEOUT` - VM Agent: WebSocket pong deadline after ping (default: 10s) +- `ACP_PROMPT_TIMEOUT` - VM Agent: max ACP prompt runtime before timeout/force-stop fallback (default: 10m) +- `ACP_PROMPT_CANCEL_GRACE_PERIOD` - VM Agent: grace wait after cancel before force-stop fallback (default: 5s) - `MAX_NODE_EVENTS` - VM Agent: max node-level events retained in memory (default: 500) - `MAX_WORKSPACE_EVENTS` - VM Agent: max workspace-level events retained in memory (default: 500) - `MAX_VM_AGENT_ERROR_BODY_BYTES` - API: max VM agent error request body (default: 32768) diff --git a/apps/web/src/components/ChatSession.tsx b/apps/web/src/components/ChatSession.tsx index ff6b746..0d1b188 100644 --- a/apps/web/src/components/ChatSession.tsx +++ b/apps/web/src/components/ChatSession.tsx @@ -71,7 +71,7 @@ export const ChatSession = React.forwardRef useImperativeHandle(ref, () => ({ focusInput: () => agentPanelRef.current?.focusInput(), })); - const [resolvedWsUrl, setResolvedWsUrl] = useState(null); + const wsUrlCacheRef = useRef<{ url: string; resolvedAt: number } | null>(null); // Resolve transcription API URL once (stable across renders) const transcribeApiUrl = useMemo(() => getTranscribeApiUrl(), []); @@ -106,54 +106,41 @@ export const ChatSession = React.forwardRef [workspaceId, sessionId] ); - // Fetch token and build full WS URL useEffect(() => { - if (!wsHostInfo) { - setResolvedWsUrl(null); - return; + wsUrlCacheRef.current = null; + }, [wsHostInfo, workspaceId, sessionId, worktreePath]); + + const resolveWsUrl = useCallback(async (): Promise => { + if (!wsHostInfo) return null; + + const cached = wsUrlCacheRef.current; + if (cached && Date.now() - cached.resolvedAt < 15_000) { + return cached.url; } - let cancelled = false; + reportError({ + level: 'info', + message: 'Resolving ACP WebSocket URL with fresh terminal token', + source: 'acp-chat', + context: { workspaceId, sessionId }, + }); - const fetchToken = async () => { + try { + const { token } = await getTerminalToken(workspaceId); + const sessionQuery = `&sessionId=${encodeURIComponent(sessionId)}`; + const worktreeQuery = worktreePath ? `&worktree=${encodeURIComponent(worktreePath)}` : ''; + const url = `${wsHostInfo}/agent/ws?token=${encodeURIComponent(token)}${sessionQuery}${worktreeQuery}`; + wsUrlCacheRef.current = { url, resolvedAt: Date.now() }; + return url; + } catch (err) { reportError({ - level: 'info', - message: 'Fetching terminal token', + level: 'error', + message: `Terminal token fetch failed: ${err instanceof Error ? err.message : String(err)}`, source: 'acp-chat', context: { workspaceId, sessionId }, }); - - try { - const { token } = await getTerminalToken(workspaceId); - if (cancelled) return; - - reportError({ - level: 'info', - message: 'Terminal token fetched', - source: 'acp-chat', - context: { workspaceId, sessionId, tokenLength: token.length }, - }); - - const sessionQuery = `&sessionId=${encodeURIComponent(sessionId)}`; - const worktreeQuery = worktreePath ? `&worktree=${encodeURIComponent(worktreePath)}` : ''; - setResolvedWsUrl( - `${wsHostInfo}/agent/ws?token=${encodeURIComponent(token)}${sessionQuery}${worktreeQuery}` - ); - } catch (err) { - if (cancelled) return; - reportError({ - level: 'error', - message: `Terminal token fetch failed: ${err instanceof Error ? err.message : String(err)}`, - source: 'acp-chat', - context: { workspaceId, sessionId }, - }); - } - }; - - void fetchToken(); - return () => { - cancelled = true; - }; + throw err; + } }, [wsHostInfo, workspaceId, sessionId, worktreePath]); // Each chat session gets its own message store. @@ -163,7 +150,8 @@ export const ChatSession = React.forwardRef // Own ACP session hook — separate WebSocket per chat tab const acpSession = useAcpSession({ - wsUrl: resolvedWsUrl, + wsUrl: null, + resolveWsUrl, onAcpMessage: acpMessages.processMessage, onLifecycleEvent: handleLifecycleEvent, onPrepareForReplay: acpMessages.prepareForReplay, diff --git a/apps/web/src/pages/Workspace.tsx b/apps/web/src/pages/Workspace.tsx index df87bbb..3024e77 100644 --- a/apps/web/src/pages/Workspace.tsx +++ b/apps/web/src/pages/Workspace.tsx @@ -183,6 +183,7 @@ export function Workspace() { const [paletteFileIndex, setPaletteFileIndex] = useState([]); const [paletteFileIndexLoading, setPaletteFileIndexLoading] = useState(false); const paletteFileIndexLoaded = useRef(false); + const terminalWsUrlCacheRef = useRef<{ url: string; resolvedAt: number } | null>(null); const tabOrder = useTabOrder(id); @@ -254,12 +255,48 @@ export function Workspace() { return () => clearInterval(interval); }, [id, workspace?.status, loadWorkspaceState]); + const buildTerminalWsUrl = useCallback( + (token: string): string | null => { + if (!workspace?.url) return null; + try { + const url = new URL(workspace.url); + const wsProtocol = url.protocol === 'https:' ? 'wss:' : 'ws:'; + const wsPath = featureFlags.multiTerminal ? '/terminal/ws/multi' : '/terminal/ws'; + return `${wsProtocol}//${url.host}${wsPath}?token=${encodeURIComponent(token)}`; + } catch { + return null; + } + }, + [workspace?.url, featureFlags.multiTerminal] + ); + + useEffect(() => { + terminalWsUrlCacheRef.current = null; + }, [workspace?.url, id, featureFlags.multiTerminal]); + + const resolveTerminalWsUrl = useCallback(async (): Promise => { + if (!id) return null; + + const cached = terminalWsUrlCacheRef.current; + if (cached && Date.now() - cached.resolvedAt < 15_000) { + return cached.url; + } + + const { token } = await getTerminalToken(id); + const resolvedUrl = buildTerminalWsUrl(token); + if (!resolvedUrl) { + throw new Error('Invalid workspace URL'); + } + terminalWsUrlCacheRef.current = { url: resolvedUrl, resolvedAt: Date.now() }; + return resolvedUrl; + }, [id, buildTerminalWsUrl]); + // Derive WebSocket URL from the terminal token. // Only update wsUrl on the INITIAL token fetch or when the workspace URL // changes — NOT on proactive token refreshes. Changing wsUrl tears down // the WebSocket and triggers a full reconnect, which re-triggers replay - // and can cause jumbled messages (Bug 5). The refreshed token is still - // available via `terminalToken` for the next reconnection attempt. + // and can cause jumbled messages (Bug 5). Reconnect paths resolve a fresh + // URL/token via resolveTerminalWsUrl without resetting the live socket. const wsUrlSetRef = useRef(false); useEffect(() => { if (!workspace?.url || !terminalToken || !isRunning) { @@ -272,18 +309,18 @@ export function Workspace() { // cause a reconnect while the connection is still alive. if (wsUrlSetRef.current) return; - try { - const url = new URL(workspace.url); - const wsProtocol = url.protocol === 'https:' ? 'wss:' : 'ws:'; - const wsPath = featureFlags.multiTerminal ? '/terminal/ws/multi' : '/terminal/ws'; - setWsUrl(`${wsProtocol}//${url.host}${wsPath}?token=${encodeURIComponent(terminalToken)}`); - setTerminalError(null); - wsUrlSetRef.current = true; - } catch { + const nextUrl = buildTerminalWsUrl(terminalToken); + if (!nextUrl) { setWsUrl(null); setTerminalError('Invalid workspace URL'); + return; } - }, [workspace?.url, terminalToken, isRunning, featureFlags.multiTerminal]); + + setWsUrl(nextUrl); + terminalWsUrlCacheRef.current = { url: nextUrl, resolvedAt: Date.now() }; + setTerminalError(null); + wsUrlSetRef.current = true; + }, [workspace?.url, terminalToken, isRunning, buildTerminalWsUrl]); // Fetch workspace events directly from the VM Agent (not proxied through control plane) useEffect(() => { @@ -1174,6 +1211,7 @@ export function Workspace() { { + terminalWsUrlCacheRef.current = null; void refreshTerminalToken(); }} disabled={terminalLoading} diff --git a/apps/web/tests/unit/pages/workspace.test.tsx b/apps/web/tests/unit/pages/workspace.test.tsx index 5b50555..de9eea2 100644 --- a/apps/web/tests/unit/pages/workspace.test.tsx +++ b/apps/web/tests/unit/pages/workspace.test.tsx @@ -484,7 +484,7 @@ describe('Workspace page', () => { }); }); - it('includes sessionId in ACP websocket URL when a sessionId is selected', async () => { + it('provides session-aware ACP URL resolver when a sessionId is selected', async () => { mocks.listAgentSessions.mockResolvedValue([ { id: 'sess-1', @@ -503,12 +503,26 @@ describe('Workspace page', () => { }); await waitFor(() => { - const wsUrls = mocks.useAcpSession.mock.calls - .map(([options]) => options?.wsUrl) - .filter((value): value is string => typeof value === 'string'); - expect( - wsUrls.some((url) => url.includes('sessionId=sess-1') && !url.includes('takeover=')) - ).toBe(true); + const resolvers = mocks.useAcpSession.mock.calls + .map(([options]) => options?.resolveWsUrl) + .filter((value): value is (() => Promise) => typeof value === 'function'); + expect(resolvers.length).toBeGreaterThan(0); + }); + + const resolver = mocks.useAcpSession.mock.calls + .map(([options]) => options?.resolveWsUrl) + .find((value): value is () => Promise => typeof value === 'function'); + + expect(resolver).toBeDefined(); + const resolvedUrl = await resolver!(); + expect(resolvedUrl).toBeTruthy(); + expect(resolvedUrl!).toContain('sessionId=sess-1'); + expect(resolvedUrl!).not.toContain('takeover='); + expect(mocks.getTerminalToken).toHaveBeenCalledWith('ws-123'); + await waitFor(() => { + // ChatSession uses resolver mode, so wsUrl is intentionally null. + const wsUrlValues = mocks.useAcpSession.mock.calls.map(([options]) => options?.wsUrl); + expect(wsUrlValues.some((value) => value === null)).toBe(true); }); }); diff --git a/packages/acp-client/src/hooks/useAcpSession.test.ts b/packages/acp-client/src/hooks/useAcpSession.test.ts index 8c3030c..fffbbb5 100644 --- a/packages/acp-client/src/hooks/useAcpSession.test.ts +++ b/packages/acp-client/src/hooks/useAcpSession.test.ts @@ -361,6 +361,41 @@ describe('useAcpSession prompt state restoration after replay', () => { }); }); +describe('useAcpSession replay prompt-done race handling', () => { + it('ends replay in ready when session_prompt_done arrives during replay', async () => { + const { result } = renderHook(() => + useAcpSession({ + wsUrl: 'ws://localhost/agent/ws', + }) + ); + + const ws = MockWebSocket.instances[0]!; + act(() => { + ws.emitOpen(); + ws.emitMessage({ + type: 'session_state', + status: 'prompting', + agentType: 'claude-code', + replayCount: 2, + }); + }); + + await waitFor(() => { + expect(result.current.state).toBe('replaying'); + }); + + act(() => { + ws.emitMessage({ type: 'session_prompt_done' }); + ws.emitMessage({ type: 'session_replay_complete' }); + }); + + await waitFor(() => { + expect(result.current.state).toBe('ready'); + expect(result.current.replaying).toBe(false); + }); + }); +}); + describe('useAcpSession agentType reset on reconnect', () => { it('clears agentType to null when WebSocket opens (Phase 1 hang fix)', async () => { const { result } = renderHook(() => useAcpSession({ @@ -506,3 +541,43 @@ describe('useAcpSession manual reconnect', () => { expect(MockWebSocket.instances.length).toBe(instanceCountBefore); }); }); + +describe('useAcpSession resolver-driven URL refresh', () => { + it('uses resolveWsUrl on initial connect and manual reconnect', async () => { + const resolveWsUrl = vi + .fn() + .mockResolvedValueOnce('ws://localhost/agent/ws?token=first') + .mockResolvedValueOnce('ws://localhost/agent/ws?token=second'); + + const { result } = renderHook(() => + useAcpSession({ + wsUrl: null, + resolveWsUrl, + }) + ); + + await waitFor(() => { + expect(MockWebSocket.instances.length).toBe(1); + }); + expect(MockWebSocket.instances[0]?.url).toContain('token=first'); + + act(() => { + MockWebSocket.instances[0]?.emitOpenAndIdle(); + }); + + await waitFor(() => { + expect(result.current.state).toBe('no_session'); + }); + + act(() => { + MockWebSocket.instances[0]?.close(); + result.current.reconnect(); + }); + + await waitFor(() => { + expect(MockWebSocket.instances.length).toBe(2); + }); + expect(MockWebSocket.instances[1]?.url).toContain('token=second'); + expect(resolveWsUrl).toHaveBeenCalledTimes(2); + }); +}); diff --git a/packages/acp-client/src/hooks/useAcpSession.ts b/packages/acp-client/src/hooks/useAcpSession.ts index e853717..b2cd433 100644 --- a/packages/acp-client/src/hooks/useAcpSession.ts +++ b/packages/acp-client/src/hooks/useAcpSession.ts @@ -50,6 +50,8 @@ function isGatewayErrorMessage(data: unknown): data is GatewayErrorMessage { export interface UseAcpSessionOptions { /** WebSocket URL for the ACP gateway (e.g., wss://host/agent/ws?token=JWT) */ wsUrl: string | null; + /** Optional resolver to fetch/build a fresh WebSocket URL before connect/reconnect. */ + resolveWsUrl?: () => Promise | string | null; /** Called when an ACP message is received from the agent */ onAcpMessage?: (message: AcpMessage) => void; /** Optional callback for lifecycle event logging */ @@ -109,6 +111,7 @@ function safeHost(url: string): string { export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { const { wsUrl, + resolveWsUrl, onAcpMessage, onLifecycleEvent, onPrepareForReplay, @@ -131,11 +134,20 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { const onPrepareForReplayRef = useRef(onPrepareForReplay); onPrepareForReplayRef.current = onPrepareForReplay; + const wsUrlRef = useRef(wsUrl); + wsUrlRef.current = wsUrl; + const resolveWsUrlRef = useRef(resolveWsUrl); + resolveWsUrlRef.current = resolveWsUrl; // Track the server-reported status so we can restore it after replay completes. // Without this, reconnecting during a prompt transitions replaying → ready, // even though the server is still in 'prompting' (deadlocking new prompts). const serverStatusRef = useRef(''); + // Track prompt completion observed during replay so replay_complete does not + // restore a stale prompting snapshot captured before replay started. + const replaySawPromptDoneRef = useRef(false); + // Track most recent URL used for connection attempts. + const connectUrlRef = useRef(wsUrl); // Reconnection state (refs to avoid re-triggering the effect) const reconnectTimerRef = useRef | null>(null); @@ -143,6 +155,7 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { const reconnectAttemptRef = useRef(0); const intentionalCloseRef = useRef(false); const wasConnectedRef = useRef(false); + const attemptReconnectRef = useRef<() => void>(() => {}); // Lifecycle logging helper const logLifecycle = useCallback(( @@ -233,9 +246,11 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { // This avoids the race where useEffect-based clear runs after replay // messages have already been appended (causing jumbled/duplicate text). onPrepareForReplayRef.current?.(); + replaySawPromptDoneRef.current = false; setState('replaying'); setReplaying(true); } else { + replaySawPromptDoneRef.current = false; setState(status === 'prompting' ? 'prompting' : 'ready'); setReplaying(false); } @@ -255,7 +270,9 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { // Handle session_replay_complete — all buffered messages have been delivered const handleSessionReplayComplete = useCallback(() => { const serverStatus = serverStatusRef.current; - logLifecycle('info', 'Session replay complete', { serverStatus }); + const sawPromptDone = replaySawPromptDoneRef.current; + replaySawPromptDoneRef.current = false; + logLifecycle('info', 'Session replay complete', { serverStatus, sawPromptDone }); setReplaying(false); // Restore the server-reported status instead of unconditionally going to // 'ready'. If the agent was 'prompting' when we reconnected, we must stay @@ -264,6 +281,7 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { // server's promptMu (the previous prompt is still blocking). setState((prev) => { if (prev !== 'replaying') return prev; + if (sawPromptDone) return 'ready'; if (serverStatus === 'prompting') return 'prompting'; return 'ready'; }); @@ -275,7 +293,12 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { if (prompting) { setState('prompting'); } else { - setState((prev) => (prev === 'prompting' ? 'ready' : prev)); + // Mark prompt completion immediately so replay_complete in the same + // event loop tick can observe it before React flushes state updates. + replaySawPromptDoneRef.current = true; + setState((prev) => { + return prev === 'prompting' ? 'ready' : prev; + }); } }, [logLifecycle]); @@ -323,7 +346,7 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { // Only reconnect if we were previously connected if (wasConnectedRef.current) { - attemptReconnect(url); + attemptReconnectRef.current(); } else { logLifecycle('error', 'WebSocket connection failed (never connected)', { host }); setState('error'); @@ -352,8 +375,52 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { return transport; }, [handleAgentStatus, handleAcpMessage, handleSessionState, handleSessionReplayComplete, handleSessionPrompting, logLifecycle]); // eslint-disable-line react-hooks/exhaustive-deps + const resolveConnectUrl = useCallback((fallbackUrl?: string | null) => { + if (resolveWsUrlRef.current) { + return resolveWsUrlRef.current(); + } + return fallbackUrl ?? wsUrlRef.current; + }, []); + + const connectWithResolvedUrl = useCallback((fallbackUrl?: string | null): Promise => { + const handleResolved = (resolved: string | null): boolean => { + if (!resolved) { + setState('error'); + setError('WebSocket URL unavailable'); + return false; + } + connectUrlRef.current = resolved; + connect(resolved); + return true; + }; + + const handleError = (err: unknown): boolean => { + const message = err instanceof Error ? err.message : 'Failed to resolve WebSocket URL'; + logLifecycle('warn', 'Failed to resolve WebSocket URL', { error: message }); + setState('error'); + setError(message); + return false; + }; + + try { + const resolvedOrPromise = resolveConnectUrl(fallbackUrl); + if ( + resolvedOrPromise && + typeof resolvedOrPromise === 'object' && + 'then' in resolvedOrPromise + ) { + return (resolvedOrPromise as Promise) + .then((resolved) => handleResolved(resolved)) + .catch((err) => handleError(err)); + } + return Promise.resolve(handleResolved(resolvedOrPromise as string | null)); + } catch (err) { + return Promise.resolve(handleError(err)); + } + }, [connect, logLifecycle, resolveConnectUrl]); + // Attempt reconnection with exponential backoff - const attemptReconnect = useCallback((url: string) => { + const attemptReconnect = useCallback(() => { const now = Date.now(); // Start the reconnect timer on first attempt @@ -389,13 +456,18 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { reconnectTimerRef.current = setTimeout(() => { reconnectTimerRef.current = null; - connect(url); + void connectWithResolvedUrl(connectUrlRef.current).then((ok) => { + if (!ok) { + attemptReconnectRef.current(); + } + }); }, delay); - }, [reconnectDelayMs, reconnectTimeoutMs, reconnectMaxDelayMs, connect, logLifecycle]); + }, [reconnectDelayMs, reconnectTimeoutMs, reconnectMaxDelayMs, connectWithResolvedUrl, logLifecycle]); + attemptReconnectRef.current = attemptReconnect; // Main connection effect useEffect(() => { - if (!wsUrl) { + if (!wsUrl && !resolveWsUrlRef.current) { setState('disconnected'); return; } @@ -408,9 +480,10 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { setState('connecting'); setError(null); - logLifecycle('info', 'Initiating connection', { host: safeHost(wsUrl) }); - - const transport = connect(wsUrl); + logLifecycle('info', 'Initiating connection', { + host: wsUrl ? safeHost(wsUrl) : 'resolver', + }); + void connectWithResolvedUrl(wsUrl); return () => { logLifecycle('info', 'Connection cleanup (intentional close)'); @@ -419,14 +492,16 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { clearTimeout(reconnectTimerRef.current); reconnectTimerRef.current = null; } - transport.close(); + if (transportRef.current) { + transportRef.current.close(); + } transportRef.current = null; }; - }, [wsUrl, connect, logLifecycle]); + }, [wsUrl, connectWithResolvedUrl, logLifecycle]); // Reconnect immediately when tab becomes visible again (mobile background tab fix) useEffect(() => { - if (!wsUrl) return; + if (!wsUrl && !resolveWsUrlRef.current) return; if (typeof document === 'undefined') return; const handleVisibilityChange = () => { @@ -450,18 +525,18 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { intentionalCloseRef.current = false; setState('reconnecting'); - connect(wsUrl); + void connectWithResolvedUrl(connectUrlRef.current); }; document.addEventListener('visibilitychange', handleVisibilityChange); return () => { document.removeEventListener('visibilitychange', handleVisibilityChange); }; - }, [wsUrl, connect, logLifecycle]); + }, [wsUrl, connectWithResolvedUrl, logLifecycle]); // Manual reconnect (exposed to UI for "Reconnect" button) const reconnect = useCallback(() => { - if (!wsUrl) return; + if (!wsUrl && !resolveWsUrlRef.current) return; if (transportRef.current?.connected) return; logLifecycle('info', 'Manual reconnect triggered'); @@ -486,8 +561,8 @@ export function useAcpSession(options: UseAcpSessionOptions): AcpSessionHandle { wasConnectedRef.current = true; // We want to reconnect setError(null); setState('reconnecting'); - connect(wsUrl); - }, [wsUrl, connect, logLifecycle]); + void connectWithResolvedUrl(connectUrlRef.current); + }, [wsUrl, connectWithResolvedUrl, logLifecycle]); // Switch to a different agent const switchAgent = useCallback((newAgentType: string) => { diff --git a/packages/terminal/src/MultiTerminal.test.tsx b/packages/terminal/src/MultiTerminal.test.tsx index 31e770e..9a1a99e 100644 --- a/packages/terminal/src/MultiTerminal.test.tsx +++ b/packages/terminal/src/MultiTerminal.test.tsx @@ -1,5 +1,5 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -import { render, screen, fireEvent, waitFor } from '@testing-library/react'; +import { act, render, screen, fireEvent, waitFor } from '@testing-library/react'; import { MultiTerminal } from './MultiTerminal'; // Mock xterm.js @@ -62,6 +62,7 @@ class MockWebSocket { static CLOSED = 3; static CONNECTING = 0; static CLOSING = 2; + static instances: MockWebSocket[] = []; static sessionListResponse: Array<{ sessionId: string; name?: string; @@ -80,6 +81,7 @@ class MockWebSocket { constructor(url: string) { this.url = url; + MockWebSocket.instances.push(this); setTimeout(() => { if (this.onopen) { this.onopen(new Event('open')); @@ -153,9 +155,11 @@ describe('MultiTerminal', () => { vi.clearAllMocks(); sessionStorage.clear(); MockWebSocket.sessionListResponse = []; + MockWebSocket.instances = []; }); afterEach(() => { + vi.useRealTimers(); vi.unstubAllEnvs(); }); @@ -361,4 +365,31 @@ describe('MultiTerminal', () => { const connectingMsg = container.querySelector('.terminal-status-message'); expect(connectingMsg || container.querySelector('.terminal-empty-state')).toBeDefined(); }); + + it('uses resolveWsUrl for reconnect attempts', async () => { + const resolveWsUrl = vi + .fn() + .mockResolvedValueOnce('ws://localhost:8080/terminal/ws/multi?token=first') + .mockResolvedValueOnce('ws://localhost:8080/terminal/ws/multi?token=second'); + + render(); + + await waitFor(() => { + expect(MockWebSocket.instances.length).toBe(1); + }); + expect(MockWebSocket.instances[0]?.url).toContain('token=first'); + + act(() => { + MockWebSocket.instances[0]?.close(); + }); + + await waitFor( + () => { + expect(MockWebSocket.instances.length).toBe(2); + }, + { timeout: 7000 } + ); + expect(MockWebSocket.instances[1]?.url).toContain('token=second'); + expect(resolveWsUrl).toHaveBeenCalledTimes(2); + }); }); diff --git a/packages/terminal/src/MultiTerminal.tsx b/packages/terminal/src/MultiTerminal.tsx index 4d00b90..60e6da8 100644 --- a/packages/terminal/src/MultiTerminal.tsx +++ b/packages/terminal/src/MultiTerminal.tsx @@ -49,6 +49,7 @@ export const MultiTerminal = React.forwardRef { const { wsUrl, + resolveWsUrl, defaultWorkDir, onActivity, className = '', @@ -86,6 +87,10 @@ export const MultiTerminal = React.forwardRef(null); + const wsUrlRef = useRef(wsUrl); + wsUrlRef.current = wsUrl; + const resolveWsUrlRef = useRef(resolveWsUrl); + resolveWsUrlRef.current = resolveWsUrl; const terminalsRef = useRef>(new Map()); const [wsConnected, setWsConnected] = useState(false); // Guard against duplicate list_sessions requests during rapid reconnect cycles @@ -240,18 +245,51 @@ export const MultiTerminal = React.forwardRef { let disposed = false; let reconnectTimeout: ReturnType; let pingInterval: ReturnType; let currentWs: WebSocket | null = null; - function connect() { + const scheduleReconnect = () => { if (disposed) return; + clearTimeout(reconnectTimeout); + reconnectTimeout = setTimeout(() => { + void connect(); + }, RECONNECT_DELAY_MS); + }; + + const resolveConnectUrl = async (): Promise => { + if (resolveWsUrlRef.current) { + const resolved = await resolveWsUrlRef.current(); + if (resolved) { + return resolved; + } + } + return wsUrlRef.current; + }; + + const connect = async () => { + if (disposed) return; + + let connectUrl: string | null = null; + try { + connectUrl = await resolveConnectUrl(); + } catch { + setWsConnected(false); + scheduleReconnect(); + return; + } + + if (!connectUrl) { + setWsConnected(false); + scheduleReconnect(); + return; + } try { - const ws = new WebSocket(wsUrl); + const ws = new WebSocket(connectUrl); currentWs = ws; ws.onopen = () => { @@ -457,19 +495,15 @@ export const MultiTerminal = React.forwardRef { disposed = true; diff --git a/packages/terminal/src/Terminal.tsx b/packages/terminal/src/Terminal.tsx index e66805d..45e93aa 100644 --- a/packages/terminal/src/Terminal.tsx +++ b/packages/terminal/src/Terminal.tsx @@ -23,6 +23,7 @@ const PING_INTERVAL_MS = 30_000; */ export function Terminal({ wsUrl, + resolveWsUrl, shutdownDeadline, onActivity, className = '', @@ -35,6 +36,7 @@ export function Terminal({ const { socket, state, retryCount, retry } = useWebSocket({ url: wsUrl, + resolveUrl: resolveWsUrl, maxRetries: MAX_RETRIES, }); diff --git a/packages/terminal/src/types.ts b/packages/terminal/src/types.ts index 7a01de4..d20c5f6 100644 --- a/packages/terminal/src/types.ts +++ b/packages/terminal/src/types.ts @@ -9,6 +9,8 @@ export type ConnectionState = 'connecting' | 'connected' | 'reconnecting' | 'fai export interface TerminalProps { /** WebSocket URL for terminal connection */ wsUrl: string; + /** Optional resolver used for fresh URL/token retrieval before connect/reconnect. */ + resolveWsUrl?: () => Promise | string | null; /** Optional shutdown deadline (ISO 8601 timestamp) */ shutdownDeadline?: string | null; /** Callback when user activity is detected */ @@ -44,7 +46,9 @@ export interface ConnectionOverlayProps { /** Options for useWebSocket hook */ export interface UseWebSocketOptions { /** WebSocket URL to connect to */ - url: string; + url: string | null; + /** Optional resolver used for fresh URL/token retrieval before connect/reconnect. */ + resolveUrl?: () => Promise | string | null; /** Maximum number of reconnection attempts (default: 5) */ maxRetries?: number; /** Base delay for exponential backoff in ms (default: 1000) */ diff --git a/packages/terminal/src/types/multi-terminal.ts b/packages/terminal/src/types/multi-terminal.ts index 503398e..e68f77b 100644 --- a/packages/terminal/src/types/multi-terminal.ts +++ b/packages/terminal/src/types/multi-terminal.ts @@ -266,6 +266,8 @@ export interface TabShortcutActions { export interface MultiTerminalProps { /** WebSocket URL for terminal connections */ wsUrl: string; + /** Optional resolver used for fresh URL/token retrieval before connect/reconnect. */ + resolveWsUrl?: () => Promise | string | null; /** Optional shutdown deadline */ shutdownDeadline?: string | null; diff --git a/packages/terminal/src/useWebSocket.test.ts b/packages/terminal/src/useWebSocket.test.ts new file mode 100644 index 0000000..fea0c55 --- /dev/null +++ b/packages/terminal/src/useWebSocket.test.ts @@ -0,0 +1,109 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { act, renderHook, waitFor } from '@testing-library/react'; +import { useWebSocket } from './useWebSocket'; + +class MockWebSocket { + static CONNECTING = 0; + static OPEN = 1; + static CLOSING = 2; + static CLOSED = 3; + static instances: MockWebSocket[] = []; + + readonly url: string; + readyState = MockWebSocket.CONNECTING; + onopen: ((event: Event) => void) | null = null; + onclose: ((event: CloseEvent) => void) | null = null; + onerror: ((event: Event) => void) | null = null; + + constructor(url: string) { + this.url = url; + MockWebSocket.instances.push(this); + } + + send = vi.fn(); + + close(code = 1000): void { + this.readyState = MockWebSocket.CLOSED; + this.onclose?.(new CloseEvent('close', { code })); + } + + emitOpen(): void { + this.readyState = MockWebSocket.OPEN; + this.onopen?.(new Event('open')); + } + + emitClose(code = 1006): void { + this.close(code); + } +} + +beforeEach(() => { + MockWebSocket.instances = []; + vi.stubGlobal('WebSocket', MockWebSocket as unknown as typeof WebSocket); +}); + +afterEach(() => { + vi.unstubAllGlobals(); +}); + +describe('useWebSocket URL resolution', () => { + it('uses resolver URL for initial connect and reconnect attempts', async () => { + const resolveUrl = vi + .fn() + .mockResolvedValueOnce('ws://localhost/terminal/ws?token=first') + .mockResolvedValueOnce('ws://localhost/terminal/ws?token=second'); + + const { result } = renderHook(() => + useWebSocket({ + url: 'ws://localhost/terminal/ws?token=stale', + resolveUrl, + maxRetries: 2, + baseDelay: 10, + maxDelay: 10, + }) + ); + + await waitFor(() => { + expect(MockWebSocket.instances.length).toBe(1); + }); + expect(MockWebSocket.instances[0]?.url).toContain('token=first'); + + act(() => { + MockWebSocket.instances[0]?.emitOpen(); + }); + + await waitFor(() => { + expect(result.current.state).toBe('connected'); + }); + + act(() => { + MockWebSocket.instances[0]?.emitClose(1006); + }); + + await waitFor(() => { + expect(result.current.state).toBe('reconnecting'); + }); + + await waitFor(() => { + expect(MockWebSocket.instances.length).toBe(2); + }); + expect(MockWebSocket.instances[1]?.url).toContain('token=second'); + expect(resolveUrl).toHaveBeenCalledTimes(2); + }); + + it('falls back to static URL when resolver returns null', async () => { + const resolveUrl = vi.fn().mockResolvedValue(null); + + renderHook(() => + useWebSocket({ + url: 'ws://localhost/terminal/ws?token=fallback', + resolveUrl, + }) + ); + + await waitFor(() => { + expect(MockWebSocket.instances.length).toBe(1); + }); + expect(MockWebSocket.instances[0]?.url).toContain('token=fallback'); + }); +}); diff --git a/packages/terminal/src/useWebSocket.ts b/packages/terminal/src/useWebSocket.ts index 3422237..1d78832 100644 --- a/packages/terminal/src/useWebSocket.ts +++ b/packages/terminal/src/useWebSocket.ts @@ -8,6 +8,7 @@ import type { ConnectionState, UseWebSocketOptions, UseWebSocketReturn } from '. export function useWebSocket(options: UseWebSocketOptions): UseWebSocketReturn { const { url, + resolveUrl, maxRetries = 5, baseDelay = 1000, maxDelay = 30000, @@ -22,6 +23,11 @@ export function useWebSocket(options: UseWebSocketOptions): UseWebSocketReturn { const reconnectTimeoutRef = useRef>(); const socketRef = useRef(null); const mountedRef = useRef(true); + const connectRef = useRef<() => Promise>(async () => {}); + const urlRef = useRef(url); + const resolveUrlRef = useRef(resolveUrl); + urlRef.current = url; + resolveUrlRef.current = resolveUrl; // Update state and notify callback const updateState = useCallback( @@ -41,19 +47,64 @@ export function useWebSocket(options: UseWebSocketOptions): UseWebSocketReturn { [baseDelay, maxDelay] ); + const resolveConnectUrl = useCallback(async (): Promise => { + if (resolveUrlRef.current) { + const resolved = await resolveUrlRef.current(); + if (resolved) { + return resolved; + } + } + return urlRef.current; + }, []); + + const scheduleReconnect = useCallback(() => { + if (!mountedRef.current) return; + + if (retriesRef.current < maxRetries) { + updateState('reconnecting'); + const delay = getDelay(retriesRef.current); + retriesRef.current++; + setRetryCount(retriesRef.current); + + reconnectTimeoutRef.current = setTimeout(() => { + if (mountedRef.current) { + void connectRef.current(); + } + }, delay); + } else { + updateState('failed'); + } + }, [maxRetries, getDelay, updateState]); + // Connect to WebSocket - const connect = useCallback(() => { + const connect = useCallback(async () => { if (!mountedRef.current) return; + updateState(retriesRef.current === 0 ? 'connecting' : 'reconnecting'); + // Clean up existing socket if (socketRef.current) { socketRef.current.close(1000); + socketRef.current = null; + setSocket(null); } - updateState(retriesRef.current === 0 ? 'connecting' : 'reconnecting'); + let connectUrl: string | null = null; + try { + connectUrl = await resolveConnectUrl(); + } catch (error) { + console.error('WebSocket URL resolution error:', error); + scheduleReconnect(); + return; + } + + if (!connectUrl) { + scheduleReconnect(); + return; + } try { - const ws = new WebSocket(url); + const ws = new WebSocket(connectUrl); ws.onopen = () => { if (!mountedRef.current) { @@ -73,21 +124,7 @@ export function useWebSocket(options: UseWebSocketOptions): UseWebSocketReturn { return; } - // Attempt reconnection - if (retriesRef.current < maxRetries) { - updateState('reconnecting'); - const delay = getDelay(retriesRef.current); - retriesRef.current++; - setRetryCount(retriesRef.current); - - reconnectTimeoutRef.current = setTimeout(() => { - if (mountedRef.current) { - connect(); - } - }, delay); - } else { - updateState('failed'); - } + scheduleReconnect(); }; ws.onerror = () => { @@ -98,17 +135,18 @@ export function useWebSocket(options: UseWebSocketOptions): UseWebSocketReturn { setSocket(ws); } catch (error) { console.error('WebSocket connection error:', error); - updateState('failed'); + scheduleReconnect(); } - }, [url, maxRetries, getDelay, updateState]); + }, [resolveConnectUrl, scheduleReconnect, updateState]); + connectRef.current = connect; // Manual retry function const retry = useCallback(() => { retriesRef.current = 0; setRetryCount(0); clearTimeout(reconnectTimeoutRef.current); - connect(); - }, [connect]); + void connectRef.current(); + }, []); // Disconnect and cleanup const disconnect = useCallback(() => { @@ -123,7 +161,14 @@ export function useWebSocket(options: UseWebSocketOptions): UseWebSocketReturn { // Initial connection useEffect(() => { mountedRef.current = true; - connect(); + if (!url && !resolveUrlRef.current) { + updateState('failed'); + return () => { + mountedRef.current = false; + clearTimeout(reconnectTimeoutRef.current); + }; + } + void connectRef.current(); return () => { mountedRef.current = false; @@ -132,7 +177,7 @@ export function useWebSocket(options: UseWebSocketOptions): UseWebSocketReturn { socketRef.current.close(1000); } }; - }, [connect]); + }, [url, updateState]); return { socket, diff --git a/packages/vm-agent/internal/acp/gateway.go b/packages/vm-agent/internal/acp/gateway.go index 0bf9d8d..0fc5775 100644 --- a/packages/vm-agent/internal/acp/gateway.go +++ b/packages/vm-agent/internal/acp/gateway.go @@ -98,6 +98,10 @@ type GatewayConfig struct { PingInterval time.Duration // PongTimeout is the pong deadline after sending a ping. Zero uses DefaultPongTimeout. PongTimeout time.Duration + // PromptTimeout bounds how long a prompt can run before force-stop fallback. + PromptTimeout time.Duration + // PromptCancelGracePeriod waits after cancel before force-stopping unresponsive prompt. + PromptCancelGracePeriod time.Duration } // Gateway is a thin per-WebSocket relay between a browser and a SessionHost. diff --git a/packages/vm-agent/internal/acp/session_host.go b/packages/vm-agent/internal/acp/session_host.go index 0986a4e..df70888 100644 --- a/packages/vm-agent/internal/acp/session_host.go +++ b/packages/vm-agent/internal/acp/session_host.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "log" @@ -31,6 +32,14 @@ const ( HostStopped SessionHostStatus = "stopped" // Explicitly stopped ) +const ( + // DefaultPromptTimeout bounds how long a single ACP Prompt call can run. + DefaultPromptTimeout = 10 * time.Minute + // DefaultPromptCancelGracePeriod is how long we wait after cancel before + // force-stopping an unresponsive agent process. + DefaultPromptCancelGracePeriod = 5 * time.Second +) + // DefaultMessageBufferSize is the default maximum number of messages buffered // per session for late-join replay. Override via ACP_MESSAGE_BUFFER_SIZE. const DefaultMessageBufferSize = 5000 @@ -98,13 +107,19 @@ type SessionHost struct { messageBuf []BufferedMessage seqCounter uint64 - // Prompt serialization — only one prompt at a time - promptMu sync.Mutex + // Prompt lifecycle state. + // promptMu guards promptInFlight (serialization gate only). + promptMu sync.Mutex + promptInFlight bool + promptSeq uint64 // promptCancelMu guards promptCancel independently from promptMu so that // CancelPrompt() can read it without waiting for Prompt() to finish. promptCancelMu sync.Mutex // promptCancel cancels the in-flight Prompt() context. Protected by promptCancelMu. promptCancel context.CancelFunc + // activePromptID identifies the in-flight prompt associated with promptCancel. + // Protected by promptCancelMu. + activePromptID uint64 // Stderr collection stderrMu sync.Mutex @@ -195,13 +210,19 @@ func (h *SessionHost) AttachViewer(id string, conn *websocket.Conn) *Viewer { log.Printf("SessionHost[%s]: viewer %s attached (total=%d)", h.config.SessionID, id, h.ViewerCount()) // Send current session state - h.sendToViewer(viewer, h.marshalSessionState(currentStatus, currentAgentType, currentErr)) + h.sendToViewerPriority(viewer, h.marshalSessionState(currentStatus, currentAgentType, currentErr)) // Replay buffered messages h.replayToViewer(viewer) // Signal replay complete - h.sendToViewer(viewer, h.marshalControl(MsgSessionReplayDone, nil)) + h.sendToViewerPriority(viewer, h.marshalControl(MsgSessionReplayDone, nil)) + + // Send a post-replay authoritative state snapshot. + // This closes the race where prompt status changes during replay and the + // initial pre-replay snapshot becomes stale. + finalStatus, finalAgentType, finalErr := h.currentSessionState() + h.sendToViewerPriority(viewer, h.marshalSessionState(finalStatus, finalAgentType, finalErr)) return viewer } @@ -392,15 +413,23 @@ func (h *SessionHost) HandlePrompt(ctx context.Context, reqID json.RawMessage, p return } - // Serialize prompts — only one at a time - h.promptMu.Lock() + promptTimeout := h.promptTimeout() + promptCtx, promptCancel := context.WithTimeout(ctx, promptTimeout) + promptID, ok := h.beginPrompt(promptCancel) + if !ok { + promptCancel() + h.sendJSONRPCErrorToViewer(viewerID, reqID, -32603, "Prompt already in progress") + return + } + defer func() { + h.endPrompt(promptID) + promptCancel() // release context resources + }() - // Create a cancellable context for this prompt. CancelPrompt() can - // invoke promptCancel to abort the blocking Prompt() call. - promptCtx, promptCancel := context.WithCancel(ctx) - h.promptCancelMu.Lock() - h.promptCancel = promptCancel - h.promptCancelMu.Unlock() + // Watchdog: if Prompt() ignores deadline/cancel, force-stop the agent. + promptDone := make(chan struct{}) + go h.watchPromptTimeout(promptID, promptCtx, promptDone, viewerID, reqID, promptTimeout) + defer close(promptDone) // Update status to prompting h.setStatus(HostPrompting, "") @@ -420,25 +449,27 @@ func (h *SessionHost) HandlePrompt(ctx context.Context, reqID json.RawMessage, p Prompt: blocks, }) - // Clean up cancel state and release the prompt lock - h.promptCancelMu.Lock() - h.promptCancel = nil - h.promptCancelMu.Unlock() - promptCancel() // release context resources - h.promptMu.Unlock() + // If the prompt was force-stopped while Prompt() was blocked, ignore late completion. + if !h.isPromptActive(promptID) { + return + } // Update status back to ready h.setStatus(HostReady, "") h.broadcastControl(MsgSessionPromptDone, nil) if err != nil { + errMsg := fmt.Sprintf("Prompt failed: %v", err) + if errors.Is(err, context.DeadlineExceeded) || errors.Is(promptCtx.Err(), context.DeadlineExceeded) { + errMsg = fmt.Sprintf("Prompt timed out after %s", promptTimeout) + } log.Printf("ACP Prompt failed: %v", err) h.reportLifecycle("warn", "ACP Prompt failed", map[string]interface{}{ - "error": err.Error(), + "error": errMsg, "duration": time.Since(promptStart).String(), }) // Broadcast error to all viewers so all tabs see it - errResp := h.marshalJSONRPCError(reqID, -32603, fmt.Sprintf("Prompt failed: %v", err)) + errResp := h.marshalJSONRPCError(reqID, -32603, errMsg) h.broadcastMessage(errResp) return } @@ -467,6 +498,7 @@ func (h *SessionHost) HandlePrompt(ctx context.Context, reqID json.RawMessage, p func (h *SessionHost) CancelPrompt() { h.promptCancelMu.Lock() cancelFn := h.promptCancel + promptID := h.activePromptID h.promptCancelMu.Unlock() if cancelFn == nil { @@ -477,6 +509,18 @@ func (h *SessionHost) CancelPrompt() { log.Printf("CancelPrompt: cancelling in-flight prompt") h.reportLifecycle("info", "Prompt cancel requested", nil) cancelFn() + + grace := h.promptCancelGracePeriod() + if grace <= 0 { + return + } + + go func(id uint64, wait time.Duration) { + timer := time.NewTimer(wait) + defer timer.Stop() + <-timer.C + h.triggerPromptForceStopIfStuck(id, fmt.Sprintf("Prompt cancel grace elapsed after %s", wait)) + }(promptID, grace) } // ForwardToAgent sends a raw message to the agent's stdin. @@ -919,8 +963,8 @@ func (h *SessionHost) persistAcpSessionID(agentType string) { // --- Internal: message broadcasting --- -// broadcastMessage appends a message to the buffer and sends it to all viewers. -func (h *SessionHost) broadcastMessage(data []byte) { +// appendMessage appends a message to the replay buffer. +func (h *SessionHost) appendMessage(data []byte) { // Append to buffer — sequence number assigned under lock to ensure // buffer ordering matches sequence ordering under concurrent writes. h.bufMu.Lock() @@ -936,11 +980,23 @@ func (h *SessionHost) broadcastMessage(data []byte) { h.messageBuf = h.messageBuf[excess:] } h.bufMu.Unlock() +} + +// broadcastMessage appends a message to the buffer and sends it to all viewers. +func (h *SessionHost) broadcastMessage(data []byte) { + h.broadcastMessageWithPriority(data, false) +} +func (h *SessionHost) broadcastMessageWithPriority(data []byte, priority bool) { + h.appendMessage(data) // Fan out to all viewers h.viewerMu.RLock() for _, viewer := range h.viewers { - h.sendToViewer(viewer, data) + if priority { + h.sendToViewerPriority(viewer, data) + } else { + h.sendToViewer(viewer, data) + } } h.viewerMu.RUnlock() } @@ -955,13 +1011,13 @@ func (h *SessionHost) broadcastAgentStatus(status AgentStatus, agentType, errMsg Error: errMsg, } data, _ := json.Marshal(msg) - h.broadcastMessage(data) + h.broadcastMessageWithPriority(data, true) } // broadcastControl broadcasts a control message to all viewers and buffers it. func (h *SessionHost) broadcastControl(msgType ControlMessageType, extra map[string]interface{}) { data := h.marshalControl(msgType, extra) - h.broadcastMessage(data) + h.broadcastMessageWithPriority(data, true) } // replayToViewer sends all buffered messages to a newly attached viewer. @@ -988,6 +1044,32 @@ func (h *SessionHost) sendToViewer(viewer *Viewer, data []byte) { } } +// sendToViewerPriority sends a high-priority message. +// If the channel is full, we evict one queued message and retry once so +// control/status updates are not silently dropped under replay backpressure. +func (h *SessionHost) sendToViewerPriority(viewer *Viewer, data []byte) { + select { + case viewer.sendCh <- data: + return + case <-viewer.done: + return + default: + } + + // Make room by dropping one queued item for this viewer. + select { + case <-viewer.sendCh: + default: + } + + select { + case viewer.sendCh <- data: + case <-viewer.done: + default: + log.Printf("SessionHost[%s]: viewer %s priority message dropped (buffer saturated)", h.config.SessionID, viewer.ID) + } +} + // viewerWritePump drains the viewer's send channel and writes to its WebSocket. func (h *SessionHost) viewerWritePump(viewer *Viewer) { defer viewer.conn.Close() @@ -1020,7 +1102,7 @@ func (h *SessionHost) sendJSONRPCErrorToViewer(viewerID string, reqID json.RawMe h.viewerMu.RUnlock() if ok { - h.sendToViewer(viewer, data) + h.sendToViewerPriority(viewer, data) } } @@ -1070,6 +1152,112 @@ func (h *SessionHost) marshalJSONRPCError(reqID json.RawMessage, code int, messa // --- Internal: helpers --- +func (h *SessionHost) currentSessionState() (SessionHostStatus, string, string) { + h.mu.RLock() + defer h.mu.RUnlock() + return h.status, h.agentType, h.statusErr +} + +func (h *SessionHost) promptTimeout() time.Duration { + if h.config.PromptTimeout > 0 { + return h.config.PromptTimeout + } + return DefaultPromptTimeout +} + +func (h *SessionHost) promptCancelGracePeriod() time.Duration { + if h.config.PromptCancelGracePeriod > 0 { + return h.config.PromptCancelGracePeriod + } + return DefaultPromptCancelGracePeriod +} + +func (h *SessionHost) beginPrompt(cancel context.CancelFunc) (uint64, bool) { + h.promptMu.Lock() + defer h.promptMu.Unlock() + if h.promptInFlight { + return 0, false + } + h.promptInFlight = true + promptID := atomic.AddUint64(&h.promptSeq, 1) + + h.promptCancelMu.Lock() + h.promptCancel = cancel + h.activePromptID = promptID + h.promptCancelMu.Unlock() + return promptID, true +} + +func (h *SessionHost) endPrompt(promptID uint64) { + h.promptMu.Lock() + h.promptInFlight = false + h.promptMu.Unlock() + + h.promptCancelMu.Lock() + if h.activePromptID == promptID { + h.activePromptID = 0 + h.promptCancel = nil + } + h.promptCancelMu.Unlock() +} + +func (h *SessionHost) isPromptActive(promptID uint64) bool { + h.promptCancelMu.Lock() + defer h.promptCancelMu.Unlock() + return h.activePromptID == promptID +} + +func (h *SessionHost) watchPromptTimeout( + promptID uint64, + promptCtx context.Context, + done <-chan struct{}, + viewerID string, + reqID json.RawMessage, + timeout time.Duration, +) { + select { + case <-done: + return + case <-promptCtx.Done(): + if !errors.Is(promptCtx.Err(), context.DeadlineExceeded) { + return + } + msg := fmt.Sprintf("Prompt timed out after %s", timeout) + h.sendJSONRPCErrorToViewer(viewerID, reqID, -32603, msg) + h.triggerPromptForceStopIfStuck(promptID, msg) + } +} + +func (h *SessionHost) triggerPromptForceStopIfStuck(promptID uint64, reason string) { + h.promptCancelMu.Lock() + if h.activePromptID != promptID { + h.promptCancelMu.Unlock() + return + } + h.activePromptID = 0 + h.promptCancel = nil + h.promptCancelMu.Unlock() + + h.promptMu.Lock() + h.promptInFlight = false + h.promptMu.Unlock() + + h.mu.Lock() + agentType := h.agentType + if h.status == HostPrompting { + h.status = HostError + h.statusErr = reason + } + h.stopCurrentAgentLocked() + h.mu.Unlock() + + h.reportLifecycle("error", "ACP prompt force-stopped", map[string]interface{}{ + "reason": reason, + }) + h.broadcastControl(MsgSessionPromptDone, nil) + h.broadcastAgentStatus(StatusError, agentType, reason) +} + func (h *SessionHost) setStatus(status SessionHostStatus, errMsg string) { h.mu.Lock() h.status = status diff --git a/packages/vm-agent/internal/acp/session_host_test.go b/packages/vm-agent/internal/acp/session_host_test.go index 1bd77cf..17c0267 100644 --- a/packages/vm-agent/internal/acp/session_host_test.go +++ b/packages/vm-agent/internal/acp/session_host_test.go @@ -97,7 +97,7 @@ func TestSessionHost_AttachDetachViewer(t *testing.T) { t.Fatalf("viewer count = %d, want 1", host.ViewerCount()) } - // The attach sends session_state + session_replay_complete + // The attach sends initial session_state + session_replay_complete + post-replay session_state // Read session_state _, msg1, err := clientConn.ReadMessage() if err != nil { @@ -127,6 +127,19 @@ func TestSessionHost_AttachDetachViewer(t *testing.T) { t.Fatalf("second message type = %v, want %s", replayDone["type"], MsgSessionReplayDone) } + // Read post-replay authoritative session_state + _, msg3, err := clientConn.ReadMessage() + if err != nil { + t.Fatalf("read post-replay session_state: %v", err) + } + var postStateMsg SessionStateMessage + if err := json.Unmarshal(msg3, &postStateMsg); err != nil { + t.Fatalf("unmarshal post-replay session_state: %v", err) + } + if postStateMsg.Type != MsgSessionState { + t.Fatalf("third message type = %s, want %s", postStateMsg.Type, MsgSessionState) + } + // Detach viewer host.DetachViewer("v1") if host.ViewerCount() != 0 { @@ -166,7 +179,7 @@ func TestSessionHost_BroadcastToMultipleViewers(t *testing.T) { // Drain the initial session_state + replay_complete messages drainAttachMessages := func(client *websocket.Conn) { - for i := 0; i < 2; i++ { + for i := 0; i < 3; i++ { client.SetReadDeadline(time.Now().Add(2 * time.Second)) _, _, err := client.ReadMessage() if err != nil { @@ -256,7 +269,8 @@ func TestSessionHost_LateJoinReplay(t *testing.T) { host.broadcastMessage(msg) } - // Now attach a viewer — it should get session_state, 3 replayed messages, then replay_complete + // Now attach a viewer — it should get session_state, 3 replayed messages, + // replay_complete, then post-replay session_state. serverConn, clientConn := testWSPair(t) host.AttachViewer("late-v1", serverConn) @@ -296,6 +310,12 @@ func TestSessionHost_LateJoinReplay(t *testing.T) { if done["type"] != string(MsgSessionReplayDone) { t.Fatalf("expected session_replay_complete, got type=%v", done["type"]) } + + // 6. post-replay session_state + postState := readAndParse("post-replay session_state") + if postState["type"] != string(MsgSessionState) { + t.Fatalf("expected session_state, got type=%v", postState["type"]) + } } func TestSessionHost_StopDisconnectsViewers(t *testing.T) { @@ -307,7 +327,7 @@ func TestSessionHost_StopDisconnectsViewers(t *testing.T) { host.AttachViewer("v1", serverConn) // Drain attach messages - for i := 0; i < 2; i++ { + for i := 0; i < 3; i++ { clientConn.SetReadDeadline(time.Now().Add(2 * time.Second)) _, _, _ = clientConn.ReadMessage() } @@ -556,7 +576,7 @@ func TestSessionHost_ViewerDisconnectDoesNotStopAgent(t *testing.T) { host.AttachViewer("v1", serverConn) // Drain attach messages - for i := 0; i < 2; i++ { + for i := 0; i < 3; i++ { clientConn.SetReadDeadline(time.Now().Add(2 * time.Second)) _, _, _ = clientConn.ReadMessage() } @@ -671,3 +691,103 @@ func TestSessionHost_CancelPrompt_ClearedAfterPromptDone(t *testing.T) { host.CancelPrompt() // No panic, no side effects — just verifying safety } + +func TestSessionHost_SendToViewerPriority_EvictsQueuedMessage(t *testing.T) { + t.Parallel() + + host := newTestSessionHost(t) + defer host.Stop() + + viewer := &Viewer{ + ID: "v1", + sendCh: make(chan []byte, 1), + done: make(chan struct{}), + } + + viewer.sendCh <- []byte(`{"old":true}`) + host.sendToViewerPriority(viewer, []byte(`{"priority":true}`)) + + select { + case msg := <-viewer.sendCh: + if string(msg) != `{"priority":true}` { + t.Fatalf("priority message not delivered, got %s", string(msg)) + } + default: + t.Fatal("expected a priority message in viewer channel") + } +} + +func TestSessionHost_CancelPrompt_ForceStopsAfterGracePeriod(t *testing.T) { + t.Parallel() + + host := NewSessionHost(SessionHostConfig{ + GatewayConfig: GatewayConfig{ + SessionID: "test-session", + WorkspaceID: "test-workspace", + PromptCancelGracePeriod: 10 * time.Millisecond, + }, + MessageBufferSize: 100, + ViewerSendBuffer: 32, + }) + defer host.Stop() + + host.mu.Lock() + host.status = HostPrompting + host.agentType = "claude-code" + host.mu.Unlock() + + host.promptMu.Lock() + host.promptInFlight = true + host.promptMu.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + host.promptCancelMu.Lock() + host.promptCancel = cancel + host.activePromptID = 42 + host.promptCancelMu.Unlock() + + host.CancelPrompt() + + select { + case <-ctx.Done(): + case <-time.After(500 * time.Millisecond): + t.Fatal("expected prompt context to be cancelled") + } + + deadline := time.Now().Add(1 * time.Second) + for host.Status() != HostError { + if time.Now().After(deadline) { + t.Fatal("expected host to transition to error after cancel grace elapsed") + } + time.Sleep(10 * time.Millisecond) + } + + host.mu.RLock() + statusErr := host.statusErr + host.mu.RUnlock() + if !strings.Contains(statusErr, "Prompt cancel grace elapsed") { + t.Fatalf("statusErr = %q, expected cancel grace reason", statusErr) + } + + host.promptCancelMu.Lock() + if host.activePromptID != 0 { + t.Fatalf("activePromptID = %d, want 0", host.activePromptID) + } + if host.promptCancel != nil { + t.Fatal("promptCancel should be cleared after force-stop") + } + host.promptCancelMu.Unlock() + + host.promptMu.Lock() + if host.promptInFlight { + t.Fatal("promptInFlight should be false after force-stop") + } + host.promptMu.Unlock() + + host.bufMu.RLock() + buffered := len(host.messageBuf) + host.bufMu.RUnlock() + if buffered < 2 { + t.Fatalf("expected prompt_done + error status messages, buffered=%d", buffered) + } +} diff --git a/packages/vm-agent/internal/config/config.go b/packages/vm-agent/internal/config/config.go index c80d0ea..79c9e50 100644 --- a/packages/vm-agent/internal/config/config.go +++ b/packages/vm-agent/internal/config/config.go @@ -96,6 +96,8 @@ type Config struct { ACPViewerSendBuffer int // Per-viewer send channel buffer size ACPPingInterval time.Duration // WebSocket ping interval (default: 30s) ACPPongTimeout time.Duration // WebSocket pong deadline after ping (default: 10s) + ACPPromptTimeout time.Duration // Max prompt runtime before force-stop fallback (default: 10m) + ACPPromptCancelGrace time.Duration // Wait after cancel before force-stop fallback (default: 5s) // Event log settings - configurable per constitution principle XI MaxNodeEvents int // Max node-level events retained in memory (default: 500) @@ -233,6 +235,8 @@ func Load() (*Config, error) { ACPViewerSendBuffer: getEnvInt("ACP_VIEWER_SEND_BUFFER", 256), ACPPingInterval: getEnvDuration("ACP_PING_INTERVAL", 30*time.Second), ACPPongTimeout: getEnvDuration("ACP_PONG_TIMEOUT", 10*time.Second), + ACPPromptTimeout: getEnvDuration("ACP_PROMPT_TIMEOUT", 10*time.Minute), + ACPPromptCancelGrace: getEnvDuration("ACP_PROMPT_CANCEL_GRACE_PERIOD", 5*time.Second), // Event log settings MaxNodeEvents: getEnvInt("MAX_NODE_EVENTS", 500), diff --git a/packages/vm-agent/internal/server/server.go b/packages/vm-agent/internal/server/server.go index be5cf88..1d2519d 100644 --- a/packages/vm-agent/internal/server/server.go +++ b/packages/vm-agent/internal/server/server.go @@ -160,20 +160,22 @@ func New(cfg *config.Config) (*Server, error) { // Build ACP gateway configuration acpGatewayConfig := acp.GatewayConfig{ - InitTimeoutMs: cfg.ACPInitTimeoutMs, - MaxRestartAttempts: cfg.ACPMaxRestartAttempts, - ControlPlaneURL: cfg.ControlPlaneURL, - WorkspaceID: defaultWorkspaceScope(cfg.WorkspaceID, cfg.NodeID), - CallbackToken: cfg.CallbackToken, - ContainerResolver: containerResolver, - ContainerUser: containerUser, - ContainerWorkDir: containerWorkDir, - OnActivity: idleDetector.RecordActivity, - FileExecTimeout: cfg.GitExecTimeout, - FileMaxSize: cfg.GitFileMaxSize, - ErrorReporter: errorReporter, - PingInterval: cfg.ACPPingInterval, - PongTimeout: cfg.ACPPongTimeout, + InitTimeoutMs: cfg.ACPInitTimeoutMs, + MaxRestartAttempts: cfg.ACPMaxRestartAttempts, + ControlPlaneURL: cfg.ControlPlaneURL, + WorkspaceID: defaultWorkspaceScope(cfg.WorkspaceID, cfg.NodeID), + CallbackToken: cfg.CallbackToken, + ContainerResolver: containerResolver, + ContainerUser: containerUser, + ContainerWorkDir: containerWorkDir, + OnActivity: idleDetector.RecordActivity, + FileExecTimeout: cfg.GitExecTimeout, + FileMaxSize: cfg.GitFileMaxSize, + ErrorReporter: errorReporter, + PingInterval: cfg.ACPPingInterval, + PongTimeout: cfg.ACPPongTimeout, + PromptTimeout: cfg.ACPPromptTimeout, + PromptCancelGracePeriod: cfg.ACPPromptCancelGrace, } // Open persistence store for cross-device session state. diff --git a/tasks/backlog/2026-02-17-acp-session-stuck-after-reconnect.md b/tasks/backlog/2026-02-17-acp-session-stuck-after-reconnect.md index ecc09e1..ddd2c78 100644 --- a/tasks/backlog/2026-02-17-acp-session-stuck-after-reconnect.md +++ b/tasks/backlog/2026-02-17-acp-session-stuck-after-reconnect.md @@ -78,28 +78,27 @@ If `session_prompt_done` is among the dropped messages, the browser stays in `pr ### Fix R1: Add prompt timeout + force-kill escape hatch -- [ ] Add configurable `ACP_PROMPT_TIMEOUT` env var (e.g., default 10 minutes) -- [ ] Use `context.WithTimeout` instead of `context.WithCancel` in `HandlePrompt` -- [ ] If `CancelPrompt` doesn't unblock within a grace period, force-kill the agent subprocess -- [ ] Broadcast error to viewers on timeout so browser transitions to `error` state +- [x] Add configurable `ACP_PROMPT_TIMEOUT` env var (default 10 minutes) +- [x] Use `context.WithTimeout` instead of `context.WithCancel` in `HandlePrompt` +- [x] If `CancelPrompt` doesn't unblock within a grace period, force-kill the agent subprocess +- [x] Broadcast error/status to viewers on timeout/force-stop so browser exits `prompting` - [ ] Consider a `session/stop` control message to let users force-stop stuck sessions from UI ### Fix R2: Use `useTokenRefresh` in ChatSession -- [ ] Refactor `ChatSession` to use the existing `useTokenRefresh` hook (same as terminal) -- [ ] On reconnection (visibility change, reconnect button), fetch a fresh token before `connect()` -- [ ] Pass the refreshed token URL into `useAcpSession` so reconnect attempts use valid tokens +- [x] On reconnection (visibility change, reconnect button), fetch/build a fresh tokenized URL before `connect()` +- [x] Pass a resolver into `useAcpSession` so reconnect attempts use valid tokens +- [x] Apply the same fresh-token reconnect behavior to terminal surfaces (single + multi-terminal) ### Fix R3: Use server-authoritative post-replay status -- [ ] After replay completes, have the server send a **fresh** `session_state` that reflects the current status at that exact moment (not the one captured before replay) -- [ ] Or: on the browser side, track `session_prompt_done` control messages seen during replay and use them to override `serverStatusRef.current` before `handleSessionReplayComplete` runs +- [x] After replay completes, have the server send a **fresh** `session_state` that reflects the current status at that exact moment (not the one captured before replay) +- [x] On the browser side, track `session_prompt_done` control messages seen during replay and use them during `handleSessionReplayComplete` ### Fix R4: Prioritize control messages in send buffer -- [ ] Use a separate high-priority channel for control messages (`session_prompting`, `session_prompt_done`, `session_state`) -- [ ] Or: increase viewer send buffer size for replay (temporarily or permanently) -- [ ] Or: filter control messages out of the replay buffer and send them as a separate post-replay summary +- [x] Ensure high-priority control/status messages are delivered preferentially under buffer pressure +- [x] Keep authoritative post-replay `session_state` as deterministic recovery even if intermediate control traffic is dropped ## Affected Files @@ -119,3 +118,135 @@ If `session_prompt_done` is among the dropped messages, the browser stays in `pr - Unit test: token refresh on reconnection - Integration test: replay with `session_prompt_done` in buffer produces correct post-replay state - Manual test: leave workspace idle for >1 hour, return, verify reconnection works + +--- + +## Detailed Research Addendum (2026-02-18) + +### Preflight Classification + +- `business-logic-change` +- `cross-component-change` +- `security-sensitive-change` +- `public-surface-change` (new env var(s) and reconnect behavior) +- `docs-sync-change` + +### Sources Reviewed (Code + Docs) + +- Constitution and preflight: + - `.specify/memory/constitution.md` + - `docs/guides/agent-preflight-behavior.md` +- Architecture/security references: + - `docs/architecture/credential-security.md` + - `docs/architecture/secrets-taxonomy.md` + - `docs/adr/002-stateless-architecture.md` (historical/superseded) +- Business/spec references: + - `specs/003-browser-terminal-saas/spec.md` + - `specs/003-browser-terminal-saas/data-model.md` + - `specs/004-mvp-hardening/spec.md` + - `specs/004-mvp-hardening/data-model.md` +- Primary implementation files: + - `packages/vm-agent/internal/acp/session_host.go` + - `packages/vm-agent/internal/acp/gateway.go` + - `packages/vm-agent/internal/server/agent_ws.go` + - `packages/vm-agent/internal/config/config.go` + - `apps/web/src/components/ChatSession.tsx` + - `packages/acp-client/src/hooks/useAcpSession.ts` + - `apps/web/src/hooks/useTokenRefresh.ts` + - `apps/api/src/services/jwt.ts` + - `apps/api/src/routes/terminal.ts` + +### Verified Findings (Line-Level) + +#### R1 verified: Prompt can hang indefinitely and keep session in `prompting` + +- `HandlePrompt` takes `promptMu` and calls blocking `acpConn.Prompt(...)` with `context.WithCancel(ctx)` (no timeout) at `packages/vm-agent/internal/acp/session_host.go:396-421`. +- Status is moved to prompting before the call and only reset after return (`session_host.go:406-432`). +- `CancelPrompt` only calls the stored cancel function (`session_host.go:467-480`); there is no deadline or forced subprocess termination path if `Prompt()` does not return. +- Gateway invokes prompt handling via `go h.host.HandlePrompt(ctx, ...)` (`packages/vm-agent/internal/acp/gateway.go:251-253`), and current caller uses `gateway.Run(context.Background())` (`packages/vm-agent/internal/server/agent_ws.go:155`). This means no natural request-scoped deadline exists. + +Conclusion: If SDK/pipe path ignores cancellation, `promptMu` can remain effectively wedged and UI remains stuck. + +#### R2 verified: Chat reconnect uses stale tokenized URL after token expiry + +- `ChatSession` fetches token once in an effect and bakes a single URL into `resolvedWsUrl` (`apps/web/src/components/ChatSession.tsx:109-157`). +- `useAcpSession` reconnect logic keeps reusing the same URL value supplied to `connect(url)`: + - close/backoff path: `attemptReconnect(url)` -> `connect(url)` (`packages/acp-client/src/hooks/useAcpSession.ts:324-393`) + - visibility reconnect: `connect(wsUrl)` (`useAcpSession.ts:427-454`) + - manual reconnect button: `connect(wsUrl)` (`useAcpSession.ts:462-490`) +- Terminal page already has proactive token refresh (`apps/web/src/pages/Workspace.tsx:192-207`) via `useTokenRefresh`, but chat path does not. +- Token expiry default is 1 hour in API (`apps/api/src/services/jwt.ts:24-27`), and token issuance still enforces workspace ownership (`apps/api/src/routes/terminal.ts:30-50`). + +Conclusion: Long-idle chat tabs can repeatedly reconnect with an expired token until timeout/error. + +#### R3 verified: Replay completion can restore stale `prompting` status + +- On attach, server snapshots state once, sends `session_state`, replays buffer, then sends replay complete (`packages/vm-agent/internal/acp/session_host.go:169-205`). +- Client stores that snapshot as authoritative `serverStatusRef` (`packages/acp-client/src/hooks/useAcpSession.ts:208-213`). +- Replay completion restores from `serverStatusRef` (`useAcpSession.ts:255-270`). +- If `session_prompt_done` is replayed during the window between state snapshot and replay completion, client may set ready first, then replay-complete path can set it back to stale prompting. + +Conclusion: There is a real ordering/race risk without a post-replay authoritative state. + +#### R4 verified: Viewer channel overflow drops control messages silently + +- `sendToViewer` uses non-blocking send and drops on full buffer (`packages/vm-agent/internal/acp/session_host.go:981-988`). +- Replay can enqueue many messages (`replayToViewer`, `session_host.go:967-977`) while per-viewer buffer defaults to 256 (`config.go:233`). +- Control and data messages share same queue, so prompt-state control messages are not prioritized. + +Conclusion: Under heavy replay, `session_prompt_done` / `session_state` can be dropped for a viewer, causing persistent bad UI state. + +### Additional Technical Observations + +1. Existing tests already cover some reconnect and replay state behavior in `packages/acp-client/src/hooks/useAcpSession.test.ts`, but do not cover the specific stale-status overwrite case where `session_prompt_done` is replayed before `session_replay_complete`. +2. `packages/vm-agent/internal/acp/session_host_test.go` has no test for control-message drop behavior under full `ViewerSendBuffer`. +3. `apps/web` has no direct unit test for `ChatSession` token-refresh behavior; current tests focus on workspace/terminal token flows. + +### Terminal Surface Impact (Not Chat-Only) + +The stuck-after-reconnect family of issues also has a terminal-side variant: + +- Workspace derives `wsUrl` once and intentionally avoids updating it on proactive token refresh (`apps/web/src/pages/Workspace.tsx:257-286`). +- Terminal reconnect logic in shared terminal components reuses the same `wsUrl`: + - multi-terminal reconnect loop: `new WebSocket(wsUrl)` with fixed-effect dependency (`packages/terminal/src/MultiTerminal.tsx:241-487`) + - single-terminal hook reconnect loop: reconnect uses fixed `url` option (`packages/terminal/src/useWebSocket.ts:45-103`) +- Retry UI calls `refreshTerminalToken()` but does not currently force `wsUrl` regeneration (`apps/web/src/pages/Workspace.tsx:1209-1217`). + +Implication: +- If the VM-agent cookie path is unavailable/expired, reconnect can continue attempting a stale tokenized URL on terminal surfaces too (not only ACP chat). +- Scope for implementation should include terminal reconnect/token freshness path, not just chat session logic. + +### Constitution + Security Alignment (Pre-Implementation) + +- Principle XI compliance requirements for this task: + - Any new timeout/limits MUST be env-configurable in `packages/vm-agent/internal/config/config.go`. + - If adding prompt timeout behavior, defaults are acceptable only with env override (e.g., `ACP_PROMPT_TIMEOUT`). + - If adding cancel grace or control-send timeout, those should also be configurable. +- Security posture: + - Token refresh path remains safe if it continues using `POST /api/terminal/token`, which already checks ownership before minting JWT. + - No token should be persisted to logs or emitted in lifecycle events. + +### Cross-Component Impact Map + +- VM agent runtime behavior: `packages/vm-agent/internal/acp/session_host.go` +- VM agent config/env surface: `packages/vm-agent/internal/config/config.go` +- VM agent config wiring: `packages/vm-agent/internal/server/server.go`, `packages/vm-agent/internal/server/agent_ws.go` +- Web chat token lifecycle: `apps/web/src/components/ChatSession.tsx` +- Shared ACP reconnect semantics: `packages/acp-client/src/hooks/useAcpSession.ts` +- Docs/env references (if env surface changes): `AGENTS.md` and `CLAUDE.md` must be updated together + +### Test Gap Checklist (To Close During Implementation) + +- [ ] `useAcpSession` unit test: replay contains `session_prompt_done` and final state resolves to `ready` (not stale prompting) +- [ ] `useAcpSession` unit test: reconnect path can acquire and use refreshed WS URL/token +- [ ] VM agent unit/integration test: prompt timeout transitions out of prompting and clears cancel state +- [ ] VM agent unit test: control messages are not dropped under replay backpressure (or post-replay state self-heals deterministically) +- [ ] Web unit test for `ChatSession` token refresh + reconnect behavior +- [ ] Web/terminal unit test: terminal retry/reconnect path can recover using refreshed token/URL when previous token is stale + +### Recommended Implementation Order (Based on Risk/Impact) + +1. Token freshness path for chat reconnect (R2) — highest UX impact, lowest risk. +2. Replay authoritative state correction (R3) — deterministic state convergence after replay. +3. Prompt timeout/escape hatch (R1) — requires careful process lifecycle handling. +4. Control-message backpressure handling (R4) — reliability hardening under high replay volume.