diff --git a/agents/k8s_debug_agent/Dockerfile b/agents/k8s_debug_agent/Dockerfile index db25c40..05f8bef 100644 --- a/agents/k8s_debug_agent/Dockerfile +++ b/agents/k8s_debug_agent/Dockerfile @@ -6,7 +6,8 @@ WORKDIR /app COPY . /app/ # Install the package with dependencies -RUN pip install --no-cache-dir . +# --pre flag is required for pre-release packages like opentelemetry-instrumentation-openai-v2 +RUN pip install --no-cache-dir --pre . # Set proper permissions for OpenShift compatibility RUN chmod -R g+rwX /app && \ diff --git a/agents/k8s_debug_agent/a2a_agent.py b/agents/k8s_debug_agent/a2a_agent.py index 9799169..4356740 100644 --- a/agents/k8s_debug_agent/a2a_agent.py +++ b/agents/k8s_debug_agent/a2a_agent.py @@ -1,5 +1,33 @@ """A2A-compatible Kubernetes debugging agent entrypoint.""" +# CRITICAL: Initialize OpenTelemetry instrumentation BEFORE importing any libraries +# that we want to instrument (autogen, openai). This ensures the instrumentors can +# properly patch the library classes before they are used. +# See: https://opentelemetry.io/docs/concepts/instrumentation/libraries/ +from k8s_debug_agent.observability import ( + setup_observability, + create_agent_span, + trace_context_from_headers, + set_baggage_context, + extract_baggage_from_headers, +) + +setup_observability() + +# Import OpenInference context manager for adding attributes to all spans +try: + from openinference.instrumentation import using_attributes +except ImportError: + # Fallback if openinference-instrumentation not installed + # nullcontext doesn't accept keyword arguments, so we need a custom fallback + from contextlib import contextmanager + + @contextmanager + def using_attributes(**kwargs): + """No-op context manager that accepts and ignores keyword arguments.""" + yield + +# Now import remaining dependencies (autogen imports happen AFTER instrumentation) import logging import sys import traceback @@ -66,14 +94,28 @@ class A2AEvent: def __init__(self, task_updater: TaskUpdater): self.task_updater = task_updater + self._completed = False # Track if task was already completed async def emit_event(self, message: str, final: bool = False) -> None: logger.info("Emitting event %s", message) + # Prevent double-completion of task + if self._completed: + logger.warning("Task already completed, skipping emit_event") + return + if final: parts = [TextPart(text=message)] - await self.task_updater.add_artifact(parts) - await self.task_updater.complete() + try: + await self.task_updater.add_artifact(parts) + await self.task_updater.complete() + self._completed = True + except RuntimeError as e: + if "terminal state" in str(e): + logger.warning("Task already in terminal state: %s", e) + self._completed = True + else: + raise else: await self.task_updater.update_status( TaskState.working, @@ -121,36 +163,86 @@ async def execute(self, context: RequestContext, event_queue: EventQueue): } ) + # Extract headers from context for trace propagation + # This enables cross-agent tracing when called by another agent + headers = {} + if hasattr(context, 'headers'): + headers = dict(context.headers) + elif hasattr(context, 'message') and context.message: + if hasattr(context.message, 'headers') and context.message.headers: + headers = dict(context.message.headers) + elif hasattr(context.message, 'metadata') and context.message.metadata: + headers = context.message.metadata.get('headers', {}) + + # Extract baggage from headers + baggage_data = extract_baggage_from_headers(headers) + + # Add task/context IDs from A2A if available + if task: + baggage_data['task_id'] = task.id + baggage_data['context_id'] = task.context_id + + # If no user_id in headers, use anonymous + if 'user_id' not in baggage_data: + baggage_data['user_id'] = 'anonymous' + + # Prepare OpenInference attributes + oi_session_id = task.context_id if task else baggage_data.get('context_id') + oi_user_id = baggage_data.get('user_id', 'anonymous') + oi_metadata = { + 'task_id': task.id if task else baggage_data.get('task_id'), + 'request_id': baggage_data.get('request_id'), + } + toolkit: Optional[Toolkit] = None try: - if settings.MCP_URL: - logging.info("Connecting to MCP server at %s", settings.MCP_URL) - async with ( - streamablehttp_client(url=settings.MCP_URL) as ( - read_stream, - write_stream, - _, - ), - ClientSession(read_stream, write_stream) as session, + # Wrap execution with trace context for cross-agent trace propagation + with trace_context_from_headers(headers): + # Set baggage context + set_baggage_context(baggage_data) + + # using_attributes adds session.id, user.id to ALL spans in scope + # create_agent_span creates a root AGENT span for the conversation + with using_attributes( + session_id=oi_session_id, + user_id=oi_user_id, + metadata=oi_metadata, ): - await session.initialize() - toolkit = await create_toolkit( - session=session, - use_mcp_resources=False, - ) - await self._run_agent( - messages, - settings, - event_emitter, - toolkit, - ) - else: - await self._run_agent( - messages, - settings, - event_emitter, - toolkit, - ) + with create_agent_span( + name="k8s_debug_agent", + task_id=task.id if task else None, + context_id=task.context_id if task else None, + user_id=oi_user_id, + input_text=user_input[0] if user_input else None, + ): + if settings.MCP_URL: + logging.info("Connecting to MCP server at %s", settings.MCP_URL) + async with ( + streamablehttp_client(url=settings.MCP_URL) as ( + read_stream, + write_stream, + _, + ), + ClientSession(read_stream, write_stream) as session, + ): + await session.initialize() + toolkit = await create_toolkit( + session=session, + use_mcp_resources=False, + ) + await self._run_agent( + messages, + settings, + event_emitter, + toolkit, + ) + else: + await self._run_agent( + messages, + settings, + event_emitter, + toolkit, + ) except Exception as exc: # noqa: BLE001 traceback.print_exc() diff --git a/agents/k8s_debug_agent/k8s_debug_agent/main.py b/agents/k8s_debug_agent/k8s_debug_agent/main.py index 2d055ed..83a06eb 100644 --- a/agents/k8s_debug_agent/k8s_debug_agent/main.py +++ b/agents/k8s_debug_agent/k8s_debug_agent/main.py @@ -4,6 +4,7 @@ from typing import Any, List from autogen.mcp.mcp_client import Toolkit +from opentelemetry import trace from k8s_debug_agent.agents import Agents from k8s_debug_agent.config import Settings @@ -140,15 +141,36 @@ async def _invoke_agent( max_turns: int | None = None, **kwargs, ): - try: - return await self.agents.user_proxy.a_initiate_chat( - recipient=recipient, - message=message, - max_turns=max_turns, - **kwargs, - ) - except Exception as exc: # noqa: BLE001 - raise AgentWorkflowError(f"{description} failed: {exc}") from exc + """ + Invoke an AutoGen agent with proper OpenTelemetry context propagation. + + We create an explicit child span to serve as parent for any OpenAI + instrumentation spans created by AutoGen internally. + """ + tracer = trace.get_tracer("k8s-debug-agent") + + # Create a child span for this agent invocation + # This ensures OpenAI auto-instrumentation spans have a proper parent + with tracer.start_as_current_span( + f"autogen.{recipient.name}", + attributes={ + "gen_ai.operation.name": "chain", + "autogen.agent.name": recipient.name, + "autogen.description": description, + }, + ) as span: + try: + result = await self.agents.user_proxy.a_initiate_chat( + recipient=recipient, + message=message, + max_turns=max_turns, + **kwargs, + ) + return result + except Exception as exc: # noqa: BLE001 + span.record_exception(exc) + span.set_status(trace.StatusCode.ERROR, str(exc)) + raise AgentWorkflowError(f"{description} failed: {exc}") from exc def _extract_text_response(self, response, description: str) -> str: chat_history = getattr(response, "chat_history", None) diff --git a/agents/k8s_debug_agent/k8s_debug_agent/observability.py b/agents/k8s_debug_agent/k8s_debug_agent/observability.py new file mode 100644 index 0000000..132b974 --- /dev/null +++ b/agents/k8s_debug_agent/k8s_debug_agent/observability.py @@ -0,0 +1,376 @@ +""" +OpenTelemetry observability setup for K8s Debug Agent. + +This module provides: +- Auto-instrumentation with OTEL GenAI semantic conventions (gen_ai.*) +- Cross-agent trace propagation via W3C Trace Context + Baggage + +The OTEL Collector handles conversion from GenAI (gen_ai.*) to OpenInference +(llm.*, openinference.span.kind) for Phoenix compatibility. + +Key Features: +- `setup_observability`: Configure OTEL with GenAI instrumentation +- `create_agent_span`: Create a root AGENT span for the conversation +- `trace_context_from_headers`: Extract/propagate traceparent across agents +- Auto-instrumentation of OpenAI SDK with gen_ai.* attributes + +Usage: + from k8s_debug_agent.observability import ( + setup_observability, + create_agent_span, + trace_context_from_headers, + ) + + # At agent startup (BEFORE importing openai/autogen) + setup_observability() + + # In request handler - wrap execution with context + with trace_context_from_headers(headers): + with create_agent_span("agent_task", task_id="task-456") as span: + result = await agent.execute(messages) +""" + +import logging +import os +from typing import Dict, Any, Optional +from contextlib import contextmanager +from opentelemetry import trace, baggage, context +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.resources import Resource +from opentelemetry.trace import Status, StatusCode +from opentelemetry.propagate import set_global_textmap, extract, inject +from opentelemetry.propagators.composite import CompositePropagator +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.baggage.propagation import W3CBaggagePropagator + +logger = logging.getLogger(__name__) + +# Tracer name for manual spans +TRACER_NAME = "k8s-debug-agent" + + +def _get_otlp_exporter(endpoint: str, protocol: str): + """ + Get the appropriate OTLP exporter based on protocol. + + Args: + endpoint: OTLP endpoint URL + protocol: Protocol to use ('grpc' or 'http/protobuf') + + Returns: + Configured OTLP span exporter + """ + if protocol.lower() == "grpc": + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter as GrpcExporter, + ) + # For gRPC, endpoint should not have http:// prefix + grpc_endpoint = endpoint.replace("http://", "").replace("https://", "") + return GrpcExporter(endpoint=grpc_endpoint, insecure=True) + else: + # Default to HTTP/protobuf + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter as HttpExporter, + ) + # Ensure endpoint has /v1/traces path for HTTP + if not endpoint.endswith("/v1/traces"): + endpoint = endpoint.rstrip("/") + "/v1/traces" + return HttpExporter(endpoint=endpoint) + + +class ObservabilityConfig: + """ + Configuration for observability setup. + + Reads from environment variables with sensible defaults. + """ + + def __init__(self): + # Service identification + self.service_name = os.getenv("OTEL_SERVICE_NAME", "k8s-debug-agent") + self.namespace = os.getenv("K8S_NAMESPACE_NAME", "kagenti-agents") + self.deployment_env = os.getenv("DEPLOYMENT_ENVIRONMENT", "kind-local") + + # Phoenix project routing + self.phoenix_project = os.getenv( + "PHOENIX_PROJECT_NAME", + f"{self.namespace}-agents" + ) + + # OTLP endpoint and protocol + self.otlp_endpoint = os.getenv( + "OTEL_EXPORTER_OTLP_ENDPOINT", + "http://otel-collector.kagenti-system.svc.cluster.local:4318" + ) + self.otlp_protocol = os.getenv( + "OTEL_EXPORTER_OTLP_PROTOCOL", + "http/protobuf" + ) + + # Additional resource attributes + self.extra_resource_attrs = self._parse_resource_attrs() + + def _parse_resource_attrs(self) -> Dict[str, str]: + """Parse OTEL_RESOURCE_ATTRIBUTES environment variable.""" + attrs = {} + resource_attrs_str = os.getenv("OTEL_RESOURCE_ATTRIBUTES", "") + + if resource_attrs_str: + for pair in resource_attrs_str.split(","): + if "=" in pair: + key, value = pair.split("=", 1) + attrs[key.strip()] = value.strip() + + return attrs + + def get_resource_attributes(self) -> Dict[str, str]: + """Get complete set of resource attributes for OTEL tracer.""" + attrs = { + "service.name": self.service_name, + "service.namespace": self.namespace, + "k8s.namespace.name": self.namespace, + "phoenix.project.name": self.phoenix_project, + "deployment.environment": self.deployment_env, + } + attrs.update(self.extra_resource_attrs) + return attrs + + +def setup_observability(config: Optional[ObservabilityConfig] = None) -> TracerProvider: + """ + Set up OpenTelemetry tracing with OTEL GenAI instrumentation. + + This function: + 1. Creates OTEL tracer provider with proper resource attributes + 2. Configures OTLP exporter (HTTP or gRPC) to send traces to collector + 3. Instruments OpenAI SDK with OTEL GenAI semantic conventions (gen_ai.*) + 4. Instruments asyncio for context propagation across async tasks + + The OTEL Collector handles conversion from GenAI to OpenInference format + for Phoenix compatibility using the transform/genai_to_openinference processor. + + IMPORTANT: Call this BEFORE importing openai or autogen to ensure + proper instrumentation. + + Args: + config: Optional ObservabilityConfig. If not provided, creates default. + + Returns: + Configured TracerProvider (can be passed to AutoGen runtime) + """ + if config is None: + config = ObservabilityConfig() + + logger.info("=" * 70) + logger.info("Setting up OpenTelemetry observability (OTEL GenAI)") + logger.info("-" * 70) + logger.info(f"Service Name: {config.service_name}") + logger.info(f"Namespace: {config.namespace}") + logger.info(f"Phoenix Project: {config.phoenix_project}") + logger.info(f"OTLP Endpoint: {config.otlp_endpoint}") + logger.info(f"OTLP Protocol: {config.otlp_protocol}") + logger.info("=" * 70) + + # Create resource with all attributes + resource = Resource(attributes=config.get_resource_attributes()) + + # Create tracer provider + tracer_provider = TracerProvider(resource=resource) + + # Note: GenAI to OpenInference conversion is handled by the OTEL Collector + # using the transform/genai_to_openinference processor. This keeps agent code + # simple and allows centralized configuration of the conversion logic. + + # Add OTLP exporter + otlp_exporter = _get_otlp_exporter(config.otlp_endpoint, config.otlp_protocol) + tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) + + # Set global tracer provider + trace.set_tracer_provider(tracer_provider) + + # NOTE: We intentionally DO NOT use opentelemetry-instrumentation-openai-v2 here. + # The OpenAI SDK instrumentation creates LLM spans that break context propagation + # when AutoGen crosses async/sync boundaries (ThreadPoolExecutor). + # + # Instead, we use httpx instrumentation (below) which: + # 1. Captures HTTP calls to api.openai.com with proper parent context + # 2. Creates spans that are properly nested under the agent span + # + # The OTEL Collector can then enrich these HTTP spans with LLM attributes + # using transform rules if needed for Phoenix visualization. + + # Instrument asyncio to ensure context propagation across async tasks + # This is critical for AutoGen which uses asyncio.create_task internally + try: + from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor + AsyncioInstrumentor().instrument(tracer_provider=tracer_provider) + logger.info("Asyncio instrumented for context propagation") + except ImportError: + logger.warning("opentelemetry-instrumentation-asyncio not installed, context may not propagate in async tasks") + + # Instrument httpx for HTTP-level context propagation + # This is CRITICAL for AutoGen/OpenAI because: + # 1. OpenAI SDK uses httpx as its HTTP client + # 2. AutoGen crosses async/sync boundaries (ThreadPoolExecutor), breaking contextvars + # 3. httpx instrumentation captures context at HTTP layer where it IS preserved + # 4. This ensures LLM spans become children of the agent span + try: + from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor + HTTPXClientInstrumentor().instrument(tracer_provider=tracer_provider) + logger.info("HTTPX instrumented for HTTP-level context propagation") + except ImportError: + logger.warning("opentelemetry-instrumentation-httpx not installed, LLM spans may be disconnected") + + # Configure W3C Trace Context and Baggage propagators for distributed tracing + set_global_textmap(CompositePropagator([ + TraceContextTextMapPropagator(), + W3CBaggagePropagator(), + ])) + + logger.info("W3C Trace Context and Baggage propagators configured") + logger.info("Traces will route to Phoenix project: %s", config.phoenix_project) + + return tracer_provider + + +# Global tracer for creating manual spans +_tracer: Optional[trace.Tracer] = None + + +def get_tracer() -> trace.Tracer: + """Get the global tracer for creating manual spans.""" + global _tracer + if _tracer is None: + _tracer = trace.get_tracer(TRACER_NAME) + return _tracer + + +@contextmanager +def create_agent_span( + name: str = "agent_task", + task_id: Optional[str] = None, + context_id: Optional[str] = None, + user_id: Optional[str] = None, + input_text: Optional[str] = None, +): + """ + Create a root AGENT span for the conversation. + + This span serves as the root for all OpenAI auto-instrumented spans, + providing a clear entry point for each agent interaction. + + Args: + name: Span name (default: "agent_task") + task_id: A2A task ID for filtering conversations + context_id: A2A context ID (conversation session) + user_id: User identifier + input_text: The user's input message + + Yields: + The created span + """ + tracer = get_tracer() + + # Build attributes using OTEL GenAI semantic conventions + attributes = { + "gen_ai.operation.name": "agent", + } + + # Add A2A task/context IDs as custom attributes for filtering + if task_id: + attributes["a2a.task_id"] = task_id + if context_id: + attributes["a2a.context_id"] = context_id + if user_id: + attributes["user.id"] = user_id + if input_text: + attributes["gen_ai.prompt"] = input_text + + with tracer.start_as_current_span(name, attributes=attributes) as span: + try: + yield span + span.set_status(Status(StatusCode.OK)) + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + +def extract_trace_context(headers: Dict[str, str]) -> context.Context: + """Extract trace context from HTTP headers.""" + return extract(headers) + + +def inject_trace_context(headers: Dict[str, str]) -> Dict[str, str]: + """Inject current trace context into HTTP headers.""" + inject(headers) + return headers + + +@contextmanager +def trace_context_from_headers(headers: Dict[str, str]): + """ + Context manager that activates trace context from HTTP headers. + + Use this to wrap request handling code so that all spans created + within the context become children of the incoming trace. + """ + ctx = extract(headers) + token = context.attach(ctx) + try: + yield ctx + finally: + context.detach(token) + + +def set_baggage_context(context_data: Dict[str, Any]) -> context.Context: + """ + Set OTEL baggage for context propagation across services. + + Args: + context_data: Dict with keys like user_id, request_id, tenant_id + + Returns: + Updated context with baggage + """ + ctx = context.get_current() + + for key, value in context_data.items(): + if value is not None: + ctx = baggage.set_baggage(key, str(value), context=ctx) + logger.debug(f"Set baggage: {key}={value}") + + context.attach(ctx) + return ctx + + +def extract_baggage_from_headers(headers: Dict[str, str]) -> Dict[str, str]: + """ + Extract baggage context from HTTP headers. + + Common headers to extract: + - user-id, x-user-id + - request-id, x-request-id + - conversation-id, x-conversation-id + """ + baggage_data = {} + headers_lower = {k.lower(): v for k, v in headers.items()} + + header_mappings = { + "user-id": "user_id", + "x-user-id": "user_id", + "request-id": "request_id", + "x-request-id": "request_id", + "conversation-id": "conversation_id", + "x-conversation-id": "conversation_id", + "tenant-id": "tenant_id", + "x-tenant-id": "tenant_id", + } + + for header_name, baggage_key in header_mappings.items(): + if header_name in headers_lower: + baggage_data[baggage_key] = headers_lower[header_name] + + logger.debug(f"Extracted baggage from headers: {baggage_data}") + return baggage_data diff --git a/agents/k8s_debug_agent/pyproject.toml b/agents/k8s_debug_agent/pyproject.toml index 55d10a0..b6a1289 100644 --- a/agents/k8s_debug_agent/pyproject.toml +++ b/agents/k8s_debug_agent/pyproject.toml @@ -7,6 +7,22 @@ requires-python = ">=3.10" dependencies = [ "ag2[openai,mcp]>=0.10.0", "a2a-sdk>=0.3.10", + # OpenTelemetry Core + "opentelemetry-api>=1.20.0", + "opentelemetry-sdk>=1.20.0", + "opentelemetry-exporter-otlp>=1.20.0", + "opentelemetry-exporter-otlp-proto-http>=1.20.0", + "opentelemetry-exporter-otlp-proto-grpc>=1.20.0", + # Asyncio instrumentation for context propagation across async tasks + "opentelemetry-instrumentation-asyncio>=0.49b0", + # HTTPX instrumentation - captures HTTP calls to OpenAI with proper trace context + # This is the PRIMARY instrumentation for LLM calls because: + # 1. AutoGen crosses async/sync boundaries (ThreadPoolExecutor), breaking contextvars + # 2. httpx captures context at HTTP layer where it IS preserved + # 3. OpenAI SDK uses httpx internally, so all LLM calls are captured + # Note: We intentionally DO NOT use opentelemetry-instrumentation-openai-v2 because + # it creates orphaned LLM spans due to AutoGen's threading model. + "opentelemetry-instrumentation-httpx>=0.49b0", ] [project.optional-dependencies] diff --git a/agents/orchestrator_agent/Dockerfile b/agents/orchestrator_agent/Dockerfile new file mode 100644 index 0000000..e94dd93 --- /dev/null +++ b/agents/orchestrator_agent/Dockerfile @@ -0,0 +1,25 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Copy requirements and install dependencies +# --pre flag is required for pre-release packages like opentelemetry-instrumentation-openai-v2 +COPY requirements.txt . +RUN pip install --no-cache-dir --pre -r requirements.txt + +# Copy application code +COPY orchestrator_agent/ ./orchestrator_agent/ +COPY a2a_agent.py . + +# OpenShift compatibility: set group permissions for arbitrary UID +RUN chmod -R g+rwX /app && \ + chgrp -R 0 /app + +# Run as non-root user +USER 1000 + +# Expose port +EXPOSE 8000 + +# Run the A2A agent +CMD ["python", "a2a_agent.py"] diff --git a/agents/orchestrator_agent/a2a_agent.py b/agents/orchestrator_agent/a2a_agent.py new file mode 100644 index 0000000..56fb826 --- /dev/null +++ b/agents/orchestrator_agent/a2a_agent.py @@ -0,0 +1,253 @@ +"""A2A-compatible Orchestrator Agent entrypoint.""" + +# CRITICAL: Initialize OpenTelemetry instrumentation BEFORE importing any libraries +# that we want to instrument (autogen, openai). This ensures the instrumentors can +# properly patch the library classes before they are used. +from orchestrator_agent.observability import ( + setup_observability, + create_agent_span, + trace_context_from_headers, + set_baggage_context, + extract_baggage_from_headers, +) + +setup_observability() + +# Import OpenInference context manager for adding attributes to all spans +try: + from openinference.instrumentation import using_attributes +except ImportError: + from contextlib import nullcontext as using_attributes + +# Now import remaining dependencies (autogen imports happen AFTER instrumentation) +import logging +import sys +import traceback +from typing import Optional + +import uvicorn +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.apps import A2AStarletteApplication +from a2a.server.events.event_queue import EventQueue +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore, TaskUpdater +from a2a.types import ( + AgentCapabilities, + AgentCard, + AgentSkill, + TaskState, + TextPart, +) +from a2a.utils import new_agent_text_message, new_task +from autogen.mcp.mcp_client import Toolkit, create_toolkit +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client + +from orchestrator_agent.config import settings +from orchestrator_agent.event import Event +from orchestrator_agent.main import OrchestratorAgent + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=settings.LOG_LEVEL, + stream=sys.stdout, + format="%(levelname)s: %(message)s", +) + + +def get_agent_card(host: str, port: int): + """Returns the Agent Card for the Orchestrator agent.""" + + capabilities = AgentCapabilities(streaming=True) + skill = AgentSkill( + id="orchestrate", + name="Task Orchestration", + description="Routes tasks to specialized agents based on their capabilities.", + tags=["orchestration", "routing", "coordination", "a2a"], + examples=[ + "Get the weather in London and check Kubernetes pod status", + "List all available agents and their capabilities", + "Delegate this task to the most appropriate agent", + ], + ) + return AgentCard( + name="Orchestrator Agent", + description="Intelligent task router that discovers and coordinates specialized agents via A2A protocol.", + url=f"http://{host}:{port}/", + version="1.0.0", + default_input_modes=["text"], + default_output_modes=["text"], + capabilities=capabilities, + skills=[skill], + ) + + +class A2AEvent: + """Task event bridge that streams updates back to the A2A control plane.""" + + def __init__(self, task_updater: TaskUpdater): + self.task_updater = task_updater + self._completed = False + + async def emit_event(self, message: str, final: bool = False) -> None: + logger.info("Emitting event %s", message) + + if self._completed: + logger.warning("Task already completed, skipping emit_event") + return + + if final: + parts = [TextPart(text=message)] + try: + await self.task_updater.add_artifact(parts) + await self.task_updater.complete() + self._completed = True + except RuntimeError as e: + if "terminal state" in str(e): + logger.warning("Task already in terminal state: %s", e) + self._completed = True + else: + raise + else: + await self.task_updater.update_status( + TaskState.working, + new_agent_text_message( + message, + self.task_updater.context_id, + self.task_updater.task_id, + ), + ) + + +class OrchestratorExecutor(AgentExecutor): + """Adapter that wires the Orchestrator agent into the A2A runtime.""" + + async def _run_agent( + self, + messages: list[dict[str, str]], + event_emitter: Event, + toolkit: Optional[Toolkit], + ) -> None: + agent = OrchestratorAgent( + eventer=event_emitter, + mcp_toolkit=toolkit, + ) + result = await agent.execute(messages) + await event_emitter.emit_event(result, True) + + async def execute(self, context: RequestContext, event_queue: EventQueue): + """Executes the orchestration task.""" + + user_input = [context.get_user_input()] + task = context.current_task + if not task: + task = new_task(context.message) + await event_queue.enqueue_event(task) + task_updater = TaskUpdater(event_queue, task.id, task.context_id) + event_emitter = A2AEvent(task_updater) + messages: list[dict[str, str]] = [] + for message in user_input: + messages.append({"role": "User", "content": message}) + + # Extract headers for trace propagation + headers = {} + if hasattr(context, 'headers'): + headers = dict(context.headers) + elif hasattr(context, 'message') and context.message: + if hasattr(context.message, 'headers') and context.message.headers: + headers = dict(context.message.headers) + elif hasattr(context.message, 'metadata') and context.message.metadata: + headers = context.message.metadata.get('headers', {}) + + # Extract baggage from headers + baggage_data = extract_baggage_from_headers(headers) + if task: + baggage_data['task_id'] = task.id + baggage_data['context_id'] = task.context_id + if 'user_id' not in baggage_data: + baggage_data['user_id'] = 'anonymous' + + # OpenInference attributes + oi_session_id = task.context_id if task else baggage_data.get('context_id') + oi_user_id = baggage_data.get('user_id', 'anonymous') + oi_metadata = { + 'task_id': task.id if task else baggage_data.get('task_id'), + 'request_id': baggage_data.get('request_id'), + } + + toolkit: Optional[Toolkit] = None + try: + with trace_context_from_headers(headers): + set_baggage_context(baggage_data) + + with using_attributes( + session_id=oi_session_id, + user_id=oi_user_id, + metadata=oi_metadata, + ): + with create_agent_span( + name="orchestrator_agent", + task_id=task.id if task else None, + context_id=task.context_id if task else None, + user_id=oi_user_id, + input_text=user_input[0] if user_input else None, + ): + if settings.MCP_URL: + logging.info("Connecting to a2a-bridge at %s", settings.MCP_URL) + async with ( + streamablehttp_client(url=settings.MCP_URL) as ( + read_stream, + write_stream, + _, + ), + ClientSession(read_stream, write_stream) as session, + ): + await session.initialize() + toolkit = await create_toolkit( + session=session, + use_mcp_resources=False, + ) + await self._run_agent(messages, event_emitter, toolkit) + else: + logging.warning("No MCP_URL configured - orchestrator has no tools") + await self._run_agent(messages, event_emitter, toolkit) + + except Exception as exc: + traceback.print_exc() + await event_emitter.emit_event( + f"Orchestration failed: {exc}", + True, + ) + + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + """Not implemented.""" + raise Exception("cancel not supported") + + +def run(): + """Runs the A2A Agent application.""" + + agent_card = get_agent_card(host="0.0.0.0", port=settings.SERVICE_PORT) + + request_handler = DefaultRequestHandler( + agent_executor=OrchestratorExecutor(), + task_store=InMemoryTaskStore(), + ) + + server = A2AStarletteApplication( + agent_card=agent_card, + http_handler=request_handler, + ) + + app = server.build() + + uvicorn.run(app, host="0.0.0.0", port=settings.SERVICE_PORT) + + +def main(): + """Console script entrypoint for packaging compatibility.""" + run() + + +if __name__ == "__main__": + main() diff --git a/agents/orchestrator_agent/orchestrator_agent/__init__.py b/agents/orchestrator_agent/orchestrator_agent/__init__.py new file mode 100644 index 0000000..20e4be7 --- /dev/null +++ b/agents/orchestrator_agent/orchestrator_agent/__init__.py @@ -0,0 +1 @@ +"""Orchestrator Agent - Routes tasks to specialized agents via A2A protocol.""" diff --git a/agents/orchestrator_agent/orchestrator_agent/agents.py b/agents/orchestrator_agent/orchestrator_agent/agents.py new file mode 100644 index 0000000..dca0e63 --- /dev/null +++ b/agents/orchestrator_agent/orchestrator_agent/agents.py @@ -0,0 +1,67 @@ +"""AutoGen agent definitions for the Orchestrator Agent.""" + +import logging +import sys + +from autogen import ConversableAgent +from autogen.mcp.mcp_client import Toolkit + +from orchestrator_agent.config import settings +from orchestrator_agent.prompts import ORCHESTRATOR_SYSTEM_PROMPT + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=settings.LOG_LEVEL, stream=sys.stdout, format="%(levelname)s: %(message)s" +) + + +class Agents: + """AutoGen agent definitions for orchestration.""" + + def __init__(self, mcp_toolkit: Toolkit = None): + """Initialize agents with optional MCP toolkit for a2a-bridge.""" + self.llm_config = { + "config_list": [ + { + "model": settings.TASK_MODEL_ID, + "base_url": settings.LLM_API_BASE, + "api_type": "openai", + "api_key": settings.LLM_API_KEY, + } + ], + "temperature": settings.MODEL_TEMPERATURE, + } + + # Main orchestrator agent that routes tasks + self.orchestrator = ConversableAgent( + name="Orchestrator", + system_message=ORCHESTRATOR_SYSTEM_PROMPT, + llm_config=self.llm_config, + code_execution_config=False, + human_input_mode="NEVER", + ) + + # User proxy that executes tools + self.user_proxy = ConversableAgent( + name="User", + human_input_mode="NEVER", + code_execution_config=False, + is_termination_msg=lambda msg: msg + and "content" in msg + and msg["content"] is not None + and ( + "##DONE##" in msg["content"] + or "##TERMINATE##" in msg["content"] + or ("tool_calls" not in msg and msg["content"] == "") + ), + ) + + # Register MCP tools (a2a-bridge) if available + if mcp_toolkit is not None: + logger.info("Registering a2a-bridge MCP tools") + mcp_toolkit.register_for_execution(self.user_proxy) + mcp_toolkit.register_for_llm(self.orchestrator) + for tool in mcp_toolkit.tools: + logger.info(f" - {tool.name}: {tool.description}") + else: + logger.warning("No MCP toolkit provided - orchestrator will have no tools") diff --git a/agents/orchestrator_agent/orchestrator_agent/config.py b/agents/orchestrator_agent/orchestrator_agent/config.py new file mode 100644 index 0000000..bbefed3 --- /dev/null +++ b/agents/orchestrator_agent/orchestrator_agent/config.py @@ -0,0 +1,32 @@ +"""Configuration settings for the Orchestrator Agent.""" + +import os +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Orchestrator agent configuration from environment variables.""" + + # LLM Configuration + LLM_API_BASE: str = os.getenv("LLM_API_BASE", "http://localhost:11434/v1") + LLM_API_KEY: str = os.getenv("LLM_API_KEY", "dummy") + TASK_MODEL_ID: str = os.getenv("TASK_MODEL_ID", "gpt-4o-mini") + MODEL_TEMPERATURE: float = float(os.getenv("MODEL_TEMPERATURE", "0.0")) + + # Service Configuration + SERVICE_PORT: int = int(os.getenv("PORT", "8000")) + SERVICE_HOST: str = os.getenv("HOST", "0.0.0.0") + + # MCP Configuration (a2a-bridge) + MCP_URL: str = os.getenv("MCP_URL", "") + MCP_TRANSPORT: str = os.getenv("MCP_TRANSPORT", "http") + + # Logging + LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO") + + class Config: + env_file = ".env" + extra = "ignore" + + +settings = Settings() diff --git a/agents/orchestrator_agent/orchestrator_agent/event.py b/agents/orchestrator_agent/orchestrator_agent/event.py new file mode 100644 index 0000000..20e66ff --- /dev/null +++ b/agents/orchestrator_agent/orchestrator_agent/event.py @@ -0,0 +1,27 @@ +"""Event interface for streaming updates.""" + +import logging +from abc import ABC, abstractmethod + + +class Event(ABC): + """Abstract interface for emitting task events.""" + + @abstractmethod + async def emit_event(self, message: str, final: bool = False) -> None: + """Emit a task event.""" + pass + + +class LoggingEvent(Event): + """Event implementation that logs messages.""" + + def __init__(self, logger: logging.Logger = None): + self.logger = logger or logging.getLogger(__name__) + + async def emit_event(self, message: str, final: bool = False) -> None: + """Log the event message.""" + if final: + self.logger.info(f"[FINAL] {message}") + else: + self.logger.info(message) diff --git a/agents/orchestrator_agent/orchestrator_agent/main.py b/agents/orchestrator_agent/orchestrator_agent/main.py new file mode 100644 index 0000000..9651fbf --- /dev/null +++ b/agents/orchestrator_agent/orchestrator_agent/main.py @@ -0,0 +1,87 @@ +"""Main orchestration logic for the Orchestrator Agent.""" + +import logging +from typing import Optional + +from autogen.mcp.mcp_client import Toolkit + +from orchestrator_agent.agents import Agents +from orchestrator_agent.config import settings +from orchestrator_agent.event import Event, LoggingEvent + + +class OrchestratorAgent: + """Orchestrator that routes tasks to specialized agents via A2A protocol.""" + + def __init__( + self, + eventer: Event = None, + mcp_toolkit: Toolkit = None, + logger: Optional[logging.Logger] = None, + ): + self.logger = logger or logging.getLogger(__name__) + self.agents = Agents(mcp_toolkit) + self.eventer = eventer or LoggingEvent(self.logger) + + async def execute(self, messages: list[dict]) -> str: + """Execute the orchestration task. + + Args: + messages: List of message dicts with role and content + + Returns: + Final response string + """ + try: + # Extract user input from messages + user_input = self._extract_user_input(messages) + await self.eventer.emit_event(f"Received request: {user_input[:100]}...") + + # Start orchestration conversation + await self.eventer.emit_event("Analyzing request and discovering agents...") + + response = await self.agents.user_proxy.a_initiate_chat( + recipient=self.agents.orchestrator, + message=user_input, + max_turns=10, # Allow multiple turns for tool use + ) + + # Extract final response + chat_history = getattr(response, "chat_history", []) + if not chat_history: + return "I was unable to process your request." + + # Find the last meaningful response + for msg in reversed(chat_history): + if isinstance(msg, dict) and msg.get("content"): + content = msg["content"] + if isinstance(content, str) and content.strip(): + return content + + return "Task completed but no response was generated." + + except Exception as e: + error_msg = f"Orchestration failed: {str(e)}" + self.logger.error(error_msg, exc_info=True) + return error_msg + + def _extract_user_input(self, messages: list[dict]) -> str: + """Extract user input from message list.""" + if not messages: + return "" + + # Get the last user message + last_msg = messages[-1] + content = last_msg.get("content", "") + + if isinstance(content, str): + return content + elif isinstance(content, list): + # Handle structured content (text parts) + text_parts = [] + for item in content: + if isinstance(item, dict) and item.get("type") == "text": + text_parts.append(item.get("text", "")) + return " ".join(text_parts) + + return str(content) diff --git a/agents/orchestrator_agent/orchestrator_agent/observability.py b/agents/orchestrator_agent/orchestrator_agent/observability.py new file mode 100644 index 0000000..2bf57f7 --- /dev/null +++ b/agents/orchestrator_agent/orchestrator_agent/observability.py @@ -0,0 +1,352 @@ +""" +OpenTelemetry observability setup for Orchestrator Agent. + +This module provides: +- Auto-instrumentation with OTEL GenAI semantic conventions (gen_ai.*) +- OpenInferenceSpanProcessor for Phoenix compatibility +- Cross-agent trace propagation via W3C Trace Context + Baggage + +Key Features: +- `setup_observability`: Configure OTEL with GenAI instrumentation +- `create_agent_span`: Create a root AGENT span for the conversation +- `trace_context_from_headers`: Extract/propagate traceparent across agents +- Auto-instrumentation of OpenAI SDK with gen_ai.* attributes + +Usage: + from orchestrator_agent.observability import ( + setup_observability, + create_agent_span, + trace_context_from_headers, + ) + + # At agent startup (BEFORE importing openai/autogen) + setup_observability() + + # In request handler - wrap execution with context + with trace_context_from_headers(headers): + with create_agent_span("agent_task", task_id="task-456") as span: + result = await agent.execute(messages) +""" + +import logging +import os +from typing import Dict, Any, Optional +from contextlib import contextmanager +from opentelemetry import trace, baggage, context +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.resources import Resource +from opentelemetry.trace import Status, StatusCode +from opentelemetry.propagate import set_global_textmap, extract, inject +from opentelemetry.propagators.composite import CompositePropagator +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.baggage.propagation import W3CBaggagePropagator + +logger = logging.getLogger(__name__) + +# Tracer name for manual spans +TRACER_NAME = "orchestrator-agent" + + +def _get_otlp_exporter(endpoint: str, protocol: str): + """ + Get the appropriate OTLP exporter based on protocol. + + Args: + endpoint: OTLP endpoint URL + protocol: Protocol to use ('grpc' or 'http/protobuf') + + Returns: + Configured OTLP span exporter + """ + if protocol.lower() == "grpc": + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter as GrpcExporter, + ) + # For gRPC, endpoint should not have http:// prefix + grpc_endpoint = endpoint.replace("http://", "").replace("https://", "") + return GrpcExporter(endpoint=grpc_endpoint, insecure=True) + else: + # Default to HTTP/protobuf + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter as HttpExporter, + ) + # Ensure endpoint has /v1/traces path for HTTP + if not endpoint.endswith("/v1/traces"): + endpoint = endpoint.rstrip("/") + "/v1/traces" + return HttpExporter(endpoint=endpoint) + + +class ObservabilityConfig: + """ + Configuration for observability setup. + + Reads from environment variables with sensible defaults. + """ + + def __init__(self): + # Service identification + self.service_name = os.getenv("OTEL_SERVICE_NAME", "orchestrator-agent") + self.namespace = os.getenv("K8S_NAMESPACE_NAME", "kagenti-agents") + self.deployment_env = os.getenv("DEPLOYMENT_ENVIRONMENT", "kind-local") + + # Phoenix project routing + self.phoenix_project = os.getenv( + "PHOENIX_PROJECT_NAME", + f"{self.namespace}-agents" + ) + + # OTLP endpoint and protocol + self.otlp_endpoint = os.getenv( + "OTEL_EXPORTER_OTLP_ENDPOINT", + "http://otel-collector.kagenti-system.svc.cluster.local:4318" + ) + self.otlp_protocol = os.getenv( + "OTEL_EXPORTER_OTLP_PROTOCOL", + "http/protobuf" + ) + + # Additional resource attributes + self.extra_resource_attrs = self._parse_resource_attrs() + + def _parse_resource_attrs(self) -> Dict[str, str]: + """Parse OTEL_RESOURCE_ATTRIBUTES environment variable.""" + attrs = {} + resource_attrs_str = os.getenv("OTEL_RESOURCE_ATTRIBUTES", "") + + if resource_attrs_str: + for pair in resource_attrs_str.split(","): + if "=" in pair: + key, value = pair.split("=", 1) + attrs[key.strip()] = value.strip() + + return attrs + + def get_resource_attributes(self) -> Dict[str, str]: + """Get complete set of resource attributes for OTEL tracer.""" + attrs = { + "service.name": self.service_name, + "service.namespace": self.namespace, + "k8s.namespace.name": self.namespace, + "phoenix.project.name": self.phoenix_project, + "deployment.environment": self.deployment_env, + } + attrs.update(self.extra_resource_attrs) + return attrs + + +def setup_observability(config: Optional[ObservabilityConfig] = None) -> TracerProvider: + """ + Set up OpenTelemetry tracing with OTEL GenAI instrumentation. + + This function: + 1. Creates OTEL tracer provider with proper resource attributes + 2. Adds OpenInferenceSpanProcessor for Phoenix compatibility + 3. Configures OTLP exporter (HTTP or gRPC) + 4. Instruments OpenAI SDK with OTEL GenAI semantic conventions + + IMPORTANT: Call this BEFORE importing openai or autogen to ensure + proper instrumentation. + + Args: + config: Optional ObservabilityConfig. If not provided, creates default. + + Returns: + Configured TracerProvider (can be passed to AutoGen runtime) + """ + if config is None: + config = ObservabilityConfig() + + logger.info("=" * 70) + logger.info("Setting up OpenTelemetry observability (OTEL GenAI)") + logger.info("-" * 70) + logger.info(f"Service Name: {config.service_name}") + logger.info(f"Namespace: {config.namespace}") + logger.info(f"Phoenix Project: {config.phoenix_project}") + logger.info(f"OTLP Endpoint: {config.otlp_endpoint}") + logger.info(f"OTLP Protocol: {config.otlp_protocol}") + logger.info("=" * 70) + + # Create resource with all attributes + resource = Resource(attributes=config.get_resource_attributes()) + + # Create tracer provider + tracer_provider = TracerProvider(resource=resource) + + # Add OpenInferenceSpanProcessor FIRST + # This converts gen_ai.* attributes to OpenInference format for Phoenix + try: + from openinference.instrumentation.openllmetry import OpenInferenceSpanProcessor + tracer_provider.add_span_processor(OpenInferenceSpanProcessor()) + logger.info("OpenInferenceSpanProcessor added for Phoenix compatibility") + except ImportError: + logger.warning("openinference-instrumentation-openllmetry not installed, skipping Phoenix conversion") + + # Add OTLP exporter + otlp_exporter = _get_otlp_exporter(config.otlp_endpoint, config.otlp_protocol) + tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) + + # Set global tracer provider + trace.set_tracer_provider(tracer_provider) + + # Auto-instrument OpenAI with OTEL GenAI semantic conventions + # This creates spans with gen_ai.* attributes that Phoenix can understand via the SpanProcessor + try: + from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor + OpenAIInstrumentor().instrument(tracer_provider=tracer_provider) + logger.info("OpenAI SDK instrumented with OTEL GenAI semantic conventions") + except ImportError: + logger.warning("opentelemetry-instrumentation-openai-v2 not installed, skipping OpenAI instrumentation") + + # Configure W3C Trace Context and Baggage propagators for distributed tracing + set_global_textmap(CompositePropagator([ + TraceContextTextMapPropagator(), + W3CBaggagePropagator(), + ])) + + logger.info("W3C Trace Context and Baggage propagators configured") + logger.info("Traces will route to Phoenix project: %s", config.phoenix_project) + + return tracer_provider + + +# Global tracer for creating manual spans +_tracer: Optional[trace.Tracer] = None + + +def get_tracer() -> trace.Tracer: + """Get the global tracer for creating manual spans.""" + global _tracer + if _tracer is None: + _tracer = trace.get_tracer(TRACER_NAME) + return _tracer + + +@contextmanager +def create_agent_span( + name: str = "agent_task", + task_id: Optional[str] = None, + context_id: Optional[str] = None, + user_id: Optional[str] = None, + input_text: Optional[str] = None, +): + """ + Create a root AGENT span for the conversation. + + This span serves as the root for all OpenAI auto-instrumented spans, + providing a clear entry point for each agent interaction. + + Args: + name: Span name (default: "agent_task") + task_id: A2A task ID for filtering conversations + context_id: A2A context ID (conversation session) + user_id: User identifier + input_text: The user's input message + + Yields: + The created span + """ + tracer = get_tracer() + + # Build attributes using OTEL GenAI semantic conventions + attributes = { + "gen_ai.operation.name": "agent", + } + + # Add A2A task/context IDs as custom attributes for filtering + if task_id: + attributes["a2a.task_id"] = task_id + if context_id: + attributes["a2a.context_id"] = context_id + if user_id: + attributes["user.id"] = user_id + if input_text: + attributes["gen_ai.prompt"] = input_text + + with tracer.start_as_current_span(name, attributes=attributes) as span: + try: + yield span + span.set_status(Status(StatusCode.OK)) + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + +def extract_trace_context(headers: Dict[str, str]) -> context.Context: + """Extract trace context from HTTP headers.""" + return extract(headers) + + +def inject_trace_context(headers: Dict[str, str]) -> Dict[str, str]: + """Inject current trace context into HTTP headers.""" + inject(headers) + return headers + + +@contextmanager +def trace_context_from_headers(headers: Dict[str, str]): + """ + Context manager that activates trace context from HTTP headers. + + Use this to wrap request handling code so that all spans created + within the context become children of the incoming trace. + """ + ctx = extract(headers) + token = context.attach(ctx) + try: + yield ctx + finally: + context.detach(token) + + +def set_baggage_context(context_data: Dict[str, Any]) -> context.Context: + """ + Set OTEL baggage for context propagation across services. + + Args: + context_data: Dict with keys like user_id, request_id, tenant_id + + Returns: + Updated context with baggage + """ + ctx = context.get_current() + + for key, value in context_data.items(): + if value is not None: + ctx = baggage.set_baggage(key, str(value), context=ctx) + logger.debug(f"Set baggage: {key}={value}") + + context.attach(ctx) + return ctx + + +def extract_baggage_from_headers(headers: Dict[str, str]) -> Dict[str, str]: + """ + Extract baggage context from HTTP headers. + + Common headers to extract: + - user-id, x-user-id + - request-id, x-request-id + - conversation-id, x-conversation-id + """ + baggage_data = {} + headers_lower = {k.lower(): v for k, v in headers.items()} + + header_mappings = { + "user-id": "user_id", + "x-user-id": "user_id", + "request-id": "request_id", + "x-request-id": "request_id", + "conversation-id": "conversation_id", + "x-conversation-id": "conversation_id", + "tenant-id": "tenant_id", + "x-tenant-id": "tenant_id", + } + + for header_name, baggage_key in header_mappings.items(): + if header_name in headers_lower: + baggage_data[baggage_key] = headers_lower[header_name] + + logger.debug(f"Extracted baggage from headers: {baggage_data}") + return baggage_data diff --git a/agents/orchestrator_agent/orchestrator_agent/prompts.py b/agents/orchestrator_agent/orchestrator_agent/prompts.py new file mode 100644 index 0000000..3fda524 --- /dev/null +++ b/agents/orchestrator_agent/orchestrator_agent/prompts.py @@ -0,0 +1,48 @@ +"""Prompts for the Orchestrator Agent.""" + +ORCHESTRATOR_SYSTEM_PROMPT = """You are an intelligent task orchestrator that routes user requests to specialized agents. + +Your role is to: +1. Analyze the user's request to understand what type of task it is +2. Use the discover_agents or list_agents tool to find available agents and their capabilities +3. Select the most appropriate agent(s) to handle the request based on their skills +4. Delegate the task using send_message_to_agent tool +5. Aggregate and summarize results from multiple agents if needed + +Available tools: +- discover_agents: Get detailed JSON about available agents +- list_agents: Get a summary table of agents with optional filtering +- send_message_to_agent: Send a task to a specific agent and get response +- send_streaming_message_to_agent: Send a task with streaming response + +When selecting an agent: +- Match the user's request to agent skills (e.g., "kubernetes" skill for k8s questions) +- Consider the agent's description to understand its capabilities +- If multiple agents could help, coordinate between them + +Always explain your reasoning and provide a clear summary of results.""" + +TASK_ROUTER_PROMPT = """Analyze this user request and determine which agent(s) should handle it. + +User Request: {user_request} + +Available Agents: +{agent_list} + +Respond with a JSON object: +{{ + "analysis": "Brief analysis of the request type", + "selected_agents": ["agent_url1", "agent_url2"], + "delegation_plan": "How to coordinate if multiple agents are needed", + "message_for_agent": "The message to send to the primary agent" +}}""" + +RESULT_AGGREGATOR_PROMPT = """Summarize the results from the delegated agents. + +Original Request: {original_request} + +Agent Responses: +{agent_responses} + +Provide a clear, concise summary that directly answers the user's original request. +Include relevant details from each agent's response.""" diff --git a/agents/orchestrator_agent/pyproject.toml b/agents/orchestrator_agent/pyproject.toml new file mode 100644 index 0000000..eae28ad --- /dev/null +++ b/agents/orchestrator_agent/pyproject.toml @@ -0,0 +1,29 @@ +[project] +name = "orchestrator-agent" +version = "0.1.0" +description = "Orchestrator agent that routes tasks to specialized agents via A2A protocol" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "a2a-sdk[all,http-server]>=0.2.5", + "ag2[openai,mcp]==0.10.2", + "openai>=1.0.0", + # OTEL GenAI Auto-Instrumentation (uses gen_ai.* semantic conventions) + "opentelemetry-instrumentation-openai-v2>=2.0b0", + # OpenInference SpanProcessor for Phoenix compatibility (converts gen_ai.* to OpenInference) + "openinference-instrumentation-openllmetry>=0.1.0", + # OpenTelemetry Core + "opentelemetry-api>=1.20.0", + "opentelemetry-exporter-otlp>=1.20.0", + "opentelemetry-sdk>=1.20.0", + "pydantic>=2.0.0", + "pydantic-settings>=2.0.0", + "uvicorn>=0.27.0", +] + +[project.scripts] +orchestrator-agent = "orchestrator_agent.a2a_agent:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" diff --git a/agents/orchestrator_agent/requirements.txt b/agents/orchestrator_agent/requirements.txt new file mode 100644 index 0000000..81ee543 --- /dev/null +++ b/agents/orchestrator_agent/requirements.txt @@ -0,0 +1,29 @@ +# A2A SDK +a2a-sdk[all,http-server]>=0.2.5 + +# AutoGen for agent orchestration (ag2 provides 'autogen' module namespace) +ag2[openai,mcp]==0.10.2 + +# MCP client for a2a-bridge +mcp>=1.0.0 + +# OpenAI SDK (used by AutoGen) +openai>=1.0.0 + +# OpenTelemetry +opentelemetry-api>=1.20.0 +opentelemetry-sdk>=1.20.0 +opentelemetry-exporter-otlp>=1.20.0 + +# OpenInference for AI observability +openinference-instrumentation>=0.1.0 +openinference-instrumentation-openai>=0.1.0 +openinference-instrumentation-autogen>=0.1.10 +openinference-semantic-conventions>=0.1.0 + +# Configuration +pydantic>=2.0.0 +pydantic-settings>=2.0.0 + +# Server +uvicorn>=0.27.0 diff --git a/agents/source_code_analyzer/a2a_agent.py b/agents/source_code_analyzer/a2a_agent.py index d10bd20..1684d9e 100644 --- a/agents/source_code_analyzer/a2a_agent.py +++ b/agents/source_code_analyzer/a2a_agent.py @@ -22,6 +22,11 @@ from source_code_analyzer.config import Settings, settings from source_code_analyzer.event import Event from source_code_analyzer.main import SourceCodeAnalyzer +from source_code_analyzer.observability import ( + setup_observability, + create_agent_span, + trace_context_from_headers, +) logger = logging.getLogger(__name__) logging.basicConfig( @@ -30,6 +35,9 @@ format="%(levelname)s: %(message)s", ) +# Initialize OpenTelemetry observability with Phoenix integration +setup_observability() + def get_agent_card(host: str, port: int) -> AgentCard: capabilities = AgentCapabilities(streaming=True) @@ -58,13 +66,28 @@ def get_agent_card(host: str, port: int) -> AgentCard: class A2AEvent(Event): def __init__(self, task_updater: TaskUpdater): self.task_updater = task_updater + self._completed = False # Track if task was already completed async def emit_event(self, message: str, final: bool = False) -> None: logger.info("event: %s", message) + + # Prevent double-completion of task + if self._completed: + logger.warning("Task already completed, skipping emit_event") + return + if final: parts = [TextPart(text=message)] - await self.task_updater.add_artifact(parts) - await self.task_updater.complete() + try: + await self.task_updater.add_artifact(parts) + await self.task_updater.complete() + self._completed = True + except RuntimeError as e: + if "terminal state" in str(e): + logger.warning("Task already in terminal state: %s", e) + self._completed = True + else: + raise else: await self.task_updater.update_status( TaskState.working, @@ -111,26 +134,40 @@ async def execute(self, context: RequestContext, event_queue: EventQueue): } ) + # Extract headers from context for trace propagation + trace_headers = {} + if hasattr(context, 'message') and context.message: + if hasattr(context.message, 'metadata') and context.message.metadata: + trace_headers = context.message.metadata.get('headers', {}) + toolkit: Optional[Toolkit] = None try: - if settings.MCP_URL: - logger.info("Connecting to MCP server at %s", settings.MCP_URL) - headers = {"X-MCP-Readonly": "true"} - if settings.MCP_TOKEN: - headers["Authorization"] = f"Bearer {settings.MCP_TOKEN}" - async with ( - streamablehttp_client(url=settings.MCP_URL, headers=headers or None) as ( - read_stream, - write_stream, - _, - ), - ClientSession(read_stream, write_stream) as session, + # Wrap execution with trace context for cross-agent trace propagation + with trace_context_from_headers(trace_headers): + with create_agent_span( + name="source_code_analyzer", + task_id=task.id if task else None, + context_id=task.context_id if task else None, + input_text=user_input[0] if user_input else None, ): - await session.initialize() - toolkit = await create_toolkit(session=session, use_mcp_resources=False) - await self._run_agent(messages, settings, event_emitter, toolkit) - else: - await self._run_agent(messages, settings, event_emitter, None) + if settings.MCP_URL: + logger.info("Connecting to MCP server at %s", settings.MCP_URL) + headers = {"X-MCP-Readonly": "true"} + if settings.MCP_TOKEN: + headers["Authorization"] = f"Bearer {settings.MCP_TOKEN}" + async with ( + streamablehttp_client(url=settings.MCP_URL, headers=headers or None) as ( + read_stream, + write_stream, + _, + ), + ClientSession(read_stream, write_stream) as session, + ): + await session.initialize() + toolkit = await create_toolkit(session=session, use_mcp_resources=False) + await self._run_agent(messages, settings, event_emitter, toolkit) + else: + await self._run_agent(messages, settings, event_emitter, None) except Exception as exc: # noqa: BLE001 traceback.print_exc() diff --git a/agents/source_code_analyzer/pyproject.toml b/agents/source_code_analyzer/pyproject.toml index 43537ac..1f88dd2 100644 --- a/agents/source_code_analyzer/pyproject.toml +++ b/agents/source_code_analyzer/pyproject.toml @@ -7,6 +7,16 @@ requires-python = ">=3.10" dependencies = [ "ag2[openai,mcp]>=0.10.0", "a2a-sdk>=0.3.10", + # OTEL GenAI Auto-Instrumentation (uses gen_ai.* semantic conventions) + "opentelemetry-instrumentation-openai-v2>=2.0b0", + # OpenInference SpanProcessor for Phoenix compatibility (converts gen_ai.* to OpenInference) + "openinference-instrumentation-openllmetry>=0.1.0", + # OpenTelemetry Core + "opentelemetry-api>=1.20.0", + "opentelemetry-sdk>=1.20.0", + "opentelemetry-exporter-otlp>=1.20.0", + "opentelemetry-exporter-otlp-proto-http>=1.20.0", + "opentelemetry-exporter-otlp-proto-grpc>=1.20.0", ] [project.optional-dependencies] @@ -34,3 +44,7 @@ extend-select = ["I"] [tool.black] line-length = 100 target-version = ["py310"] + +[tool.uv] +# Allow pre-release packages (e.g., opentelemetry-instrumentation-openai-v2 beta) +prerelease = "allow" diff --git a/agents/source_code_analyzer/source_code_analyzer/observability.py b/agents/source_code_analyzer/source_code_analyzer/observability.py new file mode 100644 index 0000000..b11fbaf --- /dev/null +++ b/agents/source_code_analyzer/source_code_analyzer/observability.py @@ -0,0 +1,352 @@ +""" +OpenTelemetry observability setup for Source Code Analyzer Agent. + +This module provides: +- Auto-instrumentation with OTEL GenAI semantic conventions (gen_ai.*) +- OpenInferenceSpanProcessor for Phoenix compatibility +- Cross-agent trace propagation via W3C Trace Context + Baggage + +Key Features: +- `setup_observability`: Configure OTEL with GenAI instrumentation +- `create_agent_span`: Create a root AGENT span for the conversation +- `trace_context_from_headers`: Extract/propagate traceparent across agents +- Auto-instrumentation of OpenAI SDK with gen_ai.* attributes + +Usage: + from source_code_analyzer.observability import ( + setup_observability, + create_agent_span, + trace_context_from_headers, + ) + + # At agent startup (BEFORE importing openai/autogen) + setup_observability() + + # In request handler - wrap execution with context + with trace_context_from_headers(headers): + with create_agent_span("agent_task", task_id="task-456") as span: + result = await agent.execute(messages) +""" + +import logging +import os +from typing import Dict, Any, Optional +from contextlib import contextmanager +from opentelemetry import trace, baggage, context +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.resources import Resource +from opentelemetry.trace import Status, StatusCode +from opentelemetry.propagate import set_global_textmap, extract, inject +from opentelemetry.propagators.composite import CompositePropagator +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.baggage.propagation import W3CBaggagePropagator + +logger = logging.getLogger(__name__) + +# Tracer name for manual spans +TRACER_NAME = "source-code-analyzer" + + +def _get_otlp_exporter(endpoint: str, protocol: str): + """ + Get the appropriate OTLP exporter based on protocol. + + Args: + endpoint: OTLP endpoint URL + protocol: Protocol to use ('grpc' or 'http/protobuf') + + Returns: + Configured OTLP span exporter + """ + if protocol.lower() == "grpc": + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter as GrpcExporter, + ) + # For gRPC, endpoint should not have http:// prefix + grpc_endpoint = endpoint.replace("http://", "").replace("https://", "") + return GrpcExporter(endpoint=grpc_endpoint, insecure=True) + else: + # Default to HTTP/protobuf + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter as HttpExporter, + ) + # Ensure endpoint has /v1/traces path for HTTP + if not endpoint.endswith("/v1/traces"): + endpoint = endpoint.rstrip("/") + "/v1/traces" + return HttpExporter(endpoint=endpoint) + + +class ObservabilityConfig: + """ + Configuration for observability setup. + + Reads from environment variables with sensible defaults. + """ + + def __init__(self): + # Service identification + self.service_name = os.getenv("OTEL_SERVICE_NAME", "source-code-analyzer") + self.namespace = os.getenv("K8S_NAMESPACE_NAME", "kagenti-agents") + self.deployment_env = os.getenv("DEPLOYMENT_ENVIRONMENT", "kind-local") + + # Phoenix project routing + self.phoenix_project = os.getenv( + "PHOENIX_PROJECT_NAME", + f"{self.namespace}-agents" + ) + + # OTLP endpoint and protocol + self.otlp_endpoint = os.getenv( + "OTEL_EXPORTER_OTLP_ENDPOINT", + "http://otel-collector.kagenti-system.svc.cluster.local:4318" + ) + self.otlp_protocol = os.getenv( + "OTEL_EXPORTER_OTLP_PROTOCOL", + "http/protobuf" + ) + + # Additional resource attributes + self.extra_resource_attrs = self._parse_resource_attrs() + + def _parse_resource_attrs(self) -> Dict[str, str]: + """Parse OTEL_RESOURCE_ATTRIBUTES environment variable.""" + attrs = {} + resource_attrs_str = os.getenv("OTEL_RESOURCE_ATTRIBUTES", "") + + if resource_attrs_str: + for pair in resource_attrs_str.split(","): + if "=" in pair: + key, value = pair.split("=", 1) + attrs[key.strip()] = value.strip() + + return attrs + + def get_resource_attributes(self) -> Dict[str, str]: + """Get complete set of resource attributes for OTEL tracer.""" + attrs = { + "service.name": self.service_name, + "service.namespace": self.namespace, + "k8s.namespace.name": self.namespace, + "phoenix.project.name": self.phoenix_project, + "deployment.environment": self.deployment_env, + } + attrs.update(self.extra_resource_attrs) + return attrs + + +def setup_observability(config: Optional[ObservabilityConfig] = None) -> TracerProvider: + """ + Set up OpenTelemetry tracing with OTEL GenAI instrumentation. + + This function: + 1. Creates OTEL tracer provider with proper resource attributes + 2. Adds OpenInferenceSpanProcessor for Phoenix compatibility + 3. Configures OTLP exporter (HTTP or gRPC) + 4. Instruments OpenAI SDK with OTEL GenAI semantic conventions + + IMPORTANT: Call this BEFORE importing openai or autogen to ensure + proper instrumentation. + + Args: + config: Optional ObservabilityConfig. If not provided, creates default. + + Returns: + Configured TracerProvider (can be passed to AutoGen runtime) + """ + if config is None: + config = ObservabilityConfig() + + logger.info("=" * 70) + logger.info("Setting up OpenTelemetry observability (OTEL GenAI)") + logger.info("-" * 70) + logger.info(f"Service Name: {config.service_name}") + logger.info(f"Namespace: {config.namespace}") + logger.info(f"Phoenix Project: {config.phoenix_project}") + logger.info(f"OTLP Endpoint: {config.otlp_endpoint}") + logger.info(f"OTLP Protocol: {config.otlp_protocol}") + logger.info("=" * 70) + + # Create resource with all attributes + resource = Resource(attributes=config.get_resource_attributes()) + + # Create tracer provider + tracer_provider = TracerProvider(resource=resource) + + # Add OpenInferenceSpanProcessor FIRST + # This converts gen_ai.* attributes to OpenInference format for Phoenix + try: + from openinference.instrumentation.openllmetry import OpenInferenceSpanProcessor + tracer_provider.add_span_processor(OpenInferenceSpanProcessor()) + logger.info("OpenInferenceSpanProcessor added for Phoenix compatibility") + except ImportError: + logger.warning("openinference-instrumentation-openllmetry not installed, skipping Phoenix conversion") + + # Add OTLP exporter + otlp_exporter = _get_otlp_exporter(config.otlp_endpoint, config.otlp_protocol) + tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) + + # Set global tracer provider + trace.set_tracer_provider(tracer_provider) + + # Auto-instrument OpenAI with OTEL GenAI semantic conventions + # This creates spans with gen_ai.* attributes that Phoenix can understand via the SpanProcessor + try: + from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor + OpenAIInstrumentor().instrument(tracer_provider=tracer_provider) + logger.info("OpenAI SDK instrumented with OTEL GenAI semantic conventions") + except ImportError: + logger.warning("opentelemetry-instrumentation-openai-v2 not installed, skipping OpenAI instrumentation") + + # Configure W3C Trace Context and Baggage propagators for distributed tracing + set_global_textmap(CompositePropagator([ + TraceContextTextMapPropagator(), + W3CBaggagePropagator(), + ])) + + logger.info("W3C Trace Context and Baggage propagators configured") + logger.info("Traces will route to Phoenix project: %s", config.phoenix_project) + + return tracer_provider + + +# Global tracer for creating manual spans +_tracer: Optional[trace.Tracer] = None + + +def get_tracer() -> trace.Tracer: + """Get the global tracer for creating manual spans.""" + global _tracer + if _tracer is None: + _tracer = trace.get_tracer(TRACER_NAME) + return _tracer + + +@contextmanager +def create_agent_span( + name: str = "agent_task", + task_id: Optional[str] = None, + context_id: Optional[str] = None, + user_id: Optional[str] = None, + input_text: Optional[str] = None, +): + """ + Create a root AGENT span for the conversation. + + This span serves as the root for all OpenAI auto-instrumented spans, + providing a clear entry point for each agent interaction. + + Args: + name: Span name (default: "agent_task") + task_id: A2A task ID for filtering conversations + context_id: A2A context ID (conversation session) + user_id: User identifier + input_text: The user's input message + + Yields: + The created span + """ + tracer = get_tracer() + + # Build attributes using OTEL GenAI semantic conventions + attributes = { + "gen_ai.operation.name": "agent", + } + + # Add A2A task/context IDs as custom attributes for filtering + if task_id: + attributes["a2a.task_id"] = task_id + if context_id: + attributes["a2a.context_id"] = context_id + if user_id: + attributes["user.id"] = user_id + if input_text: + attributes["gen_ai.prompt"] = input_text + + with tracer.start_as_current_span(name, attributes=attributes) as span: + try: + yield span + span.set_status(Status(StatusCode.OK)) + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + +def extract_trace_context(headers: Dict[str, str]) -> context.Context: + """Extract trace context from HTTP headers.""" + return extract(headers) + + +def inject_trace_context(headers: Dict[str, str]) -> Dict[str, str]: + """Inject current trace context into HTTP headers.""" + inject(headers) + return headers + + +@contextmanager +def trace_context_from_headers(headers: Dict[str, str]): + """ + Context manager that activates trace context from HTTP headers. + + Use this to wrap request handling code so that all spans created + within the context become children of the incoming trace. + """ + ctx = extract(headers) + token = context.attach(ctx) + try: + yield ctx + finally: + context.detach(token) + + +def set_baggage_context(context_data: Dict[str, Any]) -> context.Context: + """ + Set OTEL baggage for context propagation across services. + + Args: + context_data: Dict with keys like user_id, request_id, tenant_id + + Returns: + Updated context with baggage + """ + ctx = context.get_current() + + for key, value in context_data.items(): + if value is not None: + ctx = baggage.set_baggage(key, str(value), context=ctx) + logger.debug(f"Set baggage: {key}={value}") + + context.attach(ctx) + return ctx + + +def extract_baggage_from_headers(headers: Dict[str, str]) -> Dict[str, str]: + """ + Extract baggage context from HTTP headers. + + Common headers to extract: + - user-id, x-user-id + - request-id, x-request-id + - conversation-id, x-conversation-id + """ + baggage_data = {} + headers_lower = {k.lower(): v for k, v in headers.items()} + + header_mappings = { + "user-id": "user_id", + "x-user-id": "user_id", + "request-id": "request_id", + "x-request-id": "request_id", + "conversation-id": "conversation_id", + "x-conversation-id": "conversation_id", + "tenant-id": "tenant_id", + "x-tenant-id": "tenant_id", + } + + for header_name, baggage_key in header_mappings.items(): + if header_name in headers_lower: + baggage_data[baggage_key] = headers_lower[header_name] + + logger.debug(f"Extracted baggage from headers: {baggage_data}") + return baggage_data diff --git a/deploy/a2a-bridge/02-rbac.yaml b/deploy/a2a-bridge/02-rbac.yaml index 3f245ba..7182aff 100644 --- a/deploy/a2a-bridge/02-rbac.yaml +++ b/deploy/a2a-bridge/02-rbac.yaml @@ -31,6 +31,13 @@ roleRef: kind: ClusterRole name: a2a-bridge-role subjects: + # NOTE: Toolhive operator creates TWO ServiceAccounts: + # - a2a-bridge-proxy-runner: Used by the MCPServerProxy deployment (proxy) + # - a2a-bridge-sa: Used by the MCPServer statefulset (actual MCP server) + # The MCP server (statefulset) is the one that needs RBAC to access AgentCards - kind: ServiceAccount - name: a2a-bridge + name: a2a-bridge-proxy-runner + namespace: kagenti-agents + - kind: ServiceAccount + name: a2a-bridge-sa namespace: kagenti-agents diff --git a/deploy/k8s-debug-agent/04-deployment.yaml b/deploy/k8s-debug-agent/04-deployment.yaml index 119ef6b..417dd80 100644 --- a/deploy/k8s-debug-agent/04-deployment.yaml +++ b/deploy/k8s-debug-agent/04-deployment.yaml @@ -60,6 +60,19 @@ spec: value: "http://k8s-readonly-server.kagenti-agents.svc.cluster.local:8080/mcp" - name: XDG_CACHE_HOME value: "/tmp/.cache" + # OpenTelemetry / Phoenix Configuration + - name: OTEL_SERVICE_NAME + value: "k8s-debug-agent" + - name: K8S_NAMESPACE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OTEL_EXPORTER_OTLP_ENDPOINT + value: "http://otel-collector.kagenti-system.svc.cluster.local:8335" + - name: OTEL_EXPORTER_OTLP_PROTOCOL + value: "http/protobuf" + - name: PHOENIX_PROJECT_NAME + value: "kagenti-agents" resources: limits: cpu: "1000m" diff --git a/deploy/k8s-readonly-server/02-clusterrole.yaml b/deploy/k8s-readonly-server/02-clusterrole.yaml index eed0b07..bd085c6 100644 --- a/deploy/k8s-readonly-server/02-clusterrole.yaml +++ b/deploy/k8s-readonly-server/02-clusterrole.yaml @@ -7,6 +7,11 @@ metadata: app.kubernetes.io/component: mcp-tool app.kubernetes.io/part-of: kagenti rules: + # Read namespaces (cluster-wide) + - apiGroups: [""] + resources: ["namespaces"] + verbs: ["get", "list", "watch"] + # Read pods and pod logs - apiGroups: [""] resources: ["pods", "pods/log"] diff --git a/deploy/k8s-readonly-server/03-clusterrolebinding.yaml b/deploy/k8s-readonly-server/03-clusterrolebinding.yaml index 77ae0b0..4042c64 100644 --- a/deploy/k8s-readonly-server/03-clusterrolebinding.yaml +++ b/deploy/k8s-readonly-server/03-clusterrolebinding.yaml @@ -7,8 +7,9 @@ metadata: app.kubernetes.io/component: mcp-tool app.kubernetes.io/part-of: kagenti subjects: + # NOTE: MCPServer (Toolhive) operator creates SA with suffix "-sa" - kind: ServiceAccount - name: k8s-readonly-server + name: k8s-readonly-server-sa namespace: kagenti-agents roleRef: kind: ClusterRole diff --git a/tools/k8s_readonly_server/server.py b/tools/k8s_readonly_server/server.py index 67f1d11..df0a220 100644 --- a/tools/k8s_readonly_server/server.py +++ b/tools/k8s_readonly_server/server.py @@ -44,6 +44,36 @@ def validate_namespace(namespace: str) -> None: ) +@mcp.tool() +def get_namespaces() -> str: + """ + List all namespaces in the cluster. + + Returns: + JSON array of namespace information + """ + try: + namespaces = v1.list_namespace() + + namespace_list = [] + for ns in namespaces.items: + namespace_info = { + "name": ns.metadata.name, + "status": ns.status.phase, + "labels": ns.metadata.labels or {}, + "creation_timestamp": str(ns.metadata.creation_timestamp), + } + namespace_list.append(namespace_info) + + return ( + f"Found {len(namespace_list)} namespace(s) in the cluster:\n\n" + + str(namespace_list) + ) + + except ApiException as e: + raise Exception(f"Kubernetes API error: {e.reason}") + + @mcp.tool() def get_pods(namespace: str, label_selector: Optional[str] = None) -> str: """