Skip to content

feat: global AI concurrency limiter#413

Merged
buger merged 1 commit intomainfrom
feat/global-ai-concurrency-limiter
Feb 15, 2026
Merged

feat: global AI concurrency limiter#413
buger merged 1 commit intomainfrom
feat/global-ai-concurrency-limiter

Conversation

@buger
Copy link
Collaborator

@buger buger commented Feb 15, 2026

Summary

  • Adds constructor options (maxConcurrent, maxPerSession, queueTimeout) to DelegationManager, allowing callers to override env-var/hardcoded defaults
  • Adds concurrencyLimiter parameter to delegate() and passes it through to ProbeAgent
  • Implements acquire/release gating in streamTextWithRetryAndFallback so a shared DelegationManager instance can cap concurrent AI API calls across all ProbeAgent instances in a run
  • Adds 10 new unit tests covering constructor options, delegate passthrough, and acquire/release lifecycle

Test plan

  • npm test -- --testPathPattern="concurrency-limiter" — 10/10 pass
  • npm test -- --testPathPattern="delegate" — all 117 existing delegate tests still pass

🤖 Generated with Claude Code

…eAgent

Adds constructor options (maxConcurrent, maxPerSession, queueTimeout) to
DelegationManager, a concurrencyLimiter param to delegate(), and
acquire/release gating in streamTextWithRetryAndFallback so a shared
limiter can cap concurrent AI API calls across all ProbeAgent instances.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@probelabs
Copy link
Contributor

probelabs bot commented Feb 15, 2026

PR Overview: Global AI Concurrency Limiter

Summary

This PR introduces a global AI concurrency limiter to control and throttle concurrent AI API calls across all ProbeAgent instances in a run. The implementation adds constructor options to DelegationManager, passes a concurrencyLimiter parameter through delegate() to ProbeAgent, and implements acquire/release gating in streamTextWithRetryAndFallback.

Files Changed

Modified Files

  1. npm/src/agent/ProbeAgent.js (+54 lines, -6 lines)

    • Added concurrencyLimiter option to constructor
    • Modified streamTextWithRetryAndFallback() to acquire/release limiter slots
    • Wrapped textStream to hold slot until stream completes
    • Added debug logging for acquire/release operations
  2. npm/src/delegate.js (+12 lines, -6 lines)

    • Modified DelegationManager constructor to accept options object with maxConcurrent, maxPerSession, and queueTimeout
    • Added concurrencyLimiter parameter to delegate() function
    • Passed concurrencyLimiter through to ProbeAgent constructor

Added Files

  1. npm/tests/unit/concurrency-limiter.test.js (+212 lines)
    • 10 new unit tests covering constructor options, delegate passthrough, and acquire/release lifecycle
    • Tests for options overriding env/defaults
    • Tests for queue behavior and error handling

Architecture & Impact Assessment

What This PR Accomplishes

  • Enables global throttling of AI API calls across multiple ProbeAgent instances
  • Prevents API rate limit exhaustion by capping concurrent requests
  • Provides configurable limits via constructor options instead of just environment variables
  • Maintains backward compatibility with existing code

Key Technical Changes

  1. DelegationManager Constructor Enhancement: Now accepts an options object allowing runtime configuration of limits
  2. Concurrency Slot Management: Implements acquire/release pattern in streamTextWithRetryAndFallback() with proper cleanup in finally blocks
  3. Stream Wrapping: TextStreams are wrapped to ensure slots are held until stream consumption completes
  4. Error Handling: Limiter slots are properly released on error paths

Affected System Components

  • ProbeAgent: Core AI interaction layer
  • delegate(): Main delegation entry point
  • DelegationManager: Concurrency control mechanism

Component Flow Diagram

graph TD
    A[delegate call] --> B[Create ProbeAgent]
    B --> C[Pass concurrencyLimiter]
    C --> D[streamTextWithRetryAndFallback]
    D --> E{Limiter configured?}
    E -->|Yes| F[Acquire slot]
    E -->|No| G[Execute AI call]
    F --> H[Execute AI call]
    H --> I{textStream exists?}
    I -->|Yes| J[Wrap stream]
    I -->|No| K[Release slot]
    J --> L[Consume stream]
    L --> M[Release slot in finally]
    K --> N[Return result]
    M --> N
    G --> N
Loading

Scope Discovery & Context Expansion

Direct Impact

  • All code using delegate() function can now pass concurrencyLimiter option
  • All ProbeAgent instances can share a single DelegationManager for global throttling
  • Existing tests continue to pass (117 delegate tests)

Related Files to Review

  • npm/src/tools.js - Tool implementations that may call delegate
  • npm/tests/unit/delegate-limits.test.js - Similar test patterns for delegation limits
  • npm/src/engines/ - Engine implementations that interact with AI providers

Potential Edge Cases

  • Stream cancellation before completion (handled by finally block)
  • Multiple concurrent delegates sharing same limiter (tested)
  • Queue timeout behavior (tested)
  • Error path slot release (tested)

Testing Coverage

  • 10 new unit tests specifically for concurrency limiter
  • All 117 existing delegate tests pass
  • Tests cover: constructor options, delegate passthrough, acquire/release lifecycle, queue behavior, error handling

Labels

  • Type: feature
  • Review Effort: 2 (moderate - focused changes with good test coverage)

Review Focus Areas

  1. Proper slot release in all code paths (success, error, cancellation)
  2. Stream wrapping implementation correctness
  3. Constructor options fallback logic (options > env > defaults)
  4. Debug logging usefulness
Metadata
  • Review Effort: 2 / 5
  • Primary Label: feature

Powered by Visor from Probelabs

Last updated: 2026-02-15T18:35:00.761Z | Triggered by: pr_opened | Commit: f8aff91

💡 TIP: You can chat with Visor using /visor ask <your question>

@probelabs
Copy link
Contributor

probelabs bot commented Feb 15, 2026

Security Issues (1)

Severity Location Issue
🟠 Error system:0
ProbeAgent execution failed: Error: Failed to get response from AI model during iteration 1. No output generated. Check the stream for errors.

Architecture Issues (7)

Severity Location Issue
🟠 Error npm/src/agent/ProbeAgent.js:1418-1434
The limiter slot release is wrapped in a textStream generator's finally block, but this creates a resource leak risk: if the caller doesn't consume the stream (e.g., early return, error before iteration, or stream abandonment), the limiter slot is never released. The release happens only when the stream is consumed to completion or error.
💡 SuggestionUse a try-finally pattern around the entire streamTextWithRetryAndFallback method to ensure limiter release regardless of stream consumption. Consider using a WeakRef or finalization registry to detect abandoned streams, or document that callers MUST consume the entire stream.
🔧 Suggested Fix
Move the limiter.release() call to the outer finally block (line 1442-1446) and remove the stream wrapping, or add explicit cleanup documentation.
🟢 Info npm/src/delegate.js:22-29
DelegationManager constructor now accepts options object, but this creates inconsistency with other configuration patterns in the codebase. The ProbeAgent constructor uses direct options parameters, while DelegationManager uses an options object. This makes the API less predictable.
💡 SuggestionConsider standardizing on one pattern. Either make DelegationManager accept direct parameters like ProbeAgent, or make ProbeAgent accept an options object. Consistency improves API usability.
🔧 Suggested Fix
N/A - this is a design suggestion for future consistency.
🟡 Warning npm/src/agent/ProbeAgent.js:1435-1438
The else-if branch 'limiter.release(null)' for the case when there's no textStream is unreachable dead code. The streamTextWithRetryAndFallback method ALWAYS returns a result with textStream (from either _tryEngineStreamPath or _executeWithVercelProvider), so this condition will never execute.
💡 SuggestionRemove this dead code branch. If textStream might be missing in edge cases, handle it in the main finally block instead.
🔧 Suggested Fix
Remove lines 1435-1438 entirely.
🟡 Warning npm/src/agent/ProbeAgent.js:1371-1377
The concurrency limiter acquire/release logic is tightly coupled with streamTextWithRetryAndFallback, mixing concurrency control with AI streaming logic. This violates single responsibility principle and makes it difficult to reuse the limiter for other purposes or test independently.
💡 SuggestionExtract the acquire/release logic into a separate wrapper function or decorator that can be applied to any async operation. Consider using a higher-order function like 'withConcurrencyLimit(limiter, asyncFn)' that handles acquire/release automatically.
🔧 Suggested Fix
Create a utility function: async function withConcurrencyLimit(limiter, operation) { await limiter.acquire(null); try { return await operation(); } finally { limiter.release(null); } }
🟡 Warning npm/src/agent/ProbeAgent.js:360-362
The concurrencyLimiter is stored as a direct property on ProbeAgent without any interface definition or type checking. Any object with acquire/release methods can be passed, creating implicit duck typing that's not documented or validated.
💡 SuggestionDefine a ConcurrencyLimiter interface or abstract class, and validate that the passed limiter implements required methods (acquire, release, getStats). This provides better error messages and prevents runtime errors from incompatible objects.
🔧 Suggested Fix
Add validation: if (concurrencyLimiter && typeof concurrencyLimiter.acquire !== 'function') throw new TypeError('concurrencyLimiter must implement acquire() method');
🟡 Warning npm/src/agent/ProbeAgent.js:1418-1434
Wrapping the textStream generator to handle limiter release adds significant complexity and creates a closure that captures limiter and debug variables. This makes the code harder to reason about and debug, especially when errors occur during stream iteration.
💡 SuggestionConsider using async resource tracking with a dedicated cleanup function, or use a stream wrapper utility that can be reused. The current approach mixes resource management with stream transformation concerns.
🔧 Suggested Fix
Extract to a utility function: function createReleasingStream(originalStream, limiter, debug) { return (async function* () { try { for await (const chunk of originalStream) yield chunk; } finally { limiter.release(null); } })(); }
🟡 Warning npm/src/delegate.js:473
The concurrencyLimiter parameter is added to the delegate function's already large parameter list (20+ parameters). This continues a pattern of parameter accumulation that makes the function signature unwieldy and hard to maintain.
💡 SuggestionRefactor to use an options object pattern for delegate parameters, similar to what was done for DelegationManager constructor. This would make the API more extensible and easier to use.
🔧 Suggested Fix
Change signature to: export async function delegate({ task, timeout, debug, ..., concurrencyLimiter }) { /* existing implementation */ }

Performance Issues (1)

Severity Location Issue
🟠 Error system:0
ProbeAgent execution failed: Error: Failed to get response from AI model during iteration 1. No output generated. Check the stream for errors.

Quality Issues (1)

Severity Location Issue
🟠 Error system:0
ProbeAgent execution failed: Error: Failed to get response from AI model during iteration 1. No output generated. Check the stream for errors.

Powered by Visor from Probelabs

Last updated: 2026-02-15T18:35:03.473Z | Triggered by: pr_opened | Commit: f8aff91

💡 TIP: You can chat with Visor using /visor ask <your question>

@buger buger merged commit 0137fca into main Feb 15, 2026
14 of 18 checks passed
@buger buger deleted the feat/global-ai-concurrency-limiter branch February 15, 2026 18:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant