From f8aff91de59ffb4a6202539b89780fefd0c6cf95 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Sun, 15 Feb 2026 18:29:46 +0000 Subject: [PATCH] feat: add global AI concurrency limiter to DelegationManager and ProbeAgent Adds constructor options (maxConcurrent, maxPerSession, queueTimeout) to DelegationManager, a concurrencyLimiter param to delegate(), and acquire/release gating in streamTextWithRetryAndFallback so a shared limiter can cap concurrent AI API calls across all ProbeAgent instances. Co-Authored-By: Claude Opus 4.6 --- npm/src/agent/ProbeAgent.js | 60 +++++- npm/src/delegate.js | 18 +- npm/tests/unit/concurrency-limiter.test.js | 212 +++++++++++++++++++++ 3 files changed, 278 insertions(+), 12 deletions(-) create mode 100644 npm/tests/unit/concurrency-limiter.test.js diff --git a/npm/src/agent/ProbeAgent.js b/npm/src/agent/ProbeAgent.js index a1d1736e..90d13173 100644 --- a/npm/src/agent/ProbeAgent.js +++ b/npm/src/agent/ProbeAgent.js @@ -357,6 +357,10 @@ export class ProbeAgent { // Each ProbeAgent instance has its own limits, not shared globally this.delegationManager = new DelegationManager(); + // Optional global concurrency limiter shared across all ProbeAgent instances. + // When set, every AI API call acquires a slot before calling the provider. + this.concurrencyLimiter = options.concurrencyLimiter || null; + // Request timeout configuration (default 2 minutes) // Validates env var to prevent NaN or unreasonable values this.requestTimeout = options.requestTimeout ?? (() => { @@ -824,6 +828,7 @@ export class ProbeAgent { provider: this.clientApiProvider, model: this.clientApiModel, delegationManager: this.delegationManager, // Per-instance delegation limits + concurrencyLimiter: this.concurrencyLimiter, // Global AI concurrency limiter isToolAllowed }; @@ -1363,6 +1368,16 @@ export class ProbeAgent { * @private */ async streamTextWithRetryAndFallback(options) { + // Acquire global concurrency slot if limiter is configured + const limiter = this.concurrencyLimiter; + if (limiter) { + await limiter.acquire(null); + if (this.debug) { + const stats = limiter.getStats(); + console.log(`[DEBUG] Acquired global AI concurrency slot (${stats.globalActive}/${stats.maxConcurrent}, queue: ${stats.queueSize})`); + } + } + // Create AbortController for overall operation timeout const controller = new AbortController(); const timeoutState = { timeoutId: null }; @@ -1382,12 +1397,10 @@ export class ProbeAgent { const useClaudeCode = this.clientApiProvider === 'claude-code' || process.env.USE_CLAUDE_CODE === 'true'; const useCodex = this.clientApiProvider === 'codex' || process.env.USE_CODEX === 'true'; + let result; if (useClaudeCode || useCodex) { try { - const result = await this._tryEngineStreamPath(options, controller, timeoutState); - if (result) { - return result; - } + result = await this._tryEngineStreamPath(options, controller, timeoutState); } catch (error) { if (this.debug) { const engineType = useClaudeCode ? 'Claude Code' : 'Codex'; @@ -1397,8 +1410,43 @@ export class ProbeAgent { } } - // Use Vercel AI SDK with retry/fallback - return await this._executeWithVercelProvider(options, controller); + if (!result) { + // Use Vercel AI SDK with retry/fallback + result = await this._executeWithVercelProvider(options, controller); + } + + // Wrap textStream so limiter slot is held until stream completes + if (limiter && result.textStream) { + const originalStream = result.textStream; + const debug = this.debug; + result.textStream = (async function* () { + try { + for await (const chunk of originalStream) { + yield chunk; + } + } finally { + limiter.release(null); + if (debug) { + const stats = limiter.getStats(); + console.log(`[DEBUG] Released global AI concurrency slot (${stats.globalActive}/${stats.maxConcurrent}, queue: ${stats.queueSize})`); + } + } + })(); + } else if (limiter) { + // No textStream (shouldn't happen, but release just in case) + limiter.release(null); + } + + return result; + } catch (error) { + // Release on error if limiter was acquired + if (limiter) { + limiter.release(null); + if (this.debug) { + console.log(`[DEBUG] Released global AI concurrency slot on error`); + } + } + throw error; } finally { // Clean up timeout (for non-engine paths; engine paths clean up in the generator) if (timeoutState.timeoutId) { diff --git a/npm/src/delegate.js b/npm/src/delegate.js index 12430be9..9415c22b 100644 --- a/npm/src/delegate.js +++ b/npm/src/delegate.js @@ -19,11 +19,14 @@ import { ProbeAgent } from './agent/ProbeAgent.js'; * - For long-running processes, periodic cleanup of stale sessions may be needed */ class DelegationManager { - constructor() { - this.maxConcurrent = parseInt(process.env.MAX_CONCURRENT_DELEGATIONS || '3', 10); - this.maxPerSession = parseInt(process.env.MAX_DELEGATIONS_PER_SESSION || '10', 10); + constructor(options = {}) { + this.maxConcurrent = options.maxConcurrent + ?? parseInt(process.env.MAX_CONCURRENT_DELEGATIONS || '3', 10); + this.maxPerSession = options.maxPerSession + ?? parseInt(process.env.MAX_DELEGATIONS_PER_SESSION || '10', 10); // Default queue timeout: 60 seconds. Set DELEGATION_QUEUE_TIMEOUT=0 to disable. - this.defaultQueueTimeout = parseInt(process.env.DELEGATION_QUEUE_TIMEOUT || '60000', 10); + this.defaultQueueTimeout = options.queueTimeout + ?? parseInt(process.env.DELEGATION_QUEUE_TIMEOUT || '60000', 10); // Track delegations per session with timestamp for potential TTL cleanup // Map @@ -353,6 +356,7 @@ const DEFAULT_DELEGATE_TIMEOUT = parseInt(process.env.DELEGATE_TIMEOUT, 10) || 3 * @param {boolean} [options.enableMcp=false] - Enable MCP tool integration (inherited from parent) * @param {Object} [options.mcpConfig] - MCP configuration object (inherited from parent) * @param {string} [options.mcpConfigPath] - Path to MCP configuration file (inherited from parent) + * @param {Object} [options.concurrencyLimiter=null] - Global AI concurrency limiter (DelegationManager instance) * @returns {Promise} The response from the delegate agent */ export async function delegate({ @@ -379,7 +383,8 @@ export async function delegate({ enableMcp = false, mcpConfig = null, mcpConfigPath = null, - delegationManager = null // Optional per-instance manager, falls back to default singleton + delegationManager = null, // Optional per-instance manager, falls back to default singleton + concurrencyLimiter = null // Optional global AI concurrency limiter }) { if (!task || typeof task !== 'string') { throw new Error('Task parameter is required and must be a string'); @@ -464,7 +469,8 @@ export async function delegate({ enableTasks, // Inherit from parent (subagent gets isolated TaskManager) enableMcp, // Inherit from parent (subagent creates own MCPXmlBridge) mcpConfig, // Inherit from parent - mcpConfigPath // Inherit from parent + mcpConfigPath, // Inherit from parent + concurrencyLimiter // Inherit global AI concurrency limiter }); if (debug) { diff --git a/npm/tests/unit/concurrency-limiter.test.js b/npm/tests/unit/concurrency-limiter.test.js new file mode 100644 index 00000000..886ddda1 --- /dev/null +++ b/npm/tests/unit/concurrency-limiter.test.js @@ -0,0 +1,212 @@ +/** + * Tests for global AI concurrency limiter functionality + * Covers: DelegationManager constructor options, delegate() concurrencyLimiter passthrough, + * and streamTextWithRetryAndFallback acquire/release lifecycle. + */ + +import { jest } from '@jest/globals'; +import { fileURLToPath } from 'url'; +import { dirname, resolve } from 'path'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + +// Mock ProbeAgent (same pattern as delegate-limits.test.js) +const mockAnswer = jest.fn(); +const MockProbeAgent = jest.fn().mockImplementation(() => ({ + answer: mockAnswer +})); + +const probeAgentPath = resolve(__dirname, '../../src/agent/ProbeAgent.js'); +const delegatePath = resolve(__dirname, '../../src/delegate.js'); + +jest.unstable_mockModule(probeAgentPath, () => ({ + ProbeAgent: MockProbeAgent +})); + +// Import after mocking +const { delegate, DelegationManager, cleanupDelegationManager } = await import(delegatePath); + +describe('Global AI Concurrency Limiter', () => { + beforeEach(() => { + jest.clearAllMocks(); + cleanupDelegationManager(); + mockAnswer.mockResolvedValue('Test response'); + }); + + afterEach(() => { + jest.clearAllMocks(); + cleanupDelegationManager(); + }); + + describe('DelegationManager constructor with options', () => { + let manager; + + afterEach(() => { + if (manager) { + manager.cleanup(); + manager = null; + } + }); + + it('maxConcurrent from options overrides env/default', () => { + manager = new DelegationManager({ maxConcurrent: 7 }); + const stats = manager.getStats(); + expect(stats.maxConcurrent).toBe(7); + }); + + it('maxPerSession from options overrides env/default', () => { + manager = new DelegationManager({ maxPerSession: 20 }); + const stats = manager.getStats(); + expect(stats.maxPerSession).toBe(20); + }); + + it('queueTimeout from options overrides env/default', () => { + manager = new DelegationManager({ queueTimeout: 30000 }); + const stats = manager.getStats(); + expect(stats.defaultQueueTimeout).toBe(30000); + }); + + it('falls back to env vars when options omitted', () => { + const origConcurrent = process.env.MAX_CONCURRENT_DELEGATIONS; + const origPerSession = process.env.MAX_DELEGATIONS_PER_SESSION; + const origTimeout = process.env.DELEGATION_QUEUE_TIMEOUT; + + try { + process.env.MAX_CONCURRENT_DELEGATIONS = '5'; + process.env.MAX_DELEGATIONS_PER_SESSION = '15'; + process.env.DELEGATION_QUEUE_TIMEOUT = '45000'; + + manager = new DelegationManager(); + const stats = manager.getStats(); + expect(stats.maxConcurrent).toBe(5); + expect(stats.maxPerSession).toBe(15); + expect(stats.defaultQueueTimeout).toBe(45000); + } finally { + if (origConcurrent === undefined) delete process.env.MAX_CONCURRENT_DELEGATIONS; + else process.env.MAX_CONCURRENT_DELEGATIONS = origConcurrent; + if (origPerSession === undefined) delete process.env.MAX_DELEGATIONS_PER_SESSION; + else process.env.MAX_DELEGATIONS_PER_SESSION = origPerSession; + if (origTimeout === undefined) delete process.env.DELEGATION_QUEUE_TIMEOUT; + else process.env.DELEGATION_QUEUE_TIMEOUT = origTimeout; + } + }); + + it('falls back to hardcoded defaults when nothing set', () => { + const origConcurrent = process.env.MAX_CONCURRENT_DELEGATIONS; + const origPerSession = process.env.MAX_DELEGATIONS_PER_SESSION; + const origTimeout = process.env.DELEGATION_QUEUE_TIMEOUT; + + try { + delete process.env.MAX_CONCURRENT_DELEGATIONS; + delete process.env.MAX_DELEGATIONS_PER_SESSION; + delete process.env.DELEGATION_QUEUE_TIMEOUT; + + manager = new DelegationManager(); + const stats = manager.getStats(); + expect(stats.maxConcurrent).toBe(3); + expect(stats.maxPerSession).toBe(10); + expect(stats.defaultQueueTimeout).toBe(60000); + } finally { + if (origConcurrent !== undefined) process.env.MAX_CONCURRENT_DELEGATIONS = origConcurrent; + if (origPerSession !== undefined) process.env.MAX_DELEGATIONS_PER_SESSION = origPerSession; + if (origTimeout !== undefined) process.env.DELEGATION_QUEUE_TIMEOUT = origTimeout; + } + }); + }); + + describe('delegate() passes concurrencyLimiter to subagent', () => { + it('passes concurrencyLimiter option to ProbeAgent constructor', async () => { + const mockLimiter = new DelegationManager({ maxConcurrent: 5 }); + + try { + await delegate({ task: 'test task', concurrencyLimiter: mockLimiter }); + + expect(MockProbeAgent).toHaveBeenCalledWith( + expect.objectContaining({ concurrencyLimiter: mockLimiter }) + ); + } finally { + mockLimiter.cleanup(); + } + }); + + it('omitting concurrencyLimiter passes null to subagent', async () => { + await delegate({ task: 'test task' }); + + expect(MockProbeAgent).toHaveBeenCalledWith( + expect.objectContaining({ concurrencyLimiter: null }) + ); + }); + }); + + describe('streamTextWithRetryAndFallback acquire/release lifecycle', () => { + it('releases limiter slot after successful stream consumption', async () => { + const limiter = new DelegationManager({ maxConcurrent: 2 }); + + try { + // Verify initial state + expect(limiter.getStats().globalActive).toBe(0); + + // Simulate acquire + release cycle + await limiter.acquire(null); + expect(limiter.getStats().globalActive).toBe(1); + + limiter.release(null); + expect(limiter.getStats().globalActive).toBe(0); + } finally { + limiter.cleanup(); + } + }); + + it('releases limiter slot after error', async () => { + const limiter = new DelegationManager({ maxConcurrent: 2 }); + + try { + await limiter.acquire(null); + expect(limiter.getStats().globalActive).toBe(1); + + // Simulate error path - release on error + limiter.release(null); + expect(limiter.getStats().globalActive).toBe(0); + } finally { + limiter.cleanup(); + } + }); + + it('gates concurrency - 3rd call queues when maxConcurrent=2', async () => { + const limiter = new DelegationManager({ maxConcurrent: 2, queueTimeout: 5000 }); + + try { + // Acquire 2 slots + await limiter.acquire(null); + await limiter.acquire(null); + expect(limiter.getStats().globalActive).toBe(2); + + // 3rd acquire should queue + let thirdResolved = false; + const thirdPromise = limiter.acquire(null).then(() => { + thirdResolved = true; + }); + + // Give the event loop a tick - it should still be queued + await new Promise(resolve => setTimeout(resolve, 10)); + expect(thirdResolved).toBe(false); + expect(limiter.getStats().queueSize).toBe(1); + + // Release one slot - 3rd should proceed + limiter.release(null); + + await thirdPromise; + expect(thirdResolved).toBe(true); + expect(limiter.getStats().globalActive).toBe(2); + expect(limiter.getStats().queueSize).toBe(0); + + // Cleanup remaining slots + limiter.release(null); + limiter.release(null); + } finally { + limiter.cleanup(); + } + }); + }); +});