From 68d01636ed43f068127ce377d8473e21b2dbe615 Mon Sep 17 00:00:00 2001 From: mojo-opencode Date: Sat, 14 Feb 2026 03:12:53 +0000 Subject: [PATCH] fix(compat): add anthropic error responses and usage retention pruning --- .env.example | 4 + README.md | 3 + src/proxy_app/anthropic_errors.py | 17 +++++ src/proxy_app/main.py | 119 +++++++++++++---------------- src/proxy_app/routers/admin_api.py | 23 +++++- src/proxy_app/usage_recorder.py | 35 +++++++++ tests/test_api_compat.py | 22 ++++++ tests/test_auth.py | 18 +++++ tests/test_usage_retention.py | 49 ++++++++++++ 9 files changed, 223 insertions(+), 67 deletions(-) create mode 100644 src/proxy_app/anthropic_errors.py create mode 100644 tests/test_api_compat.py create mode 100644 tests/test_usage_retention.py diff --git a/.env.example b/.env.example index 43d3647e..6ad67ae7 100644 --- a/.env.example +++ b/.env.example @@ -71,6 +71,10 @@ # SQLite lock wait timeout in milliseconds. #SQLITE_BUSY_TIMEOUT_MS=5000 +# Usage event retention window (days). Older events are pruned on startup and +# can be pruned manually via POST /api/admin/usage/prune. +#USAGE_RETENTION_DAYS=30 + # ------------------------------------------------------------------------------ # | [API KEYS] Provider API Keys | diff --git a/README.md b/README.md index 3b04870f..666a0057 100644 --- a/README.md +++ b/README.md @@ -243,8 +243,10 @@ print(response.content[0].text) | `GET /v1/models` | List all available models with pricing & capabilities | | `GET /v1/models/{model_id}` | Get details for a specific model | | `GET /v1/providers` | List configured providers | +| `GET/POST /v1/quota-stats` | Provider quota stats (admin-only actor) | | `POST /v1/token-count` | Calculate token count for a payload | | `POST /v1/cost-estimate` | Estimate cost based on token counts | +| `POST /api/admin/usage/prune` | Prune usage events older than `USAGE_RETENTION_DAYS` | > **Tip:** The `/v1/models` endpoint is useful for discovering available models in your client. Many apps can fetch this list automatically. Add `?enriched=false` for a minimal response without pricing data. @@ -508,6 +510,7 @@ The proxy includes a powerful text-based UI for configuration and management. | `CORS_ALLOW_ORIGINS` | Comma-separated browser origins | empty | | `CORS_ALLOW_CREDENTIALS` | Allow credentialed CORS requests | false (unless origins configured) | | `SQLITE_BUSY_TIMEOUT_MS` | SQLite lock timeout in milliseconds | `5000` | +| `USAGE_RETENTION_DAYS` | Usage event retention window in days | `30` | | `OAUTH_REFRESH_INTERVAL` | Token refresh check interval (seconds) | `600` | | `SKIP_OAUTH_INIT_CHECK` | Skip interactive OAuth setup on startup | `false` | diff --git a/src/proxy_app/anthropic_errors.py b/src/proxy_app/anthropic_errors.py new file mode 100644 index 00000000..564fe21f --- /dev/null +++ b/src/proxy_app/anthropic_errors.py @@ -0,0 +1,17 @@ +from fastapi.responses import JSONResponse + + +def anthropic_error_response( + *, + status_code: int, + error_type: str, + message: str, +) -> JSONResponse: + payload = { + "type": "error", + "error": { + "type": error_type, + "message": message, + }, + } + return JSONResponse(status_code=status_code, content=payload) diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 46aae2c4..9f22b30f 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -138,12 +138,15 @@ from proxy_app.request_logger import log_request_to_console, redact_sensitive_data from proxy_app.security_config import get_cors_settings, validate_secret_settings from proxy_app.stream_usage import StreamUsageTracker + from proxy_app.anthropic_errors import anthropic_error_response from proxy_app.batch_manager import EmbeddingBatcher from proxy_app.api_token_auth import ApiActor, get_api_actor, require_admin_api_actor from proxy_app.detailed_logger import RawIOLogger from proxy_app.db import init_db_runtime from proxy_app.routers import admin_router, auth_router, ui_router, user_router from proxy_app.usage_recorder import ( + get_usage_retention_days, + prune_usage_events, record_usage_event as record_usage_event_async, start_usage_recorder, stop_usage_recorder, @@ -440,6 +443,11 @@ async def lifespan(app: FastAPI): db_engine, db_session_maker = await init_db_runtime(_root_dir) app.state.db_engine = db_engine app.state.db_session_maker = db_session_maker + retention_days = get_usage_retention_days() + await prune_usage_events( + app.state.db_session_maker, + retention_days=retention_days, + ) app.state.usage_recorder = await start_usage_recorder(app.state.db_session_maker) # [MODIFIED] Perform skippable OAuth initialization at startup @@ -1199,14 +1207,11 @@ async def anthropic_messages( usage=None, error=e, ) - error_response = { - "type": "error", - "error": { - "type": "invalid_request_error", - "message": _safe_error_message(e, default="Invalid request"), - }, - } - raise HTTPException(status_code=400, detail=error_response) + return anthropic_error_response( + status_code=400, + error_type="invalid_request_error", + message=_safe_error_message(e, default="Invalid request"), + ) except litellm.AuthenticationError as e: await _record_usage( actor=actor, @@ -1217,14 +1222,11 @@ async def anthropic_messages( usage=None, error=e, ) - error_response = { - "type": "error", - "error": { - "type": "authentication_error", - "message": _safe_error_message(e, default="Authentication failed"), - }, - } - raise HTTPException(status_code=401, detail=error_response) + return anthropic_error_response( + status_code=401, + error_type="authentication_error", + message=_safe_error_message(e, default="Authentication failed"), + ) except litellm.RateLimitError as e: await _record_usage( actor=actor, @@ -1235,14 +1237,11 @@ async def anthropic_messages( usage=None, error=e, ) - error_response = { - "type": "error", - "error": { - "type": "rate_limit_error", - "message": _safe_error_message(e, default="Rate limit exceeded"), - }, - } - raise HTTPException(status_code=429, detail=error_response) + return anthropic_error_response( + status_code=429, + error_type="rate_limit_error", + message=_safe_error_message(e, default="Rate limit exceeded"), + ) except (litellm.ServiceUnavailableError, litellm.APIConnectionError) as e: await _record_usage( actor=actor, @@ -1253,14 +1252,11 @@ async def anthropic_messages( usage=None, error=e, ) - error_response = { - "type": "error", - "error": { - "type": "api_error", - "message": _safe_error_message(e, default="Service unavailable"), - }, - } - raise HTTPException(status_code=503, detail=error_response) + return anthropic_error_response( + status_code=503, + error_type="api_error", + message=_safe_error_message(e, default="Service unavailable"), + ) except litellm.Timeout as e: await _record_usage( actor=actor, @@ -1271,14 +1267,11 @@ async def anthropic_messages( usage=None, error=e, ) - error_response = { - "type": "error", - "error": { - "type": "api_error", - "message": f"Request timed out: {_safe_error_message(e, default='Gateway timeout')}", - }, - } - raise HTTPException(status_code=504, detail=error_response) + return anthropic_error_response( + status_code=504, + error_type="api_error", + message=f"Request timed out: {_safe_error_message(e, default='Gateway timeout')}", + ) except Exception as e: safe_message = _safe_error_message(e, default="Internal server error") logging.error("Anthropic messages endpoint error: %s", safe_message) @@ -1297,11 +1290,11 @@ async def anthropic_messages( headers=None, body={"error": safe_message}, ) - error_response = { - "type": "error", - "error": {"type": "api_error", "message": safe_message}, - } - raise HTTPException(status_code=500, detail=error_response) + return anthropic_error_response( + status_code=500, + error_type="api_error", + message=safe_message, + ) # --- Anthropic Count Tokens Endpoint --- @@ -1330,31 +1323,25 @@ async def anthropic_count_tokens( ValueError, litellm.ContextWindowExceededError, ) as e: - error_response = { - "type": "error", - "error": { - "type": "invalid_request_error", - "message": _safe_error_message(e, default="Invalid request"), - }, - } - raise HTTPException(status_code=400, detail=error_response) + return anthropic_error_response( + status_code=400, + error_type="invalid_request_error", + message=_safe_error_message(e, default="Invalid request"), + ) except litellm.AuthenticationError as e: - error_response = { - "type": "error", - "error": { - "type": "authentication_error", - "message": _safe_error_message(e, default="Authentication failed"), - }, - } - raise HTTPException(status_code=401, detail=error_response) + return anthropic_error_response( + status_code=401, + error_type="authentication_error", + message=_safe_error_message(e, default="Authentication failed"), + ) except Exception as e: safe_message = _safe_error_message(e, default="Internal server error") logging.error("Anthropic count_tokens endpoint error: %s", safe_message) - error_response = { - "type": "error", - "error": {"type": "api_error", "message": safe_message}, - } - raise HTTPException(status_code=500, detail=error_response) + return anthropic_error_response( + status_code=500, + error_type="api_error", + message=safe_message, + ) @app.post("/v1/embeddings") diff --git a/src/proxy_app/routers/admin_api.py b/src/proxy_app/routers/admin_api.py index 35188e02..9dd0d661 100644 --- a/src/proxy_app/routers/admin_api.py +++ b/src/proxy_app/routers/admin_api.py @@ -2,7 +2,7 @@ from datetime import datetime from typing import Literal -from fastapi import APIRouter, Depends, HTTPException, Query, status +from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession @@ -10,6 +10,7 @@ from proxy_app.auth import SessionUser, get_db_session, require_admin from proxy_app.db import hash_password from proxy_app.db_models import User +from proxy_app.usage_recorder import get_usage_retention_days, prune_usage_events from proxy_app.usage_queries import ( fetch_usage_by_day, fetch_usage_by_model, @@ -75,6 +76,12 @@ class UsageByModelResponse(BaseModel): rows: list[UsageByModelItem] +class UsagePruneResponse(BaseModel): + ok: bool + deleted: int + retention_days: int + + def _serialize_user(user: User) -> AdminUserItem: return AdminUserItem( id=user.id, @@ -243,3 +250,17 @@ async def admin_user_usage_by_model( days=days, rows=[UsageByModelItem(**row) for row in rows], ) + + +@router.post("/usage/prune", response_model=UsagePruneResponse) +async def admin_prune_usage( + request: Request, + _: SessionUser = Depends(require_admin), +) -> UsagePruneResponse: + retention_days = get_usage_retention_days() + session_maker = request.app.state.db_session_maker + deleted = await prune_usage_events( + session_maker, + retention_days=retention_days, + ) + return UsagePruneResponse(ok=True, deleted=deleted, retention_days=retention_days) diff --git a/src/proxy_app/usage_recorder.py b/src/proxy_app/usage_recorder.py index 77d86153..e4ada1c3 100644 --- a/src/proxy_app/usage_recorder.py +++ b/src/proxy_app/usage_recorder.py @@ -1,8 +1,11 @@ import asyncio import logging +import os +from datetime import datetime, timedelta from dataclasses import dataclass from typing import Any +from sqlalchemy import delete from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from proxy_app.db_models import UsageEvent @@ -10,6 +13,38 @@ logger = logging.getLogger(__name__) +def get_usage_retention_days() -> int: + raw = os.getenv("USAGE_RETENTION_DAYS", "30") + try: + days = int(raw) + except ValueError: + days = 30 + return max(1, days) + + +async def prune_usage_events( + session_maker: async_sessionmaker[AsyncSession], + *, + retention_days: int | None = None, +) -> int: + active_retention_days = retention_days or get_usage_retention_days() + cutoff = datetime.utcnow() - timedelta(days=active_retention_days) + async with session_maker() as session: + result = await session.execute( + delete(UsageEvent).where(UsageEvent.timestamp < cutoff) + ) + await session.commit() + deleted = result.rowcount if result.rowcount is not None else 0 + + if deleted > 0: + logger.info( + "Pruned %d usage events older than %d days", + deleted, + active_retention_days, + ) + return deleted + + @dataclass(slots=True) class UsageEventPayload: user_id: int | None diff --git a/tests/test_api_compat.py b/tests/test_api_compat.py new file mode 100644 index 00000000..5db93fdb --- /dev/null +++ b/tests/test_api_compat.py @@ -0,0 +1,22 @@ +import json + +from proxy_app.anthropic_errors import anthropic_error_response + + +def test_anthropic_error_response_shape_is_not_fastapi_wrapped() -> None: + response = anthropic_error_response( + status_code=429, + error_type="rate_limit_error", + message="Too many requests", + ) + + payload = json.loads(response.body.decode("utf-8")) + assert response.status_code == 429 + assert payload == { + "type": "error", + "error": { + "type": "rate_limit_error", + "message": "Too many requests", + }, + } + assert "detail" not in payload diff --git a/tests/test_auth.py b/tests/test_auth.py index 6ab71b93..18d28828 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,6 +1,7 @@ from datetime import datetime, timedelta import pytest +from fastapi import HTTPException from proxy_app.api_token_auth import ( AUTH_MODE_BOTH, @@ -8,9 +9,11 @@ AUTH_MODE_USERS, AUTH_SOURCE_LEGACY_MASTER, AUTH_SOURCE_USER_API_KEY, + ApiActor, extract_api_token_from_headers, hash_api_token, hash_api_token_legacy, + require_admin_api_actor, resolve_api_actor_from_token, ) from proxy_app.auth import create_session_token, decode_session_token, verify_password @@ -112,6 +115,21 @@ def test_x_api_key_header_precedence_over_authorization() -> None: assert fallback == "bearer-token" +@pytest.mark.asyncio +async def test_require_admin_api_actor_blocks_non_admin() -> None: + with pytest.raises(HTTPException) as exc: + await require_admin_api_actor( + actor=ApiActor( + user_id=1, + api_key_id=2, + role="user", + auth_source="user_api_key", + ) + ) + + assert exc.value.status_code == 403 + + @pytest.mark.asyncio async def test_legacy_token_hash_opportunistic_migration( session_maker, diff --git a/tests/test_usage_retention.py b/tests/test_usage_retention.py new file mode 100644 index 00000000..264f1a2d --- /dev/null +++ b/tests/test_usage_retention.py @@ -0,0 +1,49 @@ +from datetime import datetime, timedelta + +import pytest +from sqlalchemy import func, select + +from proxy_app.db_models import UsageEvent +from proxy_app.usage_recorder import prune_usage_events + + +@pytest.mark.asyncio +async def test_prune_usage_events_removes_old_rows(session_maker) -> None: + now = datetime.utcnow() + old_time = now - timedelta(days=60) + recent_time = now - timedelta(days=5) + + async with session_maker() as session: + session.add_all( + [ + UsageEvent( + timestamp=old_time, + user_id=1, + api_key_id=1, + endpoint="/v1/chat/completions", + provider="openai", + model="openai/gpt-4o-mini", + request_id="req-old", + status_code=200, + ), + UsageEvent( + timestamp=recent_time, + user_id=1, + api_key_id=1, + endpoint="/v1/chat/completions", + provider="openai", + model="openai/gpt-4o-mini", + request_id="req-recent", + status_code=200, + ), + ] + ) + await session.commit() + + deleted = await prune_usage_events(session_maker, retention_days=30) + assert deleted == 1 + + async with session_maker() as session: + count = await session.scalar(select(func.count(UsageEvent.id))) + + assert count == 1