Skip to content

[LEADS-208] Fix TokenTracker double-counting in multi-thread evaluation#159

Open
bsatapat-jpg wants to merge 1 commit intolightspeed-core:mainfrom
bsatapat-jpg:dev
Open

[LEADS-208] Fix TokenTracker double-counting in multi-thread evaluation#159
bsatapat-jpg wants to merge 1 commit intolightspeed-core:mainfrom
bsatapat-jpg:dev

Conversation

@bsatapat-jpg
Copy link
Collaborator

@bsatapat-jpg bsatapat-jpg commented Feb 12, 2026

Description

Type of change

  • Refactor
  • New feature
  • Bug fix
  • CVE fix
  • Optimization
  • Documentation Update
  • Configuration Update
  • Bump-up service version
  • Bump-up dependent library
  • Bump-up library or tool used for development (does not change the final image)
  • CI configuration change
  • Unit tests improvement

Tools used to create PR

Identify any AI code assistants used in this PR (for transparency and review context)

  • Assisted-by: Claude-4.5-opus-high
  • Generated by: Cursor

Related Tickets & Documents

Checklist before requesting a review

  • I have performed a self-review of my code.
  • PR has passed all pre-merge test jobs.
  • If it is a core feature, I have added thorough tests.

Testing

  • Please provide detailed steps to perform tests related to this code change.
  • How were the fix/results from this change verified? Please provide relevant screenshots or results.

Summary by CodeRabbit

  • Bug Fixes
    • Improved thread-safe token tracking and callback handling so usage counts remain accurate during concurrent or multi-threaded runs; registration/unregistration of trackers is now safer and briefly waits for pending callbacks.
  • Tests
    • Added unit tests to verify cross-thread callback behavior, reset semantics, and correct token accumulation.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 12, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds thread-safe, callback-driven token counting to TokenTracker: introduces a class-level callback lock, per-instance lock/condition and pending-callback counter; _token_callback only counts when pending, decrements and notifies; start()/stop() register/unregister callbacks under the class lock; reset() and get_counts() coordinate pending callbacks.

Changes

Cohort / File(s) Summary
TokenTracker implementation
src/lightspeed_evaluation/core/llm/custom.py
Introduces TokenTracker._callback_lock (class-level) and per-instance _lock, _callback_condition, _pending_callbacks; updates _token_callback signature/logic to only count when pending, decrement and notify; start()/stop() register/unregister the callback under _callback_lock; reset() sets pending expectation; get_counts() waits briefly on the condition before returning counts.
Unit tests (threading behavior)
tests/unit/core/llm/test_custom.py
Adds threading import and three tests: counting when expecting a callback (start() + reset()), ignoring callbacks when not expecting, and handling callbacks invoked from other threads; mocks global litellm callbacks and uses tracker start/stop lifecycle.

Sequence Diagram(s)

sequenceDiagram
  participant Thread as Thread (caller)
  participant Registry as rgba(70,130,180,0.5) Token callback registry (litellm.success_callback)
  participant Tracker as rgba(34,139,34,0.5) TokenTracker instance
  participant LLM as rgba(255,165,0,0.5) External LLM / emitter

  Thread->>Registry: start() — acquire class _callback_lock and register callback
  Registry-->>Tracker: callback attached
  Thread->>Tracker: reset() — set _pending_callbacks, prepare condition
  LLM->>Registry: emit global callback(payload)
  Registry->>Tracker: invoke instance `_token_callback` with response
  Tracker->>Tracker: if _pending_callbacks>0 then count tokens, _pending_callbacks--, notify condition
  Thread->>Tracker: get_counts() waits on _callback_condition (timeout) then returns counts
  Thread->>Registry: stop() — acquire class _callback_lock and unregister callback
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested reviewers

  • VladimirKadlec
  • asamal4
🚥 Pre-merge checks | ✅ 3 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Merge Conflict Detection ⚠️ Warning ❌ Merge conflicts detected (3 files):

