-
Notifications
You must be signed in to change notification settings - Fork 11
feat: add spawn circuit breaker, lifecycle logging, and heartbeat auto-recovery #198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; | ||
| import { Type } from "@sinclair/typebox"; | ||
| import { existsSync, mkdirSync, readlinkSync, statSync } from "node:fs"; | ||
| import { appendFileSync, existsSync, mkdirSync, readlinkSync, statSync } from "node:fs"; | ||
| import net from "node:net"; | ||
| import { homedir } from "node:os"; | ||
| import { dirname, join, resolve } from "node:path"; | ||
|
|
@@ -15,8 +15,101 @@ const READINESS_POLL_MS = 200; | |
| const SOCKET_PROBE_TIMEOUT_MS = 300; | ||
| const TMUX_SPAWN_TIMEOUT_MS = 15_000; | ||
|
|
||
| // Circuit breaker defaults | ||
| const CIRCUIT_FAILURE_THRESHOLD = 3; | ||
| const CIRCUIT_COOLDOWN_MS = 5 * 60 * 1000; // 5 minutes | ||
|
|
||
| type SpawnStage = "spawn" | "wait_alias" | "wait_socket" | "probe" | "aborted"; | ||
|
|
||
| // ── Circuit Breaker ───────────────────────────────────────────────────────── | ||
|
|
||
| type CircuitState = "closed" | "open" | "half-open"; | ||
|
|
||
| type CircuitBreaker = { | ||
| state: CircuitState; | ||
| consecutiveFailures: number; | ||
| lastFailureAt: number | null; | ||
| lastSuccessAt: number | null; | ||
| totalFailures: number; | ||
| totalSuccesses: number; | ||
| }; | ||
|
|
||
| function createCircuitBreaker(): CircuitBreaker { | ||
| return { | ||
| state: "closed", | ||
| consecutiveFailures: 0, | ||
| lastFailureAt: null, | ||
| lastSuccessAt: null, | ||
| totalFailures: 0, | ||
| totalSuccesses: 0, | ||
| }; | ||
| } | ||
|
|
||
| function recordSuccess(cb: CircuitBreaker): void { | ||
| cb.consecutiveFailures = 0; | ||
| cb.lastSuccessAt = Date.now(); | ||
| cb.totalSuccesses++; | ||
| cb.state = "closed"; | ||
| } | ||
|
|
||
| function recordFailure(cb: CircuitBreaker): void { | ||
| cb.consecutiveFailures++; | ||
| cb.lastFailureAt = Date.now(); | ||
| cb.totalFailures++; | ||
| if (cb.consecutiveFailures >= CIRCUIT_FAILURE_THRESHOLD) { | ||
| cb.state = "open"; | ||
| } | ||
| } | ||
|
|
||
| function isCircuitOpen(cb: CircuitBreaker): boolean { | ||
| if (cb.state !== "open") return false; | ||
| // Check if cooldown has elapsed → transition to half-open | ||
| if (cb.lastFailureAt && Date.now() - cb.lastFailureAt >= CIRCUIT_COOLDOWN_MS) { | ||
| cb.state = "half-open"; | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| function circuitStatus(cb: CircuitBreaker): string { | ||
| const cooldownRemaining = | ||
| cb.state === "open" && cb.lastFailureAt | ||
| ? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - cb.lastFailureAt)) | ||
| : 0; | ||
| return [ | ||
| `State: ${cb.state}`, | ||
| `Consecutive failures: ${cb.consecutiveFailures}/${CIRCUIT_FAILURE_THRESHOLD}`, | ||
| `Total: ${cb.totalSuccesses} ok, ${cb.totalFailures} failed`, | ||
| `Last success: ${cb.lastSuccessAt ? new Date(cb.lastSuccessAt).toISOString() : "never"}`, | ||
| `Last failure: ${cb.lastFailureAt ? new Date(cb.lastFailureAt).toISOString() : "never"}`, | ||
| cb.state === "open" ? `Cooldown remaining: ${Math.round(cooldownRemaining / 1000)}s` : "", | ||
| ] | ||
| .filter(Boolean) | ||
| .join("\n "); | ||
| } | ||
|
|
||
| // ── Lifecycle Log ─────────────────────────────────────────────────────────── | ||
|
|
||
| const LIFECYCLE_LOG_PATH = join(homedir(), ".pi", "agent", "logs", "worker-lifecycle.jsonl"); | ||
|
|
||
| type LifecycleEvent = { | ||
| timestamp: string; | ||
| session_name: string; | ||
| event: "spawn_started" | "spawn_success" | "spawn_failed" | "circuit_rejected"; | ||
| stage?: string; | ||
| ready_after_ms?: number; | ||
| error?: string; | ||
| }; | ||
|
|
||
| function logLifecycleEvent(event: LifecycleEvent): void { | ||
| try { | ||
| mkdirSync(dirname(LIFECYCLE_LOG_PATH), { recursive: true }); | ||
| appendFileSync(LIFECYCLE_LOG_PATH, JSON.stringify(event) + "\n"); | ||
| } catch { | ||
| // Best-effort — don't break spawn on logging failure | ||
| } | ||
| } | ||
|
|
||
| type ReadinessResult = { | ||
| ready: boolean; | ||
| aborted: boolean; | ||
|
|
@@ -192,11 +285,14 @@ type AgentSpawnInput = { | |
| }; | ||
|
|
||
| export default function agentSpawnExtension(pi: ExtensionAPI): void { | ||
| const circuit = createCircuitBreaker(); | ||
|
|
||
| pi.registerTool({ | ||
| name: "agent_spawn", | ||
| label: "Agent Spawn", | ||
| description: | ||
| "Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout.", | ||
| "Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout. " + | ||
| "Includes a circuit breaker: after 3 consecutive failures, spawns are rejected for 5 minutes to prevent resource waste.", | ||
| parameters: Type.Object({ | ||
| session_name: Type.String({ description: "Target session name (also PI_SESSION_NAME)" }), | ||
| cwd: Type.String({ description: "Working directory for the new session" }), | ||
|
|
@@ -215,6 +311,38 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { | |
| const model = input.model?.trim(); | ||
| const readyTimeoutSec = clampReadyTimeout(input.ready_timeout_sec); | ||
|
|
||
| // Circuit breaker check | ||
| if (isCircuitOpen(circuit)) { | ||
| const cooldownLeft = circuit.lastFailureAt | ||
| ? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - circuit.lastFailureAt)) | ||
| : 0; | ||
| logLifecycleEvent({ | ||
| timestamp: new Date().toISOString(), | ||
| session_name: sessionName || "unknown", | ||
| event: "circuit_rejected", | ||
| error: `Circuit open after ${circuit.consecutiveFailures} failures. Cooldown: ${Math.round(cooldownLeft / 1000)}s`, | ||
| }); | ||
| return { | ||
| content: [{ | ||
| type: "text", | ||
| text: | ||
| `⚡ Circuit breaker OPEN — ${circuit.consecutiveFailures} consecutive spawn failures. ` + | ||
| `Refusing new spawns for ${Math.round(cooldownLeft / 1000)}s to prevent resource waste. ` + | ||
| `Investigate the root cause (check logs, API keys, model availability).`, | ||
| }], | ||
| isError: true, | ||
| details: { | ||
| error: "circuit_open", | ||
| circuit: { | ||
| state: circuit.state, | ||
| consecutive_failures: circuit.consecutiveFailures, | ||
| cooldown_remaining_sec: Math.round(cooldownLeft / 1000), | ||
| last_failure: circuit.lastFailureAt ? new Date(circuit.lastFailureAt).toISOString() : null, | ||
| }, | ||
| }, | ||
| }; | ||
| } | ||
|
|
||
| if (!sessionName || !isSafeName(sessionName)) { | ||
| return { | ||
| content: [{ type: "text", text: "Invalid session_name. Use only letters, numbers, '.', '_', and '-'." }], | ||
|
|
@@ -273,6 +401,12 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { | |
| }; | ||
| } | ||
|
|
||
| logLifecycleEvent({ | ||
| timestamp: new Date().toISOString(), | ||
| session_name: sessionName, | ||
| event: "spawn_started", | ||
| }); | ||
|
|
||
| const tmuxCommand = [ | ||
| `cd ${shellQuote(cwdPath)}`, | ||
| 'export PATH="$HOME/.varlock/bin:$HOME/opt/node/bin:$PATH"', | ||
|
|
@@ -290,6 +424,14 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { | |
| ); | ||
|
|
||
| if (spawnResult.code !== 0) { | ||
| recordFailure(circuit); | ||
| logLifecycleEvent({ | ||
| timestamp: new Date().toISOString(), | ||
| session_name: sessionName, | ||
| event: "spawn_failed", | ||
| stage: "spawn", | ||
| error: `tmux exit code ${spawnResult.code}`, | ||
| }); | ||
| return { | ||
| content: [{ type: "text", text: `Failed to spawn tmux session ${sessionName}.` }], | ||
| isError: true, | ||
|
|
@@ -304,6 +446,8 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { | |
| stdout: spawnResult.stdout, | ||
| stderr: spawnResult.stderr, | ||
| exit_code: spawnResult.code, | ||
| circuit_state: circuit.state, | ||
| circuit_failures: circuit.consecutiveFailures, | ||
| }, | ||
| }; | ||
| } | ||
|
|
@@ -321,9 +465,19 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { | |
| ready_after_ms: readiness.readyAfterMs, | ||
| stage: readiness.stage, | ||
| error: readiness.ready ? null : readiness.aborted ? "readiness_aborted" : "readiness_timeout", | ||
| circuit_state: circuit.state, | ||
| circuit_failures: circuit.consecutiveFailures, | ||
| }; | ||
|
|
||
| if (readiness.aborted) { | ||
| recordFailure(circuit); | ||
| logLifecycleEvent({ | ||
| timestamp: new Date().toISOString(), | ||
| session_name: sessionName, | ||
| event: "spawn_failed", | ||
| stage: "aborted", | ||
| error: "readiness_aborted", | ||
| }); | ||
| return { | ||
| content: [{ | ||
| type: "text", | ||
|
|
@@ -335,6 +489,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { | |
| } | ||
|
|
||
| if (!readiness.ready) { | ||
| recordFailure(circuit); | ||
| logLifecycleEvent({ | ||
| timestamp: new Date().toISOString(), | ||
| session_name: sessionName, | ||
| event: "spawn_failed", | ||
| stage: readiness.stage, | ||
| ready_after_ms: readiness.readyAfterMs, | ||
| error: "readiness_timeout", | ||
| }); | ||
| return { | ||
| content: [{ | ||
| type: "text", | ||
|
|
@@ -347,6 +510,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { | |
| }; | ||
| } | ||
|
|
||
| recordSuccess(circuit); | ||
| logLifecycleEvent({ | ||
| timestamp: new Date().toISOString(), | ||
| session_name: sessionName, | ||
| event: "spawn_success", | ||
| stage: readiness.stage, | ||
| ready_after_ms: readiness.readyAfterMs, | ||
| }); | ||
|
|
||
| return { | ||
| content: [{ | ||
| type: "text", | ||
|
|
@@ -358,4 +530,57 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { | |
| }; | ||
| }, | ||
| }); | ||
|
|
||
| // ── spawn_status tool ───────────────────────────────────────────────────── | ||
|
|
||
| pi.registerTool({ | ||
| name: "spawn_status", | ||
| label: "Spawn Status", | ||
| description: | ||
| "Check the agent_spawn circuit breaker state and recent worker lifecycle events.", | ||
| parameters: Type.Object({}), | ||
| async execute() { | ||
| let recentEvents = ""; | ||
| try { | ||
| const { execSync } = require("node:child_process"); | ||
| const tail = execSync(`tail -20 "${LIFECYCLE_LOG_PATH}" 2>/dev/null`, { encoding: "utf-8" }); | ||
| if (tail.trim()) { | ||
| const lines = tail.trim().split("\n"); | ||
|
Comment on lines
+544
to
+548
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dynamic This file uses static ES module
A portable alternative using the already-imported import { appendFileSync, existsSync, mkdirSync, readFileSync, readlinkSync, statSync } from "node:fs";
// …
// Inside spawn_status execute():
let recentEvents = "";
try {
if (existsSync(LIFECYCLE_LOG_PATH)) {
const lines = readFileSync(LIFECYCLE_LOG_PATH, "utf-8")
.trimEnd()
.split("\n")
.slice(-20);
// … same parse/format logic
}
} catch {
recentEvents = " (no lifecycle log)";
}The same Prompt To Fix With AIThis is a comment left during a code review.
Path: pi/extensions/agent-spawn.ts
Line: 544-548
Comment:
**Dynamic `require()` in an ESM/TypeScript file — use `readFileSync` instead**
This file uses static ES module `import` declarations at the top, but `spawn_status` falls back to `require("node:child_process")` and a shell `tail` command to read the last 20 lines of the JSONL log. Two problems:
1. Dynamic `require()` is a CommonJS pattern and can be surprising in an ESM context.
2. Shelling out to `tail` for a simple file-tail operation is non-portable (unavailable on Windows, behaves differently under PATH changes) and redundant — `readFileSync` is already available as a static import in this file.
A portable alternative using the already-imported `readFileSync`:
```typescript
import { appendFileSync, existsSync, mkdirSync, readFileSync, readlinkSync, statSync } from "node:fs";
// …
// Inside spawn_status execute():
let recentEvents = "";
try {
if (existsSync(LIFECYCLE_LOG_PATH)) {
const lines = readFileSync(LIFECYCLE_LOG_PATH, "utf-8")
.trimEnd()
.split("\n")
.slice(-20);
// … same parse/format logic
}
} catch {
recentEvents = " (no lifecycle log)";
}
```
The same `require("node:child_process")` / `require("node:fs")` pattern also appears in `heartbeat.ts` (lines 595, 684) where top-level static imports should be used instead.
How can I resolve this? If you propose a fix, please make it concise. |
||
| recentEvents = lines | ||
| .map((line: string) => { | ||
| try { | ||
| const e = JSON.parse(line) as LifecycleEvent; | ||
| return ` ${e.timestamp} ${e.event} ${e.session_name}${e.error ? ` (${e.error})` : ""}${e.ready_after_ms ? ` [${e.ready_after_ms}ms]` : ""}`; | ||
| } catch { | ||
| return ` (unparseable)`; | ||
| } | ||
| }) | ||
| .join("\n"); | ||
| } | ||
| } catch { | ||
| recentEvents = " (no lifecycle log)"; | ||
| } | ||
|
|
||
| return { | ||
| content: [{ | ||
| type: "text" as const, | ||
| text: [ | ||
| "Spawn Circuit Breaker:", | ||
| ` ${circuitStatus(circuit)}`, | ||
| "", | ||
| "Recent lifecycle events:", | ||
| recentEvents || " (none)", | ||
| ].join("\n"), | ||
| }], | ||
| details: { | ||
| circuit: { | ||
| state: circuit.state, | ||
| consecutive_failures: circuit.consecutiveFailures, | ||
| total_successes: circuit.totalSuccesses, | ||
| total_failures: circuit.totalFailures, | ||
| }, | ||
| }, | ||
| }; | ||
| }, | ||
| }); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: The
isCircuitOpenfunction mutates the circuit state tohalf-openbefore input validation, allowing invalid requests to waste the single recovery probe attempt.Severity: MEDIUM
Suggested Fix
Move the circuit state transition to occur after all input validation passes and immediately before the spawn attempt. This ensures only valid requests consume the half-open probe slot.
Prompt for AI Agent
Did we get this right? 👍 / 👎 to inform future reviews.