Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions a2a/weather_service/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ dependencies = [
"pydantic-settings>=2.8.1",
"langchain-mcp-adapters>=0.1.0",
"python-keycloak>=5.5.1",
"opentelemetry-exporter-otlp",
# OpenTelemetry GenAI semantic convention instrumentation
# Emits spans with gen_ai.* attributes for MLflow compatibility
"opentelemetry-instrumentation-openai>=0.34b0",
# OpenInference for LangChain instrumentation and AGENT span semantics
"openinference-semantic-conventions>=0.1.12",
"openinference-instrumentation-langchain>=0.1.27",
"opentelemetry-sdk>=1.25.0",
"opentelemetry-exporter-otlp-proto-http>=1.25.0",
"openinference-instrumentation-langchain>=0.1.36",
"opentelemetry-instrumentation-openai>=0.42.0",
"opentelemetry-instrumentation-asgi>=0.46b0",
]

[project.scripts]
Expand Down
10 changes: 5 additions & 5 deletions a2a/weather_service/src/weather_service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Weather Service - OpenTelemetry Observability Setup"""
"""Weather Service - A2A weather agent with zero observability code.

from weather_service.observability import setup_observability

# Initialize observability before importing agent
setup_observability()
All tracing and observability is handled externally by the AuthBridge
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Docstring contradicts pyproject.toml

This says "No OTEL dependencies needed in the agent" but pyproject.toml adds five OTEL packages (opentelemetry-sdk, opentelemetry-exporter-otlp-proto-http, openinference-instrumentation-langchain, opentelemetry-instrumentation-openai, opentelemetry-instrumentation-asgi) and observability.py is still ~110 lines of active setup code.

This is "minimal" instrumentation, not "zero". The docstring (and PR title) should be corrected to avoid confusion when comparing Approach A vs. alternatives.

ext_proc sidecar which creates root spans and nested child spans from
the A2A SSE event stream. No OTEL dependencies needed in the agent.
"""
131 changes: 93 additions & 38 deletions a2a/weather_service/src/weather_service/agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import asyncio
import json
import logging
import os
import uvicorn
from textwrap import dedent

# Initialize OTEL before importing instrumented libraries
from weather_service.observability import setup_observability, wrap_asgi_app
setup_observability()

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events.event_queue import EventQueue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 PR description should reflect all changes in this PR

Beyond the observability simplification, this PR also introduces:

  • asyncio.shield() to survive SSE client disconnects (new resilience pattern)
  • Direct task-store saving as a fallback when SSE consumer is gone (new data-flow path)
  • _serialize_event() for JSON-serialized LangGraph events (new wire format for ext_proc parsing)
  • A working cancel() implementation (previously raise Exception)

These are all fine to include here, but the PR summary should list them so reviewers (and future git log readers) know what shipped.

Expand All @@ -13,15 +19,29 @@
from a2a.utils import new_agent_text_message, new_task
from langchain_core.messages import HumanMessage

from starlette.middleware.base import BaseHTTPMiddleware

from weather_service.graph import get_graph, get_mcpclient
from weather_service.observability import create_tracing_middleware, set_span_output, get_root_span

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


def _serialize_event(value):
"""Serialize a LangGraph event value to JSON-compatible dict.

Converts LangChain message objects to dicts via model_dump() so the
ext_proc can parse structured GenAI attributes (token counts, model, etc).
"""
if isinstance(value, dict) and "messages" in value:
msgs = []
for msg in value["messages"]:
if hasattr(msg, "model_dump"):
msgs.append(msg.model_dump())
else:
msgs.append(str(msg))
return {"messages": msgs}
return value


def get_agent_card(host: str, port: int):
"""Returns the Agent Card for the AG2 Agent."""
capabilities = AgentCapabilities(streaming=True)
Expand Down Expand Up @@ -91,7 +111,36 @@ class WeatherExecutor(AgentExecutor):
"""
A class to handle weather assistant execution for A2A Agent.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Concurrency bug: _cancelled is shared across concurrent requests

WeatherExecutor is instantiated once in run() and shared across all concurrent requests. The _cancelled flag is reset to False at the top of every execute() call:

self._cancelled = False
task = asyncio.ensure_future(self._do_execute(context, event_queue))

