Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,8 @@ See `apps/api/.env.example`:
- `ACP_VIEWER_SEND_BUFFER` - VM Agent: per-viewer send channel buffer size (default: 256)
- `ACP_PING_INTERVAL` - VM Agent: WebSocket ping interval for stale connection detection (default: 30s)
- `ACP_PONG_TIMEOUT` - VM Agent: WebSocket pong deadline after ping (default: 10s)
- `ACP_PROMPT_TIMEOUT` - VM Agent: max ACP prompt runtime before timeout/force-stop fallback (default: 10m)
- `ACP_PROMPT_CANCEL_GRACE_PERIOD` - VM Agent: grace wait after cancel before force-stop fallback (default: 5s)
- `MAX_NODE_EVENTS` - VM Agent: max node-level events retained in memory (default: 500)
- `MAX_WORKSPACE_EVENTS` - VM Agent: max workspace-level events retained in memory (default: 500)
- `MAX_VM_AGENT_ERROR_BODY_BYTES` - API: max VM agent error request body (default: 32768)
Expand Down
2 changes: 2 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,8 @@ See `apps/api/.env.example`:
- `ACP_VIEWER_SEND_BUFFER` - VM Agent: per-viewer send channel buffer size (default: 256)
- `ACP_PING_INTERVAL` - VM Agent: WebSocket ping interval for stale connection detection (default: 30s)
- `ACP_PONG_TIMEOUT` - VM Agent: WebSocket pong deadline after ping (default: 10s)
- `ACP_PROMPT_TIMEOUT` - VM Agent: max ACP prompt runtime before timeout/force-stop fallback (default: 10m)
- `ACP_PROMPT_CANCEL_GRACE_PERIOD` - VM Agent: grace wait after cancel before force-stop fallback (default: 5s)
- `MAX_NODE_EVENTS` - VM Agent: max node-level events retained in memory (default: 500)
- `MAX_WORKSPACE_EVENTS` - VM Agent: max workspace-level events retained in memory (default: 500)
- `MAX_VM_AGENT_ERROR_BODY_BYTES` - API: max VM agent error request body (default: 32768)
Expand Down
72 changes: 30 additions & 42 deletions apps/web/src/components/ChatSession.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export const ChatSession = React.forwardRef<ChatSessionHandle, ChatSessionProps>
useImperativeHandle(ref, () => ({
focusInput: () => agentPanelRef.current?.focusInput(),
}));
const [resolvedWsUrl, setResolvedWsUrl] = useState<string | null>(null);
const wsUrlCacheRef = useRef<{ url: string; resolvedAt: number } | null>(null);

// Resolve transcription API URL once (stable across renders)
const transcribeApiUrl = useMemo(() => getTranscribeApiUrl(), []);
Expand Down Expand Up @@ -106,54 +106,41 @@ export const ChatSession = React.forwardRef<ChatSessionHandle, ChatSessionProps>
[workspaceId, sessionId]
);

