Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cli-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import {
type SessionAgentContent,
type SessionUserContent,
} from "./types.js";
import { getAcpxVersion } from "./version.js";
import { runQueueOwnerFromEnv } from "./queue-owner-env.js";

class NoSessionError extends Error {
Expand Down Expand Up @@ -1433,6 +1434,7 @@ export async function main(argv: string[] = process.argv): Promise<void> {
program
.name("acpx")
.description("Headless CLI client for the Agent Client Protocol")
.version(getAcpxVersion())
.enablePositionalOptions()
.showHelpAfterError();

Expand Down
101 changes: 90 additions & 11 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type CommandParts = {
const REPLAY_IDLE_MS = 80;
const REPLAY_DRAIN_TIMEOUT_MS = 5_000;
const DRAIN_POLL_INTERVAL_MS = 20;
const AGENT_CLOSE_AFTER_STDIN_END_MS = 100;
const AGENT_CLOSE_TERM_GRACE_MS = 1_500;
const AGENT_CLOSE_KILL_GRACE_MS = 1_000;

type LoadSessionOptions = {
suppressReplayUpdates?: boolean;
Expand Down Expand Up @@ -134,6 +137,47 @@ function waitForSpawn(child: ChildProcess): Promise<void> {
});
}

function isChildProcessRunning(child: ChildProcess): boolean {
return child.exitCode == null && child.signalCode == null;
}

function waitForChildExit(
child: ChildProcessByStdio<Writable, Readable, Readable>,
timeoutMs: number,
): Promise<boolean> {
if (!isChildProcessRunning(child)) {
return Promise.resolve(true);
}

return new Promise<boolean>((resolve) => {
let settled = false;
const timer = setTimeout(
() => {
finish(false);
},
Math.max(0, timeoutMs),
);

const finish = (value: boolean) => {
if (settled) {
return;
}
settled = true;
child.off("close", onExitLike);
child.off("exit", onExitLike);
clearTimeout(timer);
resolve(value);
};

const onExitLike = () => {
finish(true);
};

child.once("close", onExitLike);
child.once("exit", onExitLike);
});
}

function splitCommandLine(value: string): CommandParts {
const parts: string[] = [];
let current = "";
Expand Down Expand Up @@ -735,12 +779,7 @@ export class AcpClient {

const agent = this.agent;
if (agent) {
// Some adapters keep stdio handles alive after SIGTERM; explicitly
// destroy/unref so CLI calls can exit deterministically.
if (!agent.killed) {
agent.kill();
}
this.detachAgentHandles(agent);
await this.terminateAgentProcess(agent);
}

this.sessionUpdateChain = Promise.resolve();
Expand All @@ -755,7 +794,45 @@ export class AcpClient {
this.agent = undefined;
}

private detachAgentHandles(agent: ChildProcess): void {
private async terminateAgentProcess(
child: ChildProcessByStdio<Writable, Readable, Readable>,
): Promise<void> {
// Closing stdin is the most graceful shutdown signal for stdio-based ACP agents.
if (!child.stdin.destroyed) {
try {
child.stdin.end();
} catch {
// best effort
}
}

let exited = await waitForChildExit(child, AGENT_CLOSE_AFTER_STDIN_END_MS);
if (!exited && isChildProcessRunning(child)) {
try {
child.kill("SIGTERM");
} catch {
// best effort
}
exited = await waitForChildExit(child, AGENT_CLOSE_TERM_GRACE_MS);
}

if (!exited && isChildProcessRunning(child)) {
this.log(
`agent did not exit after ${AGENT_CLOSE_TERM_GRACE_MS}ms; forcing SIGKILL`,
);
try {
child.kill("SIGKILL");
} catch {
// best effort
}
exited = await waitForChildExit(child, AGENT_CLOSE_KILL_GRACE_MS);
}

// Ensure stdio handles don't keep this process alive after close() returns.
this.detachAgentHandles(child, !exited);
}

private detachAgentHandles(agent: ChildProcess, unref: boolean): void {
const stdin = agent.stdin as Writable | null;
const stdout = agent.stdout as Readable | null;
const stderr = agent.stderr as Readable | null;
Expand All @@ -764,10 +841,12 @@ export class AcpClient {
stdout?.destroy();
stderr?.destroy();

try {
agent.unref();
} catch {
// best effort
if (unref) {
try {
agent.unref();
} catch {
// best effort
}
}
}

Expand Down
122 changes: 122 additions & 0 deletions test/cli.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import assert from "node:assert/strict";
import { spawn } from "node:child_process";
import { readFileSync } from "node:fs";
import fs from "node:fs/promises";
import net from "node:net";
import os from "node:os";
Expand All @@ -22,7 +23,30 @@ import {

const CLI_PATH = fileURLToPath(new URL("../src/cli.js", import.meta.url));
const MOCK_AGENT_PATH = fileURLToPath(new URL("./mock-agent.js", import.meta.url));
function readPackageVersionForTest(): string {
const candidates = [
fileURLToPath(new URL("../package.json", import.meta.url)),
fileURLToPath(new URL("../../package.json", import.meta.url)),
path.join(process.cwd(), "package.json"),
];
for (const candidate of candidates) {
try {
const parsed = JSON.parse(readFileSync(candidate, "utf8")) as {
version?: unknown;
};
if (typeof parsed.version === "string" && parsed.version.trim().length > 0) {
return parsed.version;
}
} catch {
// continue searching
}
}
throw new Error("package.json version is missing");
}

const PACKAGE_VERSION = readPackageVersionForTest();
const MOCK_AGENT_COMMAND = `node ${JSON.stringify(MOCK_AGENT_PATH)}`;
const MOCK_AGENT_IGNORING_SIGTERM = `${MOCK_AGENT_COMMAND} --ignore-sigterm`;
const MOCK_CODEX_AGENT_WITH_RUNTIME_SESSION_ID = `${MOCK_AGENT_COMMAND} --codex-session-id codex-runtime-session`;
const MOCK_CLAUDE_AGENT_WITH_RUNTIME_SESSION_ID = `${MOCK_AGENT_COMMAND} --claude-session-id claude-runtime-session`;
const MOCK_AGENT_WITH_LOAD_RUNTIME_SESSION_ID = `${MOCK_AGENT_COMMAND} --supports-load-session --load-runtime-session-id loaded-runtime-session`;
Expand All @@ -44,6 +68,15 @@ type ParsedAcpError = {
};
};

test("CLI --version prints package version", async () => {
await withTempHome(async (homeDir) => {
const result = await runCli(["--version"], homeDir);
assert.equal(result.code, 0, result.stderr);
assert.equal(result.stderr.trim(), "");
assert.equal(result.stdout.trim(), PACKAGE_VERSION);
});
});

function parseSingleAcpErrorLine(stdout: string): ParsedAcpError {
const payload = JSON.parse(stdout.trim()) as {
jsonrpc?: string;
Expand Down Expand Up @@ -204,6 +237,62 @@ test("sessions ensure creates when missing and returns existing on subsequent ca
});
});

test("sessions ensure exits even when agent ignores SIGTERM", async () => {
await withTempHome(async (homeDir) => {
const cwd = path.join(homeDir, "workspace");
await fs.mkdir(cwd, { recursive: true });
await fs.mkdir(path.join(homeDir, ".acpx"), { recursive: true });
await fs.writeFile(
path.join(homeDir, ".acpx", "config.json"),
`${JSON.stringify(
{
agents: {
codex: {
command: MOCK_AGENT_IGNORING_SIGTERM,
},
},
},
null,
2,
)}\n`,
"utf8",
);

const result = await runCli(
["--cwd", cwd, "--format", "json", "codex", "sessions", "ensure"],
homeDir,
{ timeoutMs: 8_000 },
);
assert.equal(result.code, 0, result.stderr);

const payload = JSON.parse(result.stdout.trim()) as {
action?: unknown;
created?: unknown;
acpxRecordId?: unknown;
};
assert.equal(payload.action, "session_ensured");
assert.equal(payload.created, true);
assert.equal(typeof payload.acpxRecordId, "string");

const storedRecord = JSON.parse(
await fs.readFile(
path.join(
homeDir,
".acpx",
"sessions",
`${encodeURIComponent(payload.acpxRecordId as string)}.json`,
),
"utf8",
),
) as SessionRecord;

if (storedRecord.pid != null) {
const exited = await waitForPidExit(storedRecord.pid, 2_000);
assert.equal(exited, true);
}
});
});

test("sessions ensure resolves existing session by directory walk", async () => {
await withTempHome(async (homeDir) => {
const root = path.join(homeDir, "workspace");
Expand Down Expand Up @@ -1170,6 +1259,7 @@ async function withTempHome(run: (homeDir: string) => Promise<void>): Promise<vo
type CliRunOptions = {
stdin?: string;
cwd?: string;
timeoutMs?: number;
};

async function runCli(
Expand Down Expand Up @@ -1206,12 +1296,44 @@ async function runCli(
child.stdin.end();
}

let timedOut = false;
let timeout: NodeJS.Timeout | undefined;
if (options.timeoutMs != null && options.timeoutMs > 0) {
timeout = setTimeout(() => {
timedOut = true;
if (child.exitCode == null && child.signalCode == null) {
child.kill("SIGKILL");
}
}, options.timeoutMs);
}

child.once("close", (code) => {
if (timeout) {
clearTimeout(timeout);
}
if (timedOut) {
stderr += `[test] timed out after ${options.timeoutMs}ms\n`;
}
resolve({ code, stdout, stderr });
});
});
}

async function waitForPidExit(pid: number, timeoutMs: number): Promise<boolean> {
const deadline = Date.now() + Math.max(0, timeoutMs);
while (Date.now() < deadline) {
try {
process.kill(pid, 0);
} catch {
return true;
}
await new Promise<void>((resolve) => {
setTimeout(resolve, 50);
});
}
return false;
}

function makeSessionRecord(
record: Partial<SessionRecord> & {
acpxRecordId: string;
Expand Down
15 changes: 15 additions & 0 deletions test/mock-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type MockAgentOptions = {
loadSessionFailsOnEmpty: boolean;
replayLoadSessionUpdates: boolean;
loadReplayText: string;
ignoreSigterm: boolean;
};

type SessionState = {
Expand Down Expand Up @@ -257,6 +258,7 @@ function parseMockAgentOptions(argv: string[]): MockAgentOptions {
let loadSessionFailsOnEmpty = false;
let replayLoadSessionUpdates = false;
let loadReplayText = "replayed load session update";
let ignoreSigterm = false;

for (let index = 0; index < argv.length; index += 1) {
const token = argv[index];
Expand All @@ -278,6 +280,11 @@ function parseMockAgentOptions(argv: string[]): MockAgentOptions {
continue;
}

if (token === "--ignore-sigterm") {
ignoreSigterm = true;
continue;
}

if (token === "--load-replay-text") {
supportsLoadSession = true;
replayLoadSessionUpdates = true;
Expand Down Expand Up @@ -313,6 +320,7 @@ function parseMockAgentOptions(argv: string[]): MockAgentOptions {
loadSessionFailsOnEmpty,
replayLoadSessionUpdates,
loadReplayText,
ignoreSigterm,
};
}

Expand Down Expand Up @@ -603,6 +611,13 @@ const output = Writable.toWeb(process.stdout);
const input = Readable.toWeb(process.stdin) as ReadableStream<Uint8Array>;
const stream = ndJsonStream(output, input);
const mockAgentOptions = parseMockAgentOptions(process.argv.slice(2));

if (mockAgentOptions.ignoreSigterm) {
process.on("SIGTERM", () => {
// Intentionally ignore to exercise ACP client SIGKILL fallback behavior.
});
}

new AgentSideConnection(
(connection) => new MockAgent(connection, mockAgentOptions),
stream,
Expand Down