Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions adk_a2a_mcp_integration/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# To use api key
GOOGLE_GENAI_USE_VERTEXAI=False

# Google api key, get this from ai studio
GOOGLE_API_KEY=
19 changes: 19 additions & 0 deletions adk_a2a_mcp_integration/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Boilerplate for Agent interaction with ADK, A2A and MCP.

[To UPDATE]
A simple implementation of agent which has access to some custom tools and an MCP server for exploring Arxiv research papers. It also has the ability to interact with remote agents using A2A Protocol. The agents are built using ADK and can be run using `adk web` or custom streamlit UI.

```bash
# To start remote agent
python -m remote_agent
```

```bash
# To start MCP server
python mcp_server/server.py
```

```bash
# To interact with root agent in UI
streamlit run app.py
```
95 changes: 95 additions & 0 deletions adk_a2a_mcp_integration/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Module that interacts with backend agent using streamlit"""
# pylint: disable=line-too-long,invalid-name
import uuid
import asyncio
import streamlit as st
from root_agent.response_manager import ResponseManager

# --- Page Configuration ---
st.set_page_config(page_title="Agent Response Manager", layout="wide")

# --- Session State Initialization ---
if "response_manager" not in st.session_state:
st.session_state.response_manager = ResponseManager()
if "session_id" not in st.session_state:
st.session_state.session_id = str(uuid.uuid4())
st.toast(f"New session created: {st.session_state.session_id}")
if "messages" not in st.session_state:
st.session_state.messages = []

# --- Sidebar ---
with st.sidebar:
st.title("Settings")
st.markdown(f"**Session ID:**\n`{st.session_state.session_id}`")

if st.button("New Session"):
st.session_state.session_id = str(uuid.uuid4())
st.session_state.messages = []
st.toast(f"New session started: {st.session_state.session_id}")
st.rerun()

st.divider()
diagnostics_enabled = st.checkbox("Enable Diagnostics", value=True)

# --- Main Chat Interface ---
st.title("Agent Chat UI")

# Display chat history from session state
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if diagnostics_enabled and "diagnostics" in message and message["diagnostics"]:
with st.expander("View Diagnostics for this response."):
st.json(message["diagnostics"])

# --- Coroutine to handle agent invocation and UI updates ---
async def run_agent_and_display(prompt: str):
"""Add user message to session state and display it"""
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)

# --- Assistant's turn ---
with st.chat_message("assistant"):
final_response_text = ""
diagnostic_events = []
status_messages = {
"tool_call": "Calling tool... 🛠️",
"tool_response": "Processing tool result... ⚙️",
"finished": "Response generation complete. ✅"
}

status_placeholder = st.empty()
try:
# Collect all events and the final response from the agent
status_placeholder.text("Thinking... 🤔")
async for event in st.session_state.response_manager.invoke_agent(
session_id=st.session_state.session_id, query=prompt
):
status_text = status_messages.get(event.get("status"), f"Processing: {event.get('status')}")
status_placeholder.text(status_text)

diagnostic_events.append(event.get("event"))
if event.get("is_final_response"):
final_response_text = event.get("result", "Sorry, I couldn't generate a response.")

except Exception as e:
st.error(f"An error occurred: {e}")
final_response_text = "Sorry, I ran into an error."

status_placeholder.empty()
st.markdown(final_response_text)

if diagnostics_enabled and diagnostic_events:
with st.expander("View Diagnostics for this response."):
st.json(diagnostic_events)

st.session_state.messages.append({
"role": "assistant",
"content": final_response_text,
"diagnostics": diagnostic_events
})

# --- Handle User Input ---
if user_input := st.chat_input("What are some trending topics in AI?"):
asyncio.run(run_agent_and_display(user_input))
3 changes: 3 additions & 0 deletions adk_a2a_mcp_integration/mcp_server/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fastmcp==2.10.6
arxiv==2.2.0
pymupdf4llm==0.0.27
158 changes: 158 additions & 0 deletions adk_a2a_mcp_integration/mcp_server/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Module that implements simple mcp server to query Arxiv research papers collection"""
import os
import logging
import tempfile
import requests
import arxiv
import pymupdf4llm
from fastmcp import FastMCP

logger = logging.getLogger(__name__)

mcp = FastMCP("ArxivExplorer")

@mcp.tool
def search_arxiv(query: str, max_results: int = 5) -> dict:
"""
Searches arXiv for a given query and returns the top papers.
Args:
query: The search keyword or query.
max_results: The maximum number of results to return.
Returns:
A list of dictionaries, where each dictionary represents a paper
and contains its ID, title, summary, authors, and PDF URL.
"""
try:
search = arxiv.Search(
query=query,
max_results=max_results,
sort_by=arxiv.SortCriterion.Relevance
)

papers = []
for result in search.results():
logger.info(f"{result.title}")
paper_info = {
'id': result.get_short_id(),
'title': result.title,
'summary': result.summary,
'authors': [author.name for author in result.authors],
'pdf_url': result.pdf_url
}
papers.append(paper_info)

return {
"status": "success",
"result": papers
}
except Exception as e:
return {
"status": "error",
"error_message": str(e)
}

@mcp.tool()
def get_paper_md(paper_id: str) -> dict:
"""
Retrieves the text content of an arXiv paper in Markdown format.
Args:
paper_id: The ID of the paper (e.g., '1706.03762v7').
Returns:
The text content of the paper as a Markdown string.
Returns an error message if any step fails.
"""
try:
search = arxiv.Search(id_list=[paper_id])
paper = next(search.results())
pdf_url = paper.pdf_url
logger.info(f"Found paper: '{paper.title}'")
logger.info(f"Downloading from: {pdf_url}")

except StopIteration:
return {
"status": "error",
"error_message": f"Paper with ID '{paper_id}' not found on arXiv."
}
except Exception as e:
return {
"status": "error",
"error_message": f"Error searching for the paper: {e}"
}

try:
# Download the PDF content
response = requests.get(pdf_url)
response.raise_for_status()
pdf_bytes = response.content
logger.info("PDF downloaded successfully.")

except requests.exceptions.RequestException as e:
return {
"status": "error",
"error_message": f"Error downloading the PDF file, request failure: {e}"
}
except Exception as e:
return {
"status": "error",
"error_message": f"Error downloading the PDF file: {e}"
}

temp_pdf_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file:
temp_file.write(pdf_bytes)
temp_pdf_path = temp_file.name # Get the path of the temporary file

logger.info(f"PDF content written to temporary file: {temp_pdf_path}")
logger.info("Converting PDF to Markdown...")
# Pass the file path to the conversion function
md_text = pymupdf4llm.to_markdown(temp_pdf_path)

logger.info("Conversion complete.")
if temp_pdf_path and os.path.exists(temp_pdf_path):
os.remove(temp_pdf_path)
logger.info(f"Temporary file {temp_pdf_path} deleted.")
return {"status": "success", "result": md_text}

except Exception as e:
return {
"status": "error",
"error_message": f"Error converting PDF to Markdown: {e}"
}

# To pass the paper directly as media to llm, to be used later
# @mcp.tool()
# def get_paper_raw(paper_id: str) -> dict:
# """
# Retrieves the raw PDF file of an arXiv paper.
# Args:
# paper_id: The ID of the paper (e.g., '1706.03762v7').
# Returns:
# The raw bytes of the PDF file, or None if the paper is not found.
# """
# try:
# # Search for the paper by its ID
# search = arxiv.Search(id_list=[paper_id])
# paper = next(search.results())

# # Download the PDF content
# response = requests.get(paper.pdf_url)
# response.raise_for_status()
# return {
# "status": "success",
# "result":response.content
# }
# except StopIteration:
# return {
# "status": "error",
# "error_message": f"Paper with ID {paper_id} not found on arXiv."
# }
# except requests.exceptions.RequestException as e:
# logger.info(f"Error downloading PDF: {e}")
# return {"status": "error", "error_message": f"Error downloading PDF: {e}"}
# except Exception as e:
# logger.info(f"Error: {e}")
# return {"status": "error", "error_message": f"Error: {e}"}

if __name__ == "__main__":
mcp.run(transport="http", host="127.0.0.1", port=8000, path="/mcp")
2 changes: 2 additions & 0 deletions adk_a2a_mcp_integration/remote_agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"""Module to start the agent, primarily used by adk web"""
from . import agent
56 changes: 56 additions & 0 deletions adk_a2a_mcp_integration/remote_agent/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Module that exposes the remote agent as a server, sharing it's capabilities
and methods to invoke it"""
import logging
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from remote_agent.task_manager import BasicSearchAgentExecutor

logger = logging.getLogger(__name__)

AGENT_URL = "http://localhost:8090/"

def start_remote_agent():
"""Start the remote agent and expose it's capabilities"""
agent_skill = AgentSkill(
id="search_agent",
name="Search Agent",
description="""Agent that can get the latest search results from the
internet using google search and gives accurate results""",
input_modes=["text"],
output_modes=["text"],
tags=["search agent", "google search tool", "web search"],
examples=[
"What are the latest news in AI?",
"Explain the key difference between Langchain and Langgraph.",
"Who won the last IPL match?"]
)

public_agent_card = AgentCard(
name="Search agent",
description="Agent that can search the internet to answer queries.",
url=AGENT_URL,
version="0.0.1",
skills=[agent_skill],
defaultInputModes=['text'],
defaultOutputModes=['text'],
capabilities=AgentCapabilities(streaming=True),
supportsAuthenticatedExtendedCard=False,
)

request_handler = DefaultRequestHandler(
agent_executor=BasicSearchAgentExecutor(),
task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
agent_card=public_agent_card,
http_handler=request_handler
)
app = server.build()
logger.info("Uvicorn server starting...")
uvicorn.run(app, host="127.0.0.1", port=8090)

if __name__ == "__main__":
start_remote_agent()
Loading