From 2385e99c16472f04b9055a11693c0f1d59c01656 Mon Sep 17 00:00:00 2001 From: Onur <2453968+osolmaz@users.noreply.github.com> Date: Sun, 1 Mar 2026 12:13:45 +0100 Subject: [PATCH 1/3] fix: suppress load replay session updates from output --- src/client.ts | 23 ++++++- test/integration.test.ts | 143 +++++++++++++++++++++++++++++++++++++++ test/mock-agent.ts | 24 +++++++ 3 files changed, 189 insertions(+), 1 deletion(-) diff --git a/src/client.ts b/src/client.ts index 2c20042..37e484a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -283,6 +283,7 @@ export class AcpClient { private observedSessionUpdates = 0; private processedSessionUpdates = 0; private suppressSessionUpdates = false; + private suppressReplaySessionUpdateMessages = false; private activePrompt?: { sessionId: string; promise: Promise; @@ -497,6 +498,19 @@ export class AcpClient { return base; } + const shouldSuppressInboundReplaySessionUpdate = (message: AnyMessage): boolean => { + if (!this.suppressReplaySessionUpdateMessages) { + return false; + } + if (Object.hasOwn(message, "id")) { + return false; + } + if (!Object.hasOwn(message, "method")) { + return false; + } + return (message as { method?: unknown }).method === "session/update"; + }; + const readable = new ReadableStream({ async start(controller) { const reader = base.readable.getReader(); @@ -509,7 +523,9 @@ export class AcpClient { if (!value) { continue; } - onAcpMessage("inbound", value); + if (!shouldSuppressInboundReplaySessionUpdate(value)) { + onAcpMessage("inbound", value); + } controller.enqueue(value); } } finally { @@ -561,8 +577,11 @@ export class AcpClient { ): Promise { const connection = this.getConnection(); const previousSuppression = this.suppressSessionUpdates; + const previousReplaySuppression = this.suppressReplaySessionUpdateMessages; this.suppressSessionUpdates = previousSuppression || Boolean(options.suppressReplayUpdates); + this.suppressReplaySessionUpdateMessages = + previousReplaySuppression || Boolean(options.suppressReplayUpdates); let response: LoadSessionResponse | undefined; @@ -579,6 +598,7 @@ export class AcpClient { ); } finally { this.suppressSessionUpdates = previousSuppression; + this.suppressReplaySessionUpdateMessages = previousReplaySuppression; } return { @@ -730,6 +750,7 @@ export class AcpClient { this.observedSessionUpdates = 0; this.processedSessionUpdates = 0; this.suppressSessionUpdates = false; + this.suppressReplaySessionUpdateMessages = false; this.activePrompt = undefined; this.cancellingSessionIds.clear(); this.promptPermissionFailures.clear(); diff --git a/test/integration.test.ts b/test/integration.test.ts index 1dc0234..ab8c9e4 100644 --- a/test/integration.test.ts +++ b/test/integration.test.ts @@ -19,6 +19,43 @@ type CliRunResult = { stderr: string; }; +function asRecord(value: unknown): Record | undefined { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return undefined; + } + return value as Record; +} + +function extractAgentMessageChunkText( + message: Record, +): string | undefined { + if (message.method !== "session/update") { + return undefined; + } + + const params = asRecord(message.params); + const update = asRecord(params?.update); + const content = asRecord(update?.content); + if ( + update?.sessionUpdate !== "agent_message_chunk" || + content?.type !== "text" || + typeof content.text !== "string" + ) { + return undefined; + } + + return content.text; +} + +function extractJsonRpcId( + message: Record, +): string | number | undefined { + if (typeof message.id === "string" || typeof message.id === "number") { + return message.id; + } + return undefined; +} + function parseJsonRpcOutputLines(stdout: string): Array> { const lines = stdout .split("\n") @@ -470,6 +507,112 @@ test("integration: prompt recovers when loadSession fails on empty session", asy }); }); +test("integration: load replay session/update notifications are suppressed from output and event log", async () => { + await withTempHome(async (homeDir) => { + const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-")); + const replayText = "replay-load-chunk"; + const freshText = "fresh-after-load"; + const replayLoadAgentCommand = + `${MOCK_AGENT_COMMAND} --supports-load-session ` + + `--replay-load-session-updates --load-replay-text ${replayText}`; + const replayAgentArgs = [ + "--agent", + replayLoadAgentCommand, + "--approve-all", + "--cwd", + cwd, + ]; + let sessionId: string | undefined; + + try { + const created = await runCli( + [...replayAgentArgs, "--format", "json", "sessions", "new"], + homeDir, + ); + assert.equal(created.code, 0, created.stderr); + const createdPayload = JSON.parse(created.stdout.trim()) as { + acpxRecordId?: string; + }; + sessionId = createdPayload.acpxRecordId; + assert.equal(typeof sessionId, "string"); + + const prompt = await runCli( + [...replayAgentArgs, "--format", "json", "prompt", `echo ${freshText}`], + homeDir, + ); + assert.equal(prompt.code, 0, prompt.stderr); + + const outputMessages = parseJsonRpcOutputLines(prompt.stdout); + const outputChunkTexts = outputMessages + .map((message) => extractAgentMessageChunkText(message)) + .filter((text): text is string => typeof text === "string"); + + assert.equal(outputChunkTexts.includes(replayText), false, prompt.stdout); + assert.equal(outputChunkTexts.includes(freshText), true, prompt.stdout); + + const loadRequest = outputMessages.find((message) => { + return ( + message.method === "session/load" && extractJsonRpcId(message) !== undefined + ); + }); + assert(loadRequest, `expected session/load request in output:\n${prompt.stdout}`); + + const loadRequestId = extractJsonRpcId(loadRequest); + assert.notEqual(loadRequestId, undefined); + assert.equal( + outputMessages.some( + (message) => + extractJsonRpcId(message) === loadRequestId && + Object.hasOwn(message, "result"), + ), + true, + prompt.stdout, + ); + + const recordPath = path.join( + homeDir, + ".acpx", + "sessions", + `${encodeURIComponent(sessionId as string)}.json`, + ); + const storedRecord = JSON.parse(await fs.readFile(recordPath, "utf8")) as { + event_log?: { + active_path?: string; + }; + }; + const activeEventPath = storedRecord.event_log?.active_path; + assert.equal(typeof activeEventPath, "string"); + + const eventLog = await fs.readFile(activeEventPath as string, "utf8"); + const eventMessages = eventLog + .split("\n") + .map((line) => line.trim()) + .filter((line) => line.length > 0) + .map((line) => JSON.parse(line) as Record); + const eventChunkTexts = eventMessages + .map((message) => extractAgentMessageChunkText(message)) + .filter((text): text is string => typeof text === "string"); + + assert.equal(eventChunkTexts.includes(replayText), false, eventLog); + assert.equal(eventChunkTexts.includes(freshText), true, eventLog); + } finally { + if (sessionId) { + const lock = await readQueueOwnerLock(homeDir, sessionId).catch( + () => undefined, + ); + await runCli( + [...replayAgentArgs, "--format", "json", "sessions", "close"], + homeDir, + ); + if (lock) { + await waitForPidExit(lock.pid, 5_000); + } + } + await fs.rm(cwd, { recursive: true, force: true }); + } + }); +}); + test("integration: cancel yields cancelled stopReason without queue error", async () => { await withTempHome(async (homeDir) => { const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-")); diff --git a/test/mock-agent.ts b/test/mock-agent.ts index f8cd5c2..2684fb3 100644 --- a/test/mock-agent.ts +++ b/test/mock-agent.ts @@ -28,6 +28,8 @@ type MockAgentOptions = { loadSessionMeta?: Record; supportsLoadSession: boolean; loadSessionFailsOnEmpty: boolean; + replayLoadSessionUpdates: boolean; + loadReplayText: string; }; type SessionState = { @@ -253,6 +255,8 @@ function parseMockAgentOptions(argv: string[]): MockAgentOptions { const loadSessionMeta: Record = {}; let supportsLoadSession = false; let loadSessionFailsOnEmpty = false; + let replayLoadSessionUpdates = false; + let loadReplayText = "replayed load session update"; for (let index = 0; index < argv.length; index += 1) { const token = argv[index]; @@ -268,6 +272,20 @@ function parseMockAgentOptions(argv: string[]): MockAgentOptions { continue; } + if (token === "--replay-load-session-updates") { + supportsLoadSession = true; + replayLoadSessionUpdates = true; + continue; + } + + if (token === "--load-replay-text") { + supportsLoadSession = true; + replayLoadSessionUpdates = true; + loadReplayText = parseOptionValue(argv, index + 1, token); + index += 1; + continue; + } + const metaFlag = META_FLAG_SPECS[token]; if (metaFlag) { const value = parseOptionValue(argv, index + 1, token); @@ -293,6 +311,8 @@ function parseMockAgentOptions(argv: string[]): MockAgentOptions { Object.keys(loadSessionMeta).length > 0 ? { ...loadSessionMeta } : undefined, supportsLoadSession, loadSessionFailsOnEmpty, + replayLoadSessionUpdates, + loadReplayText, }; } @@ -357,6 +377,10 @@ class MockAgent implements Agent { this.sessions.set(params.sessionId, existing ?? { hasCompletedPrompt: false }); + if (this.options.replayLoadSessionUpdates) { + await this.sendAssistantMessage(params.sessionId, this.options.loadReplayText); + } + if (this.options.loadSessionMeta) { return { _meta: { ...this.options.loadSessionMeta }, From 58fd801705845f1b77be57719d1a76009d29d6ac Mon Sep 17 00:00:00 2001 From: Onur <2453968+osolmaz@users.noreply.github.com> Date: Sun, 1 Mar 2026 12:20:50 +0100 Subject: [PATCH 2/3] refactor: split ACP output and persistence callbacks --- src/acp-jsonrpc.ts | 15 ++++++++++++++ src/client.ts | 23 ++++++++++------------ src/session-runtime.ts | 4 +++- src/types.ts | 4 ++++ test/events.test.ts | 44 +++++++++++++++++++++++++++++++++++++++++- 5 files changed, 75 insertions(+), 15 deletions(-) diff --git a/src/acp-jsonrpc.ts b/src/acp-jsonrpc.ts index 4bb7db9..cb1791a 100644 --- a/src/acp-jsonrpc.ts +++ b/src/acp-jsonrpc.ts @@ -72,6 +72,21 @@ export function isAcpJsonRpcMessage(value: unknown): value is AnyMessage { return false; } +export function isJsonRpcNotification(message: AnyMessage): boolean { + return ( + Object.hasOwn(message, "method") && + typeof (message as { method?: unknown }).method === "string" && + !Object.hasOwn(message, "id") + ); +} + +export function isSessionUpdateNotification(message: AnyMessage): boolean { + return ( + isJsonRpcNotification(message) && + (message as { method?: unknown }).method === "session/update" + ); +} + export function parsePromptStopReason(message: AnyMessage): string | undefined { if (!Object.hasOwn(message, "id") || !Object.hasOwn(message, "result")) { return undefined; diff --git a/src/client.ts b/src/client.ts index 37e484a..7b988a4 100644 --- a/src/client.ts +++ b/src/client.ts @@ -34,6 +34,7 @@ import { AuthPolicyError, PermissionPromptUnavailableError, } from "./errors.js"; +import { isSessionUpdateNotification } from "./acp-jsonrpc.js"; import { FileSystemHandlers } from "./filesystem.js"; import { classifyPermissionDecision, resolvePermissionRequest } from "./permissions.js"; import { extractRuntimeSessionId } from "./runtime-session-id.js"; @@ -493,22 +494,16 @@ export class AcpClient { writable: WritableStream; } { const onAcpMessage = this.options.onAcpMessage; + const onAcpOutputMessage = this.options.onAcpOutputMessage; - if (!onAcpMessage) { + if (!onAcpMessage && !onAcpOutputMessage) { return base; } const shouldSuppressInboundReplaySessionUpdate = (message: AnyMessage): boolean => { - if (!this.suppressReplaySessionUpdateMessages) { - return false; - } - if (Object.hasOwn(message, "id")) { - return false; - } - if (!Object.hasOwn(message, "method")) { - return false; - } - return (message as { method?: unknown }).method === "session/update"; + return ( + this.suppressReplaySessionUpdateMessages && isSessionUpdateNotification(message) + ); }; const readable = new ReadableStream({ @@ -524,7 +519,8 @@ export class AcpClient { continue; } if (!shouldSuppressInboundReplaySessionUpdate(value)) { - onAcpMessage("inbound", value); + onAcpOutputMessage?.("inbound", value); + onAcpMessage?.("inbound", value); } controller.enqueue(value); } @@ -537,7 +533,8 @@ export class AcpClient { const writable = new WritableStream({ async write(message) { - onAcpMessage("outbound", message); + onAcpOutputMessage?.("outbound", message); + onAcpMessage?.("outbound", message); const writer = base.writable.getWriter(); try { await writer.write(message); diff --git a/src/session-runtime.ts b/src/session-runtime.ts index bb98878..32eae5a 100644 --- a/src/session-runtime.ts +++ b/src/session-runtime.ts @@ -399,6 +399,8 @@ async function runSessionPrompt( onAcpMessage: (_direction, message) => { sawAcpMessage = true; pendingMessages.push(message); + }, + onAcpOutputMessage: (_direction, message) => { output.onAcpMessage(message); }, onSessionUpdate: (notification) => { @@ -587,7 +589,7 @@ export async function runOnce(options: RunOnceOptions): Promise authPolicy: options.authPolicy, suppressSdkConsoleErrors: options.suppressSdkConsoleErrors, verbose: options.verbose, - onAcpMessage: (_direction, message) => output.onAcpMessage(message), + onAcpOutputMessage: (_direction, message) => output.onAcpMessage(message), }); try { diff --git a/src/types.ts b/src/types.ts index b0a6e5b..cd73cd5 100644 --- a/src/types.ts +++ b/src/types.ts @@ -149,6 +149,10 @@ export type AcpClientOptions = { suppressSdkConsoleErrors?: boolean; verbose?: boolean; onAcpMessage?: (direction: AcpMessageDirection, message: AcpJsonRpcMessage) => void; + onAcpOutputMessage?: ( + direction: AcpMessageDirection, + message: AcpJsonRpcMessage, + ) => void; onSessionUpdate?: (notification: SessionNotification) => void; onClientOperation?: (operation: ClientOperation) => void; }; diff --git a/test/events.test.ts b/test/events.test.ts index 236b73c..9768f07 100644 --- a/test/events.test.ts +++ b/test/events.test.ts @@ -1,6 +1,9 @@ import assert from "node:assert/strict"; import test from "node:test"; -import { isAcpJsonRpcMessage } from "../src/acp-jsonrpc.js"; +import { + isAcpJsonRpcMessage, + isSessionUpdateNotification, +} from "../src/acp-jsonrpc.js"; test("isAcpJsonRpcMessage accepts JSON-RPC request", () => { assert.equal( @@ -111,3 +114,42 @@ test("isAcpJsonRpcMessage accepts request/notification/response fixtures after r assert.equal(isAcpJsonRpcMessage(roundTripped), true); } }); + +test("isSessionUpdateNotification matches session/update notifications only", () => { + assert.equal( + isSessionUpdateNotification({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "hello" }, + }, + }, + }), + true, + ); + + assert.equal( + isSessionUpdateNotification({ + jsonrpc: "2.0", + id: "req-1", + method: "session/prompt", + params: { + sessionId: "session-1", + prompt: [{ type: "text", text: "hello" }], + }, + }), + false, + ); + + assert.equal( + isSessionUpdateNotification({ + jsonrpc: "2.0", + id: "req-2", + result: { stopReason: "end_turn" }, + }), + false, + ); +}); From cd2a89251c1990af62ce4c16925ec914219d017e Mon Sep 17 00:00:00 2001 From: Onur <2453968+osolmaz@users.noreply.github.com> Date: Sun, 1 Mar 2026 12:23:39 +0100 Subject: [PATCH 3/3] test: extract JSON-RPC integration helpers --- test/integration.test.ts | 55 ++++-------------------------------- test/jsonrpc-test-helpers.ts | 53 ++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 50 deletions(-) create mode 100644 test/jsonrpc-test-helpers.ts diff --git a/test/integration.test.ts b/test/integration.test.ts index ab8c9e4..ca6d9de 100644 --- a/test/integration.test.ts +++ b/test/integration.test.ts @@ -6,6 +6,11 @@ import os from "node:os"; import path from "node:path"; import test from "node:test"; import { fileURLToPath } from "node:url"; +import { + extractAgentMessageChunkText, + extractJsonRpcId, + parseJsonRpcOutputLines, +} from "./jsonrpc-test-helpers.js"; import { queuePaths } from "./queue-test-helpers.js"; const CLI_PATH = fileURLToPath(new URL("../src/cli.js", import.meta.url)); @@ -19,56 +24,6 @@ type CliRunResult = { stderr: string; }; -function asRecord(value: unknown): Record | undefined { - if (!value || typeof value !== "object" || Array.isArray(value)) { - return undefined; - } - return value as Record; -} - -function extractAgentMessageChunkText( - message: Record, -): string | undefined { - if (message.method !== "session/update") { - return undefined; - } - - const params = asRecord(message.params); - const update = asRecord(params?.update); - const content = asRecord(update?.content); - if ( - update?.sessionUpdate !== "agent_message_chunk" || - content?.type !== "text" || - typeof content.text !== "string" - ) { - return undefined; - } - - return content.text; -} - -function extractJsonRpcId( - message: Record, -): string | number | undefined { - if (typeof message.id === "string" || typeof message.id === "number") { - return message.id; - } - return undefined; -} - -function parseJsonRpcOutputLines(stdout: string): Array> { - const lines = stdout - .split("\n") - .map((line) => line.trim()) - .filter((line) => line.length > 0); - assert(lines.length > 0, "expected at least one JSON-RPC line"); - return lines.map((line) => { - const parsed = JSON.parse(line) as Record; - assert.equal(parsed.jsonrpc, "2.0"); - return parsed; - }); -} - test("integration: exec echo baseline", async () => { await withTempHome(async (homeDir) => { const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-")); diff --git a/test/jsonrpc-test-helpers.ts b/test/jsonrpc-test-helpers.ts new file mode 100644 index 0000000..57dcfc5 --- /dev/null +++ b/test/jsonrpc-test-helpers.ts @@ -0,0 +1,53 @@ +import assert from "node:assert/strict"; + +function asRecord(value: unknown): Record | undefined { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return undefined; + } + return value as Record; +} + +export function parseJsonRpcOutputLines( + stdout: string, +): Array> { + const lines = stdout + .split("\n") + .map((line) => line.trim()) + .filter((line) => line.length > 0); + assert(lines.length > 0, "expected at least one JSON-RPC line"); + return lines.map((line) => { + const parsed = JSON.parse(line) as Record; + assert.equal(parsed.jsonrpc, "2.0"); + return parsed; + }); +} + +export function extractAgentMessageChunkText( + message: Record, +): string | undefined { + if (message.method !== "session/update") { + return undefined; + } + + const params = asRecord(message.params); + const update = asRecord(params?.update); + const content = asRecord(update?.content); + if ( + update?.sessionUpdate !== "agent_message_chunk" || + content?.type !== "text" || + typeof content.text !== "string" + ) { + return undefined; + } + + return content.text; +} + +export function extractJsonRpcId( + message: Record, +): string | number | undefined { + if (typeof message.id === "string" || typeof message.id === "number") { + return message.id; + } + return undefined; +}