diff --git a/python/packages/a2a/AGENTS.md b/python/packages/a2a/AGENTS.md index af6e4a492b..65355c8f50 100644 --- a/python/packages/a2a/AGENTS.md +++ b/python/packages/a2a/AGENTS.md @@ -4,20 +4,48 @@ Agent-to-Agent (A2A) protocol support for inter-agent communication. ## Main Classes -- **`A2AAgent`** - Agent wrapper that exposes an agent via the A2A protocol +- **`A2AAgent`** - Client to connect to remote A2A-compliant agents. +- **`A2AExecutor`** - Bridge to expose local agents via the A2A protocol. ## Usage +### A2AAgent (Client) + ```python from agent_framework.a2a import A2AAgent -a2a_agent = A2AAgent(agent=my_agent) +# Connect to a remote A2A agent +a2a_agent = A2AAgent(url="http://remote-agent/a2a") +response = await a2a_agent.run("Hello!") +``` + +### A2AExecutor (Server/Bridge) + +```python +from agent_framework.a2a import A2AExecutor +from a2a.server.apps import A2AStarletteApplication +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore + +# Create an A2A executor for your agent +executor = A2AExecutor(agent=my_agent) + +# Set up the request handler and server application +request_handler = DefaultRequestHandler( + agent_executor=executor, + task_store=InMemoryTaskStore(), +) + +app = A2AStarletteApplication( + agent_card=my_agent_card, + http_handler=request_handler, +).build() ``` ## Import Path ```python -from agent_framework.a2a import A2AAgent +from agent_framework.a2a import A2AAgent, A2AExecutor # or directly: -from agent_framework_a2a import A2AAgent +from agent_framework_a2a import A2AAgent, A2AExecutor ``` diff --git a/python/packages/a2a/README.md b/python/packages/a2a/README.md index 5ae15e3647..4bdfa9221e 100644 --- a/python/packages/a2a/README.md +++ b/python/packages/a2a/README.md @@ -10,11 +10,49 @@ pip install agent-framework-a2a --pre The A2A agent integration enables communication with remote A2A-compliant agents using the standardized A2A protocol. This allows your Agent Framework applications to connect to agents running on different platforms, languages, or services. +### A2AAgent (Client) + +The `A2AAgent` class is a client that wraps an A2A Client to connect the Agent Framework with external A2A-compliant agents. + +```python +from agent_framework.a2a import A2AAgent + +# Connect to a remote A2A agent +a2a_agent = A2AAgent(url="http://remote-agent/a2a") +response = await a2a_agent.run("Hello!") +``` + +### A2AExecutor (Hosting) + +The `A2AExecutor` class bridges local AI agents built with the `agent_framework` library to the A2A protocol, allowing them to be hosted and accessed by other A2A-compliant clients. + +```python +from agent_framework.a2a import A2AExecutor +from a2a.server.apps import A2AStarletteApplication +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore + +# Create an A2A executor for your agent +executor = A2AExecutor(agent=my_agent) + +# Set up the request handler and server application +request_handler = DefaultRequestHandler( + agent_executor=executor, + task_store=InMemoryTaskStore(), +) + +app = A2AStarletteApplication( + agent_card=my_agent_card, + http_handler=request_handler, +).build() +``` + ### Basic Usage Example See the [A2A agent examples](../../samples/04-hosting/a2a/) which demonstrate: - Connecting to remote A2A agents +- Hosting local agents via A2A protocol - Sending messages and receiving responses - Handling different content types (text, files, data) - Streaming responses and real-time interaction diff --git a/python/packages/a2a/agent_framework_a2a/__init__.py b/python/packages/a2a/agent_framework_a2a/__init__.py index 4b4d54ecc3..694328925c 100644 --- a/python/packages/a2a/agent_framework_a2a/__init__.py +++ b/python/packages/a2a/agent_framework_a2a/__init__.py @@ -3,6 +3,7 @@ import importlib.metadata from ._agent import A2AAgent, A2AContinuationToken +from ._a2a_executor import A2AExecutor try: __version__ = importlib.metadata.version(__name__) @@ -12,5 +13,6 @@ __all__ = [ "A2AAgent", "A2AContinuationToken", + "A2AExecutor", "__version__", ] diff --git a/python/packages/a2a/agent_framework_a2a/_a2a_executor.py b/python/packages/a2a/agent_framework_a2a/_a2a_executor.py new file mode 100644 index 0000000000..113fddc20e --- /dev/null +++ b/python/packages/a2a/agent_framework_a2a/_a2a_executor.py @@ -0,0 +1,198 @@ +# Copyright (c) Microsoft. All rights reserved. + +from asyncio import CancelledError + +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.events import EventQueue +from a2a.server.tasks import TaskUpdater +from a2a.types import FilePart, FileWithBytes, FileWithUri, Part, TaskState, TextPart +from a2a.utils import new_task +from agent_framework import ( + Agent, + Content, + Message, + WorkflowAgent, +) +from typing_extensions import override + + +class A2AExecutor(AgentExecutor): + """Execute AI agents using the A2A (Agent-to-Agent) protocol. + + The A2AExecutor bridges AI agents built with the agent_framework library and the A2A protocol, + enabling structured agent execution with event-driven communication. It handles execution + contexts, delegates history management to the agent's session, and converts agent + responses into A2A protocol events. + + The executor supports executing an Agent or WorkflowAgent. It provides comprehensive + error handling with task status updates and supports various content types including text, + binary data, and URI-based content. + + Example: + .. code-block:: python + + from a2a.server.apps import A2AStarletteApplication + from a2a.server.request_handlers import DefaultRequestHandler + from a2a.server.tasks import InMemoryTaskStore + from a2a.types import AgentCapabilities, AgentCard + from agent_framework.a2a import A2AExecutor + from agent_framework.openai import OpenAIResponsesClient + + public_agent_card = AgentCard( + name='Food Agent', + description='A simple agent that provides food-related information.', + url='http://localhost:9999/', + version='1.0.0', + defaultInputModes=['text'], + defaultOutputModes=['text'], + capabilities=AgentCapabilities(streaming=True), + skills=[], + ) + + # Create an agent + agent = OpenAIResponsesClient().as_agent( + name="Food Agent", + instructions="A simple agent that provides food-related information.", + ) + + # Set up the A2A server with the A2AExecutor + request_handler = DefaultRequestHandler( + agent_executor=A2AExecutor(agent), + task_store=InMemoryTaskStore(), + ) + + server = A2AStarletteApplication( + agent_card=public_agent_card, + http_handler=request_handler, + ).build() + + Args: + agent: The AI agent to execute. + """ + + def __init__( + self, + agent: Agent | WorkflowAgent + ): + """Initialize the A2AExecutor with the specified agent. + + Example: + .. code-block:: python + + # Set up the A2A server with the A2AExecutor + request_handler = DefaultRequestHandler( + agent_executor=A2AExecutor(agent), + task_store=InMemoryTaskStore(), + ) + + Args: + agent: The AI agent or workflow to execute. + """ + super().__init__() + self._agent: Agent | WorkflowAgent = agent + + @override + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + """Cancel agent execution. + + Cancellation is primarily managed by the A2A protocol layer. + """ + pass + + @override + async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: + """Execute the agent with the given context and event queue. + + Orchestrates the agent execution process: sets up the agent session, + executes the agent, processes response messages, and handles errors with appropriate task status updates. + """ + if context.context_id is None: + raise ValueError("Context ID must be provided in the RequestContext") + if context.message is None: + raise ValueError("Message must be provided in the RequestContext") + + query = context.get_user_input() + task = context.current_task + + if not task: + task = new_task(context.message) + await event_queue.enqueue_event(task) + + updater = TaskUpdater(event_queue, task.id, context.context_id) + await updater.submit() + + try: + await updater.start_work() + + session = self._agent.create_session(session_id=task.context_id) + + # Create a Message from the user query + user_message = Message(role="user", contents=[Content.from_text(text=query)]) + + # Run the agent with the message list + response = await self._agent.run(user_message, session=session) + + response_messages = response.messages + if not isinstance(response_messages, list): + response_messages = [response_messages] + + for message in response_messages: + await self.handle_events(message, updater) + + # Mark as complete + await updater.complete() + except CancelledError: + await updater.update_status(state=TaskState.canceled, final=True) + except Exception as e: + await updater.update_status( + state=TaskState.failed, + final=True, + message=updater.new_agent_message([Part(root=TextPart(text=str(e.args)))]), + ) + + async def handle_events(self, message: Message, updater: TaskUpdater) -> None: + """Convert agent response messages to A2A protocol events and update task status. + + Processes Message objects returned by the agent and converts them into A2A protocol format. + Handles text, data, and URI content. USER role messages are skipped. + + Users can override this method in a subclass to implement custom transformations + from their agent's Message format to A2A protocol events. + + Example: + .. code-block:: python + + class CustomA2AExecutor(A2AExecutor): + async def handle_events(self, message: Message, updater: TaskUpdater) -> None: + # Custom logic to transform message contents + if message.role == "assistant" and message.contents: + parts = [Part(root=TextPart(text=f"Custom: {message.contents[0].text}"))] + await updater.update_status( + state=TaskState.working, + message=updater.new_agent_message(parts=parts), + ) + else: + await super().handle_events(message, updater) + """ + if message.role == "user": + # This is a user message, we can ignore it in the context of task updates + return + + parts: list[Part] = [] + metadata = getattr(message, "additional_properties", None) + + for content in message.contents: + if content.type == "text" and content.text: + parts.append(Part(root=TextPart(text=content.text))) + elif content.type == "data": + base64_str = content.uri + parts.append(Part(root=FilePart(file=FileWithBytes(bytes=base64_str, mime_type=content.media_type)))) + elif content.type == "uri": + parts.append(Part(root=FilePart(file=FileWithUri(uri=content.uri, mime_type=content.media_type)))) + # Silently skip unsupported content types + + if parts: + await updater.update_status( + state=TaskState.working, + message=updater.new_agent_message(parts=parts, metadata=metadata), + ) diff --git a/python/packages/a2a/tests/test_a2a_executor.py b/python/packages/a2a/tests/test_a2a_executor.py new file mode 100644 index 0000000000..8fe9e470c8 --- /dev/null +++ b/python/packages/a2a/tests/test_a2a_executor.py @@ -0,0 +1,662 @@ +# Copyright (c) Microsoft. All rights reserved. +import base64 +from asyncio import CancelledError +from unittest.mock import AsyncMock, MagicMock, patch +from uuid import uuid4 + +from a2a.types import Task, TaskState +from agent_framework import ( + Message, + Content, + SupportsAgentRun, +) +from agent_framework._types import AgentResponse +from agent_framework.a2a import A2AExecutor +from pytest import fixture, raises + + +@fixture +def mock_agent() -> MagicMock: + """Fixture that provides a mock SupportsAgentRun.""" + agent = MagicMock(spec=SupportsAgentRun) + agent.run = AsyncMock() + return agent + + +@fixture +def mock_request_context() -> MagicMock: + """Fixture that provides a mock RequestContext.""" + request_context = MagicMock() + request_context.context_id = str(uuid4()) + request_context.get_user_input = MagicMock(return_value="Test query") + request_context.current_task = None + request_context.message = None + return request_context + + +@fixture +def mock_event_queue() -> MagicMock: + """Fixture that provides a mock EventQueue.""" + queue = AsyncMock() + queue.enqueue_event = AsyncMock() + return queue + + +@fixture +def mock_task() -> Task: + """Fixture that provides a mock Task.""" + task = MagicMock(spec=Task) + task.id = str(uuid4()) + task.context_id = str(uuid4()) + task.state = TaskState.completed + return task + + +@fixture +def mock_task_updater() -> MagicMock: + """Fixture that provides a mock TaskUpdater.""" + updater = MagicMock() + updater.submit = AsyncMock() + updater.start_work = AsyncMock() + updater.complete = AsyncMock() + updater.update_status = AsyncMock() + updater.new_agent_message = MagicMock() + return updater + + +@fixture +def executor(mock_agent: MagicMock) -> A2AExecutor: + """Fixture that provides an A2AExecutor.""" + return A2AExecutor(agent=mock_agent) + + +class TestA2AExecutorInitialization: + """Tests for A2AExecutor initialization.""" + + def test_initialization_with_agent_only(self, mock_agent: MagicMock) -> None: + """Arrange: Create mock agent + Act: Initialize A2AExecutor with only agent + Assert: Executor is created + """ + # Act + executor = A2AExecutor(agent=mock_agent) + + # Assert + assert executor._agent is mock_agent + + +class TestA2AExecutorCancel: + """Tests for the cancel method.""" + + async def test_cancel_method_completes( + self, + executor: A2AExecutor, + mock_request_context: MagicMock, + mock_event_queue: MagicMock, + ) -> None: + """Arrange: Create executor with dependencies + Act: Call cancel method + Assert: Method completes without raising error + """ + # Act & Assert (should not raise) + await executor.cancel(mock_request_context, mock_event_queue) # type: ignore + + async def test_cancel_handles_different_contexts( + self, + executor: A2AExecutor, + mock_event_queue: MagicMock, + ) -> None: + """Arrange: Create executor with multiple request contexts + Act: Call cancel with different contexts + Assert: Each cancel completes successfully + """ + # Arrange + context1 = MagicMock() + context2 = MagicMock() + + # Act & Assert + await executor.cancel(context1, mock_event_queue) # type: ignore + await executor.cancel(context2, mock_event_queue) # type: ignore + + +class TestA2AExecutorExecute: + """Tests for the execute method.""" + + async def test_execute_with_existing_task_succeeds( + self, + executor: A2AExecutor, + mock_request_context: MagicMock, + mock_event_queue: MagicMock, + mock_task: Task, + ) -> None: + """Arrange: Create executor with mocked dependencies and existing task + Act: Call execute method + Assert: Execution completes successfully + """ + # Arrange + mock_request_context.get_user_input = MagicMock(return_value="Hello") + mock_request_context.current_task = mock_task + mock_request_context.context_id = "ctx-123" + mock_request_context.message = MagicMock() + + response_message = Message(role="assistant", contents=[Content.from_text(text="Hello back")]) + response = MagicMock(spec=AgentResponse) + response.messages = [response_message] + executor._agent.run = AsyncMock(return_value=response) + executor._agent.create_session = MagicMock() + + with patch("agent_framework_a2a._a2a_executor.TaskUpdater") as mock_updater_class: + mock_updater = MagicMock() + mock_updater.submit = AsyncMock() + mock_updater.start_work = AsyncMock() + mock_updater.complete = AsyncMock() + mock_updater.update_status = AsyncMock() + mock_updater.new_agent_message = MagicMock(return_value="message_obj") + mock_updater_class.return_value = mock_updater + + # Act + await executor.execute(mock_request_context, mock_event_queue) + + # Assert + mock_updater.submit.assert_called_once() + mock_updater.start_work.assert_called_once() + mock_updater.complete.assert_called_once() + executor._agent.create_session.assert_called_once() + executor._agent.run.assert_called_once() + + async def test_execute_creates_task_when_not_exists( + self, + executor: A2AExecutor, + mock_request_context: MagicMock, + mock_event_queue: MagicMock, + ) -> None: + """Arrange: Create executor with request context without task + Act: Call execute method + Assert: New task is created and enqueued + """ + # Arrange + mock_message = MagicMock() + mock_request_context.get_user_input = MagicMock(return_value="Hello") + mock_request_context.current_task = None + mock_request_context.message = mock_message + mock_request_context.context_id = "ctx-123" + + response_message = Message(role="assistant", contents=[Content.from_text(text="Response")]) + response = MagicMock(spec=AgentResponse) + response.messages = [response_message] + executor._agent.run = AsyncMock(return_value=response) + executor._agent.create_session = MagicMock() + + with patch("agent_framework_a2a._a2a_executor.new_task") as mock_new_task: + mock_task = MagicMock(spec=Task) + mock_task.id = "task-new" + mock_task.context_id = "ctx-123" + mock_new_task.return_value = mock_task + + with patch("agent_framework_a2a._a2a_executor.TaskUpdater") as mock_updater_class: + mock_updater = MagicMock() + mock_updater.submit = AsyncMock() + mock_updater.start_work = AsyncMock() + mock_updater.complete = AsyncMock() + mock_updater.update_status = AsyncMock() + mock_updater.new_agent_message = MagicMock(return_value="message_obj") + mock_updater_class.return_value = mock_updater + + # Act + await executor.execute(mock_request_context, mock_event_queue) + + # Assert + mock_new_task.assert_called_once() + mock_event_queue.enqueue_event.assert_called_once() + + async def test_execute_raises_error_when_context_id_missing( + self, + executor: A2AExecutor, + mock_request_context: MagicMock, + mock_event_queue: MagicMock, + ) -> None: + """Arrange: Create context without context_id + Act: Call execute method + Assert: ValueError is raised + """ + # Arrange + mock_request_context.context_id = None + mock_request_context.message = MagicMock() + + # Act & Assert + with raises(ValueError) as excinfo: + await executor.execute(mock_request_context, mock_event_queue) + + # Assert + assert "Context ID" in str(excinfo.value) + + async def test_execute_raises_error_when_message_missing( + self, + executor: A2AExecutor, + mock_request_context: MagicMock, + mock_event_queue: MagicMock, + ) -> None: + """Arrange: Create context without message + Act: Call execute method + Assert: ValueError is raised + """ + # Arrange + mock_request_context.context_id = "ctx-123" + mock_request_context.message = None + + # Act & Assert + with raises(ValueError) as excinfo: + await executor.execute(mock_request_context, mock_event_queue) + + # Assert + assert "Message" in str(excinfo.value) + + async def test_execute_handles_cancelled_error( + self, + executor: A2AExecutor, + mock_request_context: MagicMock, + mock_event_queue: MagicMock, + mock_task: Task, + ) -> None: + """Arrange: Create executor that raises CancelledError + Act: Call execute method + Assert: Error is caught and task is marked as canceled + """ + # Arrange + mock_request_context.get_user_input = MagicMock(return_value="Hello") + mock_request_context.current_task = mock_task + mock_request_context.context_id = "ctx-123" + mock_request_context.message = MagicMock() + + executor._agent.run = AsyncMock(side_effect=CancelledError()) + executor._agent.create_session = MagicMock() + + with patch("agent_framework_a2a._a2a_executor.TaskUpdater") as mock_updater_class: + mock_updater = MagicMock() + mock_updater.submit = AsyncMock() + mock_updater.start_work = AsyncMock() + mock_updater.update_status = AsyncMock() + mock_updater_class.return_value = mock_updater + + # Act + await executor.execute(mock_request_context, mock_event_queue) # type: ignore + + # Assert + mock_updater.update_status.assert_called() + call_args_list = mock_updater.update_status.call_args_list + assert any(call[1].get("state") == TaskState.canceled for call in call_args_list) + + async def test_execute_handles_generic_exception( + self, + executor: A2AExecutor, + mock_request_context: MagicMock, + mock_event_queue: MagicMock, + mock_task: Task, + ) -> None: + """Arrange: Create executor that raises generic exception + Act: Call execute method + Assert: Error is caught and task is marked as failed + """ + # Arrange + mock_request_context.get_user_input = MagicMock(return_value="Hello") + mock_request_context.current_task = mock_task + mock_request_context.context_id = "ctx-123" + mock_request_context.message = MagicMock() + + executor._agent.run = AsyncMock(side_effect=ValueError("Test error")) + executor._agent.create_session = MagicMock() + + with patch("agent_framework_a2a._a2a_executor.TaskUpdater") as mock_updater_class: + mock_updater = MagicMock() + mock_updater.submit = AsyncMock() + mock_updater.start_work = AsyncMock() + mock_updater.update_status = AsyncMock() + mock_updater.new_agent_message = MagicMock(return_value="error_message") + mock_updater_class.return_value = mock_updater + + # Act + await executor.execute(mock_request_context, mock_event_queue) + + # Assert + call_args_list = mock_updater.update_status.call_args_list + assert any(call[1].get("state") == TaskState.failed for call in call_args_list) + + async def test_execute_processes_multiple_response_messages( + self, + executor: A2AExecutor, + mock_request_context: MagicMock, + mock_event_queue: MagicMock, + mock_task: Task, + ) -> None: + """Arrange: Create executor that returns multiple response messages + Act: Call execute method + Assert: All messages are processed through handle_events + """ + # Arrange + mock_request_context.get_user_input = MagicMock(return_value="Hello") + mock_request_context.current_task = mock_task + mock_request_context.context_id = "ctx-123" + mock_request_context.message = MagicMock() + + response_message1 = Message(role="assistant", contents=[Content.from_text(text="First")]) + response_message2 = Message(role="assistant", contents=[Content.from_text(text="Second")]) + response = MagicMock(spec=AgentResponse) + response.messages = [response_message1, response_message2] + executor._agent.run = AsyncMock(return_value=response) + executor._agent.create_session = MagicMock() + + # Mock handle_events + executor.handle_events = AsyncMock() + + with patch("agent_framework_a2a._a2a_executor.TaskUpdater") as mock_updater_class: + mock_updater = MagicMock() + mock_updater.submit = AsyncMock() + mock_updater.start_work = AsyncMock() + mock_updater.complete = AsyncMock() + mock_updater_class.return_value = mock_updater + + # Act + await executor.execute(mock_request_context, mock_event_queue) + + # Assert + assert executor.handle_events.call_count == 2 + + async def test_execute_creates_message_with_user_role( + self, + executor: A2AExecutor, + mock_request_context: MagicMock, + mock_event_queue: MagicMock, + mock_task: Task, + ) -> None: + """Arrange: Create executor with request + Act: Call execute method + Assert: Message is created with USER role and query text + """ + # Arrange + query_text = "Hello agent" + mock_request_context.get_user_input = MagicMock(return_value=query_text) + mock_request_context.current_task = mock_task + mock_request_context.context_id = "ctx-123" + mock_request_context.message = MagicMock() + + response_message = Message(role="assistant", contents=[Content.from_text(text="Response")]) + response = MagicMock(spec=AgentResponse) + response.messages = [response_message] + executor._agent.run = AsyncMock(return_value=response) + executor._agent.create_session = MagicMock() + + with patch("agent_framework_a2a._a2a_executor.TaskUpdater") as mock_updater_class: + mock_updater = MagicMock() + mock_updater.submit = AsyncMock() + mock_updater.start_work = AsyncMock() + mock_updater.complete = AsyncMock() + mock_updater.update_status = AsyncMock() + mock_updater.new_agent_message = MagicMock(return_value="message_obj") + mock_updater_class.return_value = mock_updater + + # Act + await executor.execute(mock_request_context, mock_event_queue) + + # Assert + executor._agent.run.assert_called_once() + call_args = executor._agent.run.call_args + + # The input should be the new user message + user_message = call_args[0][0] + assert isinstance(user_message, Message) + assert user_message.role == "user" + assert user_message.text == query_text + + +class TestA2AExecutorHandleEvents: + """Tests for A2AExecutor.handle_events method.""" + + @fixture + def mock_updater(self) -> MagicMock: + """Create a mock execution context.""" + updater = MagicMock() + updater.update_status = AsyncMock() + updater.new_agent_message = MagicMock(return_value="mock_message") + return updater + + async def test_ignore_user_messages(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test that messages from USER role are ignored.""" + # Arrange + message = Message( + contents=[Content.from_text(text="User input")], + role="user", + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + mock_updater.update_status.assert_not_called() + + async def test_ignore_messages_with_no_contents(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test that messages with no contents are ignored.""" + # Arrange + message = Message( + contents=[], + role="assistant", + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + mock_updater.update_status.assert_not_called() + + async def test_handle_text_content(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test handling messages with text content.""" + # Arrange + text = "Hello, this is a test message" + message = Message( + contents=[Content.from_text(text=text)], + role="assistant", + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + mock_updater.update_status.assert_called_once() + call_args = mock_updater.update_status.call_args + assert call_args.kwargs["state"] == TaskState.working + assert mock_updater.new_agent_message.called + + async def test_handle_multiple_text_contents(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test handling messages with multiple text contents.""" + # Arrange + message = Message( + contents=[ + Content.from_text(text="First message"), + Content.from_text(text="Second message"), + ], + role="assistant", + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + mock_updater.update_status.assert_called_once() + assert mock_updater.new_agent_message.called + + async def test_handle_data_content(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test handling messages with data content.""" + # Arrange + data = b"test file data" + message = Message( + contents=[Content.from_data(data=data, media_type="application/octet-stream")], + role="assistant", + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + mock_updater.update_status.assert_called_once() + call_args = mock_updater.update_status.call_args + assert call_args.kwargs["state"] == TaskState.working + + async def test_handle_uri_content(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test handling messages with URI content.""" + # Arrange + uri = "https://example.com/file.pdf" + message = Message( + contents=[Content.from_uri(uri=uri, media_type="application/pdf")], + role="assistant", + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + mock_updater.update_status.assert_called_once() + call_args = mock_updater.update_status.call_args + assert call_args.kwargs["state"] == TaskState.working + + async def test_handle_mixed_content_types(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test handling messages with mixed content types.""" + # Arrange + data = b"file data" + + message = Message( + contents=[ + Content.from_text(text="Processing file..."), + Content.from_data(data=data, media_type="application/octet-stream"), + Content.from_uri(uri="https://example.com/reference.pdf", media_type="application/pdf"), + ], + role="assistant", + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + mock_updater.update_status.assert_called_once() + call_args = mock_updater.update_status.call_args + assert call_args.kwargs["state"] == TaskState.working + + async def test_handle_with_additional_properties(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test handling messages with additional properties metadata.""" + # Arrange + additional_props = {"custom_field": "custom_value", "priority": "high"} + message = Message( + contents=[Content.from_text(text="Test message")], + role="assistant", + additional_properties=additional_props, + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + mock_updater.update_status.assert_called_once() + mock_updater.new_agent_message.assert_called_once() + call_args = mock_updater.new_agent_message.call_args + assert call_args.kwargs["metadata"] == additional_props + + async def test_handle_with_no_additional_properties(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test handling messages without additional properties.""" + # Arrange + message = Message( + contents=[Content.from_text(text="Test message")], + role="assistant", + additional_properties=None, + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + mock_updater.update_status.assert_called_once() + mock_updater.new_agent_message.assert_called_once() + call_args = mock_updater.new_agent_message.call_args + assert call_args.kwargs["metadata"] == {} + + async def test_parts_list_passed_to_new_agent_message(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test that parts list is correctly passed to new_agent_message.""" + # Arrange + message = Message( + contents=[ + Content.from_text(text="Message 1"), + Content.from_text(text="Message 2"), + ], + role="assistant", + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + mock_updater.new_agent_message.assert_called_once() + call_kwargs = mock_updater.new_agent_message.call_args.kwargs + assert "parts" in call_kwargs + parts_list = call_kwargs["parts"] + assert len(parts_list) == 2 + + async def test_task_state_always_working(self, executor: A2AExecutor, mock_updater: MagicMock) -> None: + """Test that task state is always set to working.""" + # Arrange + message = Message( + contents=[Content.from_text(text="Any message")], + role="assistant", + ) + + # Act + await executor.handle_events(message, mock_updater) + + # Assert + call_kwargs = mock_updater.update_status.call_args.kwargs + assert call_kwargs["state"] == TaskState.working + + +class TestA2AExecutorIntegration: + """Integration tests for A2AExecutor.""" + + async def test_full_execution_flow_with_responses( + self, + executor: A2AExecutor, + mock_request_context: MagicMock, + mock_event_queue: MagicMock, + mock_task: Task, + ) -> None: + """Arrange: Create executor with all mocked dependencies + Act: Execute full flow from request to completion + Assert: All components interact correctly + """ + # Arrange + mock_request_context.get_user_input = MagicMock(return_value="Hello agent") + mock_request_context.current_task = mock_task + mock_request_context.context_id = "ctx-123" + mock_request_context.message = MagicMock() + + response = MagicMock(spec=AgentResponse) + response_message = MagicMock(spec=Message) + response.messages = [response_message] + response_message.contents = [Content.from_text(text="Hello user")] + response_message.role = "assistant" + response_message.additional_properties = None + + executor._agent.run = AsyncMock(return_value=response) + executor._agent.create_session = MagicMock() + executor.handle_events = AsyncMock() + + with patch("agent_framework_a2a._a2a_executor.TaskUpdater") as mock_updater_class: + mock_updater = MagicMock() + mock_updater.submit = AsyncMock() + mock_updater.start_work = AsyncMock() + mock_updater.complete = AsyncMock() + mock_updater.update_status = AsyncMock() + mock_updater_class.return_value = mock_updater + + # Act + await executor.execute(mock_request_context, mock_event_queue) + + # Assert + mock_updater.submit.assert_called_once() + mock_updater.start_work.assert_called_once() + executor.handle_events.assert_called_once() + mock_updater.complete.assert_called_once() diff --git a/python/packages/core/agent_framework/a2a/__init__.py b/python/packages/core/agent_framework/a2a/__init__.py index 7c7de63456..90daf5380a 100644 --- a/python/packages/core/agent_framework/a2a/__init__.py +++ b/python/packages/core/agent_framework/a2a/__init__.py @@ -7,6 +7,7 @@ Supported classes: - A2AAgent +- A2AExecutor """ import importlib @@ -14,7 +15,7 @@ IMPORT_PATH = "agent_framework_a2a" PACKAGE_NAME = "agent-framework-a2a" -_IMPORTS = ["A2AAgent"] +_IMPORTS = ["A2AAgent", "A2AExecutor"] def __getattr__(name: str) -> Any: diff --git a/python/packages/core/agent_framework/a2a/__init__.pyi b/python/packages/core/agent_framework/a2a/__init__.pyi index 5a54bb22a9..83fa2748ee 100644 --- a/python/packages/core/agent_framework/a2a/__init__.pyi +++ b/python/packages/core/agent_framework/a2a/__init__.pyi @@ -2,8 +2,10 @@ from agent_framework_a2a import ( A2AAgent, + A2AExecutor ) __all__ = [ "A2AAgent", + "A2AExecutor" ] diff --git a/python/samples/04-hosting/a2a/README.md b/python/samples/04-hosting/a2a/README.md index 2ede8b8a3d..924f1109b6 100644 --- a/python/samples/04-hosting/a2a/README.md +++ b/python/samples/04-hosting/a2a/README.md @@ -11,7 +11,7 @@ For more information about the A2A protocol specification, visit: https://a2a-pr | File | Description | |------|-------------| | [`agent_with_a2a.py`](agent_with_a2a.py) | Demonstrates agent discovery, non-streaming and streaming responses using the A2A protocol. | - +| [`agent_framework_to_a2a.py`](agent_framework_to_a2a.py) | Exposes an agent_framework agent as an A2A-compliant server. Demonstrates how to wrap an agent_framework agent and expose it as an A2A service that other A2A clients can discover and communicate with. | ## Environment Variables Make sure to set the following environment variables before running the example: @@ -31,4 +31,7 @@ For quick testing and demonstration, you can use the pre-built .NET A2A servers ```powershell # Simple A2A sample (single agent) uv run python agent_with_a2a.py + +# A2A server exposing an agent_framework agent +uv run python agent_framework_to_a2a.py ``` diff --git a/python/samples/04-hosting/a2a/agent_framework_to_a2a.py b/python/samples/04-hosting/a2a/agent_framework_to_a2a.py new file mode 100644 index 0000000000..2ec3676f99 --- /dev/null +++ b/python/samples/04-hosting/a2a/agent_framework_to_a2a.py @@ -0,0 +1,62 @@ +import uvicorn + +from dotenv import load_dotenv + +from a2a.server.apps import A2AStarletteApplication +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore +from a2a.types import ( + AgentCapabilities, + AgentCard, + AgentSkill, +) + +from agent_framework.a2a import A2AExecutor +from agent_framework.openai import OpenAIResponsesClient + +load_dotenv() + +if __name__ == '__main__': + # --8<-- [start:AgentSkill] + skill = AgentSkill( + id='Food_Agent', + name='Food Agent', + description="A simple agent that provides food-related information.", + tags=['food', 'nutrition', 'recipes'], + examples=[], + ) + # --8<-- [end:AgentSkill] + + # --8<-- [start:AgentCard] + # This will be the public-facing agent card + public_agent_card = AgentCard( + name='Food Agent', + description='A simple agent that provides food-related information.', + url='http://localhost:9999/', + version='1.0.0', + defaultInputModes=['text'], + defaultOutputModes=['text'], + capabilities=AgentCapabilities(streaming=True), + skills=[skill], + ) + # --8<-- [end:AgentCard] + + agent = OpenAIResponsesClient().as_agent( + name="Food Agent", + instructions="A simple agent that provides food-related information.", + ) + + request_handler = DefaultRequestHandler( + agent_executor= A2AExecutor(agent), + task_store= InMemoryTaskStore(), + ) + + server = A2AStarletteApplication( + agent_card=public_agent_card, + http_handler=request_handler, + ) + + server = server.build() + # print(schemas.get_schema(server.routes)) + + uvicorn.run(server, host='0.0.0.0', port=9999) diff --git a/python/uv.lock b/python/uv.lock index 7233077c30..403d03263b 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -1409,7 +1409,7 @@ name = "clr-loader" version = "0.2.10" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cffi", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, + { name = "cffi", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/18/24/c12faf3f61614b3131b5c98d3bf0d376b49c7feaa73edca559aeb2aee080/clr_loader-0.2.10.tar.gz", hash = "sha256:81f114afbc5005bafc5efe5af1341d400e22137e275b042a8979f3feb9fc9446", size = 83605, upload-time = "2026-01-03T23:13:06.984Z" } wheels = [ @@ -1888,7 +1888,7 @@ name = "exceptiongroup" version = "1.3.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "typing-extensions", marker = "(python_full_version < '3.11' and sys_platform == 'darwin') or (python_full_version < '3.11' and sys_platform == 'linux') or (python_full_version < '3.11' and sys_platform == 'win32')" }, + { name = "typing-extensions", marker = "(python_full_version < '3.13' and sys_platform == 'darwin') or (python_full_version < '3.13' and sys_platform == 'linux') or (python_full_version < '3.13' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/50/79/66800aadf48771f6b62f7eb014e352e5d06856655206165d775e675a02c9/exceptiongroup-1.3.1.tar.gz", hash = "sha256:8b412432c6055b0b7d14c310000ae93352ed6754f70fa8f7c34141f91c4e3219", size = 30371, upload-time = "2025-11-21T23:01:54.787Z" } wheels = [ @@ -4661,8 +4661,8 @@ name = "powerfx" version = "0.0.34" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cffi", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, - { name = "pythonnet", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, + { name = "cffi", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "pythonnet", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/9f/fb/6c4bf87e0c74ca1c563921ce89ca1c5785b7576bca932f7255cdf81082a7/powerfx-0.0.34.tar.gz", hash = "sha256:956992e7afd272657ed16d80f4cad24ec95d9e4a79fb9dfa4a068a09e136af32", size = 3237555, upload-time = "2025-12-22T15:50:59.682Z" } wheels = [ @@ -5325,7 +5325,7 @@ name = "pythonnet" version = "3.0.5" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "clr-loader", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, + { name = "clr-loader", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/9a/d6/1afd75edd932306ae9bd2c2d961d603dc2b52fcec51b04afea464f1f6646/pythonnet-3.0.5.tar.gz", hash = "sha256:48e43ca463941b3608b32b4e236db92d8d40db4c58a75ace902985f76dac21cf", size = 239212, upload-time = "2024-12-13T08:30:44.393Z" } wheels = [