Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions pi/extensions/agent-spawn.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ function randomId() {
}

function createExtensionHarness(execImpl) {
let registeredTool = null;
const registeredTools = {};
const pi = {
registerTool(tool) {
registeredTool = tool;
registeredTools[tool.name] = tool;
},
exec: execImpl,
};
agentSpawnExtension(pi);
if (!registeredTool) throw new Error("agent_spawn tool was not registered");
return registeredTool;
if (!registeredTools.agent_spawn) throw new Error("agent_spawn tool was not registered");
return registeredTools.agent_spawn;
}

function startUnixSocketServer(socketPath) {
Expand Down Expand Up @@ -242,4 +242,56 @@ describe("agent_spawn extension tool", () => {
expect(result.details.aborted).toBe(true);
expect(Date.now() - startedAt).toBeLessThan(1000);
});

it("opens circuit breaker after 3 consecutive failures", async () => {
const root = mkdtempSync(path.join(tmpdir(), "agent-spawn-test-"));
tempDirs.push(root);
const worktree = path.join(root, "worktree");
const skillPath = path.join(root, "dev-skill");
const controlDir = path.join(root, "session-control");
process.env[CONTROL_DIR_ENV] = controlDir;
mkdirSync(worktree, { recursive: true });
mkdirSync(skillPath, { recursive: true });
mkdirSync(controlDir, { recursive: true });

// Spawns succeed at tmux level but readiness always times out (1s timeout)
const execSpy = vi.fn(async () => ({ stdout: "", stderr: "", code: 0, killed: false }));
const tool = createExtensionHarness(execSpy);

const params = {
session_name: `dev-agent-circuit-${randomId()}`,
cwd: worktree,
skill_path: skillPath,
model: "anthropic/claude-opus-4-6",
ready_timeout_sec: 1,
};

// Fail 3 times (readiness timeout)
for (let i = 0; i < 3; i++) {
params.session_name = `dev-agent-circuit-${randomId()}`;
const result = await tool.execute("id", params, undefined, undefined, {});
expect(result.isError).toBe(true);
expect(result.details.error).toBe("readiness_timeout");
}

// 4th attempt should be rejected by circuit breaker
params.session_name = `dev-agent-circuit-${randomId()}`;
const rejected = await tool.execute("id", params, undefined, undefined, {});
expect(rejected.isError).toBe(true);
expect(rejected.details.error).toBe("circuit_open");
expect(String(rejected.content[0].text)).toContain("Circuit breaker OPEN");
});

it("exposes spawn_status tool", () => {
const registeredTools = {};
const pi = {
registerTool(tool) {
registeredTools[tool.name] = tool;
},
exec: async () => ({ stdout: "", stderr: "", code: 0 }),
};
agentSpawnExtension(pi);
expect(registeredTools.spawn_status).toBeDefined();
expect(registeredTools.spawn_status.name).toBe("spawn_status");
});
});
229 changes: 227 additions & 2 deletions pi/extensions/agent-spawn.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { Type } from "@sinclair/typebox";
import { existsSync, mkdirSync, readlinkSync, statSync } from "node:fs";
import { appendFileSync, existsSync, mkdirSync, readlinkSync, statSync } from "node:fs";
import net from "node:net";
import { homedir } from "node:os";
import { dirname, join, resolve } from "node:path";
Expand All @@ -15,8 +15,101 @@ const READINESS_POLL_MS = 200;
const SOCKET_PROBE_TIMEOUT_MS = 300;
const TMUX_SPAWN_TIMEOUT_MS = 15_000;

// Circuit breaker defaults
const CIRCUIT_FAILURE_THRESHOLD = 3;
const CIRCUIT_COOLDOWN_MS = 5 * 60 * 1000; // 5 minutes

type SpawnStage = "spawn" | "wait_alias" | "wait_socket" | "probe" | "aborted";

// ── Circuit Breaker ─────────────────────────────────────────────────────────

type CircuitState = "closed" | "open" | "half-open";

type CircuitBreaker = {
state: CircuitState;
consecutiveFailures: number;
lastFailureAt: number | null;
lastSuccessAt: number | null;
totalFailures: number;
totalSuccesses: number;
};

