-
Notifications
You must be signed in to change notification settings - Fork 29
feat: Minimal agent for AuthBridge OTEL (Approach A, zero custom observability) #122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e9642b2
c4a0bb2
d2abda7
8600ecc
d5d7e08
5ef5ef5
bc4ca24
0f34c82
ffeb093
58760aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| """Weather Service - OpenTelemetry Observability Setup""" | ||
| """Weather Service - A2A weather agent with zero observability code. | ||
|
|
||
| from weather_service.observability import setup_observability | ||
|
|
||
| # Initialize observability before importing agent | ||
| setup_observability() | ||
| All tracing and observability is handled externally by the AuthBridge | ||
| ext_proc sidecar which creates root spans and nested child spans from | ||
| the A2A SSE event stream. No OTEL dependencies needed in the agent. | ||
| """ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,14 @@ | ||
| import asyncio | ||
| import json | ||
| import logging | ||
| import os | ||
| import uvicorn | ||
| from textwrap import dedent | ||
|
|
||
| # Initialize OTEL before importing instrumented libraries | ||
| from weather_service.observability import setup_observability, wrap_asgi_app | ||
| setup_observability() | ||
|
|
||
| from a2a.server.agent_execution import AgentExecutor, RequestContext | ||
| from a2a.server.apps import A2AStarletteApplication | ||
| from a2a.server.events.event_queue import EventQueue | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 PR description should reflect all changes in this PR Beyond the observability simplification, this PR also introduces:
These are all fine to include here, but the PR summary should list them so reviewers (and future |
||
|
|
@@ -13,15 +19,29 @@ | |
| from a2a.utils import new_agent_text_message, new_task | ||
| from langchain_core.messages import HumanMessage | ||
|
|
||
| from starlette.middleware.base import BaseHTTPMiddleware | ||
|
|
||
| from weather_service.graph import get_graph, get_mcpclient | ||
| from weather_service.observability import create_tracing_middleware, set_span_output, get_root_span | ||
|
|
||
| logging.basicConfig(level=logging.DEBUG) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _serialize_event(value): | ||
| """Serialize a LangGraph event value to JSON-compatible dict. | ||
|
|
||
| Converts LangChain message objects to dicts via model_dump() so the | ||
| ext_proc can parse structured GenAI attributes (token counts, model, etc). | ||
| """ | ||
| if isinstance(value, dict) and "messages" in value: | ||
| msgs = [] | ||
| for msg in value["messages"]: | ||
| if hasattr(msg, "model_dump"): | ||
| msgs.append(msg.model_dump()) | ||
| else: | ||
| msgs.append(str(msg)) | ||
| return {"messages": msgs} | ||
| return value | ||
|
|
||
|
|
||
| def get_agent_card(host: str, port: int): | ||
| """Returns the Agent Card for the AG2 Agent.""" | ||
| capabilities = AgentCapabilities(streaming=True) | ||
|
|
@@ -91,7 +111,36 @@ class WeatherExecutor(AgentExecutor): | |
| """ | ||
| A class to handle weather assistant execution for A2A Agent. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Concurrency bug:
If two requests are in flight:
This needs to be per-request state, e.g. a |
||
| """ | ||
| def __init__(self, task_store=None): | ||
| self._cancelled = False | ||
| self._task_store = task_store | ||
|
|
||
| async def execute(self, context: RequestContext, event_queue: EventQueue): | ||
| """ | ||
| Shield the agent execution from SSE client disconnects. | ||
|
|
||
| The A2A SDK cancels execute() when the SSE connection drops. By | ||
| shielding the actual work, the LangGraph execution runs to completion | ||
| and the result is stored in the task store regardless of whether | ||
| anyone is listening to the stream. | ||
|
|
||
| Explicit cancellation via tasks/cancel still works — it sets the | ||
| _cancelled flag which the shielded execution checks. | ||
| """ | ||
| self._cancelled = False | ||
| task = asyncio.ensure_future(self._do_execute(context, event_queue)) | ||
| try: | ||
| await asyncio.shield(task) | ||
| except asyncio.CancelledError: | ||
| if self._cancelled: | ||
| # Explicit cancel via tasks/cancel — propagate to the task | ||
| task.cancel() | ||
| logger.info("Agent execution cancelled via tasks/cancel") | ||
| else: | ||
| # SSE disconnect — let the task continue in the background | ||
| logger.info("Client disconnected, agent execution continues in background") | ||
|
|
||
| async def _do_execute(self, context: RequestContext, event_queue: EventQueue): | ||
| """ | ||
| The agent allows to retrieve weather info through a natural language conversational interface | ||
| """ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Silent This pattern appears in several places in this PR (here, line 165, line 184). While the intent — survive SSE client disconnects — is valid, swallowing all exceptions silently (including serialization bugs, type errors, SDK contract violations) will make production debugging very difficult. At minimum, log at |
||
|
|
@@ -112,8 +161,6 @@ async def execute(self, context: RequestContext, event_queue: EventQueue): | |
| input = {"messages": messages} | ||
| logger.info(f'Processing messages: {input}') | ||
|
|
||
| # Note: Root span with MLflow attributes is created by tracing middleware | ||
| # Here we just run the agent logic - spans from LangChain are auto-captured | ||
| output = None | ||
|
|
||
| # Test MCP connection first | ||
|
|
@@ -127,48 +174,64 @@ async def execute(self, context: RequestContext, event_queue: EventQueue): | |
| logger.info(f'Successfully connected to MCP server. Available tools: {[tool.name for tool in tools]}') | ||
| except Exception as tool_error: | ||
| logger.error(f'Failed to connect to MCP server: {tool_error}') | ||
| await event_emitter.emit_event(f"Error: Cannot connect to MCP weather service at {os.getenv('MCP_URL', 'http://localhost:8000/sse')}. Please ensure the weather MCP server is running. Error: {tool_error}", failed=True) | ||
| try: | ||
| await event_emitter.emit_event(f"Error: Cannot connect to MCP weather service at {os.getenv('MCP_URL', 'http://localhost:8000/sse')}. Please ensure the weather MCP server is running. Error: {tool_error}", failed=True) | ||
| except Exception: | ||
| pass | ||
| return | ||
|
|
||
| graph = await get_graph(mcpclient) | ||
| async for event in graph.astream(input, stream_mode="updates"): | ||
| await event_emitter.emit_event( | ||
| "\n".join( | ||
| f"🚶♂️{key}: {str(value)[:256] + '...' if len(str(value)) > 256 else str(value)}" | ||
| for key, value in event.items() | ||
| try: | ||
| await event_emitter.emit_event( | ||
| "\n".join( | ||
| f"🚶♂️{key}: {json.dumps(_serialize_event(value), default=str)}" | ||
| for key, value in event.items() | ||
| ) | ||
| + "\n" | ||
| ) | ||
| + "\n" | ||
| ) | ||
| except Exception: | ||
| # SSE connection dropped — continue processing, skip event emission | ||
| logger.debug("Event emission failed (client likely disconnected), continuing execution") | ||
| output = event | ||
| logger.info(f'event: {event}') | ||
| output = output.get("assistant", {}).get("final_answer") | ||
|
|
||
| # Set span output BEFORE emitting final event (for streaming response capture) | ||
| # This populates mlflow.spanOutputs, output.value, gen_ai.completion | ||
| # Use get_root_span() to get the middleware-created root span, not the | ||
| # current A2A span (trace.get_current_span() would return wrong span) | ||
| if output: | ||
| root_span = get_root_span() | ||
| if root_span and root_span.is_recording(): | ||
| set_span_output(root_span, str(output)) | ||
|
|
||
| await event_emitter.emit_event(str(output), final=True) | ||
| try: | ||
| await event_emitter.emit_event(str(output), final=True) | ||
| except Exception: | ||
| logger.info(f"Final event emission failed (client disconnected)") | ||
|
|
||
| # Always save completed task with artifact to the store. | ||
| # The emit_event may succeed (enqueue) but the SSE consumer may be | ||
| # gone, so the task store never gets the artifact via the normal path. | ||
| # This ensures tasks/get can return the result for trace recovery. | ||
| if self._task_store and task and output: | ||
| try: | ||
| import uuid | ||
| from a2a.types import TaskStatus, TaskState as TS, Artifact, TextPart as TP | ||
| task.status = TaskStatus(state=TS.completed) | ||
| task.artifacts = [Artifact(artifactId=str(uuid.uuid4()), parts=[TP(text=str(output))])] | ||
| await self._task_store.save(task) | ||
| logger.info(f"Task {task.id} saved to store with output ({len(str(output))} chars)") | ||
| except Exception as e: | ||
| logger.error(f"Failed to save task to store: {e}") | ||
|
|
||
| async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: | ||
| """ | ||
| Not implemented | ||
| """ | ||
| raise Exception("cancel not supported") | ||
| """Cancel the agent execution.""" | ||
| self._cancelled = True | ||
| logger.info("Cancel requested for agent execution") | ||
|
|
||
| def run(): | ||
| """ | ||
| Runs the A2A Agent application. | ||
| """ | ||
| agent_card = get_agent_card(host="0.0.0.0", port=8000) | ||
|
|
||
| task_store = InMemoryTaskStore() | ||
| request_handler = DefaultRequestHandler( | ||
| agent_executor=WeatherExecutor(), | ||
| task_store=InMemoryTaskStore(), | ||
| agent_executor=WeatherExecutor(task_store=task_store), | ||
| task_store=task_store, | ||
| ) | ||
|
|
||
| server = A2AStarletteApplication( | ||
|
|
@@ -187,15 +250,7 @@ def run(): | |
| name='agent_card_new', | ||
| )) | ||
|
|
||
| # Add tracing middleware - creates root span with MLflow/GenAI attributes | ||
| app.add_middleware(BaseHTTPMiddleware, dispatch=create_tracing_middleware()) | ||
|
|
||
| # Add logging middleware | ||
| @app.middleware("http") | ||
| async def log_authorization_header(request, call_next): | ||
| auth_header = request.headers.get("authorization", "No Authorization header") | ||
| logger.info(f"🔐 Incoming request to {request.url.path} with Authorization: {auth_header[:80] + '...' if len(auth_header) > 80 else auth_header}") | ||
| response = await call_next(request) | ||
| return response | ||
| # Wrap with OTEL ASGI middleware to extract traceparent from ext_proc | ||
| app = wrap_asgi_app(app) | ||
|
|
||
| uvicorn.run(app, host="0.0.0.0", port=8000) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Docstring contradicts
pyproject.tomlThis says "No OTEL dependencies needed in the agent" but
pyproject.tomladds five OTEL packages (opentelemetry-sdk,opentelemetry-exporter-otlp-proto-http,openinference-instrumentation-langchain,opentelemetry-instrumentation-openai,opentelemetry-instrumentation-asgi) andobservability.pyis still ~110 lines of active setup code.This is "minimal" instrumentation, not "zero". The docstring (and PR title) should be corrected to avoid confusion when comparing Approach A vs. alternatives.