⚔️ Makefile (content)
⚔️ src/lightspeed_evaluation/core/llm/custom.py (content)
⚔️ tests/unit/core/llm/test_custom.py (content)

These conflicts must be resolved before merging into main.
Resolve conflicts locally and push changes to this branch.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly identifies the main change: fixing TokenTracker double-counting in multi-threaded evaluation, which matches the core functionality additions (thread-safe synchronization primitives and callback sequencing logic).
Docstring Coverage ✅ Passed Docstring coverage is 90.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
⚔️ Resolve merge conflicts (beta)
  • Auto-commit resolved conflicts to branch dev
  • Post resolved changes as copyable diffs in a comment

No actionable comments were generated in the recent review. 🎉

🧹 Recent nitpick comments
src/lightspeed_evaluation/core/llm/custom.py (2)

73-84: _callback_registered reads/writes are unsynchronized — minor TOCTOU window.

_callback_registered is checked on line 75 and set on line 84 without holding any lock, while stop() also reads it without a lock on line 88. If start() and stop() are ever called concurrently (or start() from two threads), there's a small window where the flag and the actual callback-list state can be inconsistent — e.g., the callback is appended but the flag isn't set yet, so a concurrent stop() would skip removal.

In practice this is low-risk since start()/stop() are typically called from the same thread, but if you'd like to harden it:

Suggested approach

Move the flag update inside the class lock, or use _lock to guard the flag:

     def start(self) -> None:
         """Register the token tracking callback."""
-        if self._callback_registered:
-            return
         with TokenTracker._callback_lock:
+            if self._callback_registered:
+                return
             if (
                 not hasattr(litellm, "success_callback")
                 or litellm.success_callback is None
             ):
                 litellm.success_callback = []
             litellm.success_callback.append(self._token_callback)
-        self._callback_registered = True
+            self._callback_registered = True

Apply the same pattern to stop().


109-115: reset() hardcodes _pending_callbacks = 1 — only one in-flight callback is supported.