function createCircuitBreaker(): CircuitBreaker {
return {
state: "closed",
consecutiveFailures: 0,
lastFailureAt: null,
lastSuccessAt: null,
totalFailures: 0,
totalSuccesses: 0,
};
}

function recordSuccess(cb: CircuitBreaker): void {
cb.consecutiveFailures = 0;
cb.lastSuccessAt = Date.now();
cb.totalSuccesses++;
cb.state = "closed";
}

function recordFailure(cb: CircuitBreaker): void {
cb.consecutiveFailures++;
cb.lastFailureAt = Date.now();
cb.totalFailures++;
if (cb.consecutiveFailures >= CIRCUIT_FAILURE_THRESHOLD) {
cb.state = "open";
}
}

function isCircuitOpen(cb: CircuitBreaker): boolean {
if (cb.state !== "open") return false;
// Check if cooldown has elapsed → transition to half-open
if (cb.lastFailureAt && Date.now() - cb.lastFailureAt >= CIRCUIT_COOLDOWN_MS) {
cb.state = "half-open";
return false;
}
return true;
}

function circuitStatus(cb: CircuitBreaker): string {
const cooldownRemaining =
cb.state === "open" && cb.lastFailureAt
? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - cb.lastFailureAt))
: 0;
return [
`State: ${cb.state}`,
`Consecutive failures: ${cb.consecutiveFailures}/${CIRCUIT_FAILURE_THRESHOLD}`,
`Total: ${cb.totalSuccesses} ok, ${cb.totalFailures} failed`,
`Last success: ${cb.lastSuccessAt ? new Date(cb.lastSuccessAt).toISOString() : "never"}`,
`Last failure: ${cb.lastFailureAt ? new Date(cb.lastFailureAt).toISOString() : "never"}`,
cb.state === "open" ? `Cooldown remaining: ${Math.round(cooldownRemaining / 1000)}s` : "",
]
.filter(Boolean)
.join("\n ");
}

// ── Lifecycle Log ───────────────────────────────────────────────────────────

const LIFECYCLE_LOG_PATH = join(homedir(), ".pi", "agent", "logs", "worker-lifecycle.jsonl");

type LifecycleEvent = {
timestamp: string;
session_name: string;
event: "spawn_started" | "spawn_success" | "spawn_failed" | "circuit_rejected";
stage?: string;
ready_after_ms?: number;
error?: string;
};

function logLifecycleEvent(event: LifecycleEvent): void {
try {
mkdirSync(dirname(LIFECYCLE_LOG_PATH), { recursive: true });
appendFileSync(LIFECYCLE_LOG_PATH, JSON.stringify(event) + "\n");
} catch {
// Best-effort — don't break spawn on logging failure
}
}

