Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
# SQLite lock wait timeout in milliseconds.
#SQLITE_BUSY_TIMEOUT_MS=5000

# Usage event retention window (days). Older events are pruned on startup and
# can be pruned manually via POST /api/admin/usage/prune.
#USAGE_RETENTION_DAYS=30


# ------------------------------------------------------------------------------
# | [API KEYS] Provider API Keys |
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,10 @@ print(response.content[0].text)
| `GET /v1/models` | List all available models with pricing & capabilities |
| `GET /v1/models/{model_id}` | Get details for a specific model |
| `GET /v1/providers` | List configured providers |
| `GET/POST /v1/quota-stats` | Provider quota stats (admin-only actor) |
| `POST /v1/token-count` | Calculate token count for a payload |
| `POST /v1/cost-estimate` | Estimate cost based on token counts |
| `POST /api/admin/usage/prune` | Prune usage events older than `USAGE_RETENTION_DAYS` |

> **Tip:** The `/v1/models` endpoint is useful for discovering available models in your client. Many apps can fetch this list automatically. Add `?enriched=false` for a minimal response without pricing data.

Expand Down Expand Up @@ -508,6 +510,7 @@ The proxy includes a powerful text-based UI for configuration and management.
| `CORS_ALLOW_ORIGINS` | Comma-separated browser origins | empty |
| `CORS_ALLOW_CREDENTIALS` | Allow credentialed CORS requests | false (unless origins configured) |
| `SQLITE_BUSY_TIMEOUT_MS` | SQLite lock timeout in milliseconds | `5000` |
| `USAGE_RETENTION_DAYS` | Usage event retention window in days | `30` |
| `OAUTH_REFRESH_INTERVAL` | Token refresh check interval (seconds) | `600` |
| `SKIP_OAUTH_INIT_CHECK` | Skip interactive OAuth setup on startup | `false` |

Expand Down
17 changes: 17 additions & 0 deletions src/proxy_app/anthropic_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from fastapi.responses import JSONResponse