This works for a strict reset→call→get_counts cycle, but if the caller makes multiple LLM calls between reset() and get_counts(), only the first callback is counted (subsequent callbacks are ignored because _pending_callbacks drops to 0). Consider documenting this single-call contract explicitly, or accepting a count parameter for batch scenarios.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/lightspeed_evaluation/core/llm/custom.py`:
- Around line 92-114: The start method sets
TokenTracker._global_callback_registered true but never verifies the callback
still exists on litellm.success_callback, so if that list is cleared externally
trackers won't re-register; update start to, under TokenTracker._callback_lock,
check that litellm.success_callback is a list and contains
TokenTracker._global_token_callback before using the _global_callback_registered
short-circuit, and if the callback is missing (or the attribute was cleared)
re-append TokenTracker._global_token_callback and set
_global_callback_registered = True (making sure to initialize
litellm.success_callback to a list if necessary); keep existing per-thread
registration and the self._callback_registered logic intact.

In `@tests/unit/core/llm/test_custom.py`:
- Around line 60-66: Remove the dead `result` list and its use: delete the
`result = []` initialization and the `result.append(tracker.get_counts())` call
inside `call_from_other_thread()` (which currently invokes
`TokenTracker._global_token_callback(...)`), so only the relevant cross-thread
behavior via `TokenTracker._global_token_callback` and `tracker.get_counts()`
remains; alternatively, if you intended to validate the other thread's view,
replace the append with an explicit assertion on `tracker.get_counts()` instead
of populating `result`.
🧹 Nitpick comments (4)
src/lightspeed_evaluation/core/llm/custom.py (3)

62-90: Minor race between tracker lookup and stop() — acceptable.

After releasing _callback_lock on Line 80, stop() could remove the tracker before record_callback_tokens on Line 87 executes. This is benign since the tracker instance remains valid and the tokens belong to an LLM call initiated while tracking was active. No fix needed, but worth a brief inline comment noting this is intentional.


131-141: get_counts() uses a single non-looping wait — pending callbacks may still be unprocessed.

The standard Condition pattern is while predicate: wait(). Here, if the wait times out or is spuriously woken while _pending_callbacks > 0, the method silently returns incomplete counts. This is likely acceptable given the 0.1s grace period and typical usage patterns (called after LLM response is received), but consider logging a warning when the wait expires with callbacks still pending, to aid debugging in production.

💡 Suggested improvement
     def get_counts(self) -> tuple[int, int]:
         """Get accumulated token counts, waiting for pending callbacks if needed.

         Returns:
             Tuple of (input_tokens, output_tokens)
         """
         with self._callback_condition:
             # Wait for pending callbacks with a timeout to handle race conditions
             if self._pending_callbacks > 0:
                 self._callback_condition.wait(timeout=0.1)
+            if self._pending_callbacks > 0:
+                logger.debug(
+                    "TokenTracker: %d pending callback(s) still unprocessed after wait",
+                    self._pending_callbacks,
+                )
             return self.input_tokens, self.output_tokens

143-149: reset() hardcodes _pending_callbacks = 1 — limits to single-call-then-check pattern.

This only supports expecting one callback. If multiple LLM calls happen between reset() and get_counts(), the wait mechanism won't account for all of them. This is fine if the intended usage is single-call reset/check cycles, but worth documenting in the docstring.

tests/unit/core/llm/test_custom.py (1)

17-22: Fixture resets class state before tests — consider adding post-test cleanup too.

The fixture only runs setup (before each test). Adding a yield with post-test cleanup of _active_trackers and _global_callback_registered would make the fixture more robust if tests are ever run in isolation or reordered. Minor point since the pre-test reset handles it for sequential runs.

💡 Suggested improvement
     `@pytest.fixture`(autouse=True)
     def reset_tracker_state(self) -> None:
         """Reset TokenTracker class-level state before each test."""
         TokenTracker._active_trackers.clear()
         TokenTracker._global_callback_registered = False
+        yield
+        TokenTracker._active_trackers.clear()
+        TokenTracker._global_callback_registered = False

@bsatapat-jpg bsatapat-jpg force-pushed the dev branch 2 times, most recently from 7479f4b to 5cc6b2e Compare February 13, 2026 10:56
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@src/lightspeed_evaluation/core/llm/custom.py`:
- Around line 59-69: The handler under the with self._callback_condition block
fails to decrement self._pending_callbacks and notify when
completion_response.usage is missing, leaving a stale pending counter; update
the logic in the method handling completion_response so that after acquiring
self._callback_condition you always decrement self._pending_callbacks = max(0,
self._pending_callbacks - 1) and call self._callback_condition.notify_all(), but
only add to self.input_tokens and self.output_tokens when
hasattr(completion_response, "usage") and usage is truthy (i.e., move the
decrement/notify outside the usage branch); ensure this change interacts
correctly with reset() and get_counts() semantics.
🧹 Nitpick comments (4)
src/lightspeed_evaluation/core/llm/custom.py (4)

71-82: _callback_registered flag is read/written outside _callback_lock — minor thread-safety gap.

self._callback_registered is checked on line 73 and set on line 82 without holding _callback_lock. If two threads call start() on the same TokenTracker instance concurrently, both could pass the guard and append the callback twice. Consider moving the flag check+set inside the lock scope, or document that start()/stop() must be called from a single thread.

Proposed fix
     def start(self) -> None:
         """Register the token tracking callback."""
-        if self._callback_registered:
-            return
         with TokenTracker._callback_lock:
+            if self._callback_registered:
+                return
             if (
                 not hasattr(litellm, "success_callback")
                 or litellm.success_callback is None
             ):
                 litellm.success_callback = []
             litellm.success_callback.append(self._token_callback)