type ReadinessResult = {
ready: boolean;
aborted: boolean;
Expand Down Expand Up @@ -192,11 +285,14 @@ type AgentSpawnInput = {
};

export default function agentSpawnExtension(pi: ExtensionAPI): void {
const circuit = createCircuitBreaker();

pi.registerTool({
name: "agent_spawn",
label: "Agent Spawn",
description:
"Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout.",
"Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout. " +
"Includes a circuit breaker: after 3 consecutive failures, spawns are rejected for 5 minutes to prevent resource waste.",
parameters: Type.Object({
session_name: Type.String({ description: "Target session name (also PI_SESSION_NAME)" }),
cwd: Type.String({ description: "Working directory for the new session" }),
Expand All @@ -215,6 +311,38 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
const model = input.model?.trim();
const readyTimeoutSec = clampReadyTimeout(input.ready_timeout_sec);

// Circuit breaker check
if (isCircuitOpen(circuit)) {
const cooldownLeft = circuit.lastFailureAt
? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - circuit.lastFailureAt))
Comment on lines +314 to +317
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The isCircuitOpen function mutates the circuit state to half-open before input validation, allowing invalid requests to waste the single recovery probe attempt.
Severity: MEDIUM

Suggested Fix

Move the circuit state transition to occur after all input validation passes and immediately before the spawn attempt. This ensures only valid requests consume the half-open probe slot.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: pi/extensions/agent-spawn.ts#L314-L317

Potential issue: The `isCircuitOpen` function mutates the circuit breaker's state to
`half-open` as a side effect. This check is performed before input validation.
Consequently, a request with invalid input (e.g., an unsafe `sessionName`) can trigger
the state transition to `half-open` and then fail validation, returning an error. This
consumes the single "probe" attempt allowed in the half-open state without actually
trying to spawn an agent. This violates the intended behavior of allowing exactly one
probe attempt to test system recovery, potentially delaying recovery from an open
circuit.

Did we get this right? 👍 / 👎 to inform future reviews.

: 0;
logLifecycleEvent({
timestamp: new Date().toISOString(),
session_name: sessionName || "unknown",
event: "circuit_rejected",
error: `Circuit open after ${circuit.consecutiveFailures} failures. Cooldown: ${Math.round(cooldownLeft / 1000)}s`,
});
return {
content: [{
type: "text",
text:
`⚡ Circuit breaker OPEN — ${circuit.consecutiveFailures} consecutive spawn failures. ` +
`Refusing new spawns for ${Math.round(cooldownLeft / 1000)}s to prevent resource waste. ` +
`Investigate the root cause (check logs, API keys, model availability).`,
}],
isError: true,
details: {
error: "circuit_open",
circuit: {
state: circuit.state,
consecutive_failures: circuit.consecutiveFailures,
cooldown_remaining_sec: Math.round(cooldownLeft / 1000),
last_failure: circuit.lastFailureAt ? new Date(circuit.lastFailureAt).toISOString() : null,
},
},
};
}

if (!sessionName || !isSafeName(sessionName)) {
return {
content: [{ type: "text", text: "Invalid session_name. Use only letters, numbers, '.', '_', and '-'." }],
Expand Down Expand Up @@ -273,6 +401,12 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
};
}

logLifecycleEvent({
timestamp: new Date().toISOString(),
session_name: sessionName,
event: "spawn_started",
});

const tmuxCommand = [
`cd ${shellQuote(cwdPath)}`,
'export PATH="$HOME/.varlock/bin:$HOME/opt/node/bin:$PATH"',
Expand All @@ -290,6 +424,14 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
);

if (spawnResult.code !== 0) {
recordFailure(circuit);
logLifecycleEvent({
timestamp: new Date().toISOString(),
session_name: sessionName,
event: "spawn_failed",
stage: "spawn",
error: `tmux exit code ${spawnResult.code}`,
});
return {
content: [{ type: "text", text: `Failed to spawn tmux session ${sessionName}.` }],
isError: true,
Expand All @@ -304,6 +446,8 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
stdout: spawnResult.stdout,
stderr: spawnResult.stderr,
exit_code: spawnResult.code,
circuit_state: circuit.state,
circuit_failures: circuit.consecutiveFailures,
},
};
}
Expand All @@ -321,9 +465,19 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
ready_after_ms: readiness.readyAfterMs,
stage: readiness.stage,
error: readiness.ready ? null : readiness.aborted ? "readiness_aborted" : "readiness_timeout",
circuit_state: circuit.state,
circuit_failures: circuit.consecutiveFailures,
};

if (readiness.aborted) {
recordFailure(circuit);
logLifecycleEvent({
timestamp: new Date().toISOString(),
session_name: sessionName,
event: "spawn_failed",
stage: "aborted",
error: "readiness_aborted",
});
return {
content: [{
type: "text",
Expand All @@ -335,6 +489,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
}

if (!readiness.ready) {
recordFailure(circuit);
logLifecycleEvent({
timestamp: new Date().toISOString(),
session_name: sessionName,
event: "spawn_failed",
stage: readiness.stage,
ready_after_ms: readiness.readyAfterMs,
error: "readiness_timeout",
});
return {
content: [{
type: "text",
Expand All @@ -347,6 +510,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
};
}

recordSuccess(circuit);
logLifecycleEvent({
timestamp: new Date().toISOString(),
session_name: sessionName,
event: "spawn_success",
stage: readiness.stage,
ready_after_ms: readiness.readyAfterMs,
});

return {
content: [{
type: "text",
Expand All @@ -358,4 +530,57 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
};
},
});

// ── spawn_status tool ─────────────────────────────────────────────────────

pi.registerTool({
name: "spawn_status",
label: "Spawn Status",
description:
"Check the agent_spawn circuit breaker state and recent worker lifecycle events.",
parameters: Type.Object({}),
async execute() {
let recentEvents = "";
try {
const { execSync } = require("node:child_process");
const tail = execSync(`tail -20 "${LIFECYCLE_LOG_PATH}" 2>/dev/null`, { encoding: "utf-8" });
if (tail.trim()) {
const lines = tail.trim().split("\n");
Comment on lines +544 to +548
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dynamic require() in an ESM/TypeScript file — use readFileSync instead

This file uses static ES module import declarations at the top, but spawn_status falls back to require("node:child_process") and a shell tail command to read the last 20 lines of the JSONL log. Two problems:

  1. Dynamic require() is a CommonJS pattern and can be surprising in an ESM context.
  2. Shelling out to tail for a simple file-tail operation is non-portable (unavailable on Windows, behaves differently under PATH changes) and redundant — readFileSync is already available as a static import in this file.

A portable alternative using the already-imported readFileSync:

import { appendFileSync, existsSync, mkdirSync, readFileSync, readlinkSync, statSync } from "node:fs";
// …
// Inside spawn_status execute():
let recentEvents = "";
try {
  if (existsSync(LIFECYCLE_LOG_PATH)) {
    const lines = readFileSync(LIFECYCLE_LOG_PATH, "utf-8")
      .trimEnd()
      .split("\n")
      .slice(-20);
    // … same parse/format logic
  }
} catch {
  recentEvents = "  (no lifecycle log)";
}

The same require("node:child_process") / require("node:fs") pattern also appears in heartbeat.ts (lines 595, 684) where top-level static imports should be used instead.

Prompt To Fix With AI
This is a comment left during a code review.
Path: pi/extensions/agent-spawn.ts
Line: 544-548

Comment:
**Dynamic `require()` in an ESM/TypeScript file — use `readFileSync` instead**

This file uses static ES module `import` declarations at the top, but `spawn_status` falls back to `require("node:child_process")` and a shell `tail` command to read the last 20 lines of the JSONL log. Two problems:

1. Dynamic `require()` is a CommonJS pattern and can be surprising in an ESM context.
2. Shelling out to `tail` for a simple file-tail operation is non-portable (unavailable on Windows, behaves differently under PATH changes) and redundant — `readFileSync` is already available as a static import in this file.

A portable alternative using the already-imported `readFileSync`:

```typescript
import { appendFileSync, existsSync, mkdirSync, readFileSync, readlinkSync, statSync } from "node:fs";
//
// Inside spawn_status execute():
let recentEvents = "";
try {
  if (existsSync(LIFECYCLE_LOG_PATH)) {
    const lines = readFileSync(LIFECYCLE_LOG_PATH, "utf-8")
      .trimEnd()
      .split("\n")
      .slice(-20);
    // … same parse/format logic
  }
} catch {
  recentEvents = "  (no lifecycle log)";
}
```

The same `require("node:child_process")` / `require("node:fs")` pattern also appears in `heartbeat.ts` (lines 595, 684) where top-level static imports should be used instead.

How can I resolve this? If you propose a fix, please make it concise.

recentEvents = lines
.map((line: string) => {
try {
const e = JSON.parse(line) as LifecycleEvent;
return ` ${e.timestamp} ${e.event} ${e.session_name}${e.error ? ` (${e.error})` : ""}${e.ready_after_ms ? ` [${e.ready_after_ms}ms]` : ""}`;
} catch {
return ` (unparseable)`;
}
})
.join("\n");
}
} catch {
recentEvents = " (no lifecycle log)";
}

return {
content: [{
type: "text" as const,
text: [
"Spawn Circuit Breaker:",
` ${circuitStatus(circuit)}`,
"",
"Recent lifecycle events:",
recentEvents || " (none)",
].join("\n"),
}],
details: {
circuit: {
state: circuit.state,
consecutive_failures: circuit.consecutiveFailures,
total_successes: circuit.totalSuccesses,
total_failures: circuit.totalFailures,
},
},
};
},
});
}
Loading
Loading