From 760d1e3d98e6bb8349b31483db67224d9699060a Mon Sep 17 00:00:00 2001 From: baudbot-agent Date: Thu, 5 Mar 2026 17:02:53 +0000 Subject: [PATCH] feat: add spawn circuit breaker, lifecycle logging, and heartbeat auto-recovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses remaining items from #185 (control-agent resilience): ## Circuit breaker (agent-spawn.ts) After 3 consecutive spawn failures, the circuit opens and rejects new spawn attempts for 5 minutes (cooldown). This prevents resource waste when spawns are failing due to systemic issues (missing API keys, model unavailability, etc.). The circuit transitions: closed → open (after 3 failures) → half-open (after cooldown) → closed (on success) Failure tracking counts tmux spawn failures, readiness timeouts, and aborted readiness checks. Validation errors (bad name, missing model) don't affect the circuit. ## Worker lifecycle logging (agent-spawn.ts) All spawn events are logged to ~/.pi/agent/logs/worker-lifecycle.jsonl: - spawn_started: when a spawn attempt begins - spawn_success: readiness verified (includes ready_after_ms) - spawn_failed: tmux error, readiness timeout, or abort - circuit_rejected: spawn refused by open circuit New `spawn_status` tool exposes circuit breaker state and recent lifecycle events for observability. ## Heartbeat auto-recovery (heartbeat.ts) Before prompting the control-agent about failures, the heartbeat now attempts automatic recovery for two failure types: - Bridge down: kills existing bridge tmux session, clears port holders, runs startup-pi.sh, verifies bridge comes back - Orphaned dev-agents: kills tmux session, removes stale alias If recovery succeeds, no LLM tokens are consumed (same as healthy check). Only unrecoverable failures prompt the agent. Recovery actions are logged to ~/.pi/agent/logs/auto-recovery.jsonl for audit and debugging. ## Tests - Fixed test harness to handle multiple tool registrations - Added circuit breaker test (3 failures → open → rejected) - Added spawn_status tool registration test - All 128 tests pass (73 heartbeat + 6 agent-spawn + 49 memory) Refs #185 Co-authored-by: Darcy Clarke --- pi/extensions/agent-spawn.test.mjs | 60 +++++++- pi/extensions/agent-spawn.ts | 229 ++++++++++++++++++++++++++++- pi/extensions/heartbeat.ts | 194 +++++++++++++++++++++++- 3 files changed, 471 insertions(+), 12 deletions(-) diff --git a/pi/extensions/agent-spawn.test.mjs b/pi/extensions/agent-spawn.test.mjs index 217fb2f..2befdc2 100644 --- a/pi/extensions/agent-spawn.test.mjs +++ b/pi/extensions/agent-spawn.test.mjs @@ -13,16 +13,16 @@ function randomId() { } function createExtensionHarness(execImpl) { - let registeredTool = null; + const registeredTools = {}; const pi = { registerTool(tool) { - registeredTool = tool; + registeredTools[tool.name] = tool; }, exec: execImpl, }; agentSpawnExtension(pi); - if (!registeredTool) throw new Error("agent_spawn tool was not registered"); - return registeredTool; + if (!registeredTools.agent_spawn) throw new Error("agent_spawn tool was not registered"); + return registeredTools.agent_spawn; } function startUnixSocketServer(socketPath) { @@ -242,4 +242,56 @@ describe("agent_spawn extension tool", () => { expect(result.details.aborted).toBe(true); expect(Date.now() - startedAt).toBeLessThan(1000); }); + + it("opens circuit breaker after 3 consecutive failures", async () => { + const root = mkdtempSync(path.join(tmpdir(), "agent-spawn-test-")); + tempDirs.push(root); + const worktree = path.join(root, "worktree"); + const skillPath = path.join(root, "dev-skill"); + const controlDir = path.join(root, "session-control"); + process.env[CONTROL_DIR_ENV] = controlDir; + mkdirSync(worktree, { recursive: true }); + mkdirSync(skillPath, { recursive: true }); + mkdirSync(controlDir, { recursive: true }); + + // Spawns succeed at tmux level but readiness always times out (1s timeout) + const execSpy = vi.fn(async () => ({ stdout: "", stderr: "", code: 0, killed: false })); + const tool = createExtensionHarness(execSpy); + + const params = { + session_name: `dev-agent-circuit-${randomId()}`, + cwd: worktree, + skill_path: skillPath, + model: "anthropic/claude-opus-4-6", + ready_timeout_sec: 1, + }; + + // Fail 3 times (readiness timeout) + for (let i = 0; i < 3; i++) { + params.session_name = `dev-agent-circuit-${randomId()}`; + const result = await tool.execute("id", params, undefined, undefined, {}); + expect(result.isError).toBe(true); + expect(result.details.error).toBe("readiness_timeout"); + } + + // 4th attempt should be rejected by circuit breaker + params.session_name = `dev-agent-circuit-${randomId()}`; + const rejected = await tool.execute("id", params, undefined, undefined, {}); + expect(rejected.isError).toBe(true); + expect(rejected.details.error).toBe("circuit_open"); + expect(String(rejected.content[0].text)).toContain("Circuit breaker OPEN"); + }); + + it("exposes spawn_status tool", () => { + const registeredTools = {}; + const pi = { + registerTool(tool) { + registeredTools[tool.name] = tool; + }, + exec: async () => ({ stdout: "", stderr: "", code: 0 }), + }; + agentSpawnExtension(pi); + expect(registeredTools.spawn_status).toBeDefined(); + expect(registeredTools.spawn_status.name).toBe("spawn_status"); + }); }); diff --git a/pi/extensions/agent-spawn.ts b/pi/extensions/agent-spawn.ts index bd3eb54..64c9ab4 100644 --- a/pi/extensions/agent-spawn.ts +++ b/pi/extensions/agent-spawn.ts @@ -1,6 +1,6 @@ import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { Type } from "@sinclair/typebox"; -import { existsSync, mkdirSync, readlinkSync, statSync } from "node:fs"; +import { appendFileSync, existsSync, mkdirSync, readlinkSync, statSync } from "node:fs"; import net from "node:net"; import { homedir } from "node:os"; import { dirname, join, resolve } from "node:path"; @@ -15,8 +15,101 @@ const READINESS_POLL_MS = 200; const SOCKET_PROBE_TIMEOUT_MS = 300; const TMUX_SPAWN_TIMEOUT_MS = 15_000; +// Circuit breaker defaults +const CIRCUIT_FAILURE_THRESHOLD = 3; +const CIRCUIT_COOLDOWN_MS = 5 * 60 * 1000; // 5 minutes + type SpawnStage = "spawn" | "wait_alias" | "wait_socket" | "probe" | "aborted"; +// ── Circuit Breaker ───────────────────────────────────────────────────────── + +type CircuitState = "closed" | "open" | "half-open"; + +type CircuitBreaker = { + state: CircuitState; + consecutiveFailures: number; + lastFailureAt: number | null; + lastSuccessAt: number | null; + totalFailures: number; + totalSuccesses: number; +}; + +function createCircuitBreaker(): CircuitBreaker { + return { + state: "closed", + consecutiveFailures: 0, + lastFailureAt: null, + lastSuccessAt: null, + totalFailures: 0, + totalSuccesses: 0, + }; +} + +function recordSuccess(cb: CircuitBreaker): void { + cb.consecutiveFailures = 0; + cb.lastSuccessAt = Date.now(); + cb.totalSuccesses++; + cb.state = "closed"; +} + +function recordFailure(cb: CircuitBreaker): void { + cb.consecutiveFailures++; + cb.lastFailureAt = Date.now(); + cb.totalFailures++; + if (cb.consecutiveFailures >= CIRCUIT_FAILURE_THRESHOLD) { + cb.state = "open"; + } +} + +function isCircuitOpen(cb: CircuitBreaker): boolean { + if (cb.state !== "open") return false; + // Check if cooldown has elapsed → transition to half-open + if (cb.lastFailureAt && Date.now() - cb.lastFailureAt >= CIRCUIT_COOLDOWN_MS) { + cb.state = "half-open"; + return false; + } + return true; +} + +function circuitStatus(cb: CircuitBreaker): string { + const cooldownRemaining = + cb.state === "open" && cb.lastFailureAt + ? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - cb.lastFailureAt)) + : 0; + return [ + `State: ${cb.state}`, + `Consecutive failures: ${cb.consecutiveFailures}/${CIRCUIT_FAILURE_THRESHOLD}`, + `Total: ${cb.totalSuccesses} ok, ${cb.totalFailures} failed`, + `Last success: ${cb.lastSuccessAt ? new Date(cb.lastSuccessAt).toISOString() : "never"}`, + `Last failure: ${cb.lastFailureAt ? new Date(cb.lastFailureAt).toISOString() : "never"}`, + cb.state === "open" ? `Cooldown remaining: ${Math.round(cooldownRemaining / 1000)}s` : "", + ] + .filter(Boolean) + .join("\n "); +} + +// ── Lifecycle Log ─────────────────────────────────────────────────────────── + +const LIFECYCLE_LOG_PATH = join(homedir(), ".pi", "agent", "logs", "worker-lifecycle.jsonl"); + +type LifecycleEvent = { + timestamp: string; + session_name: string; + event: "spawn_started" | "spawn_success" | "spawn_failed" | "circuit_rejected"; + stage?: string; + ready_after_ms?: number; + error?: string; +}; + +function logLifecycleEvent(event: LifecycleEvent): void { + try { + mkdirSync(dirname(LIFECYCLE_LOG_PATH), { recursive: true }); + appendFileSync(LIFECYCLE_LOG_PATH, JSON.stringify(event) + "\n"); + } catch { + // Best-effort — don't break spawn on logging failure + } +} + type ReadinessResult = { ready: boolean; aborted: boolean; @@ -192,11 +285,14 @@ type AgentSpawnInput = { }; export default function agentSpawnExtension(pi: ExtensionAPI): void { + const circuit = createCircuitBreaker(); + pi.registerTool({ name: "agent_spawn", label: "Agent Spawn", description: - "Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout.", + "Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout. " + + "Includes a circuit breaker: after 3 consecutive failures, spawns are rejected for 5 minutes to prevent resource waste.", parameters: Type.Object({ session_name: Type.String({ description: "Target session name (also PI_SESSION_NAME)" }), cwd: Type.String({ description: "Working directory for the new session" }), @@ -215,6 +311,38 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { const model = input.model?.trim(); const readyTimeoutSec = clampReadyTimeout(input.ready_timeout_sec); + // Circuit breaker check + if (isCircuitOpen(circuit)) { + const cooldownLeft = circuit.lastFailureAt + ? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - circuit.lastFailureAt)) + : 0; + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName || "unknown", + event: "circuit_rejected", + error: `Circuit open after ${circuit.consecutiveFailures} failures. Cooldown: ${Math.round(cooldownLeft / 1000)}s`, + }); + return { + content: [{ + type: "text", + text: + `⚡ Circuit breaker OPEN — ${circuit.consecutiveFailures} consecutive spawn failures. ` + + `Refusing new spawns for ${Math.round(cooldownLeft / 1000)}s to prevent resource waste. ` + + `Investigate the root cause (check logs, API keys, model availability).`, + }], + isError: true, + details: { + error: "circuit_open", + circuit: { + state: circuit.state, + consecutive_failures: circuit.consecutiveFailures, + cooldown_remaining_sec: Math.round(cooldownLeft / 1000), + last_failure: circuit.lastFailureAt ? new Date(circuit.lastFailureAt).toISOString() : null, + }, + }, + }; + } + if (!sessionName || !isSafeName(sessionName)) { return { content: [{ type: "text", text: "Invalid session_name. Use only letters, numbers, '.', '_', and '-'." }], @@ -273,6 +401,12 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { }; } + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName, + event: "spawn_started", + }); + const tmuxCommand = [ `cd ${shellQuote(cwdPath)}`, 'export PATH="$HOME/.varlock/bin:$HOME/opt/node/bin:$PATH"', @@ -290,6 +424,14 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { ); if (spawnResult.code !== 0) { + recordFailure(circuit); + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName, + event: "spawn_failed", + stage: "spawn", + error: `tmux exit code ${spawnResult.code}`, + }); return { content: [{ type: "text", text: `Failed to spawn tmux session ${sessionName}.` }], isError: true, @@ -304,6 +446,8 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { stdout: spawnResult.stdout, stderr: spawnResult.stderr, exit_code: spawnResult.code, + circuit_state: circuit.state, + circuit_failures: circuit.consecutiveFailures, }, }; } @@ -321,9 +465,19 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { ready_after_ms: readiness.readyAfterMs, stage: readiness.stage, error: readiness.ready ? null : readiness.aborted ? "readiness_aborted" : "readiness_timeout", + circuit_state: circuit.state, + circuit_failures: circuit.consecutiveFailures, }; if (readiness.aborted) { + recordFailure(circuit); + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName, + event: "spawn_failed", + stage: "aborted", + error: "readiness_aborted", + }); return { content: [{ type: "text", @@ -335,6 +489,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { } if (!readiness.ready) { + recordFailure(circuit); + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName, + event: "spawn_failed", + stage: readiness.stage, + ready_after_ms: readiness.readyAfterMs, + error: "readiness_timeout", + }); return { content: [{ type: "text", @@ -347,6 +510,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { }; } + recordSuccess(circuit); + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName, + event: "spawn_success", + stage: readiness.stage, + ready_after_ms: readiness.readyAfterMs, + }); + return { content: [{ type: "text", @@ -358,4 +530,57 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { }; }, }); + + // ── spawn_status tool ───────────────────────────────────────────────────── + + pi.registerTool({ + name: "spawn_status", + label: "Spawn Status", + description: + "Check the agent_spawn circuit breaker state and recent worker lifecycle events.", + parameters: Type.Object({}), + async execute() { + let recentEvents = ""; + try { + const { execSync } = require("node:child_process"); + const tail = execSync(`tail -20 "${LIFECYCLE_LOG_PATH}" 2>/dev/null`, { encoding: "utf-8" }); + if (tail.trim()) { + const lines = tail.trim().split("\n"); + recentEvents = lines + .map((line: string) => { + try { + const e = JSON.parse(line) as LifecycleEvent; + return ` ${e.timestamp} ${e.event} ${e.session_name}${e.error ? ` (${e.error})` : ""}${e.ready_after_ms ? ` [${e.ready_after_ms}ms]` : ""}`; + } catch { + return ` (unparseable)`; + } + }) + .join("\n"); + } + } catch { + recentEvents = " (no lifecycle log)"; + } + + return { + content: [{ + type: "text" as const, + text: [ + "Spawn Circuit Breaker:", + ` ${circuitStatus(circuit)}`, + "", + "Recent lifecycle events:", + recentEvents || " (none)", + ].join("\n"), + }], + details: { + circuit: { + state: circuit.state, + consecutive_failures: circuit.consecutiveFailures, + total_successes: circuit.totalSuccesses, + total_failures: circuit.totalFailures, + }, + }, + }; + }, + }); } diff --git a/pi/extensions/heartbeat.ts b/pi/extensions/heartbeat.ts index 92ab9c9..ae26b3d 100644 --- a/pi/extensions/heartbeat.ts +++ b/pi/extensions/heartbeat.ts @@ -25,9 +25,9 @@ import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { Type } from "@sinclair/typebox"; import { StringEnum } from "@mariozechner/pi-ai"; -import { existsSync, readdirSync, readFileSync, statSync } from "node:fs"; +import { appendFileSync, existsSync, mkdirSync, readdirSync, readFileSync, statSync } from "node:fs"; import { homedir } from "node:os"; -import { join } from "node:path"; +import { dirname, join } from "node:path"; import { discoverSubagentPackages, readSubagentState, resolveEffectiveState } from "./subagent-registry.ts"; const DEFAULT_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes @@ -562,6 +562,155 @@ function hasDevAgentForTodo(todoId: string): boolean { } } +// ── Auto-Recovery ─────────────────────────────────────────────────────────── + +const RECOVERY_LOG_PATH = join(homedir(), ".pi", "agent", "logs", "auto-recovery.jsonl"); + +type RecoveryAction = { + timestamp: string; + check: string; + action: string; + success: boolean; + detail?: string; +}; + +function logRecovery(entry: RecoveryAction): void { + try { + mkdirSync(dirname(RECOVERY_LOG_PATH), { recursive: true }); + appendFileSync(RECOVERY_LOG_PATH, JSON.stringify(entry) + "\n"); + } catch { + // Best-effort + } +} + +/** + * Attempt automatic recovery for certain failure types. + * Returns an array of results describing what was attempted and whether it worked. + * Only performs safe, idempotent actions: + * - Restart bridge tmux session + * - Kill orphaned dev-agent tmux sessions and remove stale aliases + */ +async function tryAutoRecover(failures: CheckResult[]): Promise { + const actions: RecoveryAction[] = []; + const { execSync } = require("node:child_process"); + + for (const failure of failures) { + // Auto-recover: bridge down → restart the bridge tmux session + if (failure.name === "bridge") { + try { + // Find control-agent UUID from alias + const controlAlias = join(SOCKET_DIR, "control-agent.alias"); + if (!existsSync(controlAlias)) { + actions.push({ + timestamp: new Date().toISOString(), + check: failure.name, + action: "bridge_restart", + success: false, + detail: "Cannot restart bridge: control-agent.alias not found", + }); + continue; + } + + // Kill existing bridge tmux session + try { + execSync('tmux kill-session -t baudbot-gateway-bridge 2>/dev/null', { timeout: 5000 }); + } catch { + // May not exist — that's fine + } + + // Kill anything holding port 7890 + try { + execSync('lsof -ti :7890 2>/dev/null | xargs kill -9 2>/dev/null', { timeout: 5000 }); + } catch { + // Nothing holding port — fine + } + + // Restart via startup script + const startupScript = join(homedir(), ".pi", "agent", "skills", "control-agent", "startup-pi.sh"); + if (existsSync(startupScript)) { + // Get live session UUIDs from session-control dir + const sockFiles = readdirSync(SOCKET_DIR).filter((f) => f.endsWith(".sock")); + const uuids = sockFiles.map((f) => f.replace(".sock", "")).join(" "); + if (uuids) { + execSync(`bash "${startupScript}" ${uuids} 2>&1`, { + timeout: 30000, + encoding: "utf-8", + }); + + // Verify bridge came back + await new Promise((resolve) => setTimeout(resolve, 3000)); + const verifyResult = await checkBridge(); + + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "bridge_restart", + success: verifyResult.ok, + detail: verifyResult.ok + ? "Bridge restarted and verified healthy" + : `Bridge restart attempted but still failing: ${verifyResult.detail}`, + }; + actions.push(entry); + logRecovery(entry); + } + } + } catch (err: any) { + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "bridge_restart", + success: false, + detail: `Recovery failed: ${err.message || err}`, + }; + actions.push(entry); + logRecovery(entry); + } + } + + // Auto-recover: orphaned dev-agent → kill tmux session + remove alias + if (failure.name.startsWith("orphan:")) { + const sessionName = failure.name.replace("orphan:", ""); + try { + // Kill the tmux session + try { + execSync(`tmux kill-session -t "${sessionName}" 2>/dev/null`, { timeout: 5000 }); + } catch { + // May already be dead + } + + // Remove the stale alias + const aliasPath = join(SOCKET_DIR, `${sessionName}.alias`); + if (existsSync(aliasPath)) { + const { unlinkSync } = require("node:fs"); + unlinkSync(aliasPath); + } + + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "orphan_cleanup", + success: true, + detail: `Killed tmux session and removed alias for orphaned dev-agent "${sessionName}"`, + }; + actions.push(entry); + logRecovery(entry); + } catch (err: any) { + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "orphan_cleanup", + success: false, + detail: `Cleanup failed for "${sessionName}": ${err.message || err}`, + }; + actions.push(entry); + logRecovery(entry); + } + } + } + + return actions; +} + // ── Extension ─────────────────────────────────────────────────────────────── export default function heartbeatExtension(pi: ExtensionAPI): void { @@ -636,19 +785,52 @@ export default function heartbeatExtension(pi: ExtensionAPI): void { return; } - // Something is wrong — inject a prompt so the control-agent can fix it - const failureList = failures + // Attempt auto-recovery for recoverable failures before prompting the agent + const recoveryActions = await tryAutoRecover(failures); + const successfulRecoveries = recoveryActions.filter((a) => a.success); + + // Re-check: remove failures that were successfully auto-recovered + const recoveredChecks = new Set(successfulRecoveries.map((a) => a.check)); + const remainingFailures = failures.filter((f) => !recoveredChecks.has(f.name)); + + // If all failures were auto-recovered, no need to prompt the agent + if (remainingFailures.length === 0) { + state.lastFailures = []; + if (successfulRecoveries.length > 0) { + state.lastFailures = successfulRecoveries.map( + (a) => `auto-recovered: ${a.check} — ${a.detail}`, + ); + } + state.consecutiveErrors = 0; + saveState(); + armTimer(); + return; + } + + // Build prompt with both failures and recovery attempt details + const failureList = remainingFailures .map((f) => `- **${f.name}**: ${f.detail}`) .join("\n"); + const recoveryInfo = recoveryActions.length > 0 + ? [ + "", + "**Auto-recovery attempted:**", + ...recoveryActions.map((a) => + `- ${a.success ? "✅" : "❌"} ${a.action} (${a.check}): ${a.detail}`, + ), + ].join("\n") + : ""; + const prompt = [ `🫀 **Heartbeat** (run #${state.totalRuns}, ${new Date(now).toISOString()})`, ``, - `**${failures.length} health check failure(s) detected** — take action:`, + `**${remainingFailures.length} health check failure(s) remain** — take action:`, ``, failureList, + recoveryInfo, ``, - `All other checks passed. Fix the issues above and report what you did.`, + `Fix the remaining issues above and report what you did.`, ].join("\n"); pi.sendMessage(