Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 54 additions & 6 deletions npm/src/agent/ProbeAgent.js
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@
// Each ProbeAgent instance has its own limits, not shared globally
this.delegationManager = new DelegationManager();

// Optional global concurrency limiter shared across all ProbeAgent instances.
// When set, every AI API call acquires a slot before calling the provider.
this.concurrencyLimiter = options.concurrencyLimiter || null;

Check warning on line 362 in npm/src/agent/ProbeAgent.js

View check run for this annotation

probelabs / Visor: architecture

architecture Issue

The concurrencyLimiter is stored as a direct property on ProbeAgent without any interface definition or type checking. Any object with acquire/release methods can be passed, creating implicit duck typing that's not documented or validated.
Raw output
Define a ConcurrencyLimiter interface or abstract class, and validate that the passed limiter implements required methods (acquire, release, getStats). This provides better error messages and prevents runtime errors from incompatible objects.

// Request timeout configuration (default 2 minutes)
// Validates env var to prevent NaN or unreasonable values
this.requestTimeout = options.requestTimeout ?? (() => {
Expand Down Expand Up @@ -824,6 +828,7 @@
provider: this.clientApiProvider,
model: this.clientApiModel,
delegationManager: this.delegationManager, // Per-instance delegation limits
concurrencyLimiter: this.concurrencyLimiter, // Global AI concurrency limiter
isToolAllowed
};

Expand Down Expand Up @@ -1363,6 +1368,16 @@
* @private
*/
async streamTextWithRetryAndFallback(options) {
// Acquire global concurrency slot if limiter is configured
const limiter = this.concurrencyLimiter;
if (limiter) {
await limiter.acquire(null);
if (this.debug) {
const stats = limiter.getStats();
console.log(`[DEBUG] Acquired global AI concurrency slot (${stats.globalActive}/${stats.maxConcurrent}, queue: ${stats.queueSize})`);

Check warning on line 1377 in npm/src/agent/ProbeAgent.js

View check run for this annotation

probelabs / Visor: architecture

architecture Issue

The concurrency limiter acquire/release logic is tightly coupled with streamTextWithRetryAndFallback, mixing concurrency control with AI streaming logic. This violates single responsibility principle and makes it difficult to reuse the limiter for other purposes or test independently.
Raw output
Extract the acquire/release logic into a separate wrapper function or decorator that can be applied to any async operation. Consider using a higher-order function like 'withConcurrencyLimit(limiter, asyncFn)' that handles acquire/release automatically.
}
}

// Create AbortController for overall operation timeout
const controller = new AbortController();
const timeoutState = { timeoutId: null };
Expand All @@ -1382,12 +1397,10 @@
const useClaudeCode = this.clientApiProvider === 'claude-code' || process.env.USE_CLAUDE_CODE === 'true';
const useCodex = this.clientApiProvider === 'codex' || process.env.USE_CODEX === 'true';

let result;
if (useClaudeCode || useCodex) {
try {
const result = await this._tryEngineStreamPath(options, controller, timeoutState);
if (result) {
return result;
}
result = await this._tryEngineStreamPath(options, controller, timeoutState);
} catch (error) {
if (this.debug) {
const engineType = useClaudeCode ? 'Claude Code' : 'Codex';
Expand All @@ -1397,8 +1410,43 @@
}
}

// Use Vercel AI SDK with retry/fallback
return await this._executeWithVercelProvider(options, controller);
if (!result) {
// Use Vercel AI SDK with retry/fallback
result = await this._executeWithVercelProvider(options, controller);
}

// Wrap textStream so limiter slot is held until stream completes
if (limiter && result.textStream) {
const originalStream = result.textStream;
const debug = this.debug;
result.textStream = (async function* () {
try {
for await (const chunk of originalStream) {
yield chunk;
}
} finally {
limiter.release(null);
if (debug) {
const stats = limiter.getStats();
console.log(`[DEBUG] Released global AI concurrency slot (${stats.globalActive}/${stats.maxConcurrent}, queue: ${stats.queueSize})`);
}
}
})();

Check failure on line 1434 in npm/src/agent/ProbeAgent.js

View check run for this annotation

probelabs / Visor: architecture

architecture Issue

The limiter slot release is wrapped in a textStream generator's finally block, but this creates a resource leak risk: if the caller doesn't consume the stream (e.g., early return, error before iteration, or stream abandonment), the limiter slot is never released. The release happens only when the stream is consumed to completion or error.
Raw output
Use a try-finally pattern around the entire streamTextWithRetryAndFallback method to ensure limiter release regardless of stream consumption. Consider using a WeakRef or finalization registry to detect abandoned streams, or document that callers MUST consume the entire stream.

Check warning on line 1434 in npm/src/agent/ProbeAgent.js

View check run for this annotation

probelabs / Visor: architecture

architecture Issue

Wrapping the textStream generator to handle limiter release adds significant complexity and creates a closure that captures limiter and debug variables. This makes the code harder to reason about and debug, especially when errors occur during stream iteration.
Raw output
Consider using async resource tracking with a dedicated cleanup function, or use a stream wrapper utility that can be reused. The current approach mixes resource management with stream transformation concerns.
} else if (limiter) {
// No textStream (shouldn't happen, but release just in case)
limiter.release(null);
}

Check warning on line 1438 in npm/src/agent/ProbeAgent.js

View check run for this annotation

probelabs / Visor: architecture

architecture Issue

The else-if branch 'limiter.release(null)' for the case when there's no textStream is unreachable dead code. The streamTextWithRetryAndFallback method ALWAYS returns a result with textStream (from either _tryEngineStreamPath or _executeWithVercelProvider), so this condition will never execute.
Raw output
Remove this dead code branch. If textStream might be missing in edge cases, handle it in the main finally block instead.

return result;
} catch (error) {
// Release on error if limiter was acquired
if (limiter) {
limiter.release(null);
if (this.debug) {
console.log(`[DEBUG] Released global AI concurrency slot on error`);
}
}
throw error;
} finally {
// Clean up timeout (for non-engine paths; engine paths clean up in the generator)
if (timeoutState.timeoutId) {
Expand Down
18 changes: 12 additions & 6 deletions npm/src/delegate.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
* - For long-running processes, periodic cleanup of stale sessions may be needed
*/
class DelegationManager {
constructor() {
this.maxConcurrent = parseInt(process.env.MAX_CONCURRENT_DELEGATIONS || '3', 10);
this.maxPerSession = parseInt(process.env.MAX_DELEGATIONS_PER_SESSION || '10', 10);
constructor(options = {}) {
this.maxConcurrent = options.maxConcurrent
?? parseInt(process.env.MAX_CONCURRENT_DELEGATIONS || '3', 10);
this.maxPerSession = options.maxPerSession
?? parseInt(process.env.MAX_DELEGATIONS_PER_SESSION || '10', 10);
// Default queue timeout: 60 seconds. Set DELEGATION_QUEUE_TIMEOUT=0 to disable.
this.defaultQueueTimeout = parseInt(process.env.DELEGATION_QUEUE_TIMEOUT || '60000', 10);
this.defaultQueueTimeout = options.queueTimeout
?? parseInt(process.env.DELEGATION_QUEUE_TIMEOUT || '60000', 10);

Check notice on line 29 in npm/src/delegate.js

View check run for this annotation

probelabs / Visor: architecture

architecture Issue

DelegationManager constructor now accepts options object, but this creates inconsistency with other configuration patterns in the codebase. The ProbeAgent constructor uses direct options parameters, while DelegationManager uses an options object. This makes the API less predictable.
Raw output
Consider standardizing on one pattern. Either make DelegationManager accept direct parameters like ProbeAgent, or make ProbeAgent accept an options object. Consistency improves API usability.

// Track delegations per session with timestamp for potential TTL cleanup
// Map<string, { count: number, lastUpdated: number }>
Expand Down Expand Up @@ -353,6 +356,7 @@
* @param {boolean} [options.enableMcp=false] - Enable MCP tool integration (inherited from parent)
* @param {Object} [options.mcpConfig] - MCP configuration object (inherited from parent)
* @param {string} [options.mcpConfigPath] - Path to MCP configuration file (inherited from parent)
* @param {Object} [options.concurrencyLimiter=null] - Global AI concurrency limiter (DelegationManager instance)
* @returns {Promise<string>} The response from the delegate agent
*/
export async function delegate({
Expand All @@ -379,7 +383,8 @@
enableMcp = false,
mcpConfig = null,
mcpConfigPath = null,
delegationManager = null // Optional per-instance manager, falls back to default singleton
delegationManager = null, // Optional per-instance manager, falls back to default singleton
concurrencyLimiter = null // Optional global AI concurrency limiter
}) {
if (!task || typeof task !== 'string') {
throw new Error('Task parameter is required and must be a string');
Expand Down Expand Up @@ -464,7 +469,8 @@
enableTasks, // Inherit from parent (subagent gets isolated TaskManager)
enableMcp, // Inherit from parent (subagent creates own MCPXmlBridge)
mcpConfig, // Inherit from parent
mcpConfigPath // Inherit from parent
mcpConfigPath, // Inherit from parent
concurrencyLimiter // Inherit global AI concurrency limiter

Check warning on line 473 in npm/src/delegate.js

View check run for this annotation

probelabs / Visor: architecture

architecture Issue

The concurrencyLimiter parameter is added to the delegate function's already large parameter list (20+ parameters). This continues a pattern of parameter accumulation that makes the function signature unwieldy and hard to maintain.
Raw output
Refactor to use an options object pattern for delegate parameters, similar to what was done for DelegationManager constructor. This would make the API more extensible and easier to use.
});

if (debug) {
Expand Down
212 changes: 212 additions & 0 deletions npm/tests/unit/concurrency-limiter.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/**
* Tests for global AI concurrency limiter functionality
* Covers: DelegationManager constructor options, delegate() concurrencyLimiter passthrough,
* and streamTextWithRetryAndFallback acquire/release lifecycle.
*/

import { jest } from '@jest/globals';
import { fileURLToPath } from 'url';
import { dirname, resolve } from 'path';

const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);

// Mock ProbeAgent (same pattern as delegate-limits.test.js)
const mockAnswer = jest.fn();
const MockProbeAgent = jest.fn().mockImplementation(() => ({
answer: mockAnswer
}));

const probeAgentPath = resolve(__dirname, '../../src/agent/ProbeAgent.js');
const delegatePath = resolve(__dirname, '../../src/delegate.js');

jest.unstable_mockModule(probeAgentPath, () => ({
ProbeAgent: MockProbeAgent
}));

// Import after mocking
const { delegate, DelegationManager, cleanupDelegationManager } = await import(delegatePath);

describe('Global AI Concurrency Limiter', () => {
beforeEach(() => {
jest.clearAllMocks();
cleanupDelegationManager();
mockAnswer.mockResolvedValue('Test response');
});

afterEach(() => {
jest.clearAllMocks();
cleanupDelegationManager();
});

describe('DelegationManager constructor with options', () => {
let manager;

afterEach(() => {
if (manager) {
manager.cleanup();
manager = null;
}
});

it('maxConcurrent from options overrides env/default', () => {
manager = new DelegationManager({ maxConcurrent: 7 });
const stats = manager.getStats();
expect(stats.maxConcurrent).toBe(7);
});

it('maxPerSession from options overrides env/default', () => {
manager = new DelegationManager({ maxPerSession: 20 });
const stats = manager.getStats();
expect(stats.maxPerSession).toBe(20);
});

it('queueTimeout from options overrides env/default', () => {
manager = new DelegationManager({ queueTimeout: 30000 });
const stats = manager.getStats();
expect(stats.defaultQueueTimeout).toBe(30000);
});

it('falls back to env vars when options omitted', () => {
const origConcurrent = process.env.MAX_CONCURRENT_DELEGATIONS;
const origPerSession = process.env.MAX_DELEGATIONS_PER_SESSION;
const origTimeout = process.env.DELEGATION_QUEUE_TIMEOUT;

try {
process.env.MAX_CONCURRENT_DELEGATIONS = '5';
process.env.MAX_DELEGATIONS_PER_SESSION = '15';
process.env.DELEGATION_QUEUE_TIMEOUT = '45000';

manager = new DelegationManager();
const stats = manager.getStats();
expect(stats.maxConcurrent).toBe(5);
expect(stats.maxPerSession).toBe(15);
expect(stats.defaultQueueTimeout).toBe(45000);
} finally {
if (origConcurrent === undefined) delete process.env.MAX_CONCURRENT_DELEGATIONS;
else process.env.MAX_CONCURRENT_DELEGATIONS = origConcurrent;
if (origPerSession === undefined) delete process.env.MAX_DELEGATIONS_PER_SESSION;
else process.env.MAX_DELEGATIONS_PER_SESSION = origPerSession;
if (origTimeout === undefined) delete process.env.DELEGATION_QUEUE_TIMEOUT;
else process.env.DELEGATION_QUEUE_TIMEOUT = origTimeout;
}
});

it('falls back to hardcoded defaults when nothing set', () => {
const origConcurrent = process.env.MAX_CONCURRENT_DELEGATIONS;
const origPerSession = process.env.MAX_DELEGATIONS_PER_SESSION;
const origTimeout = process.env.DELEGATION_QUEUE_TIMEOUT;

try {
delete process.env.MAX_CONCURRENT_DELEGATIONS;
delete process.env.MAX_DELEGATIONS_PER_SESSION;
delete process.env.DELEGATION_QUEUE_TIMEOUT;

manager = new DelegationManager();
const stats = manager.getStats();
expect(stats.maxConcurrent).toBe(3);
expect(stats.maxPerSession).toBe(10);
expect(stats.defaultQueueTimeout).toBe(60000);
} finally {
if (origConcurrent !== undefined) process.env.MAX_CONCURRENT_DELEGATIONS = origConcurrent;
if (origPerSession !== undefined) process.env.MAX_DELEGATIONS_PER_SESSION = origPerSession;
if (origTimeout !== undefined) process.env.DELEGATION_QUEUE_TIMEOUT = origTimeout;
}
});
});

describe('delegate() passes concurrencyLimiter to subagent', () => {
it('passes concurrencyLimiter option to ProbeAgent constructor', async () => {
const mockLimiter = new DelegationManager({ maxConcurrent: 5 });

try {
await delegate({ task: 'test task', concurrencyLimiter: mockLimiter });

expect(MockProbeAgent).toHaveBeenCalledWith(
expect.objectContaining({ concurrencyLimiter: mockLimiter })
);
} finally {
mockLimiter.cleanup();
}
});

it('omitting concurrencyLimiter passes null to subagent', async () => {
await delegate({ task: 'test task' });

expect(MockProbeAgent).toHaveBeenCalledWith(
expect.objectContaining({ concurrencyLimiter: null })
);
});
});

describe('streamTextWithRetryAndFallback acquire/release lifecycle', () => {
it('releases limiter slot after successful stream consumption', async () => {
const limiter = new DelegationManager({ maxConcurrent: 2 });

try {
// Verify initial state
expect(limiter.getStats().globalActive).toBe(0);

// Simulate acquire + release cycle
await limiter.acquire(null);
expect(limiter.getStats().globalActive).toBe(1);

limiter.release(null);
expect(limiter.getStats().globalActive).toBe(0);
} finally {
limiter.cleanup();
}
});

it('releases limiter slot after error', async () => {
const limiter = new DelegationManager({ maxConcurrent: 2 });

try {
await limiter.acquire(null);
expect(limiter.getStats().globalActive).toBe(1);

// Simulate error path - release on error
limiter.release(null);
expect(limiter.getStats().globalActive).toBe(0);
} finally {
limiter.cleanup();
}
});

it('gates concurrency - 3rd call queues when maxConcurrent=2', async () => {
const limiter = new DelegationManager({ maxConcurrent: 2, queueTimeout: 5000 });

try {
// Acquire 2 slots
await limiter.acquire(null);
await limiter.acquire(null);
expect(limiter.getStats().globalActive).toBe(2);

// 3rd acquire should queue
let thirdResolved = false;
const thirdPromise = limiter.acquire(null).then(() => {
thirdResolved = true;
});

// Give the event loop a tick - it should still be queued
await new Promise(resolve => setTimeout(resolve, 10));
expect(thirdResolved).toBe(false);
expect(limiter.getStats().queueSize).toBe(1);

// Release one slot - 3rd should proceed
limiter.release(null);

await thirdPromise;
expect(thirdResolved).toBe(true);
expect(limiter.getStats().globalActive).toBe(2);
expect(limiter.getStats().queueSize).toBe(0);

// Cleanup remaining slots
limiter.release(null);
limiter.release(null);
} finally {
limiter.cleanup();
}
});
});
});
Loading