Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion agents/k8s_debug_agent/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ WORKDIR /app
COPY . /app/

# Install the package with dependencies
RUN pip install --no-cache-dir .
# --pre flag is required for pre-release packages like opentelemetry-instrumentation-openai-v2
RUN pip install --no-cache-dir --pre .

# Set proper permissions for OpenShift compatibility
RUN chmod -R g+rwX /app && \
Expand Down
150 changes: 121 additions & 29 deletions agents/k8s_debug_agent/a2a_agent.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
"""A2A-compatible Kubernetes debugging agent entrypoint."""

# CRITICAL: Initialize OpenTelemetry instrumentation BEFORE importing any libraries
# that we want to instrument (autogen, openai). This ensures the instrumentors can
# properly patch the library classes before they are used.
# See: https://opentelemetry.io/docs/concepts/instrumentation/libraries/
from k8s_debug_agent.observability import (
setup_observability,
create_agent_span,
trace_context_from_headers,
set_baggage_context,
extract_baggage_from_headers,
)

setup_observability()

# Import OpenInference context manager for adding attributes to all spans
try:
from openinference.instrumentation import using_attributes
except ImportError:
# Fallback if openinference-instrumentation not installed
# nullcontext doesn't accept keyword arguments, so we need a custom fallback
from contextlib import contextmanager

@contextmanager
def using_attributes(**kwargs):
"""No-op context manager that accepts and ignores keyword arguments."""
yield

# Now import remaining dependencies (autogen imports happen AFTER instrumentation)
import logging
import sys
import traceback
Expand Down Expand Up @@ -66,14 +94,28 @@ class A2AEvent:

def __init__(self, task_updater: TaskUpdater):
self.task_updater = task_updater
self._completed = False # Track if task was already completed

async def emit_event(self, message: str, final: bool = False) -> None:
logger.info("Emitting event %s", message)

# Prevent double-completion of task
if self._completed:
logger.warning("Task already completed, skipping emit_event")
return

if final:
parts = [TextPart(text=message)]
await self.task_updater.add_artifact(parts)
await self.task_updater.complete()
try:
await self.task_updater.add_artifact(parts)
await self.task_updater.complete()
self._completed = True
except RuntimeError as e:
if "terminal state" in str(e):
logger.warning("Task already in terminal state: %s", e)
self._completed = True
else:
raise
else:
await self.task_updater.update_status(
TaskState.working,
Expand Down Expand Up @@ -121,36 +163,86 @@ async def execute(self, context: RequestContext, event_queue: EventQueue):
}
)

# Extract headers from context for trace propagation
# This enables cross-agent tracing when called by another agent
headers = {}
if hasattr(context, 'headers'):
headers = dict(context.headers)
elif hasattr(context, 'message') and context.message:
if hasattr(context.message, 'headers') and context.message.headers:
headers = dict(context.message.headers)
elif hasattr(context.message, 'metadata') and context.message.metadata:
headers = context.message.metadata.get('headers', {})

# Extract baggage from headers
baggage_data = extract_baggage_from_headers(headers)

# Add task/context IDs from A2A if available
if task:
baggage_data['task_id'] = task.id
baggage_data['context_id'] = task.context_id

# If no user_id in headers, use anonymous
if 'user_id' not in baggage_data:
baggage_data['user_id'] = 'anonymous'

# Prepare OpenInference attributes
oi_session_id = task.context_id if task else baggage_data.get('context_id')
oi_user_id = baggage_data.get('user_id', 'anonymous')
oi_metadata = {
'task_id': task.id if task else baggage_data.get('task_id'),
'request_id': baggage_data.get('request_id'),
}

toolkit: Optional[Toolkit] = None
try:
if settings.MCP_URL:
logging.info("Connecting to MCP server at %s", settings.MCP_URL)
async with (
streamablehttp_client(url=settings.MCP_URL) as (
read_stream,
write_stream,
_,
),
ClientSession(read_stream, write_stream) as session,
# Wrap execution with trace context for cross-agent trace propagation
with trace_context_from_headers(headers):
# Set baggage context
set_baggage_context(baggage_data)

# using_attributes adds session.id, user.id to ALL spans in scope
# create_agent_span creates a root AGENT span for the conversation
with using_attributes(
session_id=oi_session_id,
user_id=oi_user_id,
metadata=oi_metadata,
):
await session.initialize()
toolkit = await create_toolkit(
session=session,
use_mcp_resources=False,
)
await self._run_agent(
messages,
settings,
event_emitter,
toolkit,
)
else:
await self._run_agent(
messages,
settings,
event_emitter,
toolkit,
)
with create_agent_span(
name="k8s_debug_agent",
task_id=task.id if task else None,
context_id=task.context_id if task else None,
user_id=oi_user_id,
input_text=user_input[0] if user_input else None,
):
if settings.MCP_URL:
logging.info("Connecting to MCP server at %s", settings.MCP_URL)
async with (
streamablehttp_client(url=settings.MCP_URL) as (
read_stream,
write_stream,
_,
),
ClientSession(read_stream, write_stream) as session,
):
await session.initialize()
toolkit = await create_toolkit(
session=session,
use_mcp_resources=False,
)
await self._run_agent(
messages,
settings,
event_emitter,
toolkit,
)
else:
await self._run_agent(
messages,
settings,
event_emitter,
toolkit,
)

except Exception as exc: # noqa: BLE001
traceback.print_exc()
Expand Down
40 changes: 31 additions & 9 deletions agents/k8s_debug_agent/k8s_debug_agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, List

from autogen.mcp.mcp_client import Toolkit
from opentelemetry import trace

from k8s_debug_agent.agents import Agents
from k8s_debug_agent.config import Settings
Expand Down Expand Up @@ -140,15 +141,36 @@ async def _invoke_agent(
max_turns: int | None = None,
**kwargs,
):
try:
return await self.agents.user_proxy.a_initiate_chat(
recipient=recipient,
message=message,
max_turns=max_turns,
**kwargs,
)
except Exception as exc: # noqa: BLE001
raise AgentWorkflowError(f"{description} failed: {exc}") from exc
"""
Invoke an AutoGen agent with proper OpenTelemetry context propagation.

We create an explicit child span to serve as parent for any OpenAI
instrumentation spans created by AutoGen internally.
"""
tracer = trace.get_tracer("k8s-debug-agent")

# Create a child span for this agent invocation
# This ensures OpenAI auto-instrumentation spans have a proper parent
with tracer.start_as_current_span(
f"autogen.{recipient.name}",
attributes={
"gen_ai.operation.name": "chain",
"autogen.agent.name": recipient.name,
"autogen.description": description,
},
) as span:
try:
result = await self.agents.user_proxy.a_initiate_chat(
recipient=recipient,
message=message,
max_turns=max_turns,
**kwargs,
)
return result
except Exception as exc: # noqa: BLE001
span.record_exception(exc)
span.set_status(trace.StatusCode.ERROR, str(exc))
raise AgentWorkflowError(f"{description} failed: {exc}") from exc

def _extract_text_response(self, response, description: str) -> str:
chat_history = getattr(response, "chat_history", None)
Expand Down
Loading