-        self._callback_registered = True
+            self._callback_registered = True

84-91: Same flag-outside-lock pattern as start() — apply the same fix here.

For consistency, move self._callback_registered = False inside _callback_lock (same rationale as the comment on start()).

Proposed fix
     def stop(self) -> None:
         """Unregister the token tracking callback."""
-        if not self._callback_registered:
-            return
         with TokenTracker._callback_lock:
+            if not self._callback_registered:
+                return
             if self._token_callback in litellm.success_callback:
                 litellm.success_callback.remove(self._token_callback)
-        self._callback_registered = False
+            self._callback_registered = False

99-103: Single wait(timeout=0.1) may be insufficient if the callback fires late.

The 100 ms timeout works when litellm invokes success_callback synchronously (before completion() returns), but if the callback is delayed (async dispatch, thread scheduling under load), get_counts() silently returns stale zeros. Consider using a loop that re-checks the predicate, and/or making the timeout configurable:

Illustrative example
     def get_counts(self, timeout: float = 0.5) -> tuple[int, int]:
         with self._callback_condition:
-            if self._pending_callbacks > 0:
-                self._callback_condition.wait(timeout=0.1)
+            deadline = threading.Event()  # not needed, just illustrative
+            end = time.monotonic() + timeout
+            while self._pending_callbacks > 0:
+                remaining = end - time.monotonic()
+                if remaining <= 0:
+                    break
+                self._callback_condition.wait(timeout=remaining)
             return self.input_tokens, self.output_tokens

105-111: Consider adding a log warning when get_counts() times out with pending callbacks.

reset() sets _pending_callbacks = 1, and if no callback arrives, get_counts() silently returns zeros after the timeout. A structured log at WARNING level when the wait expires with _pending_callbacks > 0 would help diagnose token-tracking mismatches in production. As per coding guidelines, structured logging with appropriate log levels should be used in this module.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@src/lightspeed_evaluation/core/llm/custom.py`:
- Around line 95-105: get_counts currently waits on self._callback_condition for
0.1s if self._pending_callbacks > 0 but never clears _pending_callbacks on
timeout, causing every subsequent get_counts call to pay the timeout; fix by,
while holding the same self._callback_condition lock in get_counts, call
wait(timeout=0.1) and then check whether the wait returned due to notification
or timeout (e.g., re-check self._pending_callbacks or use a timed wait return
value), and if still > 0 after the timeout set self._pending_callbacks = 0 (and
optionally log a warning) before returning self.input_tokens, self.output_tokens
so future get_counts calls don’t repeatedly block.
🧹 Nitpick comments (2)
src/lightspeed_evaluation/core/llm/custom.py (2)

73-84: _callback_registered flag is not protected by _callback_lock, creating a TOCTOU window.

For a class explicitly designed for thread safety, _callback_registered is checked (line 75) and set (line 84) outside the lock that guards the actual registration. If start() is called concurrently from two threads, both can pass the guard and double-append the callback. Moving the flag check and update inside the lock removes this window.

Proposed fix
     def start(self) -> None:
         """Register the token tracking callback."""
-        if self._callback_registered:
-            return
         with TokenTracker._callback_lock:
+            if self._callback_registered:
+                return
             if (
                 not hasattr(litellm, "success_callback")
                 or litellm.success_callback is None
             ):
                 litellm.success_callback = []
             litellm.success_callback.append(self._token_callback)
-        self._callback_registered = True
+            self._callback_registered = True

86-93: Same TOCTOU concern as start() — move flag under the lock.

Proposed fix
     def stop(self) -> None:
         """Unregister the token tracking callback."""
-        if not self._callback_registered:
-            return
         with TokenTracker._callback_lock:
+            if not self._callback_registered:
+                return
             if self._token_callback in litellm.success_callback:
                 litellm.success_callback.remove(self._token_callback)
-        self._callback_registered = False
+            self._callback_registered = False

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant