From 9c663d4623f66fc89854b121c3cefb153b0fcf6e Mon Sep 17 00:00:00 2001 From: Jasmin Le Roux Date: Sun, 15 Feb 2026 11:42:33 +0200 Subject: [PATCH 1/5] feat(providers): add Anthropic OAuth provider (Claude Pro/Max) Implements direct Anthropic Messages API integration using OAuth PKCE flow, impersonating Claude Code for Pro/Max subscription access. Includes full OpenAI<->Anthropic format conversion, SSE streaming, mcp_ tool prefixing, token refresh, and live model list fetching from /v1/models. Co-Authored-By: Claude Opus 4.6 --- scripts/setup_anthropic_cred.py | 68 ++ src/rotator_library/credential_manager.py | 3 +- src/rotator_library/provider_factory.py | 2 + .../providers/anthropic_auth_base.py | 571 ++++++++++++ .../providers/anthropic_provider.py | 816 ++++++++++++++++++ 5 files changed, 1459 insertions(+), 1 deletion(-) create mode 100644 scripts/setup_anthropic_cred.py create mode 100644 src/rotator_library/providers/anthropic_auth_base.py create mode 100644 src/rotator_library/providers/anthropic_provider.py diff --git a/scripts/setup_anthropic_cred.py b/scripts/setup_anthropic_cred.py new file mode 100644 index 00000000..de6ad465 --- /dev/null +++ b/scripts/setup_anthropic_cred.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +""" +Two-step Anthropic OAuth credential setup. + +Step 1 (no args): Generate auth URL + save verifier + python scripts/setup_anthropic_cred.py + +Step 2 (with code): Exchange code for tokens + python scripts/setup_anthropic_cred.py "CODE_FROM_BROWSER" +""" +import sys +import os +import json +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +import asyncio +from pathlib import Path +from rotator_library.providers.anthropic_auth_base import ( + _generate_pkce, _build_authorize_url, AnthropicAuthBase +) + +STATE_FILE = Path(__file__).parent / ".anthropic_pkce_state.json" +OAUTH_DIR = Path(__file__).parent / ".." / "oauth_creds" + +async def exchange_code(auth_code: str): + state = json.loads(STATE_FILE.read_text()) + verifier = state["verifier"] + + auth = AnthropicAuthBase() + tokens = await auth._exchange_code(auth_code.strip(), verifier) + + import time + creds = { + **tokens, + "email": "anthropic-oauth-user", + "_proxy_metadata": { + "email": "anthropic-oauth-user", + "last_check_timestamp": time.time(), + "credential_type": "oauth", + }, + } + + oauth_dir = OAUTH_DIR.resolve() + oauth_dir.mkdir(parents=True, exist_ok=True) + existing = sorted(oauth_dir.glob("anthropic_oauth_*.json")) + next_num = len(existing) + 1 + file_path = oauth_dir / f"anthropic_oauth_{next_num}.json" + + file_path.write_text(json.dumps(creds, indent=2)) + os.chmod(file_path, 0o600) + STATE_FILE.unlink(missing_ok=True) + + print(f"Credential saved to: {file_path}") + print(f"Access token prefix: {tokens['access_token'][:20]}...") + +def step1(): + verifier, challenge = _generate_pkce() + url = _build_authorize_url(verifier, challenge) + STATE_FILE.write_text(json.dumps({"verifier": verifier, "challenge": challenge})) + print("Open this URL in your browser, authorize, then copy the code:\n") + print(url) + print(f"\nThen run: python scripts/setup_anthropic_cred.py \"PASTE_CODE_HERE\"") + +if __name__ == "__main__": + if len(sys.argv) > 1: + asyncio.run(exchange_code(sys.argv[1])) + else: + step1() diff --git a/src/rotator_library/credential_manager.py b/src/rotator_library/credential_manager.py index 9a7e5edb..76273e5f 100644 --- a/src/rotator_library/credential_manager.py +++ b/src/rotator_library/credential_manager.py @@ -18,7 +18,7 @@ "qwen_code": Path.home() / ".qwen", "iflow": Path.home() / ".iflow", "antigravity": Path.home() / ".antigravity", - # Add other providers like 'claude' here if they have a standard CLI path + "anthropic": Path.home() / ".anthropic", } # OAuth providers that support environment variable-based credentials @@ -28,6 +28,7 @@ "antigravity": "ANTIGRAVITY", "qwen_code": "QWEN_CODE", "iflow": "IFLOW", + "anthropic": "ANTHROPIC_OAUTH", } diff --git a/src/rotator_library/provider_factory.py b/src/rotator_library/provider_factory.py index dcc40bc9..c8efe3f0 100644 --- a/src/rotator_library/provider_factory.py +++ b/src/rotator_library/provider_factory.py @@ -7,12 +7,14 @@ from .providers.qwen_auth_base import QwenAuthBase from .providers.iflow_auth_base import IFlowAuthBase from .providers.antigravity_auth_base import AntigravityAuthBase +from .providers.anthropic_auth_base import AnthropicAuthBase PROVIDER_MAP = { "gemini_cli": GeminiAuthBase, "qwen_code": QwenAuthBase, "iflow": IFlowAuthBase, "antigravity": AntigravityAuthBase, + "anthropic": AnthropicAuthBase, } def get_provider_auth_class(provider_name: str): diff --git a/src/rotator_library/providers/anthropic_auth_base.py b/src/rotator_library/providers/anthropic_auth_base.py new file mode 100644 index 00000000..0ce88d56 --- /dev/null +++ b/src/rotator_library/providers/anthropic_auth_base.py @@ -0,0 +1,571 @@ +# SPDX-License-Identifier: LGPL-3.0-only +# Copyright (c) 2026 Mirrowel + +# src/rotator_library/providers/anthropic_auth_base.py + +import base64 +import hashlib +import json +import os +import secrets +import time +import asyncio +import logging +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, Any, Tuple, Optional, Union + +import httpx +from rich.console import Console +from rich.panel import Panel +from rich.prompt import Prompt + +from ..utils.resilient_io import safe_write_json +from ..error_handler import CredentialNeedsReauthError + +lib_logger = logging.getLogger("rotator_library") + +# Anthropic OAuth constants — matches Claude Code's flow exactly +ANTHROPIC_CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" +ANTHROPIC_TOKEN_ENDPOINT = "https://console.anthropic.com/v1/oauth/token" +ANTHROPIC_REDIRECT_URI = "https://console.anthropic.com/oauth/code/callback" +ANTHROPIC_SCOPES = "org:create_api_key user:profile user:inference" + +# Refresh 5 minutes before expiry +REFRESH_EXPIRY_BUFFER_SECONDS = 5 * 60 + +console = Console() + + +@dataclass +class AnthropicCredentialSetupResult: + success: bool + file_path: Optional[str] = None + email: Optional[str] = None + is_update: bool = False + error: Optional[str] = None + credentials: Optional[Dict[str, Any]] = field(default=None, repr=False) + + +def _generate_pkce() -> Tuple[str, str]: + """Generate PKCE code_verifier and code_challenge (S256).""" + verifier = secrets.token_urlsafe(32) + digest = hashlib.sha256(verifier.encode("ascii")).digest() + challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii") + return verifier, challenge + + +def _build_authorize_url(verifier: str, challenge: str) -> str: + """Build the authorization URL for Claude Pro/Max OAuth.""" + params = { + "code": "true", + "client_id": ANTHROPIC_CLIENT_ID, + "response_type": "code", + "redirect_uri": ANTHROPIC_REDIRECT_URI, + "scope": ANTHROPIC_SCOPES, + "code_challenge": challenge, + "code_challenge_method": "S256", + "state": verifier, + } + qs = "&".join(f"{k}={v}" for k, v in params.items()) + return f"https://claude.ai/oauth/authorize?{qs}" + + +class AnthropicAuthBase: + """ + Anthropic OAuth authentication base class. + Implements PKCE authorization code flow matching Claude Code's OAuth exactly. + Uses manual code-paste (no local callback server). + """ + + def __init__(self): + self._credentials_cache: Dict[str, Dict[str, Any]] = {} + self._refresh_locks: Dict[str, asyncio.Lock] = {} + self._locks_lock = asyncio.Lock() + self._refresh_failures: Dict[str, int] = {} + self._next_refresh_after: Dict[str, float] = {} + self._permanently_expired_credentials: set = set() + + # ========================================================================= + # CREDENTIAL LOADING / SAVING + # ========================================================================= + + def _parse_env_credential_path(self, path: str) -> Optional[str]: + if not path.startswith("env://"): + return None + parts = path[6:].split("/") + if len(parts) >= 2: + return parts[1] + return "0" + + def _load_from_env( + self, credential_index: Optional[str] = None + ) -> Optional[Dict[str, Any]]: + if credential_index and credential_index != "0": + prefix = f"ANTHROPIC_OAUTH_{credential_index}" + default_email = f"env-user-{credential_index}" + else: + prefix = "ANTHROPIC_OAUTH" + default_email = "env-user" + + access_token = os.getenv(f"{prefix}_ACCESS_TOKEN") + refresh_token = os.getenv(f"{prefix}_REFRESH_TOKEN") + + if not (access_token and refresh_token): + return None + + lib_logger.debug( + f"Loading Anthropic OAuth credentials from env vars (prefix: {prefix})" + ) + + return { + "access_token": access_token, + "refresh_token": refresh_token, + "expiry_date": os.getenv(f"{prefix}_EXPIRY_DATE", ""), + "email": os.getenv(f"{prefix}_EMAIL", default_email), + "token_type": "Bearer", + "_proxy_metadata": { + "email": os.getenv(f"{prefix}_EMAIL", default_email), + "last_check_timestamp": time.time(), + "loaded_from_env": True, + "env_credential_index": credential_index or "0", + "credential_type": "oauth", + }, + } + + async def _read_creds_from_file(self, path: str) -> Dict[str, Any]: + try: + with open(path, "r") as f: + creds = json.load(f) + self._credentials_cache[path] = creds + return creds + except FileNotFoundError: + raise IOError(f"Anthropic OAuth credential file not found at '{path}'") + except Exception as e: + raise IOError( + f"Failed to load Anthropic OAuth credentials from '{path}': {e}" + ) + + async def _load_credentials(self, path: str) -> Dict[str, Any]: + if path in self._credentials_cache: + return self._credentials_cache[path] + + async with await self._get_lock(path): + if path in self._credentials_cache: + return self._credentials_cache[path] + + credential_index = self._parse_env_credential_path(path) + if credential_index is not None: + env_creds = self._load_from_env(credential_index) + if env_creds: + self._credentials_cache[path] = env_creds + return env_creds + else: + raise IOError( + f"Env vars for Anthropic credential index {credential_index} not found" + ) + + try: + return await self._read_creds_from_file(path) + except IOError: + env_creds = self._load_from_env() + if env_creds: + self._credentials_cache[path] = env_creds + return env_creds + raise + + async def _save_credentials(self, path: str, creds: Dict[str, Any]) -> bool: + if creds.get("_proxy_metadata", {}).get("loaded_from_env"): + self._credentials_cache[path] = creds + return True + + if not safe_write_json( + path, creds, lib_logger, secure_permissions=True, buffer_on_failure=False + ): + lib_logger.error( + f"Failed to write Anthropic credentials to disk for '{Path(path).name}'." + ) + return False + + self._credentials_cache[path] = creds + return True + + def _is_token_expired(self, creds: Dict[str, Any]) -> bool: + expiry_str = creds.get("expiry_date") + if not expiry_str: + return True + try: + from datetime import datetime + + expiry_dt = datetime.fromisoformat(expiry_str.replace("Z", "+00:00")) + expiry_timestamp = expiry_dt.timestamp() + except (ValueError, AttributeError): + try: + expiry_timestamp = float(expiry_str) + except (ValueError, TypeError): + return True + + return expiry_timestamp < time.time() + REFRESH_EXPIRY_BUFFER_SECONDS + + async def _get_lock(self, path: str) -> asyncio.Lock: + async with self._locks_lock: + if path not in self._refresh_locks: + self._refresh_locks[path] = asyncio.Lock() + return self._refresh_locks[path] + + def _mark_credential_expired(self, path: str, reason: str) -> None: + self._permanently_expired_credentials.add(path) + + display_name = path if path.startswith("env://") else Path(path).name + + console.print( + Panel( + f"[bold red]Credential:[/bold red] {display_name}\n" + f"[bold red]Reason:[/bold red] {reason}\n\n" + f"[yellow]This credential has been removed from rotation.[/yellow]\n" + f"[yellow]To fix: Run 'python credential_tool.py' to re-authenticate,[/yellow]\n" + f"[yellow]then restart the proxy.[/yellow]", + title="[bold red]CREDENTIAL EXPIRED - REMOVED FROM ROTATION[/bold red]", + border_style="red", + ) + ) + lib_logger.error( + f"CREDENTIAL EXPIRED | Credential: {display_name} | Reason: {reason}" + ) + + # ========================================================================= + # TOKEN EXCHANGE & REFRESH + # ========================================================================= + + async def _exchange_code(self, auth_code: str, verifier: str) -> Dict[str, Any]: + """ + Exchange authorization code for tokens. + The code from Anthropic is formatted as `code#state`. + """ + splits = auth_code.split("#") + code_part = splits[0] + state_part = splits[1] if len(splits) > 1 else "" + + payload = { + "code": code_part, + "state": state_part, + "grant_type": "authorization_code", + "client_id": ANTHROPIC_CLIENT_ID, + "redirect_uri": ANTHROPIC_REDIRECT_URI, + "code_verifier": verifier, + } + + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + ANTHROPIC_TOKEN_ENDPOINT, + headers={"Content-Type": "application/json"}, + json=payload, + ) + if not response.is_success: + raise ValueError( + f"Token exchange failed: {response.status_code} {response.text}" + ) + data = response.json() + + access_token = data.get("access_token") + if not access_token: + raise ValueError("Missing access_token in token response") + + refresh_token = data.get("refresh_token", "") + expires_in = data.get("expires_in", 3600) + + from datetime import datetime, timedelta + + expiry_date = ( + datetime.utcnow() + timedelta(seconds=expires_in) + ).isoformat() + "Z" + + return { + "access_token": access_token, + "refresh_token": refresh_token, + "expiry_date": expiry_date, + "token_type": "Bearer", + } + + async def _refresh_token(self, path: str, force: bool = False) -> Dict[str, Any]: + async with await self._get_lock(path): + cached_creds = self._credentials_cache.get(path) + if not force and cached_creds and not self._is_token_expired(cached_creds): + return cached_creds + + if not path.startswith("env://"): + await self._read_creds_from_file(path) + creds = self._credentials_cache[path] + + refresh_token = creds.get("refresh_token") + if not refresh_token: + raise ValueError("No refresh_token in Anthropic credentials.") + + lib_logger.debug( + f"Refreshing Anthropic OAuth token for '{Path(path).name if not path.startswith('env://') else path}'..." + ) + + max_retries = 3 + new_token_data = None + last_error = None + + async with httpx.AsyncClient(timeout=30.0) as client: + for attempt in range(max_retries): + try: + response = await client.post( + ANTHROPIC_TOKEN_ENDPOINT, + headers={"Content-Type": "application/json"}, + json={ + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "client_id": ANTHROPIC_CLIENT_ID, + }, + ) + response.raise_for_status() + new_token_data = response.json() + break + + except httpx.HTTPStatusError as e: + last_error = e + status_code = e.response.status_code + + if status_code in (400, 401, 403): + error_body = e.response.text + self._mark_credential_expired( + path, + f"Refresh token invalid (HTTP {status_code}: {error_body})", + ) + raise CredentialNeedsReauthError( + credential_path=path, + message=f"Anthropic refresh token invalid. Credential removed from rotation.", + ) + + if status_code == 429: + retry_after = int(e.response.headers.get("Retry-After", 60)) + if attempt < max_retries - 1: + await asyncio.sleep(retry_after) + continue + raise + + if 500 <= status_code < 600: + if attempt < max_retries - 1: + await asyncio.sleep(2**attempt) + continue + raise + + raise + + except (httpx.RequestError, httpx.TimeoutException) as e: + last_error = e + if attempt < max_retries - 1: + await asyncio.sleep(2**attempt) + continue + raise + + if new_token_data is None: + self._refresh_failures[path] = self._refresh_failures.get(path, 0) + 1 + raise last_error or Exception( + "Anthropic token refresh failed after all retries" + ) + + access_token = new_token_data.get("access_token") + if not access_token: + raise ValueError("Missing access_token in Anthropic refresh response") + + creds["access_token"] = access_token + creds["refresh_token"] = new_token_data.get( + "refresh_token", creds["refresh_token"] + ) + + expires_in = new_token_data.get("expires_in", 3600) + from datetime import datetime, timedelta + + creds["expiry_date"] = ( + datetime.utcnow() + timedelta(seconds=expires_in) + ).isoformat() + "Z" + + if "_proxy_metadata" not in creds: + creds["_proxy_metadata"] = {} + creds["_proxy_metadata"]["last_check_timestamp"] = time.time() + + self._refresh_failures.pop(path, None) + self._next_refresh_after.pop(path, None) + + if not await self._save_credentials(path, creds): + raise IOError( + f"Failed to persist refreshed Anthropic credentials for '{Path(path).name}'." + ) + + lib_logger.debug("Successfully refreshed Anthropic OAuth token.") + return self._credentials_cache[path] + + # ========================================================================= + # PUBLIC INTERFACE (called by ProviderInterface / executor) + # ========================================================================= + + async def get_access_token(self, credential_identifier: str) -> str: + """Get a valid access token, refreshing if needed.""" + if os.path.isfile(credential_identifier) or credential_identifier.startswith( + "env://" + ): + creds = await self._load_credentials(credential_identifier) + if self._is_token_expired(creds): + creds = await self._refresh_token(credential_identifier) + return creds["access_token"] + return credential_identifier + + async def get_auth_header(self, credential_identifier: str) -> Dict[str, str]: + """Returns Bearer auth header with the OAuth access token.""" + token = await self.get_access_token(credential_identifier) + return {"Authorization": f"Bearer {token}"} + + async def proactively_refresh(self, credential_path: str): + """Proactively refresh tokens if close to expiry.""" + try: + creds = await self._load_credentials(credential_path) + except IOError: + return + + if self._is_token_expired(creds): + try: + await self._refresh_token(credential_path) + except Exception as e: + lib_logger.warning(f"Proactive Anthropic token refresh failed: {e}") + + async def initialize_token( + self, + creds_or_path: Union[Dict[str, Any], str], + force_interactive: bool = False, + ) -> Dict[str, Any]: + """ + Initialize OAuth token — load from disk and refresh if expired. + Compatible with the proxy's startup credential processing flow. + """ + path = creds_or_path if isinstance(creds_or_path, str) else None + + if isinstance(creds_or_path, dict): + display_name = creds_or_path.get("_proxy_metadata", {}).get( + "display_name", "in-memory object" + ) + else: + display_name = Path(path).name if path else "in-memory object" + + lib_logger.debug(f"Initializing Anthropic token for '{display_name}'...") + + creds = await self._load_credentials(path or "") + if self._is_token_expired(creds): + creds = await self._refresh_token(path or "") + + lib_logger.info(f"Anthropic credential initialized: {display_name}") + return creds + + async def initialize_credentials(self, credential_paths): + """Initialize all credentials at startup.""" + for path in credential_paths: + try: + await self.initialize_token(path) + except Exception as e: + lib_logger.error( + f"Failed to initialize Anthropic credential {path}: {e}" + ) + + # ========================================================================= + # INTERACTIVE SETUP + # ========================================================================= + + async def setup_credential(self, base_dir: str) -> AnthropicCredentialSetupResult: + """ + Interactive OAuth setup: prints URL, user pastes code. + """ + from ..utils.paths import get_oauth_dir + + oauth_dir = Path(base_dir) if base_dir else get_oauth_dir() + oauth_dir.mkdir(parents=True, exist_ok=True) + + verifier, challenge = _generate_pkce() + auth_url = _build_authorize_url(verifier, challenge) + + console.print() + console.print( + Panel( + "[bold cyan]Anthropic OAuth Setup (Claude Pro/Max)[/bold cyan]\n\n" + "1. Open the URL below in your browser\n" + "2. Authorize the application\n" + "3. Copy the authorization code shown\n" + "4. Paste it here\n\n" + f"[link={auth_url}]{auth_url}[/link]", + title="[bold]Anthropic OAuth[/bold]", + border_style="cyan", + ) + ) + + try: + import webbrowser + + webbrowser.open(auth_url) + console.print("[dim]Browser opened automatically.[/dim]") + except Exception: + console.print( + "[dim]Could not open browser. Please copy the URL above.[/dim]" + ) + + auth_code = Prompt.ask("\nPaste authorization code") + if not auth_code or not auth_code.strip(): + return AnthropicCredentialSetupResult( + success=False, error="No authorization code provided" + ) + + try: + tokens = await self._exchange_code(auth_code.strip(), verifier) + except Exception as e: + return AnthropicCredentialSetupResult( + success=False, error=f"Token exchange failed: {e}" + ) + + creds = { + **tokens, + "email": "anthropic-oauth-user", + "_proxy_metadata": { + "email": "anthropic-oauth-user", + "last_check_timestamp": time.time(), + "credential_type": "oauth", + }, + } + + # Find next available file number + existing = sorted(oauth_dir.glob("anthropic_oauth_*.json")) + next_num = len(existing) + 1 + + # Check for duplicate by access token prefix + is_update = False + file_path = None + new_prefix = tokens["access_token"][:20] + for existing_file in existing: + try: + with open(existing_file) as f: + existing_creds = json.load(f) + if existing_creds.get("access_token", "")[:20] == new_prefix: + file_path = str(existing_file) + is_update = True + break + except Exception: + continue + + if not file_path: + file_path = str(oauth_dir / f"anthropic_oauth_{next_num}.json") + + if not safe_write_json(file_path, creds, lib_logger, secure_permissions=True): + return AnthropicCredentialSetupResult( + success=False, error="Failed to save credentials" + ) + + action = "Updated" if is_update else "Created" + console.print(f"\n[green]{action} credential at {Path(file_path).name}[/green]") + + return AnthropicCredentialSetupResult( + success=True, + file_path=file_path, + email="anthropic-oauth-user", + is_update=is_update, + credentials=creds, + ) diff --git a/src/rotator_library/providers/anthropic_provider.py b/src/rotator_library/providers/anthropic_provider.py new file mode 100644 index 00000000..ef549e48 --- /dev/null +++ b/src/rotator_library/providers/anthropic_provider.py @@ -0,0 +1,816 @@ +# SPDX-License-Identifier: LGPL-3.0-only +# Copyright (c) 2026 Mirrowel + +# src/rotator_library/providers/anthropic_provider.py + +import copy +import json +import os +import time +import logging +import uuid +from typing import Union, AsyncGenerator, List, Dict, Any, Optional +from pathlib import Path + +import httpx +import litellm +from litellm.exceptions import RateLimitError + +from .provider_interface import ProviderInterface +from .anthropic_auth_base import AnthropicAuthBase +from ..timeout_config import TimeoutConfig +from ..transaction_logger import ProviderLogger + +lib_logger = logging.getLogger("rotator_library") + +# ============================================================================= +# CLAUDE CODE IMPERSONATION CONSTANTS +# ============================================================================= + +CLAUDE_CODE_VERSION = "2.1.42" +TOOL_PREFIX = "mcp_" + +ANTHROPIC_API_BASE = "https://api.anthropic.com" + +ANTHROPIC_BETA_FEATURES = ",".join( + [ + "claude-code-20250219", + "oauth-2025-04-20", + "interleaved-thinking-2025-05-14", + "fine-grained-tool-streaming-2025-05-14", + ] +) + +CLAUDE_CODE_SYSTEM_PREFIX = "You are Claude Code, Anthropic's official CLI for Claude." + +# Fallback model list — only used if live fetch fails +FALLBACK_MODELS = [ + "claude-opus-4-6", + "claude-opus-4-5-20251101", + "claude-sonnet-4-5-20250929", + "claude-haiku-4-5-20251001", + "claude-opus-4-20250514", + "claude-sonnet-4-20250514", +] + +# Stop reason mapping: Anthropic -> OpenAI +STOP_REASON_MAP = { + "end_turn": "stop", + "max_tokens": "length", + "tool_use": "tool_calls", + "stop_sequence": "stop", + "pause_turn": "stop", +} + + +class AnthropicProvider(AnthropicAuthBase, ProviderInterface): + """ + Anthropic provider using OAuth authentication (Claude Pro/Max). + Calls Anthropic's Messages API directly, impersonating Claude Code. + """ + + skip_cost_calculation = True + + def __init__(self): + super().__init__() + + def has_custom_logic(self) -> bool: + return True + + async def get_models(self, api_key: str, client: httpx.AsyncClient) -> List[str]: + """Fetch models live from Anthropic API, falling back to hardcoded list.""" + try: + access_token = await self.get_access_token(api_key) + headers = self._build_anthropic_headers(access_token) + resp = await client.get( + f"{ANTHROPIC_API_BASE}/v1/models", + headers=headers, + timeout=10.0, + ) + if resp.status_code == 200: + data = resp.json() + models = [ + f"anthropic/{m['id']}" + for m in data.get("data", []) + if m.get("id") + ] + if models: + return models + except Exception as e: + lib_logger.debug(f"Failed to fetch Anthropic models live: {e}") + + return [f"anthropic/{m}" for m in FALLBACK_MODELS] + + # ========================================================================= + # OPENAI -> ANTHROPIC MESSAGE CONVERSION + # ========================================================================= + + def _openai_messages_to_anthropic(self, messages: List[Dict[str, Any]]) -> tuple: + """ + Convert OpenAI-format messages to Anthropic Messages API format. + Returns (system_blocks, anthropic_messages). + """ + system_blocks = [] + anthropic_messages = [] + + for msg in messages: + role = msg.get("role", "user") + content = msg.get("content", "") + + if role == "system": + if isinstance(content, str): + system_blocks.append({"type": "text", "text": content}) + elif isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + system_blocks.append( + {"type": "text", "text": block.get("text", "")} + ) + continue + + if role == "tool": + tool_result = { + "type": "tool_result", + "tool_use_id": msg.get("tool_call_id", ""), + "content": content if isinstance(content, str) else str(content), + } + if anthropic_messages and anthropic_messages[-1]["role"] == "user": + if isinstance(anthropic_messages[-1]["content"], list): + anthropic_messages[-1]["content"].append(tool_result) + else: + anthropic_messages[-1]["content"] = [ + { + "type": "text", + "text": anthropic_messages[-1]["content"], + }, + tool_result, + ] + else: + anthropic_messages.append( + {"role": "user", "content": [tool_result]} + ) + continue + + if role == "assistant": + blocks = [] + + reasoning = msg.get("reasoning_content") + thinking_sig = msg.get("thinking_signature") + if reasoning: + thinking_block = { + "type": "thinking", + "thinking": reasoning, + } + if thinking_sig and len(thinking_sig) >= 100: + thinking_block["signature"] = thinking_sig + blocks.append(thinking_block) + + if isinstance(content, str) and content.strip(): + blocks.append({"type": "text", "text": content}) + elif isinstance(content, list): + for block in content: + if isinstance(block, dict): + if ( + block.get("type") == "text" + and block.get("text", "").strip() + ): + blocks.append({"type": "text", "text": block["text"]}) + elif block.get("type") == "image_url": + url = block.get("image_url", {}).get("url", "") + if url.startswith("data:"): + parts = url.split(",", 1) + media_type = ( + parts[0] + .replace("data:", "") + .replace(";base64", "") + ) + blocks.append( + { + "type": "image", + "source": { + "type": "base64", + "media_type": media_type, + "data": parts[1] + if len(parts) > 1 + else "", + }, + } + ) + + tool_calls = msg.get("tool_calls") or [] + for tc in tool_calls: + func = tc.get("function", {}) + try: + input_data = json.loads(func.get("arguments", "{}")) + except json.JSONDecodeError: + input_data = {} + blocks.append( + { + "type": "tool_use", + "id": tc.get("id", f"toolu_{uuid.uuid4().hex[:12]}"), + "name": func.get("name", ""), + "input": input_data, + } + ) + + if blocks: + anthropic_messages.append({"role": "assistant", "content": blocks}) + continue + + # User messages + if isinstance(content, str): + if content.strip(): + anthropic_messages.append({"role": "user", "content": content}) + elif isinstance(content, list): + blocks = [] + for block in content: + if isinstance(block, dict): + if block.get("type") == "text": + blocks.append( + {"type": "text", "text": block.get("text", "")} + ) + elif block.get("type") == "image_url": + url = block.get("image_url", {}).get("url", "") + if url.startswith("data:"): + parts = url.split(",", 1) + media_type = ( + parts[0].replace("data:", "").replace(";base64", "") + ) + blocks.append( + { + "type": "image", + "source": { + "type": "base64", + "media_type": media_type, + "data": parts[1] if len(parts) > 1 else "", + }, + } + ) + if blocks: + anthropic_messages.append({"role": "user", "content": blocks}) + + return system_blocks, anthropic_messages + + def _openai_tools_to_anthropic( + self, tools: Optional[List[Dict[str, Any]]] + ) -> Optional[List[Dict[str, Any]]]: + if not tools: + return None + result = [] + for tool in tools: + func = tool.get("function", {}) + result.append( + { + "name": func.get("name", ""), + "description": func.get("description", ""), + "input_schema": func.get("parameters", {"type": "object"}), + } + ) + return result + + # ========================================================================= + # MCP_ TOOL NAME PREFIXING / STRIPPING + # ========================================================================= + + def _prefix_tool_names(self, payload: Dict[str, Any]) -> Dict[str, Any]: + """Add mcp_ prefix to tool names in definitions and messages.""" + payload = copy.deepcopy(payload) + + if payload.get("tools"): + for tool in payload["tools"]: + if tool.get("name"): + tool["name"] = f"{TOOL_PREFIX}{tool['name']}" + + if payload.get("messages"): + for msg in payload["messages"]: + content = msg.get("content") + if isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "tool_use": + if block.get("name"): + block["name"] = f"{TOOL_PREFIX}{block['name']}" + + return payload + + def _strip_tool_prefix(self, name: str) -> str: + """Remove mcp_ prefix from a tool name.""" + if name and name.startswith(TOOL_PREFIX): + return name[len(TOOL_PREFIX) :] + return name + + # ========================================================================= + # SYSTEM PROMPT HANDLING + # ========================================================================= + + def _inject_system_prompt( + self, system_blocks: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Prepend Claude Code identity to system prompt.""" + result = [{"type": "text", "text": CLAUDE_CODE_SYSTEM_PREFIX}] + if system_blocks: + for block in system_blocks: + result.append(block) + if len(result) >= 2: + result[1]["text"] = ( + CLAUDE_CODE_SYSTEM_PREFIX + "\n\n" + result[1]["text"] + ) + result.pop(0) + return result + + # ========================================================================= + # ANTHROPIC SSE -> OPENAI CHUNK CONVERSION + # ========================================================================= + + def _anthropic_event_to_openai_chunks( + self, + event_type: str, + data: Dict[str, Any], + model_id: str, + stream_state: Dict[str, Any], + ): + """ + Convert a single Anthropic SSE event to OpenAI-format chunk(s). + Yields litellm.ModelResponse-compatible dicts. + """ + if event_type == "message_start": + message = data.get("message", {}) + usage = message.get("usage", {}) + stream_state["input_tokens"] = usage.get("input_tokens", 0) + stream_state["message_id"] = message.get( + "id", f"chatcmpl-{uuid.uuid4().hex[:8]}" + ) + return + + if event_type == "content_block_start": + block = data.get("content_block", {}) + block_type = block.get("type") + index = data.get("index", 0) + stream_state["current_block_type"] = block_type + stream_state["current_block_index"] = index + + if block_type == "tool_use": + tool_id = block.get("id", f"toolu_{uuid.uuid4().hex[:12]}") + raw_name = block.get("name", "") + name = self._strip_tool_prefix(raw_name) + stream_state.setdefault("tool_calls", {}) + stream_state["tool_calls"][index] = { + "id": tool_id, + "name": name, + "arguments": "", + "tc_index": len(stream_state["tool_calls"]), + } + stream_state["has_tool_calls"] = True + yield { + "choices": [ + { + "index": 0, + "delta": { + "tool_calls": [ + { + "index": stream_state["tool_calls"][index][ + "tc_index" + ], + "id": tool_id, + "type": "function", + "function": { + "name": name, + "arguments": "", + }, + } + ] + }, + "finish_reason": None, + } + ], + "model": model_id, + "object": "chat.completion.chunk", + "id": stream_state.get( + "message_id", f"chatcmpl-{uuid.uuid4().hex[:8]}" + ), + "created": int(time.time()), + } + return + + if event_type == "content_block_delta": + delta = data.get("delta", {}) + delta_type = delta.get("type") + + if delta_type == "text_delta": + text = delta.get("text", "") + if text: + stream_state["accumulated_text"] = ( + stream_state.get("accumulated_text", "") + text + ) + yield { + "choices": [ + { + "index": 0, + "delta": {"content": text}, + "finish_reason": None, + } + ], + "model": model_id, + "object": "chat.completion.chunk", + "id": stream_state.get( + "message_id", f"chatcmpl-{uuid.uuid4().hex[:8]}" + ), + "created": int(time.time()), + } + + elif delta_type == "thinking_delta": + thinking = delta.get("thinking", "") + if thinking: + stream_state["accumulated_thinking"] = ( + stream_state.get("accumulated_thinking", "") + thinking + ) + yield { + "choices": [ + { + "index": 0, + "delta": {"reasoning_content": thinking}, + "finish_reason": None, + } + ], + "model": model_id, + "object": "chat.completion.chunk", + "id": stream_state.get( + "message_id", f"chatcmpl-{uuid.uuid4().hex[:8]}" + ), + "created": int(time.time()), + } + + elif delta_type == "input_json_delta": + partial = delta.get("partial_json", "") + block_index = data.get("index", 0) + tc_info = stream_state.get("tool_calls", {}).get(block_index) + if tc_info and partial: + tc_info["arguments"] += partial + yield { + "choices": [ + { + "index": 0, + "delta": { + "tool_calls": [ + { + "index": tc_info["tc_index"], + "function": {"arguments": partial}, + } + ] + }, + "finish_reason": None, + } + ], + "model": model_id, + "object": "chat.completion.chunk", + "id": stream_state.get( + "message_id", f"chatcmpl-{uuid.uuid4().hex[:8]}" + ), + "created": int(time.time()), + } + + elif delta_type == "signature_delta": + sig = delta.get("signature", "") + stream_state["thinking_signature"] = ( + stream_state.get("thinking_signature", "") + sig + ) + + return + + if event_type == "content_block_stop": + return + + if event_type == "message_delta": + delta = data.get("delta", {}) + usage = data.get("usage", {}) + stop_reason = delta.get("stop_reason", "end_turn") + finish_reason = STOP_REASON_MAP.get(stop_reason, "stop") + stream_state["finish_reason"] = finish_reason + + output_tokens = usage.get("output_tokens", 0) + input_tokens = stream_state.get("input_tokens", 0) + + yield { + "choices": [ + { + "index": 0, + "delta": {}, + "finish_reason": finish_reason, + } + ], + "model": model_id, + "object": "chat.completion.chunk", + "id": stream_state.get( + "message_id", f"chatcmpl-{uuid.uuid4().hex[:8]}" + ), + "created": int(time.time()), + "usage": { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + }, + } + return + + # ========================================================================= + # MAIN API CALL + # ========================================================================= + + def _build_anthropic_headers(self, access_token: str) -> Dict[str, str]: + return { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + "Accept": "text/event-stream", + "anthropic-version": "2023-06-01", + "anthropic-beta": ANTHROPIC_BETA_FEATURES, + "user-agent": f"claude-cli/{CLAUDE_CODE_VERSION} (external, cli)", + "x-app": "cli", + "anthropic-dangerous-direct-browser-access": "true", + } + + def _build_anthropic_payload(self, kwargs: Dict[str, Any]) -> Dict[str, Any]: + """Build the Anthropic Messages API payload from OpenAI-format kwargs.""" + messages = kwargs.get("messages", []) + model = kwargs.get("model", "") + + if "/" in model: + model = model.split("/", 1)[1] + + system_blocks, anthropic_messages = self._openai_messages_to_anthropic(messages) + system_blocks = self._inject_system_prompt(system_blocks) + + tools = self._openai_tools_to_anthropic(kwargs.get("tools")) + + payload = { + "model": model, + "messages": anthropic_messages, + "max_tokens": kwargs.get("max_tokens", 16384), + "stream": True, + } + + if system_blocks: + payload["system"] = system_blocks + + if tools: + payload["tools"] = tools + + tool_choice = kwargs.get("tool_choice") + if tool_choice: + if tool_choice == "auto": + payload["tool_choice"] = {"type": "auto"} + elif tool_choice == "required": + payload["tool_choice"] = {"type": "any"} + elif tool_choice == "none": + payload["tool_choice"] = {"type": "none"} + elif isinstance(tool_choice, dict): + func_name = tool_choice.get("function", {}).get("name", "") + if func_name: + payload["tool_choice"] = {"type": "tool", "name": func_name} + + if kwargs.get("temperature") is not None: + payload["temperature"] = kwargs["temperature"] + + reasoning_effort = kwargs.get("reasoning_effort") + if reasoning_effort and str(reasoning_effort).lower() not in ( + "none", + "disabled", + "off", + "false", + "disable", + ): + payload["thinking"] = { + "type": "enabled", + "budget_tokens": self._reasoning_effort_to_budget( + reasoning_effort, kwargs.get("max_tokens", 16384) + ), + } + + payload = self._prefix_tool_names(payload) + return payload + + def _reasoning_effort_to_budget(self, effort: Any, max_tokens: int) -> int: + effort_str = str(effort).lower().strip() + budget_map = { + "low": 4096, + "medium": 8192, + "high": 16384, + } + budget = budget_map.get(effort_str) + if budget is not None: + return min(budget, max(max_tokens - 1000, 4096)) + try: + return int(effort_str) + except (ValueError, TypeError): + return min(8192, max(max_tokens - 1000, 4096)) + + async def acompletion( + self, client: httpx.AsyncClient, **kwargs + ) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]: + credential_path = kwargs.pop("credential_identifier") + transaction_context = kwargs.pop("transaction_context", None) + model = kwargs.get("model", "") + file_logger = ProviderLogger(transaction_context) + + async def make_request(): + access_token = await self.get_access_token(credential_path) + headers = self._build_anthropic_headers(access_token) + payload = self._build_anthropic_payload(kwargs) + + file_logger.log_request(payload) + + url = f"{ANTHROPIC_API_BASE}/v1/messages?beta=true" + return client.stream( + "POST", + url, + headers=headers, + json=payload, + timeout=TimeoutConfig.streaming(), + ) + + async def stream_handler(response_stream, attempt=1): + stream_state: Dict[str, Any] = {} + try: + async with response_stream as response: + if response.status_code >= 400: + error_text = await response.aread() + error_text = ( + error_text.decode("utf-8") + if isinstance(error_text, bytes) + else error_text + ) + + if response.status_code == 401 and attempt == 1: + lib_logger.warning( + "Anthropic returned 401. Forcing token refresh and retrying." + ) + await self._refresh_token(credential_path, force=True) + retry_stream = await make_request() + async for chunk in stream_handler(retry_stream, attempt=2): + yield chunk + return + + if response.status_code == 429: + raise RateLimitError( + f"Anthropic rate limit: {error_text}", + llm_provider="anthropic", + model=model, + response=response, + ) + + error_msg = ( + f"Anthropic HTTP {response.status_code}: {error_text}" + ) + file_logger.log_error(error_msg) + raise httpx.HTTPStatusError( + error_msg, + request=response.request, + response=response, + ) + + current_event = None + async for line in response.aiter_lines(): + file_logger.log_response_chunk(line) + + if line.startswith("event:"): + current_event = line[6:].strip() + continue + + if line.startswith("data:"): + data_str = ( + line[5:].strip() + if line.startswith("data: ") + else line[5:] + ) + if not data_str or data_str == "[DONE]": + continue + try: + data = json.loads(data_str) + except json.JSONDecodeError: + continue + + if current_event: + for chunk in self._anthropic_event_to_openai_chunks( + current_event, data, model, stream_state + ): + yield litellm.ModelResponse(**chunk) + + except httpx.HTTPStatusError: + raise + except Exception as e: + file_logger.log_error(f"Error during Anthropic stream: {e}") + lib_logger.error(f"Anthropic stream error: {e}", exc_info=True) + raise + + async def logging_stream_wrapper(): + chunks = [] + try: + async for chunk in stream_handler(await make_request()): + chunks.append(chunk) + yield chunk + finally: + if chunks: + final = self._stream_to_completion_response(chunks) + file_logger.log_final_response(final.dict()) + + if kwargs.get("stream"): + return logging_stream_wrapper() + else: + + async def non_stream(): + all_chunks = [c async for c in logging_stream_wrapper()] + return self._stream_to_completion_response(all_chunks) + + return await non_stream() + + def _stream_to_completion_response( + self, chunks: List[litellm.ModelResponse] + ) -> litellm.ModelResponse: + if not chunks: + raise ValueError("No chunks to reassemble") + + final_message = {"role": "assistant"} + aggregated_tool_calls = {} + usage_data = None + finish_reason = "stop" + first_chunk = chunks[0] + + for chunk in chunks: + if not hasattr(chunk, "choices") or not chunk.choices: + continue + + choice = chunk.choices[0] + if hasattr(choice, "get"): + delta = choice.get("delta", {}) + choice_finish = choice.get("finish_reason") + elif hasattr(choice, "delta"): + delta = choice.delta if choice.delta else {} + if hasattr(delta, "model_dump"): + delta = delta.model_dump(exclude_none=True) + elif hasattr(delta, "__dict__") and not isinstance(delta, dict): + delta = { + k: v + for k, v in delta.__dict__.items() + if not k.startswith("_") and v is not None + } + choice_finish = getattr(choice, "finish_reason", None) + else: + delta = {} + choice_finish = None + + if delta.get("content"): + final_message.setdefault("content", "") + final_message["content"] += delta["content"] + + if delta.get("reasoning_content"): + final_message.setdefault("reasoning_content", "") + final_message["reasoning_content"] += delta["reasoning_content"] + + tool_calls = delta.get("tool_calls") or [] + for tc in tool_calls: + idx = tc.get("index", 0) + if idx not in aggregated_tool_calls: + aggregated_tool_calls[idx] = { + "type": "function", + "function": {"name": "", "arguments": ""}, + } + if tc.get("id"): + aggregated_tool_calls[idx]["id"] = tc["id"] + func = tc.get("function", {}) + if func.get("name"): + aggregated_tool_calls[idx]["function"]["name"] += func["name"] + if func.get("arguments"): + aggregated_tool_calls[idx]["function"]["arguments"] += func[ + "arguments" + ] + + if choice_finish: + finish_reason = choice_finish + + for chunk in reversed(chunks): + if hasattr(chunk, "usage") and chunk.usage: + usage_data = chunk.usage + break + + if aggregated_tool_calls: + final_message["tool_calls"] = list(aggregated_tool_calls.values()) + finish_reason = "tool_calls" + + for f in ["content", "tool_calls", "function_call"]: + if f not in final_message: + final_message[f] = None + + return litellm.ModelResponse( + **{ + "id": first_chunk.id, + "object": "chat.completion", + "created": first_chunk.created, + "model": first_chunk.model, + "choices": [ + { + "index": 0, + "message": final_message, + "finish_reason": finish_reason, + } + ], + "usage": usage_data, + } + ) From 64ffb0725f069980b5c236585ef5b9fca812f725 Mon Sep 17 00:00:00 2001 From: Jasmin Le Roux Date: Sun, 15 Feb 2026 11:54:10 +0200 Subject: [PATCH 2/5] fix(anthropic): URL-encode OAuth authorize parameters The scope parameter contains spaces that need to be percent-encoded for a valid URL. Co-Authored-By: Claude Opus 4.6 --- src/rotator_library/providers/anthropic_auth_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rotator_library/providers/anthropic_auth_base.py b/src/rotator_library/providers/anthropic_auth_base.py index 0ce88d56..3781ce1e 100644 --- a/src/rotator_library/providers/anthropic_auth_base.py +++ b/src/rotator_library/providers/anthropic_auth_base.py @@ -67,8 +67,8 @@ def _build_authorize_url(verifier: str, challenge: str) -> str: "code_challenge_method": "S256", "state": verifier, } - qs = "&".join(f"{k}={v}" for k, v in params.items()) - return f"https://claude.ai/oauth/authorize?{qs}" + from urllib.parse import urlencode + return f"https://claude.ai/oauth/authorize?{urlencode(params)}" class AnthropicAuthBase: From 7aa20d4abb293fb796f3f638d81e58dee967a4f0 Mon Sep 17 00:00:00 2001 From: Jasmin Le Roux Date: Sun, 15 Feb 2026 12:19:09 +0200 Subject: [PATCH 3/5] fix(anthropic): address review feedback - Add state file existence check in setup script before Step 2 - Replace deprecated datetime.utcnow() with datetime.now(timezone.utc) - Use json.dumps() instead of str() for tool result content serialization Co-Authored-By: Claude Opus 4.6 --- scripts/setup_anthropic_cred.py | 3 +++ src/rotator_library/providers/anthropic_auth_base.py | 8 ++++---- src/rotator_library/providers/anthropic_provider.py | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/scripts/setup_anthropic_cred.py b/scripts/setup_anthropic_cred.py index de6ad465..17e14591 100644 --- a/scripts/setup_anthropic_cred.py +++ b/scripts/setup_anthropic_cred.py @@ -23,6 +23,9 @@ OAUTH_DIR = Path(__file__).parent / ".." / "oauth_creds" async def exchange_code(auth_code: str): + if not STATE_FILE.exists(): + print("Error: PKCE state file not found. Please run Step 1 first.") + sys.exit(1) state = json.loads(STATE_FILE.read_text()) verifier = state["verifier"] diff --git a/src/rotator_library/providers/anthropic_auth_base.py b/src/rotator_library/providers/anthropic_auth_base.py index 3781ce1e..3da56ead 100644 --- a/src/rotator_library/providers/anthropic_auth_base.py +++ b/src/rotator_library/providers/anthropic_auth_base.py @@ -274,10 +274,10 @@ async def _exchange_code(self, auth_code: str, verifier: str) -> Dict[str, Any]: refresh_token = data.get("refresh_token", "") expires_in = data.get("expires_in", 3600) - from datetime import datetime, timedelta + from datetime import datetime, timedelta, timezone expiry_date = ( - datetime.utcnow() + timedelta(seconds=expires_in) + datetime.now(timezone.utc) + timedelta(seconds=expires_in) ).isoformat() + "Z" return { @@ -378,10 +378,10 @@ async def _refresh_token(self, path: str, force: bool = False) -> Dict[str, Any] ) expires_in = new_token_data.get("expires_in", 3600) - from datetime import datetime, timedelta + from datetime import datetime, timedelta, timezone creds["expiry_date"] = ( - datetime.utcnow() + timedelta(seconds=expires_in) + datetime.now(timezone.utc) + timedelta(seconds=expires_in) ).isoformat() + "Z" if "_proxy_metadata" not in creds: diff --git a/src/rotator_library/providers/anthropic_provider.py b/src/rotator_library/providers/anthropic_provider.py index ef549e48..f25cdec7 100644 --- a/src/rotator_library/providers/anthropic_provider.py +++ b/src/rotator_library/providers/anthropic_provider.py @@ -132,7 +132,7 @@ def _openai_messages_to_anthropic(self, messages: List[Dict[str, Any]]) -> tuple tool_result = { "type": "tool_result", "tool_use_id": msg.get("tool_call_id", ""), - "content": content if isinstance(content, str) else str(content), + "content": content if isinstance(content, str) else json.dumps(content), } if anthropic_messages and anthropic_messages[-1]["role"] == "user": if isinstance(anthropic_messages[-1]["content"], list): From a35b586231359d3474b68e96f5178c8fdad9c1fd Mon Sep 17 00:00:00 2001 From: Jasmin Le Roux Date: Sun, 15 Feb 2026 12:48:23 +0200 Subject: [PATCH 4/5] fix(anthropic): cache thinking signatures server-side for multi-turn Anthropic now requires a `signature` field on all thinking blocks. OpenAI-format clients (JS SDK etc.) can't preserve signatures across turns, causing 400 errors on multi-turn conversations with thinking. Cache thinking block signatures server-side (keyed by SHA-256 of the thinking content) and re-attach them when converting messages back to Anthropic format. Falls back to dropping thinking blocks gracefully when no cached signature is available. Co-Authored-By: Claude Opus 4.6 --- .../providers/anthropic_provider.py | 91 +++++++++++++++++-- 1 file changed, 83 insertions(+), 8 deletions(-) diff --git a/src/rotator_library/providers/anthropic_provider.py b/src/rotator_library/providers/anthropic_provider.py index f25cdec7..79978805 100644 --- a/src/rotator_library/providers/anthropic_provider.py +++ b/src/rotator_library/providers/anthropic_provider.py @@ -4,6 +4,7 @@ # src/rotator_library/providers/anthropic_provider.py import copy +import hashlib import json import os import time @@ -18,6 +19,7 @@ from .provider_interface import ProviderInterface from .anthropic_auth_base import AnthropicAuthBase +from .provider_cache import create_provider_cache from ..timeout_config import TimeoutConfig from ..transaction_logger import ProviderLogger @@ -62,6 +64,22 @@ "pause_turn": "stop", } +# Lazy-initialised server-side cache for thinking block signatures. +# Allows us to re-attach signatures when OpenAI-format clients send back +# reasoning_content without the signature (which they can't preserve). +_thinking_sig_cache = None + + +def _get_thinking_cache(): + global _thinking_sig_cache + if _thinking_sig_cache is None: + _thinking_sig_cache = create_provider_cache( + "anthropic_thinking_signatures", + memory_ttl_seconds=7200, # 2 hours in memory + disk_ttl_seconds=172800, # 48 hours on disk + ) + return _thinking_sig_cache + class AnthropicProvider(AnthropicAuthBase, ProviderInterface): """ @@ -155,15 +173,23 @@ def _openai_messages_to_anthropic(self, messages: List[Dict[str, Any]]) -> tuple blocks = [] reasoning = msg.get("reasoning_content") - thinking_sig = msg.get("thinking_signature") if reasoning: - thinking_block = { - "type": "thinking", - "thinking": reasoning, - } - if thinking_sig and len(thinking_sig) >= 100: - thinking_block["signature"] = thinking_sig - blocks.append(thinking_block) + # Try server-side cache first (signature preserved from + # the original Anthropic response) + cached = self._retrieve_thinking_blocks(reasoning) + if cached: + blocks.extend(cached) + else: + # Fallback: inline signature from client (custom clients) + thinking_sig = msg.get("thinking_signature") + if thinking_sig and len(thinking_sig) >= 100: + blocks.append({ + "type": "thinking", + "thinking": reasoning, + "signature": thinking_sig, + }) + # else: no signature → drop thinking block, + # model generates fresh thinking (cache miss on prefix) if isinstance(content, str) and content.strip(): blocks.append({"type": "text", "text": content}) @@ -251,6 +277,29 @@ def _openai_messages_to_anthropic(self, messages: List[Dict[str, Any]]) -> tuple return system_blocks, anthropic_messages + def _retrieve_thinking_blocks( + self, reasoning_content: str + ) -> Optional[List[Dict[str, Any]]]: + """Look up cached thinking blocks with signatures for given thinking content.""" + cache_key = hashlib.sha256(reasoning_content.encode()).hexdigest() + cached = _get_thinking_cache().retrieve(cache_key) + if not cached: + return None + try: + blocks_data = json.loads(cached) + result = [ + { + "type": "thinking", + "thinking": b["thinking"], + "signature": b["signature"], + } + for b in blocks_data + if b.get("signature") + ] + return result if result else None + except (json.JSONDecodeError, KeyError, TypeError): + return None + def _openai_tools_to_anthropic( self, tools: Optional[List[Dict[str, Any]]] ) -> Optional[List[Dict[str, Any]]]: @@ -348,6 +397,10 @@ def _anthropic_event_to_openai_chunks( stream_state["current_block_type"] = block_type stream_state["current_block_index"] = index + if block_type == "thinking": + stream_state["_block_thinking"] = "" + stream_state["_block_signature"] = "" + if block_type == "tool_use": tool_id = block.get("id", f"toolu_{uuid.uuid4().hex[:12]}") raw_name = block.get("name", "") @@ -423,6 +476,9 @@ def _anthropic_event_to_openai_chunks( stream_state["accumulated_thinking"] = ( stream_state.get("accumulated_thinking", "") + thinking ) + stream_state["_block_thinking"] = ( + stream_state.get("_block_thinking", "") + thinking + ) yield { "choices": [ { @@ -473,10 +529,21 @@ def _anthropic_event_to_openai_chunks( stream_state["thinking_signature"] = ( stream_state.get("thinking_signature", "") + sig ) + stream_state["_block_signature"] = ( + stream_state.get("_block_signature", "") + sig + ) return if event_type == "content_block_stop": + if stream_state.get("current_block_type") == "thinking": + block_thinking = stream_state.pop("_block_thinking", "") + block_sig = stream_state.pop("_block_signature", "") + if block_thinking and block_sig: + stream_state.setdefault("_thinking_blocks", []).append({ + "thinking": block_thinking, + "signature": block_sig, + }) return if event_type == "message_delta": @@ -509,6 +576,14 @@ def _anthropic_event_to_openai_chunks( "total_tokens": input_tokens + output_tokens, }, } + + # Cache thinking blocks with signatures for multi-turn preservation + thinking_blocks = stream_state.get("_thinking_blocks") + if thinking_blocks: + full_thinking = "".join(b["thinking"] for b in thinking_blocks) + cache_key = hashlib.sha256(full_thinking.encode()).hexdigest() + _get_thinking_cache().store(cache_key, json.dumps(thinking_blocks)) + return # ========================================================================= From 93f16ccb7e058c53a448ded21f2778377e5d7be1 Mon Sep 17 00:00:00 2001 From: Jasmin Le Roux Date: Sun, 15 Feb 2026 19:19:12 +0200 Subject: [PATCH 5/5] feat(anthropic): inject prompt cache breakpoints for cost savings Add cache_control ephemeral markers on system prompt, tools, and conversation history so Anthropic caches the prefix across turns (~90% savings on cached input tokens). Co-Authored-By: Claude Opus 4.6 --- .../providers/anthropic_provider.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/src/rotator_library/providers/anthropic_provider.py b/src/rotator_library/providers/anthropic_provider.py index 79978805..25769318 100644 --- a/src/rotator_library/providers/anthropic_provider.py +++ b/src/rotator_library/providers/anthropic_provider.py @@ -366,6 +366,50 @@ def _inject_system_prompt( result.pop(0) return result + # ========================================================================= + # PROMPT CACHING + # ========================================================================= + + def _inject_cache_control(self, payload: Dict[str, Any]) -> Dict[str, Any]: + """ + Inject cache_control breakpoints into the Anthropic payload to enable + prompt caching. Marks the last system block, last tool, and the last + content block of the final message for caching. + + Anthropic caches the full prefix up to each breakpoint. This saves + ~90% on cached input token costs and reduces latency. + """ + cache_marker = {"type": "ephemeral"} + + # 1. Cache the last system block (system prompt rarely changes) + system = payload.get("system") + if system and isinstance(system, list) and len(system) > 0: + system[-1]["cache_control"] = cache_marker + + # 2. Cache the last tool definition (tools rarely change) + tools = payload.get("tools") + if tools and isinstance(tools, list) and len(tools) > 0: + tools[-1]["cache_control"] = cache_marker + + # 3. Cache the end of conversation history for multi-turn caching. + # Mark the last content block of the second-to-last message + # (the turn before the new user message) so the growing + # conversation prefix gets cached across turns. + messages = payload.get("messages") + if messages and len(messages) >= 2: + # The last message is the new user turn; cache up to the one before it + prev_msg = messages[-2] + content = prev_msg.get("content") + if isinstance(content, list) and len(content) > 0: + content[-1]["cache_control"] = cache_marker + elif isinstance(content, str) and content: + # Convert string content to block format so we can attach cache_control + prev_msg["content"] = [ + {"type": "text", "text": content, "cache_control": cache_marker} + ] + + return payload + # ========================================================================= # ANTHROPIC SSE -> OPENAI CHUNK CONVERSION # ========================================================================= @@ -660,6 +704,7 @@ def _build_anthropic_payload(self, kwargs: Dict[str, Any]) -> Dict[str, Any]: } payload = self._prefix_tool_names(payload) + payload = self._inject_cache_control(payload) return payload def _reasoning_effort_to_budget(self, effort: Any, max_tokens: int) -> int: