diff --git a/monitoring_lab/docker-compose.yml b/monitoring_lab/docker-compose.yml new file mode 100644 index 0000000..22e799b --- /dev/null +++ b/monitoring_lab/docker-compose.yml @@ -0,0 +1,20 @@ +services: + prometheus: + image: prom/prometheus:latest + container_name: prometheus + ports: + - "9090:9090" + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + extra_hosts: + - "host.docker.internal:host-gateway" + + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + depends_on: + - prometheus diff --git a/monitoring_lab/prometheus.yml b/monitoring_lab/prometheus.yml new file mode 100644 index 0000000..c1a4626 --- /dev/null +++ b/monitoring_lab/prometheus.yml @@ -0,0 +1,8 @@ +lobal: + scrape_interval: 5s # How often to fetch metrics + +scrape_configs: + - job_name: 'MLPA_local' + metrics_path: '/metrics' + static_configs: + - targets: ['host.docker.internal:8080'] diff --git a/src/mlpa/core/auth/authorize.py b/src/mlpa/core/auth/authorize.py index ff13306..821231d 100644 --- a/src/mlpa/core/auth/authorize.py +++ b/src/mlpa/core/auth/authorize.py @@ -4,6 +4,8 @@ from mlpa.core.classes import AuthorizedChatRequest, ChatRequest, ServiceType from mlpa.core.config import env +from mlpa.core.logger import logger +from mlpa.core.prometheus_metrics import metrics from mlpa.core.routers.appattest import app_attest_auth from mlpa.core.routers.fxa import fxa_auth from mlpa.core.utils import parse_app_attest_jwt @@ -16,13 +18,19 @@ async def authorize_request( use_app_attest: Annotated[bool | None, Header()] = None, use_qa_certificates: Annotated[bool | None, Header()] = None, ) -> AuthorizedChatRequest: + metrics.auth_request_count_total.inc() if not authorization: + metrics.auth_error_count_total.labels(error=f"MalformedRequest") raise HTTPException(status_code=401, detail="Missing authorization header") if use_app_attest: assertionAuth = parse_app_attest_jwt(authorization, "assert") data = await app_attest_auth(assertionAuth, chat_request, use_qa_certificates) if data: if data.get("error"): + metrics.auth_error_count_total.labels(error=f"AppattestUnauthorized") + logger.error( + "Unauthorized", extra={"error": data["error"], "type": "appattest"} + ) raise HTTPException(status_code=401, detail=data["error"]) return AuthorizedChatRequest( user=f"{assertionAuth.key_id_b64}:{service_type.value}", # "user" is key_id_b64 from app attest @@ -33,11 +41,17 @@ async def authorize_request( fxa_user_id = fxa_auth(authorization) if fxa_user_id: if fxa_user_id.get("error"): + metrics.auth_error_count_total.labels(error=f"FxAUnauthorized") + logger.error( + "Unauthorized", extra={"error": fxa_user_id["error"], "type": "FxA"} + ) raise HTTPException(status_code=401, detail=fxa_user_id["error"]) return AuthorizedChatRequest( user=f"{fxa_user_id['user']}:{service_type.value}", **chat_request.model_dump(exclude_unset=True), ) + metrics.auth_error_count_total.labels(error=f"Cancelled") + logger.error("Auth Cancelled", extra={"error": "Cancelled", "type": "None"}) raise HTTPException( status_code=401, detail="Please authenticate with App Attest or FxA." ) diff --git a/src/mlpa/core/completions.py b/src/mlpa/core/completions.py index b3987e5..e74f3c7 100644 --- a/src/mlpa/core/completions.py +++ b/src/mlpa/core/completions.py @@ -36,7 +36,7 @@ def get_default_tokenizer() -> tiktoken.Encoding: return _global_default_tokenizer -def _parse_rate_limit_error(error_text: str, user: str) -> int | None: +def _parse_rate_limit_error(error_text: str, user: str, model_name: str) -> int | None: """ Parse error response to detect budget or rate limit errors. Returns the error code if a rate limit error is detected, None otherwise. @@ -47,26 +47,55 @@ def _parse_rate_limit_error(error_text: str, user: str) -> int | None: try: error_data = json.loads(error_text) if is_rate_limit_error(error_data, ["budget"]): - logger.warning(f"Budget limit exceeded for user {user}: {error_text}") + metrics.ai_error_count_total.labels( + model_name=model_name, error="BudgetExceeded" + ).inc() + logger.warning( + "Budget limit exceeded", + extra={ + "user_id": user, + "model_name": model_name, + "error": "BudgetExceeded", + }, + ) return ERROR_CODE_BUDGET_LIMIT_EXCEEDED elif is_rate_limit_error(error_data, ["rate"]): - logger.warning(f"Rate limit exceeded for user {user}: {error_text}") + metrics.ai_error_count_total.labels( + model_name=model_name, error="RateLimitExceeded" + ).inc() + logger.warning( + "Rate limit exceeded", + extra={ + "user_id": user, + "model_name": model_name, + "error": "RateLimitExceeded", + }, + ) return ERROR_CODE_RATE_LIMIT_EXCEEDED except (json.JSONDecodeError, AttributeError, UnicodeDecodeError): - pass + metrics.ai_error_count_total.labels( + model_name=model_name, error="UserThrottled" + ).inc() + logger.warning( + "User throttled", + extra={"user_id": user, "model_name": model_name, "error": "UserThrottled"}, + ) return None -def _handle_rate_limit_error(error_text: str, user: str) -> None: - error_code = _parse_rate_limit_error(error_text, user) +def _handle_rate_limit_error( + e: httpx.HTTPStatusError, user: str, model_name: str +) -> None: + error_text = e.response.text + error_code = _parse_rate_limit_error(error_text, user, model_name) if error_code == ERROR_CODE_BUDGET_LIMIT_EXCEEDED: raise HTTPException( status_code=429, detail={"error": ERROR_CODE_BUDGET_LIMIT_EXCEEDED}, headers={"Retry-After": "86400"}, ) - if error_code == ERROR_CODE_RATE_LIMIT_EXCEEDED: + elif error_code == ERROR_CODE_RATE_LIMIT_EXCEEDED: raise HTTPException( status_code=429, detail={"error": ERROR_CODE_RATE_LIMIT_EXCEEDED}, @@ -75,13 +104,10 @@ def _handle_rate_limit_error(error_text: str, user: str) -> None: async def stream_completion(authorized_chat_request: AuthorizedChatRequest): - """ - Proxies a streaming request to LiteLLM. - Yields response chunks as they are received and logs metrics. - """ start_time = time.time() + model = authorized_chat_request.model body = { - "model": authorized_chat_request.model, + "model": model, "messages": authorized_chat_request.messages, "temperature": authorized_chat_request.temperature, "top_p": authorized_chat_request.top_p, @@ -89,12 +115,30 @@ async def stream_completion(authorized_chat_request: AuthorizedChatRequest): "user": authorized_chat_request.user, "stream": True, } + + metrics.ai_request_count_total.labels(model_name=model).inc() + logger.info( + "Completion request initiated", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": True, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + }, + ) + result = PrometheusResult.ERROR is_first_token = True num_completion_tokens = 0 streaming_started = False logger.debug( - f"Starting a stream completion using {authorized_chat_request.model}, for user {authorized_chat_request.user}", + "Stream completion loop started", + extra={ + "user_id": authorized_chat_request.user, + "model": authorized_chat_request.model, + }, ) try: client = get_http_client() @@ -119,7 +163,7 @@ async def stream_completion(authorized_chat_request: AuthorizedChatRequest): if e.response.status_code in {400, 429}: # Check for budget or rate limit errors error_code = _parse_rate_limit_error( - error_text_str, authorized_chat_request.user + error_text_str, authorized_chat_request.user, model ) if error_code is not None: yield f'data: {{"error": {error_code}}}\n\n'.encode() @@ -135,7 +179,22 @@ async def stream_completion(authorized_chat_request: AuthorizedChatRequest): async for chunk in response.aiter_bytes(): num_completion_tokens += 1 if is_first_token: - metrics.chat_completion_ttft.observe(time.time() - start_time) + duration = time.time() - start_time + metrics.ai_time_to_first_token.labels( + model_name=authorized_chat_request.model + ).observe(duration) + logger.info( + "First token generated", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": True, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + "duration": duration, + }, + ) is_first_token = False streaming_started = True yield chunk @@ -145,20 +204,76 @@ async def stream_completion(authorized_chat_request: AuthorizedChatRequest): message["content"] for message in authorized_chat_request.messages ) prompt_tokens = len(tokenizer.encode(prompt_text)) - metrics.chat_tokens.labels(type="prompt").inc(prompt_tokens) - metrics.chat_tokens.labels(type="completion").inc(num_completion_tokens) + metrics.ai_token_count_total.labels(model_name=model, type="prompt").inc( + prompt_tokens + ) + metrics.ai_token_count_total.labels( + model_name=model, type="completion" + ).inc(num_completion_tokens) + logger.info( + "Token generation summary", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": True, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + "prompt_tokens": prompt_tokens, + "completion_tokens": num_completion_tokens, + }, + ) result = PrometheusResult.SUCCESS except httpx.HTTPStatusError as e: - logger.error(f"Upstream service returned an error: {e}") + metrics.ai_error_count_total.labels( + model_name=model, error=f"HTTP_{e.response.status_code}" + ).inc() + logger.error( + "Upstream HTTP error", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": True, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + "status_code": e.response.status_code, + }, + ) if not streaming_started: yield f'data: {{"error": "Upstream service returned an error"}}\n\n'.encode() except Exception as e: - logger.error(f"Failed to proxy request to {LITELLM_COMPLETIONS_URL}: {e}") + metrics.ai_error_count_total.labels(error_type=type(e).__name__).inc() + logger.error( + "Stream completion proxy failed", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": True, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + "error_type": type(e).__name__, + }, + ) if not streaming_started: yield f'data: {{"error": "Failed to proxy request"}}\n\n'.encode() finally: - metrics.chat_completion_latency.labels(result=result).observe( - time.time() - start_time + duration = time.time() - start_time + metrics.ai_request_duration_seconds.labels( + model_name=model, streaming=True + ).observe(duration) + logger.info( + "Stream request finished", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": True, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + "duration": duration, + }, ) @@ -167,8 +282,9 @@ async def get_completion(authorized_chat_request: AuthorizedChatRequest): Proxies a non-streaming request to LiteLLM. """ start_time = time.time() + model = authorized_chat_request.model body = { - "model": authorized_chat_request.model, + "model": model, "messages": authorized_chat_request.messages, "temperature": authorized_chat_request.temperature, "top_p": authorized_chat_request.top_p, @@ -179,8 +295,19 @@ async def get_completion(authorized_chat_request: AuthorizedChatRequest): } result = PrometheusResult.ERROR logger.debug( - f"Starting a non-stream completion using {authorized_chat_request.model}, for user {authorized_chat_request.user}", + "Non-stream completion initiated", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": False, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + }, ) + + metrics.ai_request_count_total.labels(model_name=model).inc() + try: client = get_http_client() response = await client.post( @@ -192,16 +319,21 @@ async def get_completion(authorized_chat_request: AuthorizedChatRequest): response.raise_for_status() except httpx.HTTPStatusError as e: if e.response.status_code in {400, 429}: - _handle_rate_limit_error(e.response.text, authorized_chat_request.user) - logger.error( - f"Upstream service returned an error: {e.response.status_code} - {e.response.text}" - ) - raise HTTPException( - status_code=e.response.status_code, - detail={"error": "Upstream service returned an error"}, - ) + _handle_rate_limit_error(e, authorized_chat_request.user, model) + metrics.ai_error_count_total.labels( + model_name=model, error=f"HTTP_{e.response.status_code}" + ).inc() logger.error( - f"Upstream service returned an error: {e.response.status_code} - {e.response.text}" + "Upstream HTTP error", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": False, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + "status_code": e.response.status_code, + }, ) raise HTTPException( status_code=e.response.status_code, @@ -212,20 +344,77 @@ async def get_completion(authorized_chat_request: AuthorizedChatRequest): prompt_tokens = usage.get("prompt_tokens", 0) completion_tokens = usage.get("completion_tokens", 0) - metrics.chat_tokens.labels(type="prompt").inc(prompt_tokens) - metrics.chat_tokens.labels(type="completion").inc(completion_tokens) + metrics.ai_token_count_total.labels(model_name=model, type="prompt").inc( + prompt_tokens + ) + metrics.ai_token_count_total.labels(model_name=model, type="completion").inc( + completion_tokens + ) + logger.info( + "Token generation summary", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": False, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + }, + ) result = PrometheusResult.SUCCESS return data - except HTTPException: + except HTTPException as e: + logger.error( + "Upstream service HTTP exception", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": False, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + "status_code": e.status_code, + }, + ) raise except Exception as e: - logger.error(f"Failed to proxy request to {LITELLM_COMPLETIONS_URL}: {e}") + metrics.ai_error_count_total.labels( + model_name=model, error=type(e).__name__ + ).inc() + logger.error( + "Proxy request failed", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": False, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + "error_message": str(e), + "error_type": type(e).__name__, + }, + ) raise HTTPException( status_code=502, detail={"error": f"Failed to proxy request"}, ) finally: - metrics.chat_completion_latency.labels(result=result).observe( - time.time() - start_time + duration = time.time() - start_time + metrics.ai_request_duration_seconds.labels( + model_name=model, streaming=False + ).observe(duration) + logger.info( + "Request finished", + extra={ + "user_id": authorized_chat_request.user, + "model": model, + "stream": True, + "temperature": authorized_chat_request.temperature, + "top_p": authorized_chat_request.top_p, + "max_tokens": authorized_chat_request.max_completion_tokens, + "duration": duration, + }, ) diff --git a/src/mlpa/core/middleware/instrumentation.py b/src/mlpa/core/middleware/instrumentation.py index 6d28a49..0b9a313 100644 --- a/src/mlpa/core/middleware/instrumentation.py +++ b/src/mlpa/core/middleware/instrumentation.py @@ -19,20 +19,74 @@ async def instrument_requests_middleware(request: Request, call_next): session_id=request.headers.get("session-id", "N/A"), user_agent=request.headers.get("user-agent", "N/A"), use_app_attest=request.headers.get("use-app-attest", "N/A"), + request_source=request.headers.get("x-request-source", "N/A"), ): + path = request.url.path try: + # Capture request size if available + content_length = request.headers.get("content-length") + if content_length and content_length.isdigit(): + metrics.request_size_bytes.labels(method=request.method).observe( + int(content_length) + ) + + logger.info( + "Incoming request size captured", + extra={ + "request_method": request.method, + "content_length": int(content_length) if content_length else 0, + "path": path, + }, + ) + response = await call_next(request) + duration = time.time() - start_time route = request.scope.get("route") endpoint = route.path if route else request.url.path - metrics.request_latency.labels( - method=request.method, endpoint=endpoint - ).observe(time.time() - start_time) - metrics.requests_total.labels( - method=request.method, endpoint=endpoint - ).inc() - metrics.response_status_codes.labels(status_code=response.status_code).inc() + # Capture response size + res_content_length = response.headers.get("content-length") + if res_content_length: + metrics.response_size_bytes.observe(int(res_content_length)) + logger.info( + "Response content length captured", + extra={ + "request_method": request.method, + "endpoint": endpoint, + "path": path, + "response_size_bytes": res_content_length, + "status_code": response.status_code, + "latency_ms": duration, + }, + ) + logger.info( + "Request finished", + extra={ + "request_method": request.method, + "endpoint": endpoint, + "path": path, + "response_size_bytes": res_content_length, + "status_code": response.status_code, + "latency_ms": duration, + }, + ) + return response + except Exception as e: + metrics.request_error_count_total.labels( + method=request.method, error_type=type(e).__name__ + ).inc() + logger.error( + "Request failed with exception", + extra={ + "request_method": request.method, + "path": request.url.path, + "latency_ms": (time.time() - start_time) * 1000, + "error_type": type(e).__name__, + }, + exc_info=True, # Provides the stack trace for SRE debugging + ) + raise e finally: metrics.in_progress_requests.dec() diff --git a/src/mlpa/core/prometheus_metrics.py b/src/mlpa/core/prometheus_metrics.py index b2d769f..7e4a084 100644 --- a/src/mlpa/core/prometheus_metrics.py +++ b/src/mlpa/core/prometheus_metrics.py @@ -12,65 +12,92 @@ class PrometheusResult(Enum): @dataclass class PrometheusMetrics: in_progress_requests: Gauge - requests_total: Counter - response_status_codes: Counter - request_latency: Histogram - validate_challenge_latency: Histogram - validate_app_attest_latency: Histogram - validate_app_assert_latency: Histogram - validate_fxa_latency: Histogram - chat_completion_latency: Histogram - chat_completion_ttft: Histogram # time to first token (when stream=True) - chat_tokens: Counter + + # Standard + request_size_bytes: Histogram # method(Get, Put, Update, Delete, ) + response_size_bytes: Histogram + request_error_count_total: Counter # method, error_type + + # Auth + auth_request_count_total: Counter + auth_response_count_total: ( + Counter # labels: method(fxa, appatest, ...), result (allow, deny, error) + ) + auth_duration_seconds: Histogram # labels: method(fxa, appatest, ...) + auth_error_count_total: Counter # labels: method(fxa, appatest, ...), error(rate limited, throttled, ...) + + # # Router Metrics + # router_request_count_total: Counter # labels: model_name + # router_decision_count_total: Counter # labels: decision_model_name, + # router_request_duration_seconds: Histogram # labels: model_name + + # AI Metrics + ai_request_count_total: Counter # labels: model_name + ai_time_to_first_token: ( + Histogram # time to first token (when stream=True) labels: model_name + ) + ai_request_duration_seconds: Histogram # labels: model_name, streaming + ai_error_count_total: ( + Counter # labels: model_name, error(timeout, retry, blocked, ...) + ) + ai_token_count_total: Counter # labels: model_name, type metrics = PrometheusMetrics( in_progress_requests=Gauge( "in_progress_requests", "Number of requests currently in progress." ), - requests_total=Counter( - "requests_total", - "Total number of requests handled by the proxy.", - ["method", "endpoint"], + # Standard + request_size_bytes=Histogram( + "request_size_bytes", "Size of requests in bytes.", ["method"] + ), + response_size_bytes=Histogram( + "response_size_bytes", + "Size of responses in bytes.", + ), + request_error_count_total=Counter( + "request_error_count_total", + "Total number of errors encountered.", + ["method", "error_type"], ), - response_status_codes=Counter( - "response_status_codes_total", - "Total number of response status codes.", - ["status_code"], + # Auth + auth_request_count_total=Counter( + "auth_request_count_total", + "Total authorization requests.", ), - request_latency=Histogram( - "request_latency_seconds", "Request latency in seconds.", ["method", "endpoint"] + auth_response_count_total=Counter( + "auth_response_count_total", + "Total authorization responses.", + ["method", "result"], ), - validate_challenge_latency=Histogram( - "validate_challenge_latency_seconds", "Challenge validation latency in seconds." + auth_duration_seconds=Histogram( + "auth_duration_seconds", + "Latency of authorization requests.", + ["method", "result"], ), - validate_app_attest_latency=Histogram( - "validate_app_attest_latency_seconds", - "App Attest authentication latency in seconds.", - ["result"], + auth_error_count_total=Counter( + "auth_error_count_total", "Total authorization errors.", ["error"] ), - validate_app_assert_latency=Histogram( - "validate_app_assert_latency_seconds", - "App Assert authentication latency in seconds.", - ["result"], + # AI Metrics + ai_request_count_total=Counter( + "ai_request_count_total", "Total requests sent to AI backends.", ["model_name"] ), - validate_fxa_latency=Histogram( - "validate_fxa_latency_seconds", - "FxA authentication latency in seconds.", - ["result"], + ai_time_to_first_token=Histogram( + "ai_time_to_first_token_seconds", + "Time to first token for streaming completions.", + ["model_name"], ), - chat_completion_latency=Histogram( - "chat_completion_latency_seconds", - "Chat completion latency in seconds.", - ["result"], + ai_request_duration_seconds=Histogram( + "ai_request_duration_seconds", + "Latency of AI backend requests.", + ["model_name", "streaming"], ), - chat_completion_ttft=Histogram( - "chat_completion_ttft_seconds", - "Time to first token for streaming chat completions in seconds.", + ai_error_count_total=Counter( + "ai_error_count_total", + "Total errors communicating with AI backends.", + ["model_name", "error"], ), - chat_tokens=Counter( - "chat_tokens", - "Number of tokens for chat completions.", - ["type"], + ai_token_count_total=Counter( + "ai_token_count_total", "Total tokens consumed.", ["model_name", "type"] ), ) diff --git a/src/mlpa/core/routers/appattest/appattest.py b/src/mlpa/core/routers/appattest/appattest.py index 733a275..13447d5 100644 --- a/src/mlpa/core/routers/appattest/appattest.py +++ b/src/mlpa/core/routers/appattest/appattest.py @@ -81,6 +81,7 @@ async def validate_challenge(challenge: str, key_id_b64: str) -> bool: start_time = time.time() stored_challenge = await app_attest_pg.get_challenge(key_id_b64) await app_attest_pg.delete_challenge(key_id_b64) # Remove challenge after one use + success = False try: if ( not stored_challenge @@ -88,9 +89,15 @@ async def validate_challenge(challenge: str, key_id_b64: str) -> bool: > env.CHALLENGE_EXPIRY_SECONDS ): return False + success = True return challenge == stored_challenge["challenge"] finally: - metrics.validate_challenge_latency.observe(time.time() - start_time) + metrics.auth_response_count_total.labels( + method="appatest_challenge", result="allow" if success else "deny" + ).inc() + metrics.auth_duration_seconds.labels( + method="appatest_challenge", result="allow" if success else "deny" + ).observe(time.time() - start_time) async def verify_attest( @@ -148,9 +155,12 @@ async def verify_attest( logger.error(f"Attestation verification failed: {e}") raise HTTPException(status_code=403, detail="Attestation verification failed") finally: - metrics.validate_app_attest_latency.labels(result=result).observe( - time.time() - start_time - ) + metrics.auth_response_count_total.labels( + method="appatest_attest", result=result + ).inc() + metrics.auth_duration_seconds.labels( + method="appatest_attest", result=result + ).observe(time.time() - start_time) # save public_key in b64 await app_attest_pg.store_key(key_id_b64, public_key_pem, attestation_counter) @@ -212,8 +222,11 @@ async def verify_assert( logger.error(f"Assertion verification failed: {e}") raise HTTPException(status_code=403, detail=f"Assertion verification failed") finally: - metrics.validate_app_assert_latency.labels(result=result).observe( - time.time() - start_time - ) + metrics.auth_response_count_total.labels( + method="appatest_assert", result=result + ).inc() + metrics.auth_duration_seconds.labels( + method="appatest_assert", result=result + ).observe(time.time() - start_time) return {"status": "success"} diff --git a/src/mlpa/core/routers/fxa/fxa.py b/src/mlpa/core/routers/fxa/fxa.py index 6419f64..4fbdc25 100644 --- a/src/mlpa/core/routers/fxa/fxa.py +++ b/src/mlpa/core/routers/fxa/fxa.py @@ -22,7 +22,8 @@ def fxa_auth(authorization: Annotated[str | None, Header()]): logger.error(f"FxA auth error: {e}") raise HTTPException(status_code=401, detail="Invalid FxA auth") finally: - metrics.validate_fxa_latency.labels(result=result).observe( + metrics.auth_response_count_total.labels(method="fxa", result=result).inc() + metrics.auth_duration_seconds.labels(method="fxa", result=result).observe( time.time() - start_time ) return profile diff --git a/src/mlpa/run.py b/src/mlpa/run.py index b9a6547..fb28f68 100644 --- a/src/mlpa/run.py +++ b/src/mlpa/run.py @@ -107,14 +107,36 @@ async def chat_completion( Optional[AuthorizedChatRequest], Depends(authorize_request) ], ): + start_time = time.time() user_id = authorized_chat_request.user + model = authorized_chat_request.model + + logger.info( + "Chat completion request initiated", + extra={"user_id": user_id, "model": model}, + ) + if not user_id: + metrics.ai_error_count_total.labels( + model_name=model, error=f"UserNotFound" + ).inc() + logger.warning( + "Chat completion failed", + extra={"user_id": user_id, "model": model, "error": "User not found"}, + ) raise HTTPException( status_code=400, detail={"error": "User not found from authorization response."}, ) user, _ = await get_or_create_user(user_id) if user.get("blocked"): + metrics.ai_error_count_total.labels( + model_name=model, error=f"UserBlocked" + ).inc() + logger.warning( + "Chat completion failed", + extra={"user_id": user_id, "model": model, "error": "User blocked"}, + ) raise HTTPException(status_code=403, detail={"error": "User is blocked."}) if authorized_chat_request.stream: @@ -129,9 +151,17 @@ async def chat_completion( @app.exception_handler(HTTPException) async def log_and_handle_http_exception(request: Request, exc: HTTPException): """Logs HTTPExceptions""" + metrics.request_error_count_total.labels( + method=request.method, error_type=f"HTTP_{exc.status_code}" + ).inc() if exc.status_code != 429: logger.error( - f"HTTPException for {request.method} {request.url.path} -> status={exc.status_code} detail={exc.detail}", + "HTTPException occurred", + extra={ + "path": request.url.path, + "method": request.method, + "status_code": exc.status_code, + }, ) return await http_exception_handler(request, exc) diff --git a/src/tests/integration/test_health.py b/src/tests/integration/test_health.py index dfdd114..d1a3494 100644 --- a/src/tests/integration/test_health.py +++ b/src/tests/integration/test_health.py @@ -46,4 +46,4 @@ def test_health_readiness(mocked_client_integration, httpx_mock): def test_metrics_endpoint(mocked_client_integration): response = mocked_client_integration.get("/metrics") assert response.status_code == 200 - assert "requests_total" in response.text + assert "in_progress_requests" in response.text diff --git a/src/tests/unit/test_completions.py b/src/tests/unit/test_completions.py index 600fbf0..7660731 100644 --- a/src/tests/unit/test_completions.py +++ b/src/tests/unit/test_completions.py @@ -54,21 +54,25 @@ async def test_get_completion_success(mocker): assert sent_json["stream"] == SAMPLE_REQUEST.stream # 2. Check that the token metrics were incremented correctly - mock_metrics.chat_tokens.labels.assert_any_call(type="prompt") - mock_metrics.chat_tokens.labels().inc.assert_any_call( + mock_metrics.ai_token_count_total.labels.assert_any_call( + model_name=SAMPLE_REQUEST.model, type="prompt" + ) + mock_metrics.ai_token_count_total.labels().inc.assert_any_call( SUCCESSFUL_CHAT_RESPONSE["usage"]["prompt_tokens"] ) - mock_metrics.chat_tokens.labels.assert_any_call(type="completion") - mock_metrics.chat_tokens.labels().inc.assert_any_call( + mock_metrics.ai_token_count_total.labels.assert_any_call( + model_name=SAMPLE_REQUEST.model, type="completion" + ) + mock_metrics.ai_token_count_total.labels().inc.assert_any_call( SUCCESSFUL_CHAT_RESPONSE["usage"]["completion_tokens"] ) # 3. Check that the latency metric was observed with SUCCESS - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.SUCCESS + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=False ) - mock_metrics.chat_completion_latency.labels().observe.assert_called_once() + mock_metrics.ai_request_duration_seconds.labels().observe.assert_called_once() # 4. Check that the function returned the correct data assert result_data == SUCCESSFUL_CHAT_RESPONSE @@ -104,13 +108,13 @@ async def test_get_completion_http_error(mocker): assert exc_info.value.detail == {"error": "Upstream service returned an error"} # 2. Verify latency metric was observed with ERROR - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.ERROR + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=False ) - mock_metrics.chat_completion_latency.labels().observe.assert_called_once() + mock_metrics.ai_request_duration_seconds.labels().observe.assert_called_once() # 3. Verify token metrics were NOT called - mock_metrics.chat_tokens.labels.assert_not_called() + mock_metrics.ai_token_count_total.labels.assert_not_called() async def test_get_completion_network_error(mocker): @@ -134,10 +138,10 @@ async def test_get_completion_network_error(mocker): assert "Failed to proxy request" in exc_info.value.detail["error"] # 2. Verify latency metric was observed with ERROR - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.ERROR + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=False ) - mock_metrics.chat_completion_latency.labels().observe.assert_called_once() + mock_metrics.ai_request_duration_seconds.labels().observe.assert_called_once() async def test_stream_completion_success(httpx_mock: HTTPXMock, mocker): @@ -182,19 +186,25 @@ async def test_stream_completion_success(httpx_mock: HTTPXMock, mocker): assert request_body["model"] == "test-model" # 3. Verify TTFT metric was observed - mock_metrics.chat_completion_ttft.observe.assert_called_once() + mock_metrics.ai_time_to_first_token.labels( + model_name=request_body["model"] + ).observe.assert_called_once() # 4. Verify token counts - mock_metrics.chat_tokens.labels.assert_any_call(type="prompt") - mock_metrics.chat_tokens.labels().inc.assert_any_call(2) - mock_metrics.chat_tokens.labels.assert_any_call(type="completion") - mock_metrics.chat_tokens.labels().inc.assert_any_call(len(mock_chunks)) + mock_metrics.ai_token_count_total.labels.assert_any_call( + model_name=request_body["model"], type="prompt" + ) + mock_metrics.ai_token_count_total.labels().inc.assert_any_call(2) + mock_metrics.ai_token_count_total.labels.assert_any_call( + model_name=request_body["model"], type="completion" + ) + mock_metrics.ai_token_count_total.labels().inc.assert_any_call(len(mock_chunks)) # 5. Verify final latency metric - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.SUCCESS + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=True ) - mock_metrics.chat_completion_latency.labels().observe.assert_called_once() + mock_metrics.ai_request_duration_seconds.labels().observe.assert_called_once() async def test_get_completion_budget_limit_exceeded_429(mocker): @@ -235,8 +245,8 @@ async def test_get_completion_budget_limit_exceeded_429(mocker): assert exc_info.value.headers == {"Retry-After": "86400"} # Verify latency metric was observed with ERROR - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.ERROR + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=False ) @@ -447,8 +457,8 @@ async def test_stream_completion_budget_limit_exceeded_429( ) mock_logger.warning.assert_called_once() assert "Budget limit exceeded" in str(mock_logger.warning.call_args) - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.ERROR + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=True ) @@ -486,8 +496,8 @@ async def test_stream_completion_budget_limit_exceeded_400( ) mock_logger.warning.assert_called_once() assert "Budget limit exceeded" in str(mock_logger.warning.call_args) - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.ERROR + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=True ) @@ -523,8 +533,8 @@ async def test_stream_completion_rate_limit_exceeded(httpx_mock: HTTPXMock, mock ) mock_logger.warning.assert_called_once() assert "Rate limit exceeded" in str(mock_logger.warning.call_args) - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.ERROR + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=True ) @@ -561,8 +571,8 @@ async def test_stream_completion_400_non_rate_limit_error( == b'data: {"error": "Upstream service returned an error"}\n\n' ) mock_logger.error.assert_called_once() - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.ERROR + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=True ) @@ -593,8 +603,8 @@ async def test_stream_completion_429_non_rate_limit_error( == b'data: {"error": "Upstream service returned an error"}\n\n' ) mock_logger.error.assert_called_once() - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.ERROR + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=True ) @@ -620,8 +630,8 @@ async def test_stream_completion_429_invalid_json(httpx_mock: HTTPXMock, mocker) == b'data: {"error": "Upstream service returned an error"}\n\n' ) mock_logger.error.assert_called_once() - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.ERROR + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=True ) @@ -660,7 +670,7 @@ async def test_stream_completion_exception_after_streaming_started( assert len(received_chunks) == len(mock_chunks) assert received_chunks == mock_chunks mock_logger.error.assert_called() - mock_metrics.chat_completion_latency.labels.assert_called_once_with( - result=PrometheusResult.ERROR + mock_metrics.ai_request_duration_seconds.labels.assert_called_once_with( + model_name=SAMPLE_REQUEST.model, streaming=True ) - mock_metrics.chat_completion_latency.labels().observe.assert_called_once() + mock_metrics.ai_request_duration_seconds.labels().observe.assert_called_once()