Skip to content

Race Condition in LLM Cost Tracking Causes Data Corruption #375

@Serhan-Asad

Description

@Serhan-Asad

Summary

The global _LAST_CALLBACK_DATA dictionary in pdd/llm_invoke.py is shared across concurrent jobs in server mode, causing cost data to be corrupted when multiple jobs run simultaneously. This breaks budget enforcement and cost reporting features.

Affected Components

  • pdd/llm_invoke.py:678-684 - Global _LAST_CALLBACK_DATA dictionary
  • pdd/llm_invoke.py:686-759 - _litellm_success_callback() function
  • pdd/sync_orchestration.py:1020-1026 - Budget enforcement logic
  • pdd/sync_orchestration.py:1532,1555 - Cost accumulation and reporting
  • pdd/server/app.py:54 - JobManager with max_concurrent=3

Environment

  • PDD Version: v0.0.122
  • Operating System: macOS (tested), likely affects all platforms

Bug Description

Root Cause

The LiteLLM callback stores cost/token data in a module-level global dictionary:

# pdd/llm_invoke.py:678-684
_LAST_CALLBACK_DATA = {
    "input_tokens": 0,
    "output_tokens": 0,
    "finish_reason": None,
    "cost": 0.0,
}

When the PDD server runs multiple jobs concurrently (up to 3 by design), all threads share this same dictionary:

  1. Thread 1 writes its cost ($0.10) to _LAST_CALLBACK_DATA
  2. Thread 2 immediately overwrites with its cost ($0.30)
  3. Thread 3 overwrites again with its cost ($0.05)
  4. Thread 1 reads back the cost → gets $0.05 (Thread 3's value!) ❌
  5. Thread 2 reads back the cost → gets $0.05 (Thread 3's value!) ❌

The code even has a warning comment acknowledging this issue:

# Line 678: Module-level storage for last callback data
# (Use with caution in concurrent environments)

But no synchronization mechanism is implemented.

When the Bug Occurs

The bug only affects server mode (pdd connect), not CLI mode:

  • Safe: Running pdd sync in separate terminals (separate processes)
  • Affected: Using pdd connect web UI with multiple operations
  • Affected: Multiple users on a shared PDD server
  • Affected: Automation/CI scripts submitting concurrent jobs via API

Impact

1. Budget Limits Don't Work

# pdd/sync_orchestration.py:1020-1026
if current_cost_ref[0] >= budget:
    errors.append(f"Budget of ${budget:.2f} exceeded.")
    break  # STOPS execution

Scenario:

  • Job A: budget=$5, actual cost=$3
  • Job B: budget=$3, actual cost=$4
  • Job C: budget=$2, actual cost=$1

Due to race condition:

  • Job A reads Job B's cost ($4) → thinks budget exceeded → stops early
  • Job B reads Job C's cost ($1) → thinks under budget → continues past limit → bills $4 ❌

2. Cost Reports Are Incorrect

All cost tracking is corrupted:

  • CSV files from --output-cost show wrong amounts
  • Operation logs in .pdd/meta/ have incorrect cost data
  • Budget warnings trigger at wrong thresholds
  • Cost analytics in PDD Cloud dashboard are inaccurate

Reproduction

Automated Test (Confirms Bug)

We created a test that directly demonstrates the race condition:

python3 /Users/serhanasad/Desktop/SF/pdd/test_race_condition_simple.py

Result:

What each thread SHOULD have read (its own cost):
  Thread 1: $0.1000
  Thread 2: $0.3000
  Thread 3: $0.0500

What each thread ACTUALLY read (from shared global):
  Thread 1: $0.0500  ❌ WRONG
  Thread 2: $0.0500  ❌ WRONG
  Thread 3: $0.0500  ✓ Correct by luck

🚨 RACE CONDITION CONFIRMED!

Manual Reproduction Steps

  1. Start PDD server:

    cd /tmp/pdd_test
    mkdir prompts
    echo "# Add Function\nWrite a function that adds two numbers." > prompts/task1_python.prompt
    echo "# Multiply Function\nWrite a function that multiplies two numbers." > prompts/task2_python.prompt
    echo "# Subtract Function\nWrite a function that subtracts two numbers." > prompts/task3_python.prompt
    
    pdd connect --no-browser
  2. Submit 3 jobs via API:

    # Terminal 2
    curl -X POST http://localhost:9876/api/v1/commands/execute \
      -H "Content-Type: application/json" \
      -d '{"command":"sync","args":{"basename":"task1"},"options":{"skip_tests":true,"skip_verify":true,"budget":5.0}}'
    
    # Terminal 3 (immediately)
    curl -X POST http://localhost:9876/api/v1/commands/execute \
      -H "Content-Type: application/json" \
      -d '{"command":"sync","args":{"basename":"task2"},"options":{"skip_tests":true,"skip_verify":true,"budget":5.0}}'
    
    # Terminal 4 (immediately)
    curl -X POST http://localhost:9876/api/v1/commands/execute \
      -H "Content-Type: application/json" \
      -d '{"command":"sync","args":{"basename":"task3"},"options":{"skip_tests":true,"skip_verify":true,"budget":5.0}}'
  3. Check job costs:

    curl http://localhost:9876/api/v1/commands/history | jq '.[] | {command: .command, cost: .cost}'
  4. Expected behavior: Each job reports its actual cost
    Actual behavior: Jobs report mixed/incorrect costs due to race condition

Suggested Fixes

Option 1: Thread-Local Storage (Recommended)

Replace the global dictionary with thread-local storage:

import threading

# Replace module-level dict with thread-local
_CALLBACK_DATA = threading.local()

def _litellm_success_callback(...):
    """LiteLLM success callback - thread-safe version."""
    # Initialize if not exists
    if not hasattr(_CALLBACK_DATA, 'cost'):
        _CALLBACK_DATA.cost = 0.0
        _CALLBACK_DATA.input_tokens = 0
        _CALLBACK_DATA.output_tokens = 0
        _CALLBACK_DATA.finish_reason = None

    # Write to thread-local storage
    _CALLBACK_DATA.cost = calculated_cost
    _CALLBACK_DATA.input_tokens = input_tokens
    _CALLBACK_DATA.output_tokens = output_tokens
    _CALLBACK_DATA.finish_reason = finish_reason

def get_last_callback_data():
    """Retrieve callback data for current thread."""
    if not hasattr(_CALLBACK_DATA, 'cost'):
        return {"cost": 0.0, "input_tokens": 0, "output_tokens": 0, "finish_reason": None}
    return {
        "cost": _CALLBACK_DATA.cost,
        "input_tokens": _CALLBACK_DATA.input_tokens,
        "output_tokens": _CALLBACK_DATA.output_tokens,
        "finish_reason": _CALLBACK_DATA.finish_reason,
    }

Option 2: Return Values Instead of Global State

Modify the callback to return values and have LiteLLM pass them through:

def _litellm_success_callback(...):
    # Calculate cost and tokens
    return {
        "cost": calculated_cost,
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,
        "finish_reason": finish_reason,
    }

# Then retrieve from completion response
response = litellm.completion(...)
callback_data = getattr(response, '_callback_data', None)

Option 3: Add Thread Synchronization (Less Elegant)

import threading

_CALLBACK_LOCK = threading.Lock()
_LAST_CALLBACK_DATA = {...}

def _litellm_success_callback(...):
    with _CALLBACK_LOCK:
        _LAST_CALLBACK_DATA["cost"] = calculated_cost
        # ... other writes

# And when reading:
with _CALLBACK_LOCK:
    cost = _LAST_CALLBACK_DATA.get("cost", 0.0)

Recommendation: Option 1 (thread-local storage) is the cleanest solution and aligns with the concurrent design of the server.

Additional Context

  • The server intentionally supports concurrent jobs with max_concurrent=3 (see pdd/server/app.py:54)
  • The comment "Use with caution in concurrent environments" suggests this was a known concern
  • pdd connect is the recommended entry point per README, making this bug affect the primary use case
  • Budget and cost tracking features are prominently documented in README

Proposed Changes

  1. Replace _LAST_CALLBACK_DATA with thread-local storage
  2. Update all code that reads from this dictionary to use the thread-safe accessor
  3. Add tests for concurrent LLM callbacks to prevent regression
  4. Update documentation to clarify thread-safety guarantees

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions