Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/acp-jsonrpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ export function isAcpJsonRpcMessage(value: unknown): value is AnyMessage {
return false;
}

export function isJsonRpcNotification(message: AnyMessage): boolean {
return (
Object.hasOwn(message, "method") &&
typeof (message as { method?: unknown }).method === "string" &&
!Object.hasOwn(message, "id")
);
}

export function isSessionUpdateNotification(message: AnyMessage): boolean {
return (
isJsonRpcNotification(message) &&
(message as { method?: unknown }).method === "session/update"
);
}

export function parsePromptStopReason(message: AnyMessage): string | undefined {
if (!Object.hasOwn(message, "id") || !Object.hasOwn(message, "result")) {
return undefined;
Expand Down
24 changes: 21 additions & 3 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
AuthPolicyError,
PermissionPromptUnavailableError,
} from "./errors.js";
import { isSessionUpdateNotification } from "./acp-jsonrpc.js";
import { FileSystemHandlers } from "./filesystem.js";
import { classifyPermissionDecision, resolvePermissionRequest } from "./permissions.js";
import { extractRuntimeSessionId } from "./runtime-session-id.js";
Expand Down Expand Up @@ -283,6 +284,7 @@ export class AcpClient {
private observedSessionUpdates = 0;
private processedSessionUpdates = 0;
private suppressSessionUpdates = false;
private suppressReplaySessionUpdateMessages = false;
private activePrompt?: {
sessionId: string;
promise: Promise<PromptResponse>;
Expand Down Expand Up @@ -492,11 +494,18 @@ export class AcpClient {
writable: WritableStream<AnyMessage>;
} {
const onAcpMessage = this.options.onAcpMessage;
const onAcpOutputMessage = this.options.onAcpOutputMessage;

if (!onAcpMessage) {
if (!onAcpMessage && !onAcpOutputMessage) {
return base;
}

const shouldSuppressInboundReplaySessionUpdate = (message: AnyMessage): boolean => {
return (
this.suppressReplaySessionUpdateMessages && isSessionUpdateNotification(message)
);
};

const readable = new ReadableStream<AnyMessage>({
async start(controller) {
const reader = base.readable.getReader();
Expand All @@ -509,7 +518,10 @@ export class AcpClient {
if (!value) {
continue;
}
onAcpMessage("inbound", value);
if (!shouldSuppressInboundReplaySessionUpdate(value)) {
onAcpOutputMessage?.("inbound", value);
onAcpMessage?.("inbound", value);
}
controller.enqueue(value);
}
} finally {
Expand All @@ -521,7 +533,8 @@ export class AcpClient {

const writable = new WritableStream<AnyMessage>({
async write(message) {
onAcpMessage("outbound", message);
onAcpOutputMessage?.("outbound", message);
onAcpMessage?.("outbound", message);
const writer = base.writable.getWriter();
try {
await writer.write(message);
Expand Down Expand Up @@ -561,8 +574,11 @@ export class AcpClient {
): Promise<SessionLoadResult> {
const connection = this.getConnection();
const previousSuppression = this.suppressSessionUpdates;
const previousReplaySuppression = this.suppressReplaySessionUpdateMessages;
this.suppressSessionUpdates =
previousSuppression || Boolean(options.suppressReplayUpdates);
this.suppressReplaySessionUpdateMessages =
previousReplaySuppression || Boolean(options.suppressReplayUpdates);

let response: LoadSessionResponse | undefined;

Expand All @@ -579,6 +595,7 @@ export class AcpClient {
);
} finally {
this.suppressSessionUpdates = previousSuppression;
this.suppressReplaySessionUpdateMessages = previousReplaySuppression;
}

return {
Expand Down Expand Up @@ -730,6 +747,7 @@ export class AcpClient {
this.observedSessionUpdates = 0;
this.processedSessionUpdates = 0;
this.suppressSessionUpdates = false;
this.suppressReplaySessionUpdateMessages = false;
this.activePrompt = undefined;
this.cancellingSessionIds.clear();
this.promptPermissionFailures.clear();
Expand Down
4 changes: 3 additions & 1 deletion src/session-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ async function runSessionPrompt(
onAcpMessage: (_direction, message) => {
sawAcpMessage = true;
pendingMessages.push(message);
},
onAcpOutputMessage: (_direction, message) => {
output.onAcpMessage(message);
},
onSessionUpdate: (notification) => {
Expand Down Expand Up @@ -587,7 +589,7 @@ export async function runOnce(options: RunOnceOptions): Promise<RunPromptResult>
authPolicy: options.authPolicy,
suppressSdkConsoleErrors: options.suppressSdkConsoleErrors,
verbose: options.verbose,
onAcpMessage: (_direction, message) => output.onAcpMessage(message),
onAcpOutputMessage: (_direction, message) => output.onAcpMessage(message),
});

try {
Expand Down
4 changes: 4 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ export type AcpClientOptions = {
suppressSdkConsoleErrors?: boolean;
verbose?: boolean;
onAcpMessage?: (direction: AcpMessageDirection, message: AcpJsonRpcMessage) => void;
onAcpOutputMessage?: (
direction: AcpMessageDirection,
message: AcpJsonRpcMessage,
) => void;
onSessionUpdate?: (notification: SessionNotification) => void;
onClientOperation?: (operation: ClientOperation) => void;
};
Expand Down
44 changes: 43 additions & 1 deletion test/events.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import assert from "node:assert/strict";
import test from "node:test";
import { isAcpJsonRpcMessage } from "../src/acp-jsonrpc.js";
import {
isAcpJsonRpcMessage,
isSessionUpdateNotification,
} from "../src/acp-jsonrpc.js";

test("isAcpJsonRpcMessage accepts JSON-RPC request", () => {
assert.equal(
Expand Down Expand Up @@ -111,3 +114,42 @@ test("isAcpJsonRpcMessage accepts request/notification/response fixtures after r
assert.equal(isAcpJsonRpcMessage(roundTripped), true);
}
});

test("isSessionUpdateNotification matches session/update notifications only", () => {
assert.equal(
isSessionUpdateNotification({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "hello" },
},
},
}),
true,
);

assert.equal(
isSessionUpdateNotification({
jsonrpc: "2.0",
id: "req-1",
method: "session/prompt",
params: {
sessionId: "session-1",
prompt: [{ type: "text", text: "hello" }],
},
}),
false,
);

assert.equal(
isSessionUpdateNotification({
jsonrpc: "2.0",
id: "req-2",
result: { stopReason: "end_turn" },
}),
false,
);
});
124 changes: 111 additions & 13 deletions test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import os from "node:os";
import path from "node:path";
import test from "node:test";
import { fileURLToPath } from "node:url";
import {
extractAgentMessageChunkText,
extractJsonRpcId,
parseJsonRpcOutputLines,
} from "./jsonrpc-test-helpers.js";
import { queuePaths } from "./queue-test-helpers.js";

const CLI_PATH = fileURLToPath(new URL("../src/cli.js", import.meta.url));
Expand All @@ -19,19 +24,6 @@ type CliRunResult = {
stderr: string;
};

function parseJsonRpcOutputLines(stdout: string): Array<Record<string, unknown>> {
const lines = stdout
.split("\n")
.map((line) => line.trim())
.filter((line) => line.length > 0);
assert(lines.length > 0, "expected at least one JSON-RPC line");
return lines.map((line) => {
const parsed = JSON.parse(line) as Record<string, unknown>;
assert.equal(parsed.jsonrpc, "2.0");
return parsed;
});
}

test("integration: exec echo baseline", async () => {
await withTempHome(async (homeDir) => {
const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-"));
Expand Down Expand Up @@ -470,6 +462,112 @@ test("integration: prompt recovers when loadSession fails on empty session", asy
});
});

test("integration: load replay session/update notifications are suppressed from output and event log", async () => {
await withTempHome(async (homeDir) => {
const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-"));
const replayText = "replay-load-chunk";
const freshText = "fresh-after-load";
const replayLoadAgentCommand =
`${MOCK_AGENT_COMMAND} --supports-load-session ` +
`--replay-load-session-updates --load-replay-text ${replayText}`;
const replayAgentArgs = [
"--agent",
replayLoadAgentCommand,
"--approve-all",
"--cwd",
cwd,
];
let sessionId: string | undefined;

try {
const created = await runCli(
[...replayAgentArgs, "--format", "json", "sessions", "new"],
homeDir,
);
assert.equal(created.code, 0, created.stderr);
const createdPayload = JSON.parse(created.stdout.trim()) as {
acpxRecordId?: string;
};
sessionId = createdPayload.acpxRecordId;
assert.equal(typeof sessionId, "string");

const prompt = await runCli(
[...replayAgentArgs, "--format", "json", "prompt", `echo ${freshText}`],
homeDir,
);
assert.equal(prompt.code, 0, prompt.stderr);

const outputMessages = parseJsonRpcOutputLines(prompt.stdout);
const outputChunkTexts = outputMessages
.map((message) => extractAgentMessageChunkText(message))
.filter((text): text is string => typeof text === "string");

assert.equal(outputChunkTexts.includes(replayText), false, prompt.stdout);
assert.equal(outputChunkTexts.includes(freshText), true, prompt.stdout);

const loadRequest = outputMessages.find((message) => {
return (
message.method === "session/load" && extractJsonRpcId(message) !== undefined
);
});
assert(loadRequest, `expected session/load request in output:\n${prompt.stdout}`);

const loadRequestId = extractJsonRpcId(loadRequest);
assert.notEqual(loadRequestId, undefined);
assert.equal(
outputMessages.some(
(message) =>
extractJsonRpcId(message) === loadRequestId &&
Object.hasOwn(message, "result"),
),
true,
prompt.stdout,
);

const recordPath = path.join(
homeDir,
".acpx",
"sessions",
`${encodeURIComponent(sessionId as string)}.json`,
);
const storedRecord = JSON.parse(await fs.readFile(recordPath, "utf8")) as {
event_log?: {
active_path?: string;
};
};
const activeEventPath = storedRecord.event_log?.active_path;
assert.equal(typeof activeEventPath, "string");

const eventLog = await fs.readFile(activeEventPath as string, "utf8");
const eventMessages = eventLog
.split("\n")
.map((line) => line.trim())
.filter((line) => line.length > 0)
.map((line) => JSON.parse(line) as Record<string, unknown>);
const eventChunkTexts = eventMessages
.map((message) => extractAgentMessageChunkText(message))
.filter((text): text is string => typeof text === "string");

assert.equal(eventChunkTexts.includes(replayText), false, eventLog);
assert.equal(eventChunkTexts.includes(freshText), true, eventLog);
} finally {
if (sessionId) {
const lock = await readQueueOwnerLock(homeDir, sessionId).catch(
() => undefined,
);
await runCli(
[...replayAgentArgs, "--format", "json", "sessions", "close"],
homeDir,
);
if (lock) {
await waitForPidExit(lock.pid, 5_000);
}
}
await fs.rm(cwd, { recursive: true, force: true });
}
});
});

test("integration: cancel yields cancelled stopReason without queue error", async () => {
await withTempHome(async (homeDir) => {
const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-"));
Expand Down
53 changes: 53 additions & 0 deletions test/jsonrpc-test-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import assert from "node:assert/strict";

function asRecord(value: unknown): Record<string, unknown> | undefined {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return undefined;
}
return value as Record<string, unknown>;
}

export function parseJsonRpcOutputLines(
stdout: string,
): Array<Record<string, unknown>> {
const lines = stdout
.split("\n")
.map((line) => line.trim())
.filter((line) => line.length > 0);
assert(lines.length > 0, "expected at least one JSON-RPC line");
return lines.map((line) => {
const parsed = JSON.parse(line) as Record<string, unknown>;
assert.equal(parsed.jsonrpc, "2.0");
return parsed;
});
}

export function extractAgentMessageChunkText(
message: Record<string, unknown>,
): string | undefined {
if (message.method !== "session/update") {
return undefined;
}

const params = asRecord(message.params);
const update = asRecord(params?.update);
const content = asRecord(update?.content);
if (
update?.sessionUpdate !== "agent_message_chunk" ||
content?.type !== "text" ||
typeof content.text !== "string"
) {
return undefined;
}

return content.text;
}

export function extractJsonRpcId(
message: Record<string, unknown>,
): string | number | undefined {
if (typeof message.id === "string" || typeof message.id === "number") {
return message.id;
}
return undefined;
}
Loading