[LEADS-208] Fix TokenTracker double-counting in multi-thread evaluation#159
[LEADS-208] Fix TokenTracker double-counting in multi-thread evaluation#159bsatapat-jpg wants to merge 1 commit intolightspeed-core:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds thread-safe, callback-driven token counting to TokenTracker: introduces a class-level callback lock, per-instance lock/condition and pending-callback counter; Changes
Sequence Diagram(s)sequenceDiagram
participant Thread as Thread (caller)
participant Registry as rgba(70,130,180,0.5) Token callback registry (litellm.success_callback)
participant Tracker as rgba(34,139,34,0.5) TokenTracker instance
participant LLM as rgba(255,165,0,0.5) External LLM / emitter
Thread->>Registry: start() — acquire class _callback_lock and register callback
Registry-->>Tracker: callback attached
Thread->>Tracker: reset() — set _pending_callbacks, prepare condition
LLM->>Registry: emit global callback(payload)
Registry->>Tracker: invoke instance `_token_callback` with response
Tracker->>Tracker: if _pending_callbacks>0 then count tokens, _pending_callbacks--, notify condition
Thread->>Tracker: get_counts() waits on _callback_condition (timeout) then returns counts
Thread->>Registry: stop() — acquire class _callback_lock and unregister callback
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts (beta)
No actionable comments were generated in the recent review. 🎉 🧹 Recent nitpick comments
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/lightspeed_evaluation/core/llm/custom.py`:
- Around line 92-114: The start method sets
TokenTracker._global_callback_registered true but never verifies the callback
still exists on litellm.success_callback, so if that list is cleared externally
trackers won't re-register; update start to, under TokenTracker._callback_lock,
check that litellm.success_callback is a list and contains
TokenTracker._global_token_callback before using the _global_callback_registered
short-circuit, and if the callback is missing (or the attribute was cleared)
re-append TokenTracker._global_token_callback and set
_global_callback_registered = True (making sure to initialize
litellm.success_callback to a list if necessary); keep existing per-thread
registration and the self._callback_registered logic intact.
In `@tests/unit/core/llm/test_custom.py`:
- Around line 60-66: Remove the dead `result` list and its use: delete the
`result = []` initialization and the `result.append(tracker.get_counts())` call
inside `call_from_other_thread()` (which currently invokes
`TokenTracker._global_token_callback(...)`), so only the relevant cross-thread
behavior via `TokenTracker._global_token_callback` and `tracker.get_counts()`
remains; alternatively, if you intended to validate the other thread's view,
replace the append with an explicit assertion on `tracker.get_counts()` instead
of populating `result`.
🧹 Nitpick comments (4)
src/lightspeed_evaluation/core/llm/custom.py (3)
62-90: Minor race between tracker lookup andstop()— acceptable.After releasing
_callback_lockon Line 80,stop()could remove the tracker beforerecord_callback_tokenson Line 87 executes. This is benign since the tracker instance remains valid and the tokens belong to an LLM call initiated while tracking was active. No fix needed, but worth a brief inline comment noting this is intentional.
131-141:get_counts()uses a single non-looping wait — pending callbacks may still be unprocessed.The standard
Conditionpattern iswhile predicate: wait(). Here, if the wait times out or is spuriously woken while_pending_callbacks > 0, the method silently returns incomplete counts. This is likely acceptable given the 0.1s grace period and typical usage patterns (called after LLM response is received), but consider logging a warning when the wait expires with callbacks still pending, to aid debugging in production.💡 Suggested improvement
def get_counts(self) -> tuple[int, int]: """Get accumulated token counts, waiting for pending callbacks if needed. Returns: Tuple of (input_tokens, output_tokens) """ with self._callback_condition: # Wait for pending callbacks with a timeout to handle race conditions if self._pending_callbacks > 0: self._callback_condition.wait(timeout=0.1) + if self._pending_callbacks > 0: + logger.debug( + "TokenTracker: %d pending callback(s) still unprocessed after wait", + self._pending_callbacks, + ) return self.input_tokens, self.output_tokens
143-149:reset()hardcodes_pending_callbacks = 1— limits to single-call-then-check pattern.This only supports expecting one callback. If multiple LLM calls happen between
reset()andget_counts(), the wait mechanism won't account for all of them. This is fine if the intended usage is single-call reset/check cycles, but worth documenting in the docstring.tests/unit/core/llm/test_custom.py (1)
17-22: Fixture resets class state before tests — consider adding post-test cleanup too.The fixture only runs setup (before each test). Adding a
yieldwith post-test cleanup of_active_trackersand_global_callback_registeredwould make the fixture more robust if tests are ever run in isolation or reordered. Minor point since the pre-test reset handles it for sequential runs.💡 Suggested improvement
`@pytest.fixture`(autouse=True) def reset_tracker_state(self) -> None: """Reset TokenTracker class-level state before each test.""" TokenTracker._active_trackers.clear() TokenTracker._global_callback_registered = False + yield + TokenTracker._active_trackers.clear() + TokenTracker._global_callback_registered = False
7479f4b to
5cc6b2e
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/lightspeed_evaluation/core/llm/custom.py`:
- Around line 59-69: The handler under the with self._callback_condition block
fails to decrement self._pending_callbacks and notify when
completion_response.usage is missing, leaving a stale pending counter; update
the logic in the method handling completion_response so that after acquiring
self._callback_condition you always decrement self._pending_callbacks = max(0,
self._pending_callbacks - 1) and call self._callback_condition.notify_all(), but
only add to self.input_tokens and self.output_tokens when
hasattr(completion_response, "usage") and usage is truthy (i.e., move the
decrement/notify outside the usage branch); ensure this change interacts
correctly with reset() and get_counts() semantics.
🧹 Nitpick comments (4)
src/lightspeed_evaluation/core/llm/custom.py (4)
71-82:_callback_registeredflag is read/written outside_callback_lock— minor thread-safety gap.
self._callback_registeredis checked on line 73 and set on line 82 without holding_callback_lock. If two threads callstart()on the sameTokenTrackerinstance concurrently, both could pass the guard and append the callback twice. Consider moving the flag check+set inside the lock scope, or document thatstart()/stop()must be called from a single thread.Proposed fix
def start(self) -> None: """Register the token tracking callback.""" - if self._callback_registered: - return with TokenTracker._callback_lock: + if self._callback_registered: + return if ( not hasattr(litellm, "success_callback") or litellm.success_callback is None ): litellm.success_callback = [] litellm.success_callback.append(self._token_callback) - self._callback_registered = True + self._callback_registered = True
84-91: Same flag-outside-lock pattern asstart()— apply the same fix here.For consistency, move
self._callback_registered = Falseinside_callback_lock(same rationale as the comment onstart()).Proposed fix
def stop(self) -> None: """Unregister the token tracking callback.""" - if not self._callback_registered: - return with TokenTracker._callback_lock: + if not self._callback_registered: + return if self._token_callback in litellm.success_callback: litellm.success_callback.remove(self._token_callback) - self._callback_registered = False + self._callback_registered = False
99-103: Singlewait(timeout=0.1)may be insufficient if the callback fires late.The 100 ms timeout works when litellm invokes
success_callbacksynchronously (beforecompletion()returns), but if the callback is delayed (async dispatch, thread scheduling under load),get_counts()silently returns stale zeros. Consider using a loop that re-checks the predicate, and/or making the timeout configurable:Illustrative example
def get_counts(self, timeout: float = 0.5) -> tuple[int, int]: with self._callback_condition: - if self._pending_callbacks > 0: - self._callback_condition.wait(timeout=0.1) + deadline = threading.Event() # not needed, just illustrative + end = time.monotonic() + timeout + while self._pending_callbacks > 0: + remaining = end - time.monotonic() + if remaining <= 0: + break + self._callback_condition.wait(timeout=remaining) return self.input_tokens, self.output_tokens
105-111: Consider adding a log warning whenget_counts()times out with pending callbacks.
reset()sets_pending_callbacks = 1, and if no callback arrives,get_counts()silently returns zeros after the timeout. A structured log atWARNINGlevel when the wait expires with_pending_callbacks > 0would help diagnose token-tracking mismatches in production. As per coding guidelines, structured logging with appropriate log levels should be used in this module.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/lightspeed_evaluation/core/llm/custom.py`:
- Around line 95-105: get_counts currently waits on self._callback_condition for
0.1s if self._pending_callbacks > 0 but never clears _pending_callbacks on
timeout, causing every subsequent get_counts call to pay the timeout; fix by,
while holding the same self._callback_condition lock in get_counts, call
wait(timeout=0.1) and then check whether the wait returned due to notification
or timeout (e.g., re-check self._pending_callbacks or use a timed wait return
value), and if still > 0 after the timeout set self._pending_callbacks = 0 (and
optionally log a warning) before returning self.input_tokens, self.output_tokens
so future get_counts calls don’t repeatedly block.
🧹 Nitpick comments (2)
src/lightspeed_evaluation/core/llm/custom.py (2)
73-84:_callback_registeredflag is not protected by_callback_lock, creating a TOCTOU window.For a class explicitly designed for thread safety,
_callback_registeredis checked (line 75) and set (line 84) outside the lock that guards the actual registration. Ifstart()is called concurrently from two threads, both can pass the guard and double-append the callback. Moving the flag check and update inside the lock removes this window.Proposed fix
def start(self) -> None: """Register the token tracking callback.""" - if self._callback_registered: - return with TokenTracker._callback_lock: + if self._callback_registered: + return if ( not hasattr(litellm, "success_callback") or litellm.success_callback is None ): litellm.success_callback = [] litellm.success_callback.append(self._token_callback) - self._callback_registered = True + self._callback_registered = True
86-93: Same TOCTOU concern asstart()— move flag under the lock.Proposed fix
def stop(self) -> None: """Unregister the token tracking callback.""" - if not self._callback_registered: - return with TokenTracker._callback_lock: + if not self._callback_registered: + return if self._token_callback in litellm.success_callback: litellm.success_callback.remove(self._token_callback) - self._callback_registered = False + self._callback_registered = False
Description
Type of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit