From b167d7bca35255984a80b9c1038d5821ad85ce08 Mon Sep 17 00:00:00 2001 From: Ruthvik-1411 Date: Fri, 25 Jul 2025 22:09:58 +0530 Subject: [PATCH 1/7] chore: baseline addition of multi agent integration --- adk_a2a_mcp_integration/Readme.md | 0 .../mcp_server/requirements.txt | 3 + adk_a2a_mcp_integration/mcp_server/server.py | 136 ++++++++++++++++++ adk_a2a_mcp_integration/requirements.txt | 10 ++ .../root_agent/__init__.py | 1 + adk_a2a_mcp_integration/root_agent/agent.py | 26 ++++ adk_a2a_mcp_integration/root_agent/tools.py | 77 ++++++++++ 7 files changed, 253 insertions(+) create mode 100644 adk_a2a_mcp_integration/Readme.md create mode 100644 adk_a2a_mcp_integration/mcp_server/requirements.txt create mode 100644 adk_a2a_mcp_integration/mcp_server/server.py create mode 100644 adk_a2a_mcp_integration/requirements.txt create mode 100644 adk_a2a_mcp_integration/root_agent/__init__.py create mode 100644 adk_a2a_mcp_integration/root_agent/agent.py create mode 100644 adk_a2a_mcp_integration/root_agent/tools.py diff --git a/adk_a2a_mcp_integration/Readme.md b/adk_a2a_mcp_integration/Readme.md new file mode 100644 index 0000000..e69de29 diff --git a/adk_a2a_mcp_integration/mcp_server/requirements.txt b/adk_a2a_mcp_integration/mcp_server/requirements.txt new file mode 100644 index 0000000..0ca0464 --- /dev/null +++ b/adk_a2a_mcp_integration/mcp_server/requirements.txt @@ -0,0 +1,3 @@ +fastmcp==2.10.6 +arxiv==2.2.0 +pymupdf4llm==0.0.27 \ No newline at end of file diff --git a/adk_a2a_mcp_integration/mcp_server/server.py b/adk_a2a_mcp_integration/mcp_server/server.py new file mode 100644 index 0000000..e680a59 --- /dev/null +++ b/adk_a2a_mcp_integration/mcp_server/server.py @@ -0,0 +1,136 @@ +import os +import tempfile +import requests +import arxiv +import pymupdf4llm +from fastmcp import FastMCP + +mcp = FastMCP("ArxivExplorer") + +@mcp.tool +def search_arxiv(query: str, max_results: int = 5) -> dict: + """ + Searches arXiv for a given query and returns the top papers. + Args: + query: The search keyword or query. + max_results: The maximum number of results to return. + Returns: + A list of dictionaries, where each dictionary represents a paper + and contains its ID, title, summary, authors, and PDF URL. + """ + try: + search = arxiv.Search( + query=query, + max_results=max_results, + sort_by=arxiv.SortCriterion.Relevance + ) + + papers = [] + for result in search.results(): + print(f"{result.title}") + paper_info = { + 'id': result.get_short_id(), + 'title': result.title, + 'summary': result.summary, + 'authors': [author.name for author in result.authors], + 'pdf_url': result.pdf_url + } + papers.append(paper_info) + + return { + "status": "success", + "result": papers + } + except Exception as e: + return { + "status": "error", + "error_message": str(e) + } + +@mcp.tool() +def get_paper_md(paper_id: str) -> dict: + """ + Retrieves the text content of an arXiv paper in Markdown format. + Args: + paper_id: The ID of the paper (e.g., '1706.03762v7'). + Returns: + The text content of the paper as a Markdown string. + Returns an error message if any step fails. + """ + try: + search = arxiv.Search(id_list=[paper_id]) + paper = next(search.results()) + pdf_url = paper.pdf_url + print(f"Found paper: '{paper.title}'") + print(f"Downloading from: {pdf_url}") + + except StopIteration: + return {"status": "error", "error_message": f"Paper with ID '{paper_id}' not found on arXiv."} + except Exception as e: + return {"status": "error", "error_message": f"Error searching for the paper: {e}"} + + try: + # Download the PDF content + response = requests.get(pdf_url) + response.raise_for_status() + pdf_bytes = response.content + print("PDF downloaded successfully.") + + except requests.exceptions.RequestException as e: + return {"status": "error", "error_message": f"Error downloading the PDF file, request failure: {e}"} + except Exception as e: + return {"status": "error", "error_message": f"Error downloading the PDF file: {e}"} + + temp_pdf_path = None + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: + temp_file.write(pdf_bytes) + temp_pdf_path = temp_file.name # Get the path of the temporary file + + print(f"PDF content written to temporary file: {temp_pdf_path}") + print("Converting PDF to Markdown...") + # Pass the file path to the conversion function + md_text = pymupdf4llm.to_markdown(temp_pdf_path) + + print("Conversion complete.") + if temp_pdf_path and os.path.exists(temp_pdf_path): + os.remove(temp_pdf_path) + print(f"Temporary file {temp_pdf_path} deleted.") + return {"status": "success", "result": md_text} + + except Exception as e: + return {"status": "error", "error_message": f"Error converting PDF to Markdown: {e}"} + +# To pass the paper directly as media to llm +# @mcp.tool() +# def get_paper_raw(paper_id: str) -> dict: +# """ +# Retrieves the raw PDF file of an arXiv paper. +# Args: +# paper_id: The ID of the paper (e.g., '1706.03762v7'). +# Returns: +# The raw bytes of the PDF file, or None if the paper is not found. +# """ +# try: +# # Search for the paper by its ID +# search = arxiv.Search(id_list=[paper_id]) +# paper = next(search.results()) + +# # Download the PDF content +# response = requests.get(paper.pdf_url) +# response.raise_for_status() +# return { +# "status": "success", +# "result":response.content +# } +# except StopIteration: +# return {"status": "error", "error_message": f"Paper with ID {paper_id} not found on arXiv."} +# except requests.exceptions.RequestException as e: +# print(f"Error downloading PDF: {e}") +# return {"status": "error", "error_message": f"Error downloading PDF: {e}"} +# except Exception as e: +# print(f"Error: {e}") +# return {"status": "error", "error_message": f"Error: {e}"} + +if __name__ == "__main__": + mcp.run(transport="http", host="127.0.0.1", port=8000, path="/mcp") diff --git a/adk_a2a_mcp_integration/requirements.txt b/adk_a2a_mcp_integration/requirements.txt new file mode 100644 index 0000000..496c90f --- /dev/null +++ b/adk_a2a_mcp_integration/requirements.txt @@ -0,0 +1,10 @@ +# mcp server requirements +fastmcp==2.10.6 +arxiv==2.2.0 +pymupdf4llm==0.0.27 + +# general a2a requirements +a2a-sdk==0.2.16 + +# adk requirements +google-adk==1.8.0 \ No newline at end of file diff --git a/adk_a2a_mcp_integration/root_agent/__init__.py b/adk_a2a_mcp_integration/root_agent/__init__.py new file mode 100644 index 0000000..63bd45e --- /dev/null +++ b/adk_a2a_mcp_integration/root_agent/__init__.py @@ -0,0 +1 @@ +from . import agent \ No newline at end of file diff --git a/adk_a2a_mcp_integration/root_agent/agent.py b/adk_a2a_mcp_integration/root_agent/agent.py new file mode 100644 index 0000000..af43edd --- /dev/null +++ b/adk_a2a_mcp_integration/root_agent/agent.py @@ -0,0 +1,26 @@ +from google.adk.agents import Agent +from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StreamableHTTPConnectionParams + +from root_agent.tools import get_current_time, calculate_expression + +simple_mcp_tool = MCPToolset( + connection_params=StreamableHTTPConnectionParams( + url="http://localhost:8000/mcp", + timeout=10, + sse_read_timeout=60 * 5, + terminate_on_close=True, + ), + tool_filter=["search_arxiv","get_paper_md"] +) + +root_agent = Agent( + name="orchestrator_agent", + model="gemini-2.0-flash", + description=( + "Agent to answer questions using tools provided." + ), + instruction=( + "You are a helpful agent who can answer user questions about current time and can do calculations and explore arxiv collection." + ), + tools=[get_current_time, calculate_expression, simple_mcp_tool], +) diff --git a/adk_a2a_mcp_integration/root_agent/tools.py b/adk_a2a_mcp_integration/root_agent/tools.py new file mode 100644 index 0000000..54448f7 --- /dev/null +++ b/adk_a2a_mcp_integration/root_agent/tools.py @@ -0,0 +1,77 @@ +import math +import datetime +from zoneinfo import ZoneInfo + +def get_current_time(country: str) -> dict: + """Returns the current time in a specified country. + Args: + country (str): The name of the country for which to retrieve the current time. + Returns: + dict: status and result or error msg. + """ + if country.lower() == "india": + tz_identifier = "Asia/Kolkata" + else: + return { + "status": "error", + "error_message": ( + f"Sorry, I don't have timezone information for {country}." + ), + } + + tz = ZoneInfo(tz_identifier) + now = datetime.datetime.now(tz) + report = ( + f'The current time in {country} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}' + ) + return {"status": "success", "result": report} + +ALLOWED_FUNCTIONS = { + "math": math, + "exp": math.exp, + "log": math.log, + "log10": math.log10, + "sqrt": math.sqrt, + "pi": math.pi, + "e": math.e, + "ceil": math.ceil, + "floor": math.floor, + "round": round, + "factorial": math.factorial, + "isinf": math.isinf, + "isnan": math.isnan, + "isqrt": math.isqrt, +} + +def calculate_expression(expression: str) -> dict: + """Evaluates a mathematical expression and returns the result. + Supports basic operators (+, -, *, /, **, %), mathematical functions + and constants (pi, e). Uses a restricted evaluation context for safe execution. + + Args: + expression: The mathematical expression to evaluate as a string. + Examples: "2 + 2", "sqrt(16) * 2", "log(100, 10)" + Returns: + On success: {"result": } + On error: {"error": } + + Notes: + - Use 'x' as the variable (e.g., x**2, not x²) + - Multiplication must be explicitly indicated with * (e.g., 2*x, not 2x) + - Powers are represented with ** (e.g., x**2, not x^2) + """ + try: + result = eval( + expression, + {"__builtins__": {}}, + ALLOWED_FUNCTIONS, + ) + return { + "status": "success", + "result": result + } + except Exception as e: + return { + "status": "error", + "error_message": str(e) + } From c4bf454b3b4e6ceb62507378a2f1789ff7c0c3d1 Mon Sep 17 00:00:00 2001 From: Ruthvik-1411 Date: Sat, 26 Jul 2025 22:55:42 +0530 Subject: [PATCH 2/7] feat: add remote a2a agent and task manager --- adk_a2a_mcp_integration/.env.example | 5 ++ .../remote_agent/__init__.py | 1 + .../remote_agent/__main__.py | 50 ++++++++++++ adk_a2a_mcp_integration/remote_agent/agent.py | 81 +++++++++++++++++++ .../remote_agent/client.py | 57 +++++++++++++ .../remote_agent/task_manager.py | 43 ++++++++++ adk_a2a_mcp_integration/requirements.txt | 9 ++- 7 files changed, 245 insertions(+), 1 deletion(-) create mode 100644 adk_a2a_mcp_integration/.env.example create mode 100644 adk_a2a_mcp_integration/remote_agent/__init__.py create mode 100644 adk_a2a_mcp_integration/remote_agent/__main__.py create mode 100644 adk_a2a_mcp_integration/remote_agent/agent.py create mode 100644 adk_a2a_mcp_integration/remote_agent/client.py create mode 100644 adk_a2a_mcp_integration/remote_agent/task_manager.py diff --git a/adk_a2a_mcp_integration/.env.example b/adk_a2a_mcp_integration/.env.example new file mode 100644 index 0000000..57e40da --- /dev/null +++ b/adk_a2a_mcp_integration/.env.example @@ -0,0 +1,5 @@ +# To avoid using vertex ai +GOOGLE_GENAI_USE_VERTEXAI=False + +# Google api key, get this from ai studio +GOOGLE_API_KEY= \ No newline at end of file diff --git a/adk_a2a_mcp_integration/remote_agent/__init__.py b/adk_a2a_mcp_integration/remote_agent/__init__.py new file mode 100644 index 0000000..63bd45e --- /dev/null +++ b/adk_a2a_mcp_integration/remote_agent/__init__.py @@ -0,0 +1 @@ +from . import agent \ No newline at end of file diff --git a/adk_a2a_mcp_integration/remote_agent/__main__.py b/adk_a2a_mcp_integration/remote_agent/__main__.py new file mode 100644 index 0000000..1ad9fcf --- /dev/null +++ b/adk_a2a_mcp_integration/remote_agent/__main__.py @@ -0,0 +1,50 @@ +import uvicorn +from a2a.server.apps import A2AStarletteApplication +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import InMemoryTaskStore +from a2a.types import AgentCapabilities, AgentCard, AgentSkill +from remote_agent.task_manager import BasicSearchAgentExecutor + +agent_url = "http://localhost:8090/" + +def start_remote_agent(): + + agent_skill = AgentSkill( + id="search_agent", + name="Search Agent", + description="Agent that can get the latest search results from internet using google search", + input_modes=["text"], + output_modes=["text"], + tags=["search agent", "browser", "multi query"], + examples=[ + "What are the latest news in AI?", + "Explain the key difference between Langchain and Langgraph.", + "Who won the last IPL match?"] + ) + + public_agent_card = AgentCard( + name="Search agent", + description="Agent that can search the internet to answer queries.", + url=agent_url, + version="0.0.1", + skills=[agent_skill], + defaultInputModes=['text'], + defaultOutputModes=['text'], + capabilities=AgentCapabilities(streaming=True), + supportsAuthenticatedExtendedCard=False, + ) + + request_handler = DefaultRequestHandler( + agent_executor=BasicSearchAgentExecutor(), + task_store=InMemoryTaskStore(), + ) + server = A2AStarletteApplication( + agent_card=public_agent_card, + http_handler=request_handler + ) + app = server.build() + print("Uvicorn server starting...") + uvicorn.run(app, host="127.0.0.1", port=8090) + +if __name__ == "__main__": + start_remote_agent() diff --git a/adk_a2a_mcp_integration/remote_agent/agent.py b/adk_a2a_mcp_integration/remote_agent/agent.py new file mode 100644 index 0000000..9c3218e --- /dev/null +++ b/adk_a2a_mcp_integration/remote_agent/agent.py @@ -0,0 +1,81 @@ +import os + +from google.adk import Runner +from google.adk.agents import Agent +from google.adk.artifacts import InMemoryArtifactService +from google.adk.memory.in_memory_memory_service import InMemoryMemoryService +from google.adk.sessions import InMemorySessionService +from google.adk.tools import google_search +from google.genai import types + +from dotenv import load_dotenv +# To load the google api keys +load_dotenv() + +root_agent = Agent( + name="search_agent", + model="gemini-2.0-flash", + description="Agent capable of searching internet to find relevant answers to user questions.", + instruction="""You are an friendly and supportive agent. Your job is to try to answer the user question using the search tool. + Always provide accurate and relevant information.""", + tools=[google_search], +) + +class BasicSearchAgent: + + def __init__(self): + self.agent = root_agent + self.runner = Runner( + app_name=self.agent.name, + agent=self.agent, + artifact_service=InMemoryArtifactService(), + memory_service=InMemoryMemoryService(), + session_service=InMemorySessionService(), + ) + + async def invoke(self, session_id: str, query: str, user_id: str = None) -> dict: + """Invoke the agent""" + try: + if not user_id: + user_id = "Default User" + session_instance = await self.runner.session_service.get_session( + session_id=session_id, + user_id=user_id, + app_name=self.agent.name + ) + + if not session_instance: + print(f"Creating new session with id: {session_id}") + session_instance = await self.runner.session_service.create_session( + session_id=session_id, + user_id=user_id, + app_name=self.agent.name + ) + + user_content = types.Content( + role="user", parts=[types.Part.from_text(text=query)] + ) + + final_response_text = "" + async for event in self.runner.run_async( + user_id=user_id, + session_id=session_instance.id, + new_message=user_content + ): + # We can break when there's final response, + # but for telemetry usage, the loop must complete + # print(f"Event: {event}") + if event.is_final_response(): + if event.content and event.content.parts and event.content.parts[-1].text: + final_response_text = event.content.parts[-1].text + print("Loop finished, yielding final response.") + yield { + "status": "success", + "result": final_response_text + } + except Exception as e: + print(f"Error: {e}") + yield { + "status": "error", + "error_message": str(e) + } diff --git a/adk_a2a_mcp_integration/remote_agent/client.py b/adk_a2a_mcp_integration/remote_agent/client.py new file mode 100644 index 0000000..72d6feb --- /dev/null +++ b/adk_a2a_mcp_integration/remote_agent/client.py @@ -0,0 +1,57 @@ +import httpx +import asyncio +from typing import Any +from uuid import uuid4 +from a2a.client import A2ACardResolver, A2AClient +from a2a.types import ( + AgentCard, + MessageSendParams, + SendMessageRequest, + SendStreamingMessageRequest, +) + +async def client(): + async with httpx.AsyncClient(timeout=120) as httpx_client: + resolver = A2ACardResolver( + httpx_client=httpx_client, + base_url="http://localhost:8090/", + ) + print("Attempting to fetch agent card...") + agent_card = await resolver.get_agent_card() + print('Agent card fetched. Agent card:') + print(agent_card.model_dump_json(indent=2, exclude_none=True)) + + print("Initializing A2A Client") + client = A2AClient( + httpx_client=httpx_client, agent_card=agent_card + ) + print('A2A Client initialized.') + + send_message_payload: dict[str, Any] = { + 'message': { + 'role': 'user', + 'parts': [ + {'kind': 'text', 'text': 'What is model context protocol? Give a brief description.'} + ], + 'messageId': uuid4().hex, + }, + } + print("Sending test message") + request = SendMessageRequest( + id=str(uuid4()), params=MessageSendParams(**send_message_payload) + ) + + response = await client.send_message(request) + print(response.model_dump(mode='json', exclude_none=True)) + response_dict = response.model_dump(mode='json', exclude_none=True) + agent_response_text = "No text content found in response or an error occurred." + try: + agent_response_text = response_dict['result']['parts'][0]['text'] + except (KeyError, IndexError) as e: + print(f"Error parsing agent response structure: {e}") + + print("\n--- Agent's Final Response ---") + print(agent_response_text) + print("----------------------------") + +asyncio.run(client()) diff --git a/adk_a2a_mcp_integration/remote_agent/task_manager.py b/adk_a2a_mcp_integration/remote_agent/task_manager.py new file mode 100644 index 0000000..d84ff29 --- /dev/null +++ b/adk_a2a_mcp_integration/remote_agent/task_manager.py @@ -0,0 +1,43 @@ +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.events import EventQueue +from a2a.utils import new_agent_text_message, new_task + +from remote_agent.agent import BasicSearchAgent + +class BasicSearchAgentExecutor(AgentExecutor): + + def __init__(self): + super().__init__() + self._agent = BasicSearchAgent() + + async def execute(self, request_context: RequestContext, event_queue: EventQueue) -> None: + + query = request_context.get_user_input() + task = request_context.current_task + if not task: + task = new_task(request_context.message) + await event_queue.enqueue_event(task) + print(f"Creating new task with id: {task.id}.") + + session_id = task.contextId + print(f"Using session id: {session_id}.") + async for event in self._agent.invoke( + session_id=session_id, + query=query + # user_id=request_context.user_id, + ): + if event.get("status") == "success": + await event_queue.enqueue_event( + new_agent_text_message(text=event.get("result")) + ) + elif event.get("status") == "error": + await event_queue.enqueue_event( + new_agent_text_message(text=f"Error: {event.get('error_message')}") + ) + else: + await event_queue.enqueue_event( + new_agent_text_message(text=event.get("result")) + ) + + async def cancel(self, request_context: RequestContext, event_queue: EventQueue) -> None: + raise Exception('cancel not supported at this moment!') diff --git a/adk_a2a_mcp_integration/requirements.txt b/adk_a2a_mcp_integration/requirements.txt index 496c90f..897386f 100644 --- a/adk_a2a_mcp_integration/requirements.txt +++ b/adk_a2a_mcp_integration/requirements.txt @@ -7,4 +7,11 @@ pymupdf4llm==0.0.27 a2a-sdk==0.2.16 # adk requirements -google-adk==1.8.0 \ No newline at end of file +google-adk==1.8.0 +google-genai==1.27.0 +fastapi==0.116.1 + +python-dotenv==1.1.1 +requests==2.32.4 + +uvicorn==0.35.0 \ No newline at end of file From a508306eeda23549ca05a149828e2d4e325b0227 Mon Sep 17 00:00:00 2001 From: Ruthvik-1411 Date: Sun, 27 Jul 2025 21:47:10 +0530 Subject: [PATCH 3/7] feat: chgs to use remote agent + response manager --- adk_a2a_mcp_integration/.env.example | 2 +- .../root_agent/.env.example | 5 ++ adk_a2a_mcp_integration/root_agent/agent.py | 21 +++-- .../root_agent/remote_agent_helpers.py | 90 +++++++++++++++++++ .../root_agent/response_manager.py | 86 ++++++++++++++++++ 5 files changed, 196 insertions(+), 8 deletions(-) create mode 100644 adk_a2a_mcp_integration/root_agent/.env.example create mode 100644 adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py create mode 100644 adk_a2a_mcp_integration/root_agent/response_manager.py diff --git a/adk_a2a_mcp_integration/.env.example b/adk_a2a_mcp_integration/.env.example index 57e40da..0ef764a 100644 --- a/adk_a2a_mcp_integration/.env.example +++ b/adk_a2a_mcp_integration/.env.example @@ -1,4 +1,4 @@ -# To avoid using vertex ai +# To use api key GOOGLE_GENAI_USE_VERTEXAI=False # Google api key, get this from ai studio diff --git a/adk_a2a_mcp_integration/root_agent/.env.example b/adk_a2a_mcp_integration/root_agent/.env.example new file mode 100644 index 0000000..0ef764a --- /dev/null +++ b/adk_a2a_mcp_integration/root_agent/.env.example @@ -0,0 +1,5 @@ +# To use api key +GOOGLE_GENAI_USE_VERTEXAI=False + +# Google api key, get this from ai studio +GOOGLE_API_KEY= \ No newline at end of file diff --git a/adk_a2a_mcp_integration/root_agent/agent.py b/adk_a2a_mcp_integration/root_agent/agent.py index af43edd..5fed8eb 100644 --- a/adk_a2a_mcp_integration/root_agent/agent.py +++ b/adk_a2a_mcp_integration/root_agent/agent.py @@ -2,6 +2,11 @@ from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StreamableHTTPConnectionParams from root_agent.tools import get_current_time, calculate_expression +from root_agent.remote_agent_helpers import list_remote_agents, call_remote_agent + +from dotenv import load_dotenv +# To load the google api keys +load_dotenv() simple_mcp_tool = MCPToolset( connection_params=StreamableHTTPConnectionParams( @@ -16,11 +21,13 @@ root_agent = Agent( name="orchestrator_agent", model="gemini-2.0-flash", - description=( - "Agent to answer questions using tools provided." - ), - instruction=( - "You are a helpful agent who can answer user questions about current time and can do calculations and explore arxiv collection." - ), - tools=[get_current_time, calculate_expression, simple_mcp_tool], + description="Agent to answer questions using tools provided.", + instruction="""You are a helpful agent who can answer user questions about current time and can do calculations. + For any queries that require latest/external information, identify if any remote agents can help with that. + Once you found the relevant agents, use the appropriate tools to get the answer the user query.""", + tools=[get_current_time, + calculate_expression, + simple_mcp_tool, + list_remote_agents, + call_remote_agent], ) diff --git a/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py b/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py new file mode 100644 index 0000000..c26d353 --- /dev/null +++ b/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py @@ -0,0 +1,90 @@ +import httpx +import asyncio +from typing import Any +from uuid import uuid4 +from a2a.client import A2ACardResolver, A2AClient +from a2a.types import AgentCard, MessageSendParams, SendMessageRequest + +remote_agent_cards_cache = [] + +remote_agent_url = "http://localhost:8090/" + +async def list_remote_agents() -> list[dict]: + """Fetches the capabilities of all available remote agents""" + if remote_agent_cards_cache: + print("Remote agents card cache exists, using cache") + return remote_agent_cards_cache + print("Agent card cache empty, fetching...") + try: + async with httpx.AsyncClient(timeout=120) as httpx_client: + # get only one agent url for now + resolver = A2ACardResolver( + httpx_client=httpx_client, + base_url=remote_agent_url, + ) + print("Attempting to fetch agent card...") + agent_card = await resolver.get_agent_card() + print('Agent card fetched. Agent card:') + print(agent_card.model_dump_json(indent=2, exclude_none=True)) + + remote_agent_cards_cache.append({ + "agent_name": agent_card.name, + "agent_card": agent_card + }) + print("Adding data to cache...") + return remote_agent_cards_cache + except Exception as e: + print("Failed to fetch agent card.") + raise RuntimeError("Failed to fetch the agent card. Unable to proceed") from e + +async def call_remote_agent(query: str, agent_name: str) -> str: + """Call the remote agent with appropriate query""" + + agent_cards = await list_remote_agents() + + agent_card_to_use = None + + for card in agent_cards: + if card.get("agent_name") == agent_name: + agent_card_to_use = card.get("agent_card") + break + if agent_card_to_use is None: + raise ValueError(f"Agent with name '{agent_name}' not found in available agents.") + print("Initializing A2A Client...") + async with httpx.AsyncClient(timeout=120) as httpx_client: + + client = A2AClient( + httpx_client=httpx_client, + agent_card=agent_card_to_use + ) + print("A2A Client Initialized.") + + send_message_payload: dict[str, Any] = { + 'message': { + 'role': 'user', + 'parts': [ + {'kind': 'text', 'text': query} + ], + 'messageId': uuid4().hex, + }, + } + print("Sending query to remote agent...") + request = SendMessageRequest( + id=str(uuid4()), params=MessageSendParams(**send_message_payload) + ) + try: + response = await client.send_message(request) + response_dict = response.model_dump(mode='json', exclude_none=True) + print(f"Response received from remote agent: {response_dict}") + except Exception as e: + print("Failed to send message to remote agent.") + raise RuntimeError("Remote agent call failed.") from e + + agent_response_text = "No text content found in response or an error occurred." + try: + agent_response_text = response_dict['result']['parts'][0]['text'] + except (KeyError, IndexError) as e: + print(f"Error parsing agent response structure: {e}") + + return agent_response_text + diff --git a/adk_a2a_mcp_integration/root_agent/response_manager.py b/adk_a2a_mcp_integration/root_agent/response_manager.py new file mode 100644 index 0000000..e2841da --- /dev/null +++ b/adk_a2a_mcp_integration/root_agent/response_manager.py @@ -0,0 +1,86 @@ +import asyncio +import uuid +import logging + +from google.genai import types +from google.adk.runners import Runner +from google.adk.artifacts import InMemoryArtifactService +from google.adk.memory.in_memory_memory_service import InMemoryMemoryService +from google.adk.sessions import InMemorySessionService + +from root_agent.agent import root_agent + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) + +class ResponseManager: + + def __init__(self): + self.agent = root_agent + self.user_id = "u_123" + self.runner = Runner( + app_name=self.agent.name, + agent=self.agent, + artifact_service=InMemoryArtifactService(), + session_service=InMemorySessionService(), + memory_service=InMemoryMemoryService(), + ) + + async def invoke_agent(self, session_id: str, query: str) -> str: + + logger.info(f"Fetching session data with id: {session_id}") + session = await self.runner.session_service.get_session( + app_name=self.agent.name, + user_id=self.user_id, + session_id=session_id + ) + + if session is None: + logger.info(f"Session doesn't exist, creating new session with id: {session_id}") + session = await self.runner.session_service.create_session( + app_name=self.agent.name, + user_id=self.user_id, + session_id=session_id, + state={} + ) + + logger.info(f"{session.id} Running query: {query}") + + content = types.Content(role="user",parts=[types.Part.from_text(text=query)]) + + final_response_text = "" + async for event in self.runner.run_async( + user_id=self.user_id, + session_id=session.id, + new_message=content + ): + if event.is_final_response(): + if event.content and event.content.parts and event.content.parts[-1].text: + final_response_text = event.content.parts[-1].text + + logger.info(f"[{session_id}] Final text: {final_response_text}]") + if final_response_text: + return final_response_text + else: + logger.info(f"No final response received.") + return None + +async def test_agent(): + + response_manager = ResponseManager() + + session_id = str(uuid.uuid4()) + + first_query = "What are some trending topics in AI?" + first_response = await response_manager.invoke_agent(session_id=session_id, query=first_query) + logger.info(f"First response: {first_response}") + + second_query = "What question did I ask you?" + second_response = await response_manager.invoke_agent(session_id=session_id, query=second_query) + logger.info(f"Second response: {second_response}") + +asyncio.run(test_agent()) \ No newline at end of file From eed63cbb3c44ad7758d4254e79ff7cf967c1ba6a Mon Sep 17 00:00:00 2001 From: Ruthvik-1411 Date: Sun, 27 Jul 2025 23:07:53 +0530 Subject: [PATCH 4/7] feat: add custom ui for root agent --- adk_a2a_mcp_integration/app.py | 95 +++++++++++++++++++ adk_a2a_mcp_integration/requirements.txt | 4 +- .../root_agent/response_manager.py | 91 +++++++++++------- 3 files changed, 156 insertions(+), 34 deletions(-) create mode 100644 adk_a2a_mcp_integration/app.py diff --git a/adk_a2a_mcp_integration/app.py b/adk_a2a_mcp_integration/app.py new file mode 100644 index 0000000..fa2596a --- /dev/null +++ b/adk_a2a_mcp_integration/app.py @@ -0,0 +1,95 @@ +# app.py + +import streamlit as st +import uuid +import asyncio +from root_agent.response_manager import ResponseManager + +# --- Page Configuration --- +st.set_page_config(page_title="Agent Response Manager", layout="wide") + +# --- Session State Initialization --- +if "response_manager" not in st.session_state: + st.session_state.response_manager = ResponseManager() +if "session_id" not in st.session_state: + st.session_state.session_id = str(uuid.uuid4()) + st.toast(f"New session created: {st.session_state.session_id}") +if "messages" not in st.session_state: + st.session_state.messages = [] + +# --- Sidebar --- +with st.sidebar: + st.title("Settings") + st.markdown(f"**Session ID:**\n`{st.session_state.session_id}`") + + if st.button("New Session"): + st.session_state.session_id = str(uuid.uuid4()) + st.session_state.messages = [] + st.toast(f"New session started: {st.session_state.session_id}") + st.rerun() + + st.divider() + diagnostics_enabled = st.checkbox("Enable Diagnostics", value=True) + +# --- Main Chat Interface --- +st.title("Agent Chat UI") + +# Display chat history from session state +for message in st.session_state.messages: + with st.chat_message(message["role"]): + st.markdown(message["content"]) + if diagnostics_enabled and "diagnostics" in message and message["diagnostics"]: + with st.expander("View Diagnostics for this response."): + st.json(message["diagnostics"]) + +# --- Coroutine to handle agent invocation and UI updates --- +async def run_agent_and_display(prompt: str): + # Add user message to session state and display it + st.session_state.messages.append({"role": "user", "content": prompt}) + with st.chat_message("user"): + st.markdown(prompt) + + # --- Assistant's turn --- + with st.chat_message("assistant"): + final_response_text = "" + diagnostic_events = [] + status_messages = { + "tool_call": "Calling tool... 🛠️", + "tool_response": "Processing tool result... ⚙️", + "finished": "Response generation complete. ✅" + } + + status_placeholder = st.empty() + try: + # Collect all events and the final response from the agent + status_placeholder.text("Thinking... 🤔") + async for event in st.session_state.response_manager.invoke_agent( + session_id=st.session_state.session_id, query=prompt + ): + status_text = status_messages.get(event.get("status"), f"Processing: {event.get('status')}") + status_placeholder.text(status_text) + + diagnostic_events.append(event.get("event")) + if event.get("is_final_response"): + final_response_text = event.get("result", "Sorry, I couldn't generate a response.") + + except Exception as e: + st.error(f"An error occurred: {e}") + final_response_text = "Sorry, I ran into an error." + + status_placeholder.empty() + st.markdown(final_response_text) + + if diagnostics_enabled and diagnostic_events: + with st.expander("View Diagnostics for this response."): + st.json(diagnostic_events) + + st.session_state.messages.append({ + "role": "assistant", + "content": final_response_text, + "diagnostics": diagnostic_events + }) + +# --- Handle User Input --- +if prompt := st.chat_input("What are some trending topics in AI?"): + asyncio.run(run_agent_and_display(prompt)) \ No newline at end of file diff --git a/adk_a2a_mcp_integration/requirements.txt b/adk_a2a_mcp_integration/requirements.txt index 897386f..b411c0e 100644 --- a/adk_a2a_mcp_integration/requirements.txt +++ b/adk_a2a_mcp_integration/requirements.txt @@ -14,4 +14,6 @@ fastapi==0.116.1 python-dotenv==1.1.1 requests==2.32.4 -uvicorn==0.35.0 \ No newline at end of file +uvicorn==0.35.0 + +streamlit==1.47.1 \ No newline at end of file diff --git a/adk_a2a_mcp_integration/root_agent/response_manager.py b/adk_a2a_mcp_integration/root_agent/response_manager.py index e2841da..cddb09e 100644 --- a/adk_a2a_mcp_integration/root_agent/response_manager.py +++ b/adk_a2a_mcp_integration/root_agent/response_manager.py @@ -32,42 +32,64 @@ def __init__(self): async def invoke_agent(self, session_id: str, query: str) -> str: - logger.info(f"Fetching session data with id: {session_id}") - session = await self.runner.session_service.get_session( - app_name=self.agent.name, - user_id=self.user_id, - session_id=session_id - ) - - if session is None: - logger.info(f"Session doesn't exist, creating new session with id: {session_id}") - session = await self.runner.session_service.create_session( + try: + logger.info(f"Fetching session data with id: {session_id}") + session = await self.runner.session_service.get_session( app_name=self.agent.name, user_id=self.user_id, - session_id=session_id, - state={} + session_id=session_id ) - logger.info(f"{session.id} Running query: {query}") + if session is None: + logger.info(f"Session doesn't exist, creating new session with id: {session_id}") + session = await self.runner.session_service.create_session( + app_name=self.agent.name, + user_id=self.user_id, + session_id=session_id, + state={} + ) - content = types.Content(role="user",parts=[types.Part.from_text(text=query)]) + logger.info(f"{session.id} Running query: {query}") - final_response_text = "" - async for event in self.runner.run_async( - user_id=self.user_id, - session_id=session.id, - new_message=content - ): - if event.is_final_response(): + content = types.Content(role="user",parts=[types.Part.from_text(text=query)]) + + final_response_text = "" + async for event in self.runner.run_async( + user_id=self.user_id, + session_id=session.id, + new_message=content + ): + if event.get_function_calls(): + yield { + "is_final_response": False, + "status": "tool_call", + "event": event.model_dump(mode='json', exclude_none=True) + } + if event.get_function_responses(): + yield { + "is_final_response": False, + "status": "tool_response", + "event": event.model_dump(mode='json', exclude_none=True) + } + if event.is_final_response(): if event.content and event.content.parts and event.content.parts[-1].text: final_response_text = event.content.parts[-1].text - logger.info(f"[{session_id}] Final text: {final_response_text}]") - if final_response_text: - return final_response_text - else: - logger.info(f"No final response received.") - return None + logger.info(f"[{session_id}] Final text: {final_response_text}]") + yield { + "is_final_response": True, + "status": "finished", + "result": final_response_text, + "event": event.model_dump(mode='json', exclude_none=True) + } + except Exception as e: + logger.info(f"Error generating response.") + yield { + "is_final_response": True, + "status": "fail", + "result": "No final response received", + "error_message": str(e) + } async def test_agent(): @@ -76,11 +98,14 @@ async def test_agent(): session_id = str(uuid.uuid4()) first_query = "What are some trending topics in AI?" - first_response = await response_manager.invoke_agent(session_id=session_id, query=first_query) - logger.info(f"First response: {first_response}") + # first_response = await response_manager.invoke_agent(session_id=session_id, query=first_query) + first_response = response_manager.invoke_agent(session_id=session_id, query=first_query) + async for response in first_response: + logger.info(f"Events: {response}") + # logger.info(f"First response: {first_response}") - second_query = "What question did I ask you?" - second_response = await response_manager.invoke_agent(session_id=session_id, query=second_query) - logger.info(f"Second response: {second_response}") + # second_query = "What question did I ask you?" + # second_response = await response_manager.invoke_agent(session_id=session_id, query=second_query) + # logger.info(f"Second response: {second_response}") -asyncio.run(test_agent()) \ No newline at end of file +# asyncio.run(test_agent()) \ No newline at end of file From a693eddf62d51a847c863b772b808a2ff2eb08a8 Mon Sep 17 00:00:00 2001 From: Ruthvik-1411 Date: Sat, 2 Aug 2025 23:06:57 +0530 Subject: [PATCH 5/7] chore: add logging + basic readme --- adk_a2a_mcp_integration/Readme.md | 18 +++++ adk_a2a_mcp_integration/app.py | 9 +-- adk_a2a_mcp_integration/mcp_server/server.py | 25 +++--- .../remote_agent/__main__.py | 78 ++++++++++--------- adk_a2a_mcp_integration/remote_agent/agent.py | 17 ++-- .../remote_agent/client.py | 34 ++++---- .../remote_agent/task_manager.py | 72 +++++++++-------- adk_a2a_mcp_integration/root_agent/agent.py | 3 +- .../root_agent/remote_agent_helpers.py | 35 +++++---- .../root_agent/response_manager.py | 12 +-- adk_a2a_mcp_integration/root_agent/tools.py | 1 + 11 files changed, 173 insertions(+), 131 deletions(-) diff --git a/adk_a2a_mcp_integration/Readme.md b/adk_a2a_mcp_integration/Readme.md index e69de29..e02778e 100644 --- a/adk_a2a_mcp_integration/Readme.md +++ b/adk_a2a_mcp_integration/Readme.md @@ -0,0 +1,18 @@ +# Boilerplate for Agent interaction with ADK, A2A and MCP. + +[To UPDATE] + +```bash +# To start remote agent +python -m remote_agent +``` + +```bash +# To start MCP server +python mcp_server/server.py +``` + +```bash +# To interact with root agent in UI +streamlit run app.py +``` \ No newline at end of file diff --git a/adk_a2a_mcp_integration/app.py b/adk_a2a_mcp_integration/app.py index fa2596a..60b070b 100644 --- a/adk_a2a_mcp_integration/app.py +++ b/adk_a2a_mcp_integration/app.py @@ -1,5 +1,4 @@ -# app.py - +"""Module that interacts with backend agent using streamlit""" import streamlit as st import uuid import asyncio @@ -80,9 +79,9 @@ async def run_agent_and_display(prompt: str): status_placeholder.empty() st.markdown(final_response_text) - if diagnostics_enabled and diagnostic_events: - with st.expander("View Diagnostics for this response."): - st.json(diagnostic_events) + if diagnostics_enabled and diagnostic_events: + with st.expander("View Diagnostics for this response."): + st.json(diagnostic_events) st.session_state.messages.append({ "role": "assistant", diff --git a/adk_a2a_mcp_integration/mcp_server/server.py b/adk_a2a_mcp_integration/mcp_server/server.py index e680a59..2ad877a 100644 --- a/adk_a2a_mcp_integration/mcp_server/server.py +++ b/adk_a2a_mcp_integration/mcp_server/server.py @@ -1,10 +1,13 @@ import os +import logging import tempfile import requests import arxiv import pymupdf4llm from fastmcp import FastMCP +logger = logging.getLogger(__name__) + mcp = FastMCP("ArxivExplorer") @mcp.tool @@ -27,7 +30,7 @@ def search_arxiv(query: str, max_results: int = 5) -> dict: papers = [] for result in search.results(): - print(f"{result.title}") + logger.info(f"{result.title}") paper_info = { 'id': result.get_short_id(), 'title': result.title, @@ -61,8 +64,8 @@ def get_paper_md(paper_id: str) -> dict: search = arxiv.Search(id_list=[paper_id]) paper = next(search.results()) pdf_url = paper.pdf_url - print(f"Found paper: '{paper.title}'") - print(f"Downloading from: {pdf_url}") + logger.info(f"Found paper: '{paper.title}'") + logger.info(f"Downloading from: {pdf_url}") except StopIteration: return {"status": "error", "error_message": f"Paper with ID '{paper_id}' not found on arXiv."} @@ -74,7 +77,7 @@ def get_paper_md(paper_id: str) -> dict: response = requests.get(pdf_url) response.raise_for_status() pdf_bytes = response.content - print("PDF downloaded successfully.") + logger.info("PDF downloaded successfully.") except requests.exceptions.RequestException as e: return {"status": "error", "error_message": f"Error downloading the PDF file, request failure: {e}"} @@ -87,21 +90,21 @@ def get_paper_md(paper_id: str) -> dict: temp_file.write(pdf_bytes) temp_pdf_path = temp_file.name # Get the path of the temporary file - print(f"PDF content written to temporary file: {temp_pdf_path}") - print("Converting PDF to Markdown...") + logger.info(f"PDF content written to temporary file: {temp_pdf_path}") + logger.info("Converting PDF to Markdown...") # Pass the file path to the conversion function md_text = pymupdf4llm.to_markdown(temp_pdf_path) - print("Conversion complete.") + logger.info("Conversion complete.") if temp_pdf_path and os.path.exists(temp_pdf_path): os.remove(temp_pdf_path) - print(f"Temporary file {temp_pdf_path} deleted.") + logger.info(f"Temporary file {temp_pdf_path} deleted.") return {"status": "success", "result": md_text} except Exception as e: return {"status": "error", "error_message": f"Error converting PDF to Markdown: {e}"} -# To pass the paper directly as media to llm +# To pass the paper directly as media to llm, to be used later # @mcp.tool() # def get_paper_raw(paper_id: str) -> dict: # """ @@ -126,10 +129,10 @@ def get_paper_md(paper_id: str) -> dict: # except StopIteration: # return {"status": "error", "error_message": f"Paper with ID {paper_id} not found on arXiv."} # except requests.exceptions.RequestException as e: -# print(f"Error downloading PDF: {e}") +# logger.info(f"Error downloading PDF: {e}") # return {"status": "error", "error_message": f"Error downloading PDF: {e}"} # except Exception as e: -# print(f"Error: {e}") +# logger.info(f"Error: {e}") # return {"status": "error", "error_message": f"Error: {e}"} if __name__ == "__main__": diff --git a/adk_a2a_mcp_integration/remote_agent/__main__.py b/adk_a2a_mcp_integration/remote_agent/__main__.py index 1ad9fcf..0392e0e 100644 --- a/adk_a2a_mcp_integration/remote_agent/__main__.py +++ b/adk_a2a_mcp_integration/remote_agent/__main__.py @@ -1,50 +1,56 @@ +"""Module that exposes the remote agent as a server, sharing it's capabilities +and methods to invoke it""" import uvicorn +import logging from a2a.server.apps import A2AStarletteApplication from a2a.server.request_handlers import DefaultRequestHandler from a2a.server.tasks import InMemoryTaskStore from a2a.types import AgentCapabilities, AgentCard, AgentSkill from remote_agent.task_manager import BasicSearchAgentExecutor +logger = logging.getLogger(__name__) + agent_url = "http://localhost:8090/" def start_remote_agent(): - - agent_skill = AgentSkill( - id="search_agent", - name="Search Agent", - description="Agent that can get the latest search results from internet using google search", - input_modes=["text"], - output_modes=["text"], - tags=["search agent", "browser", "multi query"], - examples=[ - "What are the latest news in AI?", - "Explain the key difference between Langchain and Langgraph.", - "Who won the last IPL match?"] - ) + """Start the remote agent and expose it's capabilities""" + agent_skill = AgentSkill( + id="search_agent", + name="Search Agent", + description="""Agent that can get the latest search results from the + internet using google search and gives accurate results""", + input_modes=["text"], + output_modes=["text"], + tags=["search agent", "google search tool", "web search"], + examples=[ + "What are the latest news in AI?", + "Explain the key difference between Langchain and Langgraph.", + "Who won the last IPL match?"] + ) - public_agent_card = AgentCard( - name="Search agent", - description="Agent that can search the internet to answer queries.", - url=agent_url, - version="0.0.1", - skills=[agent_skill], - defaultInputModes=['text'], - defaultOutputModes=['text'], - capabilities=AgentCapabilities(streaming=True), - supportsAuthenticatedExtendedCard=False, - ) + public_agent_card = AgentCard( + name="Search agent", + description="Agent that can search the internet to answer queries.", + url=agent_url, + version="0.0.1", + skills=[agent_skill], + defaultInputModes=['text'], + defaultOutputModes=['text'], + capabilities=AgentCapabilities(streaming=True), + supportsAuthenticatedExtendedCard=False, + ) - request_handler = DefaultRequestHandler( - agent_executor=BasicSearchAgentExecutor(), - task_store=InMemoryTaskStore(), - ) - server = A2AStarletteApplication( - agent_card=public_agent_card, - http_handler=request_handler - ) - app = server.build() - print("Uvicorn server starting...") - uvicorn.run(app, host="127.0.0.1", port=8090) + request_handler = DefaultRequestHandler( + agent_executor=BasicSearchAgentExecutor(), + task_store=InMemoryTaskStore(), + ) + server = A2AStarletteApplication( + agent_card=public_agent_card, + http_handler=request_handler + ) + app = server.build() + logger.info("Uvicorn server starting...") + uvicorn.run(app, host="127.0.0.1", port=8090) if __name__ == "__main__": - start_remote_agent() + start_remote_agent() diff --git a/adk_a2a_mcp_integration/remote_agent/agent.py b/adk_a2a_mcp_integration/remote_agent/agent.py index 9c3218e..32c09d2 100644 --- a/adk_a2a_mcp_integration/remote_agent/agent.py +++ b/adk_a2a_mcp_integration/remote_agent/agent.py @@ -1,4 +1,5 @@ -import os +"""Module that implements the core logic for the search agent""" +import logging from google.adk import Runner from google.adk.agents import Agent @@ -12,6 +13,8 @@ # To load the google api keys load_dotenv() +logger = logging.getLogger(__name__) + root_agent = Agent( name="search_agent", model="gemini-2.0-flash", @@ -22,7 +25,7 @@ ) class BasicSearchAgent: - + """Class that exposes the basic search agent""" def __init__(self): self.agent = root_agent self.runner = Runner( @@ -33,7 +36,7 @@ def __init__(self): session_service=InMemorySessionService(), ) - async def invoke(self, session_id: str, query: str, user_id: str = None) -> dict: + async def invoke(self, session_id: str, query: str, user_id: str = None): """Invoke the agent""" try: if not user_id: @@ -45,7 +48,7 @@ async def invoke(self, session_id: str, query: str, user_id: str = None) -> dict ) if not session_instance: - print(f"Creating new session with id: {session_id}") + logger.info(f"Creating new session with id: {session_id}") session_instance = await self.runner.session_service.create_session( session_id=session_id, user_id=user_id, @@ -64,17 +67,17 @@ async def invoke(self, session_id: str, query: str, user_id: str = None) -> dict ): # We can break when there's final response, # but for telemetry usage, the loop must complete - # print(f"Event: {event}") + # logger.debug(f"Event: {event}") if event.is_final_response(): if event.content and event.content.parts and event.content.parts[-1].text: final_response_text = event.content.parts[-1].text - print("Loop finished, yielding final response.") + logger.info("Loop finished, yielding final response.") yield { "status": "success", "result": final_response_text } except Exception as e: - print(f"Error: {e}") + logger.info(f"Error: {e}") yield { "status": "error", "error_message": str(e) diff --git a/adk_a2a_mcp_integration/remote_agent/client.py b/adk_a2a_mcp_integration/remote_agent/client.py index 72d6feb..2645774 100644 --- a/adk_a2a_mcp_integration/remote_agent/client.py +++ b/adk_a2a_mcp_integration/remote_agent/client.py @@ -1,31 +1,31 @@ +"""Module to test the remote agent exposed via A2A. Mimic's client side implementation""" import httpx import asyncio +import logging from typing import Any from uuid import uuid4 from a2a.client import A2ACardResolver, A2AClient -from a2a.types import ( - AgentCard, - MessageSendParams, - SendMessageRequest, - SendStreamingMessageRequest, -) +from a2a.types import MessageSendParams, SendMessageRequest + +logger = logging.getLogger(__name__) async def client(): + """Test the agent with a simple client""" async with httpx.AsyncClient(timeout=120) as httpx_client: resolver = A2ACardResolver( httpx_client=httpx_client, base_url="http://localhost:8090/", ) - print("Attempting to fetch agent card...") + logger.info("Attempting to fetch agent card...") agent_card = await resolver.get_agent_card() - print('Agent card fetched. Agent card:') - print(agent_card.model_dump_json(indent=2, exclude_none=True)) + logger.info('Agent card fetched. Agent card:') + logger.info(agent_card.model_dump_json(indent=2, exclude_none=True)) - print("Initializing A2A Client") + logger.info("Initializing A2A Client") client = A2AClient( httpx_client=httpx_client, agent_card=agent_card ) - print('A2A Client initialized.') + logger.info('A2A Client initialized.') send_message_payload: dict[str, Any] = { 'message': { @@ -36,22 +36,22 @@ async def client(): 'messageId': uuid4().hex, }, } - print("Sending test message") + logger.info("Sending test message") request = SendMessageRequest( id=str(uuid4()), params=MessageSendParams(**send_message_payload) ) response = await client.send_message(request) - print(response.model_dump(mode='json', exclude_none=True)) + logger.info(response.model_dump(mode='json', exclude_none=True)) response_dict = response.model_dump(mode='json', exclude_none=True) agent_response_text = "No text content found in response or an error occurred." try: agent_response_text = response_dict['result']['parts'][0]['text'] except (KeyError, IndexError) as e: - print(f"Error parsing agent response structure: {e}") + logger.info(f"Error parsing agent response structure: {e}") - print("\n--- Agent's Final Response ---") - print(agent_response_text) - print("----------------------------") + logger.info("\n--- Agent's Final Response ---") + logger.info(agent_response_text) + logger.info("----------------------------") asyncio.run(client()) diff --git a/adk_a2a_mcp_integration/remote_agent/task_manager.py b/adk_a2a_mcp_integration/remote_agent/task_manager.py index d84ff29..dd8280a 100644 --- a/adk_a2a_mcp_integration/remote_agent/task_manager.py +++ b/adk_a2a_mcp_integration/remote_agent/task_manager.py @@ -1,43 +1,49 @@ +"""Module that handles the invocation of agent""" +import logging from a2a.server.agent_execution import AgentExecutor, RequestContext from a2a.server.events import EventQueue from a2a.utils import new_agent_text_message, new_task from remote_agent.agent import BasicSearchAgent -class BasicSearchAgentExecutor(AgentExecutor): - - def __init__(self): - super().__init__() - self._agent = BasicSearchAgent() +logger = logging.getLogger(__name__) - async def execute(self, request_context: RequestContext, event_queue: EventQueue) -> None: +class BasicSearchAgentExecutor(AgentExecutor): + """Agent executor that invokes the agent""" + + def __init__(self): + super().__init__() + self._agent = BasicSearchAgent() - query = request_context.get_user_input() - task = request_context.current_task - if not task: - task = new_task(request_context.message) - await event_queue.enqueue_event(task) - print(f"Creating new task with id: {task.id}.") + async def execute(self, request_context: RequestContext, event_queue: EventQueue) -> None: + """Invokes the agent with the request context""" + query = request_context.get_user_input() + task = request_context.current_task + if not task: + task = new_task(request_context.message) + await event_queue.enqueue_event(task) + logger.info(f"Creating new task with id: {task.id}.") - session_id = task.contextId - print(f"Using session id: {session_id}.") - async for event in self._agent.invoke( - session_id=session_id, - query=query - # user_id=request_context.user_id, - ): - if event.get("status") == "success": - await event_queue.enqueue_event( - new_agent_text_message(text=event.get("result")) - ) - elif event.get("status") == "error": - await event_queue.enqueue_event( - new_agent_text_message(text=f"Error: {event.get('error_message')}") - ) - else: - await event_queue.enqueue_event( - new_agent_text_message(text=event.get("result")) - ) + session_id = task.contextId + logger.info(f"Using session id: {session_id}.") + async for event in self._agent.invoke( + session_id=session_id, + query=query + # user_id=request_context.user_id, + ): + if event.get("status") == "success": + await event_queue.enqueue_event( + new_agent_text_message(text=event.get("result")) + ) + elif event.get("status") == "error": + await event_queue.enqueue_event( + new_agent_text_message(text=f"Error: {event.get('error_message')}") + ) + else: + await event_queue.enqueue_event( + new_agent_text_message(text=event.get("result")) + ) - async def cancel(self, request_context: RequestContext, event_queue: EventQueue) -> None: - raise Exception('cancel not supported at this moment!') + async def cancel(self, request_context: RequestContext, event_queue: EventQueue) -> None: + """To cancel an ongoing agent execution""" + raise Exception('cancel not supported at this moment!') diff --git a/adk_a2a_mcp_integration/root_agent/agent.py b/adk_a2a_mcp_integration/root_agent/agent.py index 5fed8eb..16bad9b 100644 --- a/adk_a2a_mcp_integration/root_agent/agent.py +++ b/adk_a2a_mcp_integration/root_agent/agent.py @@ -1,3 +1,4 @@ +"""Core module for agent orchestration""" from google.adk.agents import Agent from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StreamableHTTPConnectionParams @@ -19,7 +20,7 @@ ) root_agent = Agent( - name="orchestrator_agent", + name="root_agent", model="gemini-2.0-flash", description="Agent to answer questions using tools provided.", instruction="""You are a helpful agent who can answer user questions about current time and can do calculations. diff --git a/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py b/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py index c26d353..23e097b 100644 --- a/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py +++ b/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py @@ -1,9 +1,12 @@ +"""Helper functions to interact with other agents exposed via A2A""" import httpx -import asyncio +import logging from typing import Any from uuid import uuid4 from a2a.client import A2ACardResolver, A2AClient -from a2a.types import AgentCard, MessageSendParams, SendMessageRequest +from a2a.types import MessageSendParams, SendMessageRequest + +logger = logging.getLogger(__name__) remote_agent_cards_cache = [] @@ -11,10 +14,11 @@ async def list_remote_agents() -> list[dict]: """Fetches the capabilities of all available remote agents""" + if remote_agent_cards_cache: - print("Remote agents card cache exists, using cache") + logger.info("Remote agents card cache exists, using cache") return remote_agent_cards_cache - print("Agent card cache empty, fetching...") + logger.info("Agent card cache empty, fetching...") try: async with httpx.AsyncClient(timeout=120) as httpx_client: # get only one agent url for now @@ -22,19 +26,19 @@ async def list_remote_agents() -> list[dict]: httpx_client=httpx_client, base_url=remote_agent_url, ) - print("Attempting to fetch agent card...") + logger.info("Attempting to fetch agent card...") agent_card = await resolver.get_agent_card() - print('Agent card fetched. Agent card:') - print(agent_card.model_dump_json(indent=2, exclude_none=True)) + logger.info('Agent card fetched. Agent card:') + logger.info(agent_card.model_dump_json(indent=2, exclude_none=True)) remote_agent_cards_cache.append({ "agent_name": agent_card.name, "agent_card": agent_card }) - print("Adding data to cache...") + logger.info("Adding data to cache...") return remote_agent_cards_cache except Exception as e: - print("Failed to fetch agent card.") + logger.error("Failed to fetch agent card.") raise RuntimeError("Failed to fetch the agent card. Unable to proceed") from e async def call_remote_agent(query: str, agent_name: str) -> str: @@ -50,14 +54,14 @@ async def call_remote_agent(query: str, agent_name: str) -> str: break if agent_card_to_use is None: raise ValueError(f"Agent with name '{agent_name}' not found in available agents.") - print("Initializing A2A Client...") + logger.info("Initializing A2A Client...") async with httpx.AsyncClient(timeout=120) as httpx_client: client = A2AClient( httpx_client=httpx_client, agent_card=agent_card_to_use ) - print("A2A Client Initialized.") + logger.info("A2A Client Initialized.") send_message_payload: dict[str, Any] = { 'message': { @@ -68,23 +72,22 @@ async def call_remote_agent(query: str, agent_name: str) -> str: 'messageId': uuid4().hex, }, } - print("Sending query to remote agent...") + logger.info("Sending query to remote agent...") request = SendMessageRequest( id=str(uuid4()), params=MessageSendParams(**send_message_payload) ) try: response = await client.send_message(request) response_dict = response.model_dump(mode='json', exclude_none=True) - print(f"Response received from remote agent: {response_dict}") + logger.info(f"Response received from remote agent: {response_dict}") except Exception as e: - print("Failed to send message to remote agent.") + logger.error("Failed to send message to remote agent.") raise RuntimeError("Remote agent call failed.") from e agent_response_text = "No text content found in response or an error occurred." try: agent_response_text = response_dict['result']['parts'][0]['text'] except (KeyError, IndexError) as e: - print(f"Error parsing agent response structure: {e}") + logger.error(f"Error parsing agent response structure: {e}") return agent_response_text - diff --git a/adk_a2a_mcp_integration/root_agent/response_manager.py b/adk_a2a_mcp_integration/root_agent/response_manager.py index cddb09e..d40de2d 100644 --- a/adk_a2a_mcp_integration/root_agent/response_manager.py +++ b/adk_a2a_mcp_integration/root_agent/response_manager.py @@ -1,3 +1,4 @@ +"""Module that handles interaction with the agent, maintains session and query passing.""" import asyncio import uuid import logging @@ -18,7 +19,7 @@ ) class ResponseManager: - + """Class that handles the respone management with root agent""" def __init__(self): self.agent = root_agent self.user_id = "u_123" @@ -30,8 +31,9 @@ def __init__(self): memory_service=InMemoryMemoryService(), ) - async def invoke_agent(self, session_id: str, query: str) -> str: - + async def invoke_agent(self, session_id: str, query: str): + """Invokes the root agent while maintaining sessionid + for continuance""" try: logger.info(f"Fetching session data with id: {session_id}") session = await self.runner.session_service.get_session( @@ -92,7 +94,7 @@ async def invoke_agent(self, session_id: str, query: str) -> str: } async def test_agent(): - + """Utils function to test the agent using response manager""" response_manager = ResponseManager() session_id = str(uuid.uuid4()) @@ -108,4 +110,4 @@ async def test_agent(): # second_response = await response_manager.invoke_agent(session_id=session_id, query=second_query) # logger.info(f"Second response: {second_response}") -# asyncio.run(test_agent()) \ No newline at end of file +# asyncio.run(test_agent()) diff --git a/adk_a2a_mcp_integration/root_agent/tools.py b/adk_a2a_mcp_integration/root_agent/tools.py index 54448f7..a05f88c 100644 --- a/adk_a2a_mcp_integration/root_agent/tools.py +++ b/adk_a2a_mcp_integration/root_agent/tools.py @@ -1,3 +1,4 @@ +"""Module that defines all the tools used by the root agent""" import math import datetime from zoneinfo import ZoneInfo From 8e08cf012dd66daed1080d27c0101ed6d9e860e1 Mon Sep 17 00:00:00 2001 From: Ruthvik-1411 Date: Sat, 2 Aug 2025 23:26:28 +0530 Subject: [PATCH 6/7] fix: linting --- adk_a2a_mcp_integration/app.py | 11 +- adk_a2a_mcp_integration/mcp_server/server.py | 249 ++++++++++-------- .../remote_agent/__init__.py | 3 +- .../remote_agent/__main__.py | 6 +- adk_a2a_mcp_integration/remote_agent/agent.py | 20 +- .../remote_agent/client.py | 11 +- .../remote_agent/task_manager.py | 4 +- .../root_agent/__init__.py | 3 +- adk_a2a_mcp_integration/root_agent/agent.py | 7 +- .../root_agent/remote_agent_helpers.py | 18 +- .../root_agent/response_manager.py | 10 +- adk_a2a_mcp_integration/root_agent/tools.py | 101 +++---- 12 files changed, 236 insertions(+), 207 deletions(-) diff --git a/adk_a2a_mcp_integration/app.py b/adk_a2a_mcp_integration/app.py index 60b070b..99d381e 100644 --- a/adk_a2a_mcp_integration/app.py +++ b/adk_a2a_mcp_integration/app.py @@ -1,7 +1,8 @@ """Module that interacts with backend agent using streamlit""" -import streamlit as st +# pylint: disable=line-too-long,invalid-name import uuid import asyncio +import streamlit as st from root_agent.response_manager import ResponseManager # --- Page Configuration --- @@ -67,7 +68,7 @@ async def run_agent_and_display(prompt: str): ): status_text = status_messages.get(event.get("status"), f"Processing: {event.get('status')}") status_placeholder.text(status_text) - + diagnostic_events.append(event.get("event")) if event.get("is_final_response"): final_response_text = event.get("result", "Sorry, I couldn't generate a response.") @@ -78,7 +79,7 @@ async def run_agent_and_display(prompt: str): status_placeholder.empty() st.markdown(final_response_text) - + if diagnostics_enabled and diagnostic_events: with st.expander("View Diagnostics for this response."): st.json(diagnostic_events) @@ -90,5 +91,5 @@ async def run_agent_and_display(prompt: str): }) # --- Handle User Input --- -if prompt := st.chat_input("What are some trending topics in AI?"): - asyncio.run(run_agent_and_display(prompt)) \ No newline at end of file +if user_input := st.chat_input("What are some trending topics in AI?"): + asyncio.run(run_agent_and_display(user_input)) diff --git a/adk_a2a_mcp_integration/mcp_server/server.py b/adk_a2a_mcp_integration/mcp_server/server.py index 2ad877a..0fea88a 100644 --- a/adk_a2a_mcp_integration/mcp_server/server.py +++ b/adk_a2a_mcp_integration/mcp_server/server.py @@ -1,3 +1,4 @@ +"""Module that implements simple mcp server to query Arxiv research papers collection""" import os import logging import tempfile @@ -12,128 +13,146 @@ @mcp.tool def search_arxiv(query: str, max_results: int = 5) -> dict: - """ - Searches arXiv for a given query and returns the top papers. - Args: - query: The search keyword or query. - max_results: The maximum number of results to return. - Returns: - A list of dictionaries, where each dictionary represents a paper - and contains its ID, title, summary, authors, and PDF URL. - """ - try: - search = arxiv.Search( - query=query, - max_results=max_results, - sort_by=arxiv.SortCriterion.Relevance - ) - - papers = [] - for result in search.results(): - logger.info(f"{result.title}") - paper_info = { - 'id': result.get_short_id(), - 'title': result.title, - 'summary': result.summary, - 'authors': [author.name for author in result.authors], - 'pdf_url': result.pdf_url - } - papers.append(paper_info) - - return { - "status": "success", - "result": papers - } - except Exception as e: - return { - "status": "error", - "error_message": str(e) - } + """ + Searches arXiv for a given query and returns the top papers. + Args: + query: The search keyword or query. + max_results: The maximum number of results to return. + Returns: + A list of dictionaries, where each dictionary represents a paper + and contains its ID, title, summary, authors, and PDF URL. + """ + try: + search = arxiv.Search( + query=query, + max_results=max_results, + sort_by=arxiv.SortCriterion.Relevance + ) + + papers = [] + for result in search.results(): + logger.info(f"{result.title}") + paper_info = { + 'id': result.get_short_id(), + 'title': result.title, + 'summary': result.summary, + 'authors': [author.name for author in result.authors], + 'pdf_url': result.pdf_url + } + papers.append(paper_info) + + return { + "status": "success", + "result": papers + } + except Exception as e: + return { + "status": "error", + "error_message": str(e) + } @mcp.tool() def get_paper_md(paper_id: str) -> dict: - """ - Retrieves the text content of an arXiv paper in Markdown format. - Args: - paper_id: The ID of the paper (e.g., '1706.03762v7'). - Returns: - The text content of the paper as a Markdown string. - Returns an error message if any step fails. - """ - try: - search = arxiv.Search(id_list=[paper_id]) - paper = next(search.results()) - pdf_url = paper.pdf_url - logger.info(f"Found paper: '{paper.title}'") - logger.info(f"Downloading from: {pdf_url}") - - except StopIteration: - return {"status": "error", "error_message": f"Paper with ID '{paper_id}' not found on arXiv."} - except Exception as e: - return {"status": "error", "error_message": f"Error searching for the paper: {e}"} - - try: - # Download the PDF content - response = requests.get(pdf_url) - response.raise_for_status() - pdf_bytes = response.content - logger.info("PDF downloaded successfully.") - - except requests.exceptions.RequestException as e: - return {"status": "error", "error_message": f"Error downloading the PDF file, request failure: {e}"} - except Exception as e: - return {"status": "error", "error_message": f"Error downloading the PDF file: {e}"} - - temp_pdf_path = None - try: - with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: - temp_file.write(pdf_bytes) - temp_pdf_path = temp_file.name # Get the path of the temporary file - - logger.info(f"PDF content written to temporary file: {temp_pdf_path}") - logger.info("Converting PDF to Markdown...") - # Pass the file path to the conversion function - md_text = pymupdf4llm.to_markdown(temp_pdf_path) - - logger.info("Conversion complete.") - if temp_pdf_path and os.path.exists(temp_pdf_path): - os.remove(temp_pdf_path) - logger.info(f"Temporary file {temp_pdf_path} deleted.") - return {"status": "success", "result": md_text} - - except Exception as e: - return {"status": "error", "error_message": f"Error converting PDF to Markdown: {e}"} + """ + Retrieves the text content of an arXiv paper in Markdown format. + Args: + paper_id: The ID of the paper (e.g., '1706.03762v7'). + Returns: + The text content of the paper as a Markdown string. + Returns an error message if any step fails. + """ + try: + search = arxiv.Search(id_list=[paper_id]) + paper = next(search.results()) + pdf_url = paper.pdf_url + logger.info(f"Found paper: '{paper.title}'") + logger.info(f"Downloading from: {pdf_url}") + + except StopIteration: + return { + "status": "error", + "error_message": f"Paper with ID '{paper_id}' not found on arXiv." + } + except Exception as e: + return { + "status": "error", + "error_message": f"Error searching for the paper: {e}" + } + + try: + # Download the PDF content + response = requests.get(pdf_url) + response.raise_for_status() + pdf_bytes = response.content + logger.info("PDF downloaded successfully.") + + except requests.exceptions.RequestException as e: + return { + "status": "error", + "error_message": f"Error downloading the PDF file, request failure: {e}" + } + except Exception as e: + return { + "status": "error", + "error_message": f"Error downloading the PDF file: {e}" + } + + temp_pdf_path = None + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: + temp_file.write(pdf_bytes) + temp_pdf_path = temp_file.name # Get the path of the temporary file + + logger.info(f"PDF content written to temporary file: {temp_pdf_path}") + logger.info("Converting PDF to Markdown...") + # Pass the file path to the conversion function + md_text = pymupdf4llm.to_markdown(temp_pdf_path) + + logger.info("Conversion complete.") + if temp_pdf_path and os.path.exists(temp_pdf_path): + os.remove(temp_pdf_path) + logger.info(f"Temporary file {temp_pdf_path} deleted.") + return {"status": "success", "result": md_text} + + except Exception as e: + return { + "status": "error", + "error_message": f"Error converting PDF to Markdown: {e}" + } # To pass the paper directly as media to llm, to be used later # @mcp.tool() # def get_paper_raw(paper_id: str) -> dict: -# """ -# Retrieves the raw PDF file of an arXiv paper. -# Args: -# paper_id: The ID of the paper (e.g., '1706.03762v7'). -# Returns: -# The raw bytes of the PDF file, or None if the paper is not found. -# """ -# try: -# # Search for the paper by its ID -# search = arxiv.Search(id_list=[paper_id]) -# paper = next(search.results()) - -# # Download the PDF content -# response = requests.get(paper.pdf_url) -# response.raise_for_status() -# return { -# "status": "success", -# "result":response.content -# } -# except StopIteration: -# return {"status": "error", "error_message": f"Paper with ID {paper_id} not found on arXiv."} -# except requests.exceptions.RequestException as e: -# logger.info(f"Error downloading PDF: {e}") -# return {"status": "error", "error_message": f"Error downloading PDF: {e}"} -# except Exception as e: -# logger.info(f"Error: {e}") -# return {"status": "error", "error_message": f"Error: {e}"} +# """ +# Retrieves the raw PDF file of an arXiv paper. +# Args: +# paper_id: The ID of the paper (e.g., '1706.03762v7'). +# Returns: +# The raw bytes of the PDF file, or None if the paper is not found. +# """ +# try: +# # Search for the paper by its ID +# search = arxiv.Search(id_list=[paper_id]) +# paper = next(search.results()) + +# # Download the PDF content +# response = requests.get(paper.pdf_url) +# response.raise_for_status() +# return { +# "status": "success", +# "result":response.content +# } +# except StopIteration: +# return { +# "status": "error", +# "error_message": f"Paper with ID {paper_id} not found on arXiv." +# } +# except requests.exceptions.RequestException as e: +# logger.info(f"Error downloading PDF: {e}") +# return {"status": "error", "error_message": f"Error downloading PDF: {e}"} +# except Exception as e: +# logger.info(f"Error: {e}") +# return {"status": "error", "error_message": f"Error: {e}"} if __name__ == "__main__": mcp.run(transport="http", host="127.0.0.1", port=8000, path="/mcp") diff --git a/adk_a2a_mcp_integration/remote_agent/__init__.py b/adk_a2a_mcp_integration/remote_agent/__init__.py index 63bd45e..96d00fd 100644 --- a/adk_a2a_mcp_integration/remote_agent/__init__.py +++ b/adk_a2a_mcp_integration/remote_agent/__init__.py @@ -1 +1,2 @@ -from . import agent \ No newline at end of file +"""Module to start the agent, primarily used by adk web""" +from . import agent diff --git a/adk_a2a_mcp_integration/remote_agent/__main__.py b/adk_a2a_mcp_integration/remote_agent/__main__.py index 0392e0e..a48c5e2 100644 --- a/adk_a2a_mcp_integration/remote_agent/__main__.py +++ b/adk_a2a_mcp_integration/remote_agent/__main__.py @@ -1,7 +1,7 @@ """Module that exposes the remote agent as a server, sharing it's capabilities and methods to invoke it""" -import uvicorn import logging +import uvicorn from a2a.server.apps import A2AStarletteApplication from a2a.server.request_handlers import DefaultRequestHandler from a2a.server.tasks import InMemoryTaskStore @@ -10,7 +10,7 @@ logger = logging.getLogger(__name__) -agent_url = "http://localhost:8090/" +AGENT_URL = "http://localhost:8090/" def start_remote_agent(): """Start the remote agent and expose it's capabilities""" @@ -31,7 +31,7 @@ def start_remote_agent(): public_agent_card = AgentCard( name="Search agent", description="Agent that can search the internet to answer queries.", - url=agent_url, + url=AGENT_URL, version="0.0.1", skills=[agent_skill], defaultInputModes=['text'], diff --git a/adk_a2a_mcp_integration/remote_agent/agent.py b/adk_a2a_mcp_integration/remote_agent/agent.py index 32c09d2..4a3936c 100644 --- a/adk_a2a_mcp_integration/remote_agent/agent.py +++ b/adk_a2a_mcp_integration/remote_agent/agent.py @@ -19,8 +19,8 @@ name="search_agent", model="gemini-2.0-flash", description="Agent capable of searching internet to find relevant answers to user questions.", - instruction="""You are an friendly and supportive agent. Your job is to try to answer the user question using the search tool. - Always provide accurate and relevant information.""", + instruction="""You are an friendly and supportive agent. Your job is to try to answer + the user question using the search tool. Always provide accurate and relevant information.""", tools=[google_search], ) @@ -40,12 +40,12 @@ async def invoke(self, session_id: str, query: str, user_id: str = None): """Invoke the agent""" try: if not user_id: - user_id = "Default User" - session_instance = await self.runner.session_service.get_session( - session_id=session_id, - user_id=user_id, - app_name=self.agent.name - ) + user_id = "Default User" + session_instance = await self.runner.session_service.get_session( + session_id=session_id, + user_id=user_id, + app_name=self.agent.name + ) if not session_instance: logger.info(f"Creating new session with id: {session_id}") @@ -54,7 +54,7 @@ async def invoke(self, session_id: str, query: str, user_id: str = None): user_id=user_id, app_name=self.agent.name ) - + user_content = types.Content( role="user", parts=[types.Part.from_text(text=query)] ) @@ -66,7 +66,7 @@ async def invoke(self, session_id: str, query: str, user_id: str = None): new_message=user_content ): # We can break when there's final response, - # but for telemetry usage, the loop must complete + # but for telemetry usage, the loop must complete # logger.debug(f"Event: {event}") if event.is_final_response(): if event.content and event.content.parts and event.content.parts[-1].text: diff --git a/adk_a2a_mcp_integration/remote_agent/client.py b/adk_a2a_mcp_integration/remote_agent/client.py index 2645774..03ebda8 100644 --- a/adk_a2a_mcp_integration/remote_agent/client.py +++ b/adk_a2a_mcp_integration/remote_agent/client.py @@ -1,8 +1,8 @@ """Module to test the remote agent exposed via A2A. Mimic's client side implementation""" -import httpx import asyncio import logging from typing import Any +import httpx from uuid import uuid4 from a2a.client import A2ACardResolver, A2AClient from a2a.types import MessageSendParams, SendMessageRequest @@ -22,7 +22,7 @@ async def client(): logger.info(agent_card.model_dump_json(indent=2, exclude_none=True)) logger.info("Initializing A2A Client") - client = A2AClient( + client_instance = A2AClient( httpx_client=httpx_client, agent_card=agent_card ) logger.info('A2A Client initialized.') @@ -31,7 +31,10 @@ async def client(): 'message': { 'role': 'user', 'parts': [ - {'kind': 'text', 'text': 'What is model context protocol? Give a brief description.'} + { + 'kind': 'text', + 'text': 'What is model context protocol? Give a brief description.' + } ], 'messageId': uuid4().hex, }, @@ -41,7 +44,7 @@ async def client(): id=str(uuid4()), params=MessageSendParams(**send_message_payload) ) - response = await client.send_message(request) + response = await client_instance.send_message(request) logger.info(response.model_dump(mode='json', exclude_none=True)) response_dict = response.model_dump(mode='json', exclude_none=True) agent_response_text = "No text content found in response or an error occurred." diff --git a/adk_a2a_mcp_integration/remote_agent/task_manager.py b/adk_a2a_mcp_integration/remote_agent/task_manager.py index dd8280a..627ed9c 100644 --- a/adk_a2a_mcp_integration/remote_agent/task_manager.py +++ b/adk_a2a_mcp_integration/remote_agent/task_manager.py @@ -10,7 +10,7 @@ class BasicSearchAgentExecutor(AgentExecutor): """Agent executor that invokes the agent""" - + def __init__(self): super().__init__() self._agent = BasicSearchAgent() @@ -46,4 +46,4 @@ async def execute(self, request_context: RequestContext, event_queue: EventQueue async def cancel(self, request_context: RequestContext, event_queue: EventQueue) -> None: """To cancel an ongoing agent execution""" - raise Exception('cancel not supported at this moment!') + raise ValueError('cancel not supported at this moment!') diff --git a/adk_a2a_mcp_integration/root_agent/__init__.py b/adk_a2a_mcp_integration/root_agent/__init__.py index 63bd45e..96d00fd 100644 --- a/adk_a2a_mcp_integration/root_agent/__init__.py +++ b/adk_a2a_mcp_integration/root_agent/__init__.py @@ -1 +1,2 @@ -from . import agent \ No newline at end of file +"""Module to start the agent, primarily used by adk web""" +from . import agent diff --git a/adk_a2a_mcp_integration/root_agent/agent.py b/adk_a2a_mcp_integration/root_agent/agent.py index 16bad9b..f99ca7a 100644 --- a/adk_a2a_mcp_integration/root_agent/agent.py +++ b/adk_a2a_mcp_integration/root_agent/agent.py @@ -23,9 +23,10 @@ name="root_agent", model="gemini-2.0-flash", description="Agent to answer questions using tools provided.", - instruction="""You are a helpful agent who can answer user questions about current time and can do calculations. - For any queries that require latest/external information, identify if any remote agents can help with that. - Once you found the relevant agents, use the appropriate tools to get the answer the user query.""", + instruction="""You are a helpful agent who can answer user questions about current time + and can do calculations. For any queries that require latest/external information, + identify if any remote agents can help with that. Once you found the relevant agents, + use the appropriate tools to get the answer the user query.""", tools=[get_current_time, calculate_expression, simple_mcp_tool, diff --git a/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py b/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py index 23e097b..cd271fd 100644 --- a/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py +++ b/adk_a2a_mcp_integration/root_agent/remote_agent_helpers.py @@ -1,8 +1,8 @@ """Helper functions to interact with other agents exposed via A2A""" -import httpx import logging from typing import Any from uuid import uuid4 +import httpx from a2a.client import A2ACardResolver, A2AClient from a2a.types import MessageSendParams, SendMessageRequest @@ -10,11 +10,11 @@ remote_agent_cards_cache = [] -remote_agent_url = "http://localhost:8090/" +REMOTE_AGENT_URL = "http://localhost:8090/" async def list_remote_agents() -> list[dict]: """Fetches the capabilities of all available remote agents""" - + if remote_agent_cards_cache: logger.info("Remote agents card cache exists, using cache") return remote_agent_cards_cache @@ -24,13 +24,13 @@ async def list_remote_agents() -> list[dict]: # get only one agent url for now resolver = A2ACardResolver( httpx_client=httpx_client, - base_url=remote_agent_url, + base_url=REMOTE_AGENT_URL, ) logger.info("Attempting to fetch agent card...") agent_card = await resolver.get_agent_card() logger.info('Agent card fetched. Agent card:') logger.info(agent_card.model_dump_json(indent=2, exclude_none=True)) - + remote_agent_cards_cache.append({ "agent_name": agent_card.name, "agent_card": agent_card @@ -43,11 +43,11 @@ async def list_remote_agents() -> list[dict]: async def call_remote_agent(query: str, agent_name: str) -> str: """Call the remote agent with appropriate query""" - + agent_cards = await list_remote_agents() - + agent_card_to_use = None - + for card in agent_cards: if card.get("agent_name") == agent_name: agent_card_to_use = card.get("agent_card") @@ -83,7 +83,7 @@ async def call_remote_agent(query: str, agent_name: str) -> str: except Exception as e: logger.error("Failed to send message to remote agent.") raise RuntimeError("Remote agent call failed.") from e - + agent_response_text = "No text content found in response or an error occurred." try: agent_response_text = response_dict['result']['parts'][0]['text'] diff --git a/adk_a2a_mcp_integration/root_agent/response_manager.py b/adk_a2a_mcp_integration/root_agent/response_manager.py index d40de2d..6a88ff7 100644 --- a/adk_a2a_mcp_integration/root_agent/response_manager.py +++ b/adk_a2a_mcp_integration/root_agent/response_manager.py @@ -85,7 +85,7 @@ async def invoke_agent(self, session_id: str, query: str): "event": event.model_dump(mode='json', exclude_none=True) } except Exception as e: - logger.info(f"Error generating response.") + logger.error(f"Error generating response. {str(e)}") yield { "is_final_response": True, "status": "fail", @@ -93,21 +93,23 @@ async def invoke_agent(self, session_id: str, query: str): "error_message": str(e) } -async def test_agent(): +async def test_agent(): """Utils function to test the agent using response manager""" response_manager = ResponseManager() session_id = str(uuid.uuid4()) first_query = "What are some trending topics in AI?" - # first_response = await response_manager.invoke_agent(session_id=session_id, query=first_query) + # first_response = await response_manager.invoke_agent(session_id=session_id, + # query=first_query) first_response = response_manager.invoke_agent(session_id=session_id, query=first_query) async for response in first_response: logger.info(f"Events: {response}") # logger.info(f"First response: {first_response}") # second_query = "What question did I ask you?" - # second_response = await response_manager.invoke_agent(session_id=session_id, query=second_query) + # second_response = await response_manager.invoke_agent(session_id=session_id, + # query=second_query) # logger.info(f"Second response: {second_response}") # asyncio.run(test_agent()) diff --git a/adk_a2a_mcp_integration/root_agent/tools.py b/adk_a2a_mcp_integration/root_agent/tools.py index a05f88c..32b7fec 100644 --- a/adk_a2a_mcp_integration/root_agent/tools.py +++ b/adk_a2a_mcp_integration/root_agent/tools.py @@ -1,31 +1,32 @@ """Module that defines all the tools used by the root agent""" +# pylint: disable=eval-used import math import datetime from zoneinfo import ZoneInfo def get_current_time(country: str) -> dict: - """Returns the current time in a specified country. - Args: - country (str): The name of the country for which to retrieve the current time. - Returns: - dict: status and result or error msg. - """ - if country.lower() == "india": - tz_identifier = "Asia/Kolkata" - else: - return { - "status": "error", - "error_message": ( - f"Sorry, I don't have timezone information for {country}." - ), - } + """Returns the current time in a specified country. + Args: + country (str): The name of the country for which to retrieve the current time. + Returns: + dict: status and result or error msg. + """ + if country.lower() == "india": + tz_identifier = "Asia/Kolkata" + else: + return { + "status": "error", + "error_message": ( + f"Sorry, I don't have timezone information for {country}." + ), + } - tz = ZoneInfo(tz_identifier) - now = datetime.datetime.now(tz) - report = ( - f'The current time in {country} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}' - ) - return {"status": "success", "result": report} + tz = ZoneInfo(tz_identifier) + now = datetime.datetime.now(tz) + report = ( + f'The current time in {country} is {now.strftime("%Y-%m-%d %H:%M:%S %Z%z")}' + ) + return {"status": "success", "result": report} ALLOWED_FUNCTIONS = { "math": math, @@ -45,34 +46,34 @@ def get_current_time(country: str) -> dict: } def calculate_expression(expression: str) -> dict: - """Evaluates a mathematical expression and returns the result. - Supports basic operators (+, -, *, /, **, %), mathematical functions - and constants (pi, e). Uses a restricted evaluation context for safe execution. + """Evaluates a mathematical expression and returns the result. + Supports basic operators (+, -, *, /, **, %), mathematical functions + and constants (pi, e). Uses a restricted evaluation context for safe execution. - Args: - expression: The mathematical expression to evaluate as a string. - Examples: "2 + 2", "sqrt(16) * 2", "log(100, 10)" - Returns: - On success: {"result": } - On error: {"error": } + Args: + expression: The mathematical expression to evaluate as a string. + Examples: "2 + 2", "sqrt(16) * 2", "log(100, 10)" + Returns: + On success: {"result": } + On error: {"error": } - Notes: - - Use 'x' as the variable (e.g., x**2, not x²) - - Multiplication must be explicitly indicated with * (e.g., 2*x, not 2x) - - Powers are represented with ** (e.g., x**2, not x^2) - """ - try: - result = eval( - expression, - {"__builtins__": {}}, - ALLOWED_FUNCTIONS, - ) - return { - "status": "success", - "result": result - } - except Exception as e: - return { - "status": "error", - "error_message": str(e) - } + Notes: + - Use 'x' as the variable (e.g., x**2, not x²) + - Multiplication must be explicitly indicated with * (e.g., 2*x, not 2x) + - Powers are represented with ** (e.g., x**2, not x^2) + """ + try: + result = eval( + expression, + {"__builtins__": {}}, + ALLOWED_FUNCTIONS, + ) + return { + "status": "success", + "result": result + } + except Exception as e: + return { + "status": "error", + "error_message": str(e) + } From 3ef27890776df4a20317777dab4cda4436b9ccb6 Mon Sep 17 00:00:00 2001 From: Ruthvik-1411 Date: Sat, 2 Aug 2025 23:31:09 +0530 Subject: [PATCH 7/7] fix: linting + minor readme chgs --- adk_a2a_mcp_integration/Readme.md | 1 + adk_a2a_mcp_integration/app.py | 2 +- adk_a2a_mcp_integration/mcp_server/server.py | 2 +- adk_a2a_mcp_integration/remote_agent/client.py | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/adk_a2a_mcp_integration/Readme.md b/adk_a2a_mcp_integration/Readme.md index e02778e..0ae140e 100644 --- a/adk_a2a_mcp_integration/Readme.md +++ b/adk_a2a_mcp_integration/Readme.md @@ -1,6 +1,7 @@ # Boilerplate for Agent interaction with ADK, A2A and MCP. [To UPDATE] +A simple implementation of agent which has access to some custom tools and an MCP server for exploring Arxiv research papers. It also has the ability to interact with remote agents using A2A Protocol. The agents are built using ADK and can be run using `adk web` or custom streamlit UI. ```bash # To start remote agent diff --git a/adk_a2a_mcp_integration/app.py b/adk_a2a_mcp_integration/app.py index 99d381e..3f70c6b 100644 --- a/adk_a2a_mcp_integration/app.py +++ b/adk_a2a_mcp_integration/app.py @@ -44,7 +44,7 @@ # --- Coroutine to handle agent invocation and UI updates --- async def run_agent_and_display(prompt: str): - # Add user message to session state and display it + """Add user message to session state and display it""" st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) diff --git a/adk_a2a_mcp_integration/mcp_server/server.py b/adk_a2a_mcp_integration/mcp_server/server.py index 0fea88a..39367fc 100644 --- a/adk_a2a_mcp_integration/mcp_server/server.py +++ b/adk_a2a_mcp_integration/mcp_server/server.py @@ -155,4 +155,4 @@ def get_paper_md(paper_id: str) -> dict: # return {"status": "error", "error_message": f"Error: {e}"} if __name__ == "__main__": - mcp.run(transport="http", host="127.0.0.1", port=8000, path="/mcp") + mcp.run(transport="http", host="127.0.0.1", port=8000, path="/mcp") diff --git a/adk_a2a_mcp_integration/remote_agent/client.py b/adk_a2a_mcp_integration/remote_agent/client.py index 03ebda8..ee6a200 100644 --- a/adk_a2a_mcp_integration/remote_agent/client.py +++ b/adk_a2a_mcp_integration/remote_agent/client.py @@ -2,8 +2,8 @@ import asyncio import logging from typing import Any -import httpx from uuid import uuid4 +import httpx from a2a.client import A2ACardResolver, A2AClient from a2a.types import MessageSendParams, SendMessageRequest