def anthropic_error_response(
*,
status_code: int,
error_type: str,
message: str,
) -> JSONResponse:
payload = {
"type": "error",
"error": {
"type": error_type,
"message": message,
},
}
return JSONResponse(status_code=status_code, content=payload)
119 changes: 53 additions & 66 deletions src/proxy_app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,15 @@
from proxy_app.request_logger import log_request_to_console, redact_sensitive_data
from proxy_app.security_config import get_cors_settings, validate_secret_settings
from proxy_app.stream_usage import StreamUsageTracker
from proxy_app.anthropic_errors import anthropic_error_response
from proxy_app.batch_manager import EmbeddingBatcher
from proxy_app.api_token_auth import ApiActor, get_api_actor, require_admin_api_actor
from proxy_app.detailed_logger import RawIOLogger
from proxy_app.db import init_db_runtime
from proxy_app.routers import admin_router, auth_router, ui_router, user_router
from proxy_app.usage_recorder import (
get_usage_retention_days,
prune_usage_events,
record_usage_event as record_usage_event_async,
start_usage_recorder,
stop_usage_recorder,
Expand Down Expand Up @@ -440,6 +443,11 @@ async def lifespan(app: FastAPI):
db_engine, db_session_maker = await init_db_runtime(_root_dir)
app.state.db_engine = db_engine
app.state.db_session_maker = db_session_maker
retention_days = get_usage_retention_days()
await prune_usage_events(
app.state.db_session_maker,
retention_days=retention_days,
)
app.state.usage_recorder = await start_usage_recorder(app.state.db_session_maker)

# [MODIFIED] Perform skippable OAuth initialization at startup
Expand Down Expand Up @@ -1199,14 +1207,11 @@ async def anthropic_messages(
usage=None,
error=e,
)
error_response = {
"type": "error",
"error": {
"type": "invalid_request_error",
"message": _safe_error_message(e, default="Invalid request"),
},
}
raise HTTPException(status_code=400, detail=error_response)
return anthropic_error_response(
status_code=400,
error_type="invalid_request_error",
message=_safe_error_message(e, default="Invalid request"),
)
except litellm.AuthenticationError as e:
await _record_usage(
actor=actor,
Expand All @@ -1217,14 +1222,11 @@ async def anthropic_messages(
usage=None,
error=e,
)
error_response = {
"type": "error",
"error": {
"type": "authentication_error",
"message": _safe_error_message(e, default="Authentication failed"),
},
}
raise HTTPException(status_code=401, detail=error_response)
return anthropic_error_response(
status_code=401,
error_type="authentication_error",
message=_safe_error_message(e, default="Authentication failed"),
)
except litellm.RateLimitError as e:
await _record_usage(
actor=actor,
Expand All @@ -1235,14 +1237,11 @@ async def anthropic_messages(
usage=None,
error=e,
)
error_response = {
"type": "error",
"error": {
"type": "rate_limit_error",
"message": _safe_error_message(e, default="Rate limit exceeded"),
},
}
raise HTTPException(status_code=429, detail=error_response)
return anthropic_error_response(
status_code=429,
error_type="rate_limit_error",
message=_safe_error_message(e, default="Rate limit exceeded"),
)
except (litellm.ServiceUnavailableError, litellm.APIConnectionError) as e:
await _record_usage(
actor=actor,
Expand All @@ -1253,14 +1252,11 @@ async def anthropic_messages(
usage=None,
error=e,
)
error_response = {
"type": "error",
"error": {
"type": "api_error",
"message": _safe_error_message(e, default="Service unavailable"),
},
}
raise HTTPException(status_code=503, detail=error_response)
return anthropic_error_response(
status_code=503,
error_type="api_error",
message=_safe_error_message(e, default="Service unavailable"),
)
except litellm.Timeout as e:
await _record_usage(
actor=actor,
Expand All @@ -1271,14 +1267,11 @@ async def anthropic_messages(
usage=None,
error=e,
)
error_response = {
"type": "error",
"error": {
"type": "api_error",
"message": f"Request timed out: {_safe_error_message(e, default='Gateway timeout')}",
},
}
raise HTTPException(status_code=504, detail=error_response)
return anthropic_error_response(
status_code=504,
error_type="api_error",
message=f"Request timed out: {_safe_error_message(e, default='Gateway timeout')}",
)
except Exception as e:
safe_message = _safe_error_message(e, default="Internal server error")
logging.error("Anthropic messages endpoint error: %s", safe_message)
Expand All @@ -1297,11 +1290,11 @@ async def anthropic_messages(
headers=None,
body={"error": safe_message},
)
error_response = {
"type": "error",
"error": {"type": "api_error", "message": safe_message},
}
raise HTTPException(status_code=500, detail=error_response)
return anthropic_error_response(
status_code=500,
error_type="api_error",
message=safe_message,
)


# --- Anthropic Count Tokens Endpoint ---
Expand Down Expand Up @@ -1330,31 +1323,25 @@ async def anthropic_count_tokens(
ValueError,
litellm.ContextWindowExceededError,
) as e:
error_response = {
"type": "error",
"error": {
"type": "invalid_request_error",
"message": _safe_error_message(e, default="Invalid request"),
},
}
raise HTTPException(status_code=400, detail=error_response)
return anthropic_error_response(
status_code=400,
error_type="invalid_request_error",
message=_safe_error_message(e, default="Invalid request"),
)
except litellm.AuthenticationError as e:
error_response = {
"type": "error",
"error": {
"type": "authentication_error",
"message": _safe_error_message(e, default="Authentication failed"),
},
}
raise HTTPException(status_code=401, detail=error_response)
return anthropic_error_response(
status_code=401,
error_type="authentication_error",
message=_safe_error_message(e, default="Authentication failed"),
)
except Exception as e:
safe_message = _safe_error_message(e, default="Internal server error")
logging.error("Anthropic count_tokens endpoint error: %s", safe_message)
error_response = {
"type": "error",
"error": {"type": "api_error", "message": safe_message},
}
raise HTTPException(status_code=500, detail=error_response)
return anthropic_error_response(
status_code=500,
error_type="api_error",
message=safe_message,
)


@app.post("/v1/embeddings")
Expand Down
23 changes: 22 additions & 1 deletion src/proxy_app/routers/admin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from datetime import datetime
from typing import Literal

from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from proxy_app.auth import SessionUser, get_db_session, require_admin
from proxy_app.db import hash_password
from proxy_app.db_models import User
from proxy_app.usage_recorder import get_usage_retention_days, prune_usage_events
from proxy_app.usage_queries import (
fetch_usage_by_day,
fetch_usage_by_model,
Expand Down Expand Up @@ -75,6 +76,12 @@ class UsageByModelResponse(BaseModel):
rows: list[UsageByModelItem]


class UsagePruneResponse(BaseModel):
ok: bool
deleted: int
retention_days: int


def _serialize_user(user: User) -> AdminUserItem:
return AdminUserItem(
id=user.id,
Expand Down Expand Up @@ -243,3 +250,17 @@ async def admin_user_usage_by_model(
days=days,
rows=[UsageByModelItem(**row) for row in rows],
)


@router.post("/usage/prune", response_model=UsagePruneResponse)
async def admin_prune_usage(
request: Request,
_: SessionUser = Depends(require_admin),
) -> UsagePruneResponse:
retention_days = get_usage_retention_days()
session_maker = request.app.state.db_session_maker
deleted = await prune_usage_events(
session_maker,
retention_days=retention_days,
)
return UsagePruneResponse(ok=True, deleted=deleted, retention_days=retention_days)
35 changes: 35 additions & 0 deletions src/proxy_app/usage_recorder.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,50 @@
import asyncio
import logging
import os
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Any

from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from proxy_app.db_models import UsageEvent

logger = logging.getLogger(__name__)


def get_usage_retention_days() -> int:
raw = os.getenv("USAGE_RETENTION_DAYS", "30")
try:
days = int(raw)
except ValueError:
days = 30
return max(1, days)


async def prune_usage_events(
session_maker: async_sessionmaker[AsyncSession],
*,
retention_days: int | None = None,
) -> int:
active_retention_days = retention_days or get_usage_retention_days()
cutoff = datetime.utcnow() - timedelta(days=active_retention_days)
async with session_maker() as session:
result = await session.execute(
delete(UsageEvent).where(UsageEvent.timestamp < cutoff)
)
await session.commit()
deleted = result.rowcount if result.rowcount is not None else 0

if deleted > 0:
logger.info(
"Pruned %d usage events older than %d days",
deleted,
active_retention_days,
)
return deleted


@dataclass(slots=True)
class UsageEventPayload:
user_id: int | None
Expand Down
22 changes: 22 additions & 0 deletions tests/test_api_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import json

from proxy_app.anthropic_errors import anthropic_error_response


def test_anthropic_error_response_shape_is_not_fastapi_wrapped() -> None:
response = anthropic_error_response(
status_code=429,
error_type="rate_limit_error",
message="Too many requests",
)

payload = json.loads(response.body.decode("utf-8"))
assert response.status_code == 429
assert payload == {
"type": "error",
"error": {
"type": "rate_limit_error",
"message": "Too many requests",
},
}
assert "detail" not in payload
Loading
Loading