// Fetch token and build full WS URL
useEffect(() => {
if (!wsHostInfo) {
setResolvedWsUrl(null);
return;
wsUrlCacheRef.current = null;
}, [wsHostInfo, workspaceId, sessionId, worktreePath]);

const resolveWsUrl = useCallback(async (): Promise<string | null> => {
if (!wsHostInfo) return null;

const cached = wsUrlCacheRef.current;
if (cached && Date.now() - cached.resolvedAt < 15_000) {
return cached.url;
}

let cancelled = false;
reportError({
level: 'info',
message: 'Resolving ACP WebSocket URL with fresh terminal token',
source: 'acp-chat',
context: { workspaceId, sessionId },
});

const fetchToken = async () => {
try {
const { token } = await getTerminalToken(workspaceId);
const sessionQuery = `&sessionId=${encodeURIComponent(sessionId)}`;
const worktreeQuery = worktreePath ? `&worktree=${encodeURIComponent(worktreePath)}` : '';
const url = `${wsHostInfo}/agent/ws?token=${encodeURIComponent(token)}${sessionQuery}${worktreeQuery}`;
wsUrlCacheRef.current = { url, resolvedAt: Date.now() };
return url;
} catch (err) {
reportError({
level: 'info',
message: 'Fetching terminal token',
level: 'error',
message: `Terminal token fetch failed: ${err instanceof Error ? err.message : String(err)}`,
source: 'acp-chat',
context: { workspaceId, sessionId },
});

try {
const { token } = await getTerminalToken(workspaceId);
if (cancelled) return;

reportError({
level: 'info',
message: 'Terminal token fetched',
source: 'acp-chat',
context: { workspaceId, sessionId, tokenLength: token.length },
});

const sessionQuery = `&sessionId=${encodeURIComponent(sessionId)}`;
const worktreeQuery = worktreePath ? `&worktree=${encodeURIComponent(worktreePath)}` : '';
setResolvedWsUrl(
`${wsHostInfo}/agent/ws?token=${encodeURIComponent(token)}${sessionQuery}${worktreeQuery}`
);
} catch (err) {
if (cancelled) return;
reportError({
level: 'error',
message: `Terminal token fetch failed: ${err instanceof Error ? err.message : String(err)}`,
source: 'acp-chat',
context: { workspaceId, sessionId },
});
}
};

void fetchToken();
return () => {
cancelled = true;
};
throw err;
}
}, [wsHostInfo, workspaceId, sessionId, worktreePath]);

// Each chat session gets its own message store.
Expand All @@ -163,7 +150,8 @@ export const ChatSession = React.forwardRef<ChatSessionHandle, ChatSessionProps>

// Own ACP session hook — separate WebSocket per chat tab
const acpSession = useAcpSession({
wsUrl: resolvedWsUrl,
wsUrl: null,
resolveWsUrl,
onAcpMessage: acpMessages.processMessage,
onLifecycleEvent: handleLifecycleEvent,
onPrepareForReplay: acpMessages.prepareForReplay,
Expand Down
62 changes: 51 additions & 11 deletions apps/web/src/pages/Workspace.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ export function Workspace() {
const [paletteFileIndex, setPaletteFileIndex] = useState<string[]>([]);
const [paletteFileIndexLoading, setPaletteFileIndexLoading] = useState(false);
const paletteFileIndexLoaded = useRef(false);
const terminalWsUrlCacheRef = useRef<{ url: string; resolvedAt: number } | null>(null);

const tabOrder = useTabOrder<WorkspaceTab>(id);

Expand Down Expand Up @@ -254,12 +255,48 @@ export function Workspace() {
return () => clearInterval(interval);
}, [id, workspace?.status, loadWorkspaceState]);

const buildTerminalWsUrl = useCallback(
(token: string): string | null => {
if (!workspace?.url) return null;
try {
const url = new URL(workspace.url);
const wsProtocol = url.protocol === 'https:' ? 'wss:' : 'ws:';
const wsPath = featureFlags.multiTerminal ? '/terminal/ws/multi' : '/terminal/ws';
return `${wsProtocol}//${url.host}${wsPath}?token=${encodeURIComponent(token)}`;
} catch {
return null;
}
},
[workspace?.url, featureFlags.multiTerminal]
);

useEffect(() => {
terminalWsUrlCacheRef.current = null;
}, [workspace?.url, id, featureFlags.multiTerminal]);

const resolveTerminalWsUrl = useCallback(async (): Promise<string | null> => {
if (!id) return null;

const cached = terminalWsUrlCacheRef.current;
if (cached && Date.now() - cached.resolvedAt < 15_000) {
return cached.url;
}

const { token } = await getTerminalToken(id);
const resolvedUrl = buildTerminalWsUrl(token);
if (!resolvedUrl) {
throw new Error('Invalid workspace URL');
}
terminalWsUrlCacheRef.current = { url: resolvedUrl, resolvedAt: Date.now() };
return resolvedUrl;
}, [id, buildTerminalWsUrl]);

// Derive WebSocket URL from the terminal token.
// Only update wsUrl on the INITIAL token fetch or when the workspace URL
// changes — NOT on proactive token refreshes. Changing wsUrl tears down
// the WebSocket and triggers a full reconnect, which re-triggers replay
// and can cause jumbled messages (Bug 5). The refreshed token is still
// available via `terminalToken` for the next reconnection attempt.
// and can cause jumbled messages (Bug 5). Reconnect paths resolve a fresh
// URL/token via resolveTerminalWsUrl without resetting the live socket.
const wsUrlSetRef = useRef(false);
useEffect(() => {
if (!workspace?.url || !terminalToken || !isRunning) {
Expand All @@ -272,18 +309,18 @@ export function Workspace() {
// cause a reconnect while the connection is still alive.
if (wsUrlSetRef.current) return;

try {
const url = new URL(workspace.url);
const wsProtocol = url.protocol === 'https:' ? 'wss:' : 'ws:';
const wsPath = featureFlags.multiTerminal ? '/terminal/ws/multi' : '/terminal/ws';
setWsUrl(`${wsProtocol}//${url.host}${wsPath}?token=${encodeURIComponent(terminalToken)}`);
setTerminalError(null);
wsUrlSetRef.current = true;
} catch {
const nextUrl = buildTerminalWsUrl(terminalToken);
if (!nextUrl) {
setWsUrl(null);
setTerminalError('Invalid workspace URL');
return;
}
}, [workspace?.url, terminalToken, isRunning, featureFlags.multiTerminal]);

setWsUrl(nextUrl);
terminalWsUrlCacheRef.current = { url: nextUrl, resolvedAt: Date.now() };
setTerminalError(null);
wsUrlSetRef.current = true;
}, [workspace?.url, terminalToken, isRunning, buildTerminalWsUrl]);

// Fetch workspace events directly from the VM Agent (not proxied through control plane)
useEffect(() => {
Expand Down Expand Up @@ -1174,6 +1211,7 @@ export function Workspace() {
<MultiTerminal
ref={multiTerminalRef}
wsUrl={wsUrl}
resolveWsUrl={resolveTerminalWsUrl}
defaultWorkDir={activeWorktree ?? undefined}
shutdownDeadline={workspace?.shutdownDeadline}
onActivity={handleTerminalActivity}
Expand All @@ -1188,6 +1226,7 @@ export function Workspace() {
) : (
<Terminal
wsUrl={wsUrl}
resolveWsUrl={resolveTerminalWsUrl}
shutdownDeadline={workspace?.shutdownDeadline}
onActivity={handleTerminalActivity}
className="h-full"
Expand All @@ -1210,6 +1249,7 @@ export function Workspace() {
variant="secondary"
size="sm"
onClick={() => {
terminalWsUrlCacheRef.current = null;
void refreshTerminalToken();
}}
disabled={terminalLoading}
Expand Down
28 changes: 21 additions & 7 deletions apps/web/tests/unit/pages/workspace.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ describe('Workspace page', () => {
});
});

it('includes sessionId in ACP websocket URL when a sessionId is selected', async () => {
it('provides session-aware ACP URL resolver when a sessionId is selected', async () => {
mocks.listAgentSessions.mockResolvedValue([
{
id: 'sess-1',
Expand All @@ -503,12 +503,26 @@ describe('Workspace page', () => {
});

await waitFor(() => {
const wsUrls = mocks.useAcpSession.mock.calls
.map(([options]) => options?.wsUrl)
.filter((value): value is string => typeof value === 'string');
expect(
wsUrls.some((url) => url.includes('sessionId=sess-1') && !url.includes('takeover='))
).toBe(true);
const resolvers = mocks.useAcpSession.mock.calls
.map(([options]) => options?.resolveWsUrl)
.filter((value): value is (() => Promise<string | null>) => typeof value === 'function');
expect(resolvers.length).toBeGreaterThan(0);
});

const resolver = mocks.useAcpSession.mock.calls
.map(([options]) => options?.resolveWsUrl)
.find((value): value is () => Promise<string | null> => typeof value === 'function');

expect(resolver).toBeDefined();
const resolvedUrl = await resolver!();
expect(resolvedUrl).toBeTruthy();
expect(resolvedUrl!).toContain('sessionId=sess-1');
expect(resolvedUrl!).not.toContain('takeover=');
expect(mocks.getTerminalToken).toHaveBeenCalledWith('ws-123');
await waitFor(() => {
// ChatSession uses resolver mode, so wsUrl is intentionally null.
const wsUrlValues = mocks.useAcpSession.mock.calls.map(([options]) => options?.wsUrl);
expect(wsUrlValues.some((value) => value === null)).toBe(true);
});
});

Expand Down
75 changes: 75 additions & 0 deletions packages/acp-client/src/hooks/useAcpSession.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,41 @@ describe('useAcpSession prompt state restoration after replay', () => {
});
});

describe('useAcpSession replay prompt-done race handling', () => {
it('ends replay in ready when session_prompt_done arrives during replay', async () => {
const { result } = renderHook(() =>
useAcpSession({
wsUrl: 'ws://localhost/agent/ws',
})
);

const ws = MockWebSocket.instances[0]!;
act(() => {
ws.emitOpen();
ws.emitMessage({
type: 'session_state',
status: 'prompting',
agentType: 'claude-code',
replayCount: 2,
});
});

await waitFor(() => {
expect(result.current.state).toBe('replaying');
});

act(() => {
ws.emitMessage({ type: 'session_prompt_done' });
ws.emitMessage({ type: 'session_replay_complete' });
});

await waitFor(() => {
expect(result.current.state).toBe('ready');
expect(result.current.replaying).toBe(false);
});
});
});

describe('useAcpSession agentType reset on reconnect', () => {
it('clears agentType to null when WebSocket opens (Phase 1 hang fix)', async () => {
const { result } = renderHook(() => useAcpSession({
Expand Down Expand Up @@ -506,3 +541,43 @@ describe('useAcpSession manual reconnect', () => {
expect(MockWebSocket.instances.length).toBe(instanceCountBefore);
});
});

describe('useAcpSession resolver-driven URL refresh', () => {
it('uses resolveWsUrl on initial connect and manual reconnect', async () => {
const resolveWsUrl = vi
.fn()
.mockResolvedValueOnce('ws://localhost/agent/ws?token=first')
.mockResolvedValueOnce('ws://localhost/agent/ws?token=second');

const { result } = renderHook(() =>
useAcpSession({
wsUrl: null,
resolveWsUrl,
})
);

await waitFor(() => {
expect(MockWebSocket.instances.length).toBe(1);
});
expect(MockWebSocket.instances[0]?.url).toContain('token=first');

act(() => {
MockWebSocket.instances[0]?.emitOpenAndIdle();
});

await waitFor(() => {
expect(result.current.state).toBe('no_session');
});

act(() => {
MockWebSocket.instances[0]?.close();
result.current.reconnect();
});

await waitFor(() => {
expect(MockWebSocket.instances.length).toBe(2);
});
expect(MockWebSocket.instances[1]?.url).toContain('token=second');
expect(resolveWsUrl).toHaveBeenCalledTimes(2);
});
});
Loading
Loading