If two requests are in flight:

  • Request A is executing, user sends tasks/cancelself._cancelled = True
  • Request B starts execute()self._cancelled = False (clobbers A's cancel)
  • Request A's SSE disconnects → CancelledError is caught, but _cancelled is now False, so the cancel is treated as an SSE disconnect instead of an explicit cancel

This needs to be per-request state, e.g. a dict[str, bool] keyed by task ID or context ID.

"""
def __init__(self, task_store=None):
self._cancelled = False
self._task_store = task_store

async def execute(self, context: RequestContext, event_queue: EventQueue):
"""
Shield the agent execution from SSE client disconnects.

The A2A SDK cancels execute() when the SSE connection drops. By
shielding the actual work, the LangGraph execution runs to completion
and the result is stored in the task store regardless of whether
anyone is listening to the stream.

Explicit cancellation via tasks/cancel still works — it sets the
_cancelled flag which the shielded execution checks.
"""
self._cancelled = False
task = asyncio.ensure_future(self._do_execute(context, event_queue))
try:
await asyncio.shield(task)
except asyncio.CancelledError:
if self._cancelled:
# Explicit cancel via tasks/cancel — propagate to the task
task.cancel()
logger.info("Agent execution cancelled via tasks/cancel")
else:
# SSE disconnect — let the task continue in the background
logger.info("Client disconnected, agent execution continues in background")

async def _do_execute(self, context: RequestContext, event_queue: EventQueue):
"""
The agent allows to retrieve weather info through a natural language conversational interface
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Silent except Exception: pass hides real bugs

This pattern appears in several places in this PR (here, line 165, line 184). While the intent — survive SSE client disconnects — is valid, swallowing all exceptions silently (including serialization bugs, type errors, SDK contract violations) will make production debugging very difficult.

At minimum, log at warning level instead of pass or debug, so these are visible in production logs without enabling debug verbosity.

Expand All @@ -112,8 +161,6 @@ async def execute(self, context: RequestContext, event_queue: EventQueue):
input = {"messages": messages}
logger.info(f'Processing messages: {input}')

# Note: Root span with MLflow attributes is created by tracing middleware
# Here we just run the agent logic - spans from LangChain are auto-captured
output = None

# Test MCP connection first
Expand All @@ -127,48 +174,64 @@ async def execute(self, context: RequestContext, event_queue: EventQueue):
logger.info(f'Successfully connected to MCP server. Available tools: {[tool.name for tool in tools]}')
except Exception as tool_error:
logger.error(f'Failed to connect to MCP server: {tool_error}')
await event_emitter.emit_event(f"Error: Cannot connect to MCP weather service at {os.getenv('MCP_URL', 'http://localhost:8000/sse')}. Please ensure the weather MCP server is running. Error: {tool_error}", failed=True)
try:
await event_emitter.emit_event(f"Error: Cannot connect to MCP weather service at {os.getenv('MCP_URL', 'http://localhost:8000/sse')}. Please ensure the weather MCP server is running. Error: {tool_error}", failed=True)
except Exception:
pass
return

graph = await get_graph(mcpclient)
async for event in graph.astream(input, stream_mode="updates"):
await event_emitter.emit_event(
"\n".join(
f"🚶‍♂️{key}: {str(value)[:256] + '...' if len(str(value)) > 256 else str(value)}"
for key, value in event.items()
try:
await event_emitter.emit_event(
"\n".join(
f"🚶‍♂️{key}: {json.dumps(_serialize_event(value), default=str)}"
for key, value in event.items()
)
+ "\n"
)
+ "\n"
)
except Exception:
# SSE connection dropped — continue processing, skip event emission
logger.debug("Event emission failed (client likely disconnected), continuing execution")
output = event
logger.info(f'event: {event}')
output = output.get("assistant", {}).get("final_answer")

# Set span output BEFORE emitting final event (for streaming response capture)
# This populates mlflow.spanOutputs, output.value, gen_ai.completion
# Use get_root_span() to get the middleware-created root span, not the
# current A2A span (trace.get_current_span() would return wrong span)
if output:
root_span = get_root_span()
if root_span and root_span.is_recording():
set_span_output(root_span, str(output))

await event_emitter.emit_event(str(output), final=True)
try:
await event_emitter.emit_event(str(output), final=True)
except Exception:
logger.info(f"Final event emission failed (client disconnected)")

# Always save completed task with artifact to the store.
# The emit_event may succeed (enqueue) but the SSE consumer may be
# gone, so the task store never gets the artifact via the normal path.
# This ensures tasks/get can return the result for trace recovery.
if self._task_store and task and output:
try:
import uuid
from a2a.types import TaskStatus, TaskState as TS, Artifact, TextPart as TP
task.status = TaskStatus(state=TS.completed)
task.artifacts = [Artifact(artifactId=str(uuid.uuid4()), parts=[TP(text=str(output))])]
await self._task_store.save(task)
logger.info(f"Task {task.id} saved to store with output ({len(str(output))} chars)")
except Exception as e:
logger.error(f"Failed to save task to store: {e}")

async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""
Not implemented
"""
raise Exception("cancel not supported")
"""Cancel the agent execution."""
self._cancelled = True
logger.info("Cancel requested for agent execution")

def run():
"""
Runs the A2A Agent application.
"""
agent_card = get_agent_card(host="0.0.0.0", port=8000)

task_store = InMemoryTaskStore()
request_handler = DefaultRequestHandler(
agent_executor=WeatherExecutor(),
task_store=InMemoryTaskStore(),
agent_executor=WeatherExecutor(task_store=task_store),
task_store=task_store,
)

server = A2AStarletteApplication(
Expand All @@ -187,15 +250,7 @@ def run():
name='agent_card_new',
))

# Add tracing middleware - creates root span with MLflow/GenAI attributes
app.add_middleware(BaseHTTPMiddleware, dispatch=create_tracing_middleware())

# Add logging middleware
@app.middleware("http")
async def log_authorization_header(request, call_next):
auth_header = request.headers.get("authorization", "No Authorization header")
logger.info(f"🔐 Incoming request to {request.url.path} with Authorization: {auth_header[:80] + '...' if len(auth_header) > 80 else auth_header}")
response = await call_next(request)
return response
# Wrap with OTEL ASGI middleware to extract traceparent from ext_proc
app = wrap_asgi_app(app)

uvicorn.run(app, host="0.0.0.0", port=8000)
Loading