-
Notifications
You must be signed in to change notification settings - Fork 48
Description
Summary
The global _LAST_CALLBACK_DATA dictionary in pdd/llm_invoke.py is shared across concurrent jobs in server mode, causing cost data to be corrupted when multiple jobs run simultaneously. This breaks budget enforcement and cost reporting features.
Affected Components
pdd/llm_invoke.py:678-684- Global_LAST_CALLBACK_DATAdictionarypdd/llm_invoke.py:686-759-_litellm_success_callback()functionpdd/sync_orchestration.py:1020-1026- Budget enforcement logicpdd/sync_orchestration.py:1532,1555- Cost accumulation and reportingpdd/server/app.py:54- JobManager withmax_concurrent=3
Environment
- PDD Version: v0.0.122
- Operating System: macOS (tested), likely affects all platforms
Bug Description
Root Cause
The LiteLLM callback stores cost/token data in a module-level global dictionary:
# pdd/llm_invoke.py:678-684
_LAST_CALLBACK_DATA = {
"input_tokens": 0,
"output_tokens": 0,
"finish_reason": None,
"cost": 0.0,
}When the PDD server runs multiple jobs concurrently (up to 3 by design), all threads share this same dictionary:
- Thread 1 writes its cost ($0.10) to
_LAST_CALLBACK_DATA - Thread 2 immediately overwrites with its cost ($0.30)
- Thread 3 overwrites again with its cost ($0.05)
- Thread 1 reads back the cost → gets $0.05 (Thread 3's value!) ❌
- Thread 2 reads back the cost → gets $0.05 (Thread 3's value!) ❌
The code even has a warning comment acknowledging this issue:
# Line 678: Module-level storage for last callback data
# (Use with caution in concurrent environments)But no synchronization mechanism is implemented.
When the Bug Occurs
The bug only affects server mode (pdd connect), not CLI mode:
- ✅ Safe: Running
pdd syncin separate terminals (separate processes) - ❌ Affected: Using
pdd connectweb UI with multiple operations - ❌ Affected: Multiple users on a shared PDD server
- ❌ Affected: Automation/CI scripts submitting concurrent jobs via API
Impact
1. Budget Limits Don't Work
# pdd/sync_orchestration.py:1020-1026
if current_cost_ref[0] >= budget:
errors.append(f"Budget of ${budget:.2f} exceeded.")
break # STOPS executionScenario:
- Job A: budget=$5, actual cost=$3
- Job B: budget=$3, actual cost=$4
- Job C: budget=$2, actual cost=$1
Due to race condition:
- Job A reads Job B's cost ($4) → thinks budget exceeded → stops early ❌
- Job B reads Job C's cost ($1) → thinks under budget → continues past limit → bills $4 ❌
2. Cost Reports Are Incorrect
All cost tracking is corrupted:
- CSV files from
--output-costshow wrong amounts - Operation logs in
.pdd/meta/have incorrect cost data - Budget warnings trigger at wrong thresholds
- Cost analytics in PDD Cloud dashboard are inaccurate
Reproduction
Automated Test (Confirms Bug)
We created a test that directly demonstrates the race condition:
python3 /Users/serhanasad/Desktop/SF/pdd/test_race_condition_simple.pyResult:
What each thread SHOULD have read (its own cost):
Thread 1: $0.1000
Thread 2: $0.3000
Thread 3: $0.0500
What each thread ACTUALLY read (from shared global):
Thread 1: $0.0500 ❌ WRONG
Thread 2: $0.0500 ❌ WRONG
Thread 3: $0.0500 ✓ Correct by luck
🚨 RACE CONDITION CONFIRMED!
Manual Reproduction Steps
-
Start PDD server:
cd /tmp/pdd_test mkdir prompts echo "# Add Function\nWrite a function that adds two numbers." > prompts/task1_python.prompt echo "# Multiply Function\nWrite a function that multiplies two numbers." > prompts/task2_python.prompt echo "# Subtract Function\nWrite a function that subtracts two numbers." > prompts/task3_python.prompt pdd connect --no-browser
-
Submit 3 jobs via API:
# Terminal 2 curl -X POST http://localhost:9876/api/v1/commands/execute \ -H "Content-Type: application/json" \ -d '{"command":"sync","args":{"basename":"task1"},"options":{"skip_tests":true,"skip_verify":true,"budget":5.0}}' # Terminal 3 (immediately) curl -X POST http://localhost:9876/api/v1/commands/execute \ -H "Content-Type: application/json" \ -d '{"command":"sync","args":{"basename":"task2"},"options":{"skip_tests":true,"skip_verify":true,"budget":5.0}}' # Terminal 4 (immediately) curl -X POST http://localhost:9876/api/v1/commands/execute \ -H "Content-Type: application/json" \ -d '{"command":"sync","args":{"basename":"task3"},"options":{"skip_tests":true,"skip_verify":true,"budget":5.0}}'
-
Check job costs:
curl http://localhost:9876/api/v1/commands/history | jq '.[] | {command: .command, cost: .cost}'
-
Expected behavior: Each job reports its actual cost
Actual behavior: Jobs report mixed/incorrect costs due to race condition
Suggested Fixes
Option 1: Thread-Local Storage (Recommended)
Replace the global dictionary with thread-local storage:
import threading
# Replace module-level dict with thread-local
_CALLBACK_DATA = threading.local()
def _litellm_success_callback(...):
"""LiteLLM success callback - thread-safe version."""
# Initialize if not exists
if not hasattr(_CALLBACK_DATA, 'cost'):
_CALLBACK_DATA.cost = 0.0
_CALLBACK_DATA.input_tokens = 0
_CALLBACK_DATA.output_tokens = 0
_CALLBACK_DATA.finish_reason = None
# Write to thread-local storage
_CALLBACK_DATA.cost = calculated_cost
_CALLBACK_DATA.input_tokens = input_tokens
_CALLBACK_DATA.output_tokens = output_tokens
_CALLBACK_DATA.finish_reason = finish_reason
def get_last_callback_data():
"""Retrieve callback data for current thread."""
if not hasattr(_CALLBACK_DATA, 'cost'):
return {"cost": 0.0, "input_tokens": 0, "output_tokens": 0, "finish_reason": None}
return {
"cost": _CALLBACK_DATA.cost,
"input_tokens": _CALLBACK_DATA.input_tokens,
"output_tokens": _CALLBACK_DATA.output_tokens,
"finish_reason": _CALLBACK_DATA.finish_reason,
}Option 2: Return Values Instead of Global State
Modify the callback to return values and have LiteLLM pass them through:
def _litellm_success_callback(...):
# Calculate cost and tokens
return {
"cost": calculated_cost,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"finish_reason": finish_reason,
}
# Then retrieve from completion response
response = litellm.completion(...)
callback_data = getattr(response, '_callback_data', None)Option 3: Add Thread Synchronization (Less Elegant)
import threading
_CALLBACK_LOCK = threading.Lock()
_LAST_CALLBACK_DATA = {...}
def _litellm_success_callback(...):
with _CALLBACK_LOCK:
_LAST_CALLBACK_DATA["cost"] = calculated_cost
# ... other writes
# And when reading:
with _CALLBACK_LOCK:
cost = _LAST_CALLBACK_DATA.get("cost", 0.0)Recommendation: Option 1 (thread-local storage) is the cleanest solution and aligns with the concurrent design of the server.
Additional Context
- The server intentionally supports concurrent jobs with
max_concurrent=3(seepdd/server/app.py:54) - The comment "Use with caution in concurrent environments" suggests this was a known concern
pdd connectis the recommended entry point per README, making this bug affect the primary use case- Budget and cost tracking features are prominently documented in README
Proposed Changes
- Replace
_LAST_CALLBACK_DATAwith thread-local storage - Update all code that reads from this dictionary to use the thread-safe accessor
- Add tests for concurrent LLM callbacks to prevent regression
- Update documentation to clarify thread-safety guarantees