diff --git a/README.md b/README.md index e59b6f4..a4f5c16 100644 --- a/README.md +++ b/README.md @@ -1,122 +1,204 @@ -> **⚠️ Disclaimer:** This documentation describes a *hypothetical* tool, `mcp.agent` (v1.0.0). The codebase provided is a functional proof-of-concept demonstrating **stateless context**, **Firestore-backed async BQ job tracking**, GCS Signed URLs, and basic environment variable handling. It requires comprehensive testing, security hardening, and feature refinement for production use. +> **⚠️ Disclaimer:** This documentation describes `mcp.agent` (v1.0.0). The codebase provided is a functional proof-of-concept demonstrating **stateless context**, **Firestore-backed asynchronous BigQuery job tracking**, GCS Signed URLs, and basic environment variable handling. It requires comprehensive testing, security hardening, and feature refinement before any consideration for production use. # `mcp.agent`: Simplified & Scalable GCP Integration for MCP Agents (v1.0.0) -`mcp.agent` is a command-line tool designed to significantly ease the integration of common Google Cloud services – specifically **Google Cloud Storage (GCS)** and **BigQuery (BQ)** – into applications using the **Model Context Protocol (MCP)**. +`mcp.agent` is a command-line tool designed to significantly ease the integration of common Google Cloud Platform (GCP) services – specifically **Google Cloud Storage (GCS)** and **BigQuery (BQ)** – into applications using the **Model Context Protocol (MCP)**. -This **v1.0.0** milestone focuses on **scalability and robustness** by: +This **v1.0.0** milestone focuses on enhancing **scalability and robustness** by: -* Adopting a **stateless design** regarding user context (buckets/datasets). Clients must now provide necessary identifiers in each relevant call. -* Persisting **BigQuery job state** in **Google Cloud Firestore** for durability across server instances and restarts. -* Utilizing **GCS Signed URLs** for efficient, scalable large file transfers. -* Implementing an **asynchronous pattern for BigQuery queries** with server-side polling updates stored in Firestore. +* Adopting a **stateless design**. Clients must provide necessary resource identifiers (e.g., bucket name, dataset ID) in each relevant tool call. Context-setting tools are removed. +* Persisting **BigQuery job state** in **Google Cloud Firestore**, enabling job status tracking across server instances and restarts. +* Utilizing **GCS Signed URLs** for efficient and scalable large file transfers directly between the client and GCS. +* Implementing an **asynchronous pattern for BigQuery queries**, with server-side polling of job status and updates to Firestore. -It automates the creation and management of a specialized MCP server, exposing GCS and BQ functionalities as standard MCP tools. +The tool provides an MCP server that exposes GCS and BQ functionalities as standard MCP tools. ## Problem Solved -Integrating cloud services often involves boilerplate. `mcp.agent` simplifies common GCS/BQ tasks by providing pre-built tools, handling server-side auth (ADC), and using scalable cloud patterns (Signed URLs, Firestore), while removing the scalability bottleneck of in-memory connection state. +Integrating cloud services often involves repetitive boilerplate and complex authentication. `mcp.agent` simplifies this by: + +* Providing pre-built, stateless MCP tools for common GCS and BQ operations. +* Handling server-side authentication using Google Cloud's Application Default Credentials (ADC). +* Employing scalable cloud patterns like Signed URLs and Firestore-backed job tracking. +* Removing scalability bottlenecks associated with in-memory connection state. ## Core Features (v1.0.0) * 🚀 **Automated Server Launch:** Single command (`mcp-agent`) starts the MCP server. -* 🛠️ **Focused GCP Toolset:** Pre-built tools for GCS (buckets, object listing) and BigQuery (datasets, tables, async queries). **Context tools are removed.** -* 🔗 **GCS Signed URLs:** Scalable large file reads/writes via direct client-GCS interaction. -* ⏳ **Async BigQuery Queries with Firestore State:** Submits BQ queries, tracks status persistently in Firestore (polled by server), allows clients to check status and retrieve paginated results. -* ✅ **Stateless Context:** Server no longer holds per-connection bucket/dataset defaults, improving scalability and simplifying deployment. **Clients must provide identifiers.** +* 🛠️ **Stateless GCP Toolset:** + * **GCS:** List buckets, list objects, generate V4 signed URLs for read/write, write string to object. + * **BigQuery:** List datasets, list tables, get table schema, submit asynchronous queries, get job status, retrieve paginated query results. + * *Context-setting tools (e.g., `gcs_set_context_bucket`) are removed.* +* 🔗 **GCS Signed URLs:** Scalable large file transfers via direct client-GCS interaction. +* ⏳ **Async BigQuery Queries with Firestore State:** + * Submits BQ queries and tracks status persistently in Firestore via `FirestoreBqJobStore`. + * A background poller (`bq_poller.py`) updates job statuses in Firestore. + * Clients can check job status (`bq_get_job_status`) and retrieve paginated results (`bq_get_query_results`). +* ✅ **Stateless Server:** Enhances scalability and simplifies deployment. Clients must provide all resource identifiers. * 🧩 **Standard MCP Interface:** Compatible with any MCP client (`list_tools`, `call_tool`). * 🔒 **Server-Side Authentication:** Uses Application Default Credentials (ADC). -* 🔑 **Environment-Based Config:** Manages optional SSE API key via `.env`/env vars (`python-dotenv`, Secret Manager support). -* 🌐 **Flexible Transports:** Supports `stdio` and `sse`. +* 🔑 **Environment-Based Configuration:** Manages SSE API key via `.env` or environment variables (supports Google Secret Manager). GCP project for Firestore can be set via `GCP_PROJECT` env var. +* 🌐 **Flexible Transports:** Supports `stdio` and `sse` (Server-Sent Events via `aiohttp`). +* 📝 **JSON Logging:** Uses `python-json-logger` for structured logging. ## How it Works (v1.0.0 Technical Overview) -1. **CLI (`cli.py`):** Loads `.env`, parses args, determines API key (checking Secret Manager env var first, then direct env var), performs GCP client pre-flight checks (including Firestore), starts server transport. -2. **MCP Server Core (`server.py`):** Manages MCP connections. **No longer holds user context.** Uses `FirestoreBqJobStore` for BQ job state. Runs background BQ poller task (reading from/writing to Firestore). Routes `call_tool` requests, injecting the job store where needed. -3. **GCP Tool Implementations (`gcp_tools/*.py`):** - * GCS tools (`gcs_get_read_signed_url`, `gcs_get_write_signed_url`, `gcs_list_objects`, etc.) now **require** `bucket_name` argument. They generate Signed URLs or perform actions directly using provided IDs. - * BQ tools (`bq_list_tables`, `bq_get_table_schema`, etc.) now **require** `project_id`/`dataset_id`. - * BQ async pattern: - * `bq_submit_query`: Starts BQ job, saves `job_id`, `location`, etc. to Firestore via `FirestoreBqJobStore`, returns job info. - * `bq_get_job_status`: Reads job status *from Firestore* (updated by background poller). If DONE+Success, fetches and returns the *first page* of results directly from BQ. - * `bq_get_query_results`: Fetches subsequent pages of results directly from BQ using `page_token`. - * Blocking GCP SDK calls use `asyncio.to_thread`. Retries are applied via `tenacity`. -4. **BQ Job Management (`job_store.py`):** - * `FirestoreBqJobStore` interacts with Firestore (Datastore mode) to `add`, `get`, `update`, and `query` BQ job status records, using `job_id` as the document ID. Provides durability. +1. **Command-Line Interface (`mcp_agent/cli.py`):** + * Sole entry point for the agent (`mcp-agent` command). + * Loads `.env` variables. Sets up JSON logging. + * Parses arguments (tools, port, host, API key requirement, BQ poll interval). + * Handles API key retrieval (direct env var or Google Secret Manager). + * Performs pre-flight checks for GCP client readiness (GCS, BigQuery, Firestore, Secret Manager if used). + * If BigQuery tools are enabled: + * Initializes `FirestoreBqJobStore` (`mcp_agent/job_store.py`). + * Initializes the BigQuery client. + * Starts the background BigQuery job poller (`mcp_agent/bq_poller.py`) as an asyncio task. + * Calls core server functions from `mcp_agent/server.py` to start listening on `stdio` or `sse`. + +2. **MCP Server Core (`mcp_agent/server.py`):** + * `run_stdio_server`: Handles MCP communication over standard input/output. Reads JSON messages, calls `dispatch_tool`, writes JSON responses. + * `run_sse_server`: Uses `aiohttp` to run an HTTP server for Server-Sent Events. + * Provides an `/mcp` endpoint for POST requests. + * Includes middleware for API key authentication if configured. + * Parses MCP JSON message from request, calls `dispatch_tool`, streams MCP `Content` objects back as SSE events. + * `dispatch_tool`: Central function that receives parsed MCP messages. + * Looks up the tool function in `ALL_TOOLS_MAP` (defined in `mcp_agent/gcp_tools/__init__.py`). + * Calls the appropriate tool function, passing arguments, `conn_id`, and the `FirestoreBqJobStore` instance (if BQ tools are enabled). + +3. **GCP Tool Implementations (`mcp_agent/gcp_tools/`):** + * `__init__.py`: Defines `ALL_TOOLS_MAP`, mapping tool string names to their respective asynchronous function implementations in `storage.py` and `bigquery.py`. (Tool schemas for advertisement are TBD). + * `storage.py` (GCS Tools): + * Implements `gcs_list_buckets`, `gcs_list_objects`, `gcs_get_read_signed_url`, `gcs_get_write_signed_url`, `gcs_write_string_object`. + * Uses `google-cloud-storage` client, with blocking calls wrapped in `asyncio.to_thread`. + * Signed URL functions generate GCS V4 signed URLs. + * `bigquery.py` (BigQuery Tools): + * Implements `bq_list_datasets`, `bq_list_tables`, `bq_get_table_schema`, `bq_submit_query`, `bq_get_job_status`, `bq_get_query_results`. + * Uses `google-cloud-bigquery` client, with blocking calls wrapped in `asyncio.to_thread`. + * `bq_submit_query`: Creates a job in BQ and records its initial state in Firestore via `FirestoreBqJobStore`. + * `bq_get_job_status`: Reads job status from Firestore. If the job is DONE and successful, it can also return the first page of results. + * `bq_get_query_results`: Fetches subsequent pages of results for a completed job. + * Both modules use helper functions from `mcp_agent/utils.py` for response formatting and GCP error handling (including retries via `tenacity`). + +4. **BigQuery Job Store (`mcp_agent/job_store.py`):** + * Defines `BqJobInfo` dataclass to represent job details. + * `FirestoreBqJobStore` class: + * Uses `google-cloud-firestore` async client. + * Methods: `add_job`, `get_job`, `update_job_status`, `query_pending_jobs`. + * Handles serialization/deserialization of `BqJobInfo` to/from Firestore. + +5. **BigQuery Poller (`mcp_agent/bq_poller.py`):** + * `run_bq_job_poller` function runs as a background asyncio task. + * Periodically calls `firestore_job_store.query_pending_jobs()`. + * For each pending job, calls the BigQuery API to get its current status. + * Updates the job's record in Firestore via `firestore_job_store.update_job_status()`. ## Prerequisites -1. Python 3.9+ -2. GCP Project (Billing Enabled) -3. **Enabled APIs:** - * Cloud Storage API - * BigQuery API - * **Firestore API** (ensure Firestore database is created in Datastore mode in your project) - * **Secret Manager API** (if using secret for API key) -4. **Authentication (ADC):** Environment running `mcp-agent` needs ADC configured (`gcloud auth application-default login` or Service Account). -5. **IAM Permissions:** The Service Account running `mcp.agent` needs roles like: - * GCS roles (e.g., `roles/storage.objectViewer`, `roles/storage.objectCreator`) - * BigQuery roles (e.g., `roles/bigquery.jobUser`, `roles/bigquery.dataViewer`) - * **Firestore roles** (e.g., `roles/datastore.user`) - * `roles/secretmanager.secretAccessor` (if using Secret Manager for API key) - * `roles/iam.serviceAccountTokenCreator` on *itself* (needed for generating Signed URLs) -6. MCP Client Library (if not using ADK) -7. ADK Setup (Optional) +1. **Python:** Version 3.9+ +2. **GCP Project:** Billing enabled. + * **Enabled APIs:** Cloud Storage, BigQuery, **Firestore API** (ensure Firestore DB is created, preferably in Datastore mode), Secret Manager API (if used for API key). +3. **Authentication (ADC):** Configured via `gcloud auth application-default login` or service account. +4. **IAM Permissions for the agent's service account:** + * GCS: `roles/storage.objectViewer`, `roles/storage.objectCreator`. `roles/iam.serviceAccountTokenCreator` (on the SA itself for signed URLs). + * BigQuery: `roles/bigquery.jobUser`, `roles/bigquery.dataViewer`. + * Firestore: `roles/datastore.user`. + * Secret Manager: `roles/secretmanager.secretAccessor` (if used). +5. MCP Client library/tool. ## Installation -1. **Python Dependencies:** +1. **Install Python Dependencies:** ```bash - pip install model-context-protocol google-cloud-storage google-cloud-bigquery python-dotenv python-json-logger google-cloud-secret-manager tenacity google-cloud-firestore + pip install model-context-protocol google-cloud-storage google-cloud-bigquery google-cloud-firestore google-cloud-secret-manager python-dotenv python-json-logger tenacity aiohttp aiohttp-sse ``` -2. **Install `mcp.agent` Tool:** - *(Assuming source code)* +2. **Install `mcp.agent` (from source):** ```bash - cd path/to/mcp_agent_source && pip install . + cd path/to/mcp_agent_source + pip install . ``` ## Usage ### 1. Running the `mcp.agent` Server -*(Command structure remains the same, but behavior is now stateless regarding user context)* - +**Example (SSE on port 8080, API key required, custom BQ poll interval):** ```bash -# Example: Stateless server via SSE on port 8080 -mcp-agent --tools storage,bigquery --port 8080 --require-api-key -Use code with caution. -Markdown -(Remember to set MCP_AGENT_API_KEY_SECRET_NAME or MCP_AGENT_API_KEY in .env or environment if using --require-api-key) -➡️ Note connection details from server output. -2. Connecting Clients -(Connection setup is the same, client interaction logic changes due to statelessness) -3. Agent Interaction Logic (v1.0.0 - Stateless Pattern) -Instruct your agent (LLM or otherwise) to: -✅ Check Status: Always parse JSON response, check "status". -🔑 Provide Identifiers: Always include bucket_name, project_id, dataset_id arguments when calling tools like gcs_list_objects, gcs_get_read_signed_url, bq_list_tables, bq_get_table_schema, etc. The server no longer remembers defaults. -🔗 GCS Signed URLs: Use gcs_get_read_signed_url / gcs_get_write_signed_url. Tell the user/app to perform the HTTP GET/PUT on the "signed_url". -⏳ BQ Async Polling: -bq_submit_query -> Get job_id/location. Client must store these. -bq_get_job_status (repeat) -> Check "state". -If "state" is "DONE" & "status" is "success", process first page data ("rows", "schema") from this status response. Check "next_page_token". -If "next_page_token", call bq_get_query_results with token for page 2+. Repeat. -If "state" is "ERROR", report error. -📢 Report Errors: Use the "message" field from error responses. -Tool Reference (v1.0.0 Changes) -REMOVED: gcs_set_context_bucket, gcs_clear_context_bucket, bq_set_context_dataset, bq_clear_context_dataset. -REQUIRED ARGS: bucket_name, project_id, dataset_id are now mandatory for most GCS/BQ tools that operate on specific resources. -GCS: gcs_list_buckets, gcs_list_objects, gcs_get_read_signed_url, gcs_get_write_signed_url, gcs_write_string_object. -BQ: bq_list_datasets, bq_list_tables, bq_get_table_schema, bq_submit_query, bq_get_job_status, bq_get_query_results. -(See source code gcp_tools/__init__.py for full schemas). -⚠️ Limitations (v1.0.0 Highlights) -Stateless Context: Simplifies scaling but places burden on the client to manage context and provide identifiers on every relevant call. -Client Complexity: Async BQ requires client polling. Signed URLs require client HTTP handling. -Firestore Dependency: Requires Firestore setup and appropriate IAM permissions. BQ job state is now durable but relies on another GCP service. Potential cost implications for high job volume. -Narrow Scope: Only GCS and BQ. -Basic Functionality: Omits advanced GCP features. -ADC Auth Only: No user impersonation. -Scalability: Statelessness improves scalability, but requires load balancing and orchestration infrastructure. Background poller might become a bottleneck at extreme scale (consider Cloud Tasks/Functions). -Minimal Security: Relies on ADC permissions, network security, transport security (HTTPS), and optional basic API key. Not fully production hardened. -Critical: Consult the detailed Limitations.md document (updated for v1.0.0) for a comprehensive understanding before using this PoC in sensitive environments. -License -(Example: Apache License 2.0) +export GCP_PROJECT="your-gcp-project-id" # For FirestoreBqJobStore if not default +export MCP_AGENT_API_KEY="your-secret-api-key" # Or use MCP_AGENT_API_KEY_SECRET_NAME + +mcp-agent --tools storage,bigquery --port 8080 --require-api-key --bq-poll-interval 30 +``` +* Set `MCP_AGENT_API_KEY_SECRET_NAME` (full Secret Manager secret version path) or `MCP_AGENT_API_KEY` if using `--require-api-key`. +* The `GCP_PROJECT` environment variable can specify the project for Firestore if it differs from the ADC default. + +➡️ *Note connection details from server output.* + +### 2. Agent Interaction Logic (v1.0.0 - Stateless Pattern) + +* ✅ **Check Status:** Always parse the JSON response, check the `"status"` field. +* 🔑 **Provide Identifiers:** Always include `bucket_name`, `object_name` (for GCS) or `project_id`, `dataset_id`, `table_id`, `job_id`, `location` (for BQ) as required by each tool. +* 🔗 **GCS Signed URLs:** Use `gcs_get_read_signed_url` / `gcs_get_write_signed_url`. The client performs the HTTP GET/PUT on the returned `"signed_url"`. +* ⏳ **BQ Async Polling Workflow:** + 1. Call `bq_submit_query` (args: `query`, optional `project_id`). Get `job_id`, `location`. + 2. Periodically call `bq_get_job_status` (args: `job_id`, `location`). Check `"state"`. + 3. If `"state"` is `"DONE"` & response `"status"` is `"success"`: + * Process results from `bq_get_job_status` response (first page: `"rows"`, `"schema"`). + * Check for `"next_page_token"`. + 4. If `"next_page_token"`, call `bq_get_query_results` (args: `job_id`, `location`, `page_token`) for subsequent pages. + 5. If BQ job state is `"ERROR"` or tool call status is error, use `"message"` and `"error_result"` from response. +* 📢 **Report Errors:** Use the `"message"` (and potentially `"data.error_result"`) from error responses. + +## Tool Reference (v1.0.0 Implemented Tools) + +This list reflects the implemented stateless tools. (Context-setting tools are REMOVED). For detailed arguments, refer to the source or future schema definitions. + +**GCS Tools (`mcp_agent/gcp_tools/storage.py`):** +* `gcs_list_buckets`: Lists GCS buckets. + * Args: `project_id` (optional). +* `gcs_list_objects`: Lists objects in a bucket. + * Args: `bucket_name` (required), `prefix` (optional), `delimiter` (optional). +* `gcs_get_read_signed_url`: Generates a V4 signed URL for reading an object. + * Args: `bucket_name` (required), `object_name` (required), `expiration_minutes` (optional, default 60). +* `gcs_get_write_signed_url`: Generates a V4 signed URL for writing an object. + * Args: `bucket_name` (required), `object_name` (required), `expiration_minutes` (optional, default 60), `content_type` (optional), `headers` (optional dict). +* `gcs_write_string_object`: Writes a string directly to a GCS object. + * Args: `bucket_name` (required), `object_name` (required), `content` (string, required), `content_type` (optional). + +**BigQuery Tools (`mcp_agent/gcp_tools/bigquery.py`):** +* `bq_list_datasets`: Lists datasets in a project. + * Args: `project_id` (optional, defaults to client's project). +* `bq_list_tables`: Lists tables in a dataset. + * Args: `project_id` (required), `dataset_id` (required). +* `bq_get_table_schema`: Gets the schema of a table. + * Args: `project_id` (required), `dataset_id` (required), `table_id` (required). +* `bq_submit_query`: Submits a query for asynchronous execution. + * Args: `query` (SQL string, required), `project_id` (optional, for job's billing project), `default_dataset_project_id` (optional), `default_dataset_id` (optional for unqualified table names in query). + * Returns: `job_id`, `location`, initial `state`. +* `bq_get_job_status`: Checks a job's status (from Firestore). + * Args: `job_id` (required), `location` (optional, but recommended). + * Returns: Job status details. If DONE & success, may include first page of results. +* `bq_get_query_results`: Fetches paginated results for a completed job. + * Args: `job_id` (required), `page_token` (required), `location` (optional but recommended), `max_results` (optional). + +*(Refer to source code in `mcp_agent/gcp_tools/*.py` for exact argument names and behavior. Schemas for MCP advertisement TBD.)* + +## ⚠️ Limitations (v1.0.0 Highlights) + +* **Stateless Context Burden:** Clients must manage and send all identifiers. +* **Increased Client Complexity:** Async BQ polling and Signed URL handling require client-side logic. +* **Firestore Dependency & Cost:** Requires Firestore setup and may incur costs. +* **Narrow Service Scope:** Only selected GCS and BigQuery operations. +* **Basic Cloud Functionality:** Omits many advanced GCP features. +* **ADC Auth Only:** No user impersonation. +* **Scalability Considerations:** Production deployments need load balancing. BQ poller might be a bottleneck at extreme scale (consider Cloud Tasks/Functions). +* **Minimal Security Hardening:** PoC not fully hardened for production. + +**Critical Note:** `mcp.agent` v1.0.0 is a proof-of-concept. Consult `docs/LIMITATIONS.md` for a comprehensive understanding before use in sensitive environments. + +## Contributing + +This project is currently a proof-of-concept. While formal contributions are not being solicited at this stage, feedback and suggestions are welcome via issues on the project's repository (if applicable). + +## License + +(Example: Apache License 2.0 - Please replace with your chosen license) diff --git a/mcp_agent/bq_poller.py b/mcp_agent/bq_poller.py new file mode 100644 index 0000000..5d5db60 --- /dev/null +++ b/mcp_agent/bq_poller.py @@ -0,0 +1,134 @@ +import asyncio +import logging +from typing import Optional + +from google.cloud import bigquery +from google.api_core import exceptions as google_exceptions + +try: + from .job_store import FirestoreBqJobStore, BqJobInfo + from .utils import retry_on_gcp_transient_error # For direct BQ calls if needed +except ImportError: + # This fallback is primarily for scenarios where the module might be loaded in isolation + # or if there's a temporary PYTHONPATH issue during development. + # In a packaged application, this should ideally not be hit. + logging.critical("Failed to import job_store or utils in bq_poller.py. Poller may not function.") + # Define dummy classes/decorators if needed for the file to load without error, + # though functionality will be broken. + class BqJobInfo: pass + class FirestoreBqJobStore: pass + def retry_on_gcp_transient_error(func): return func + + +logger = logging.getLogger("mcp_agent.bq_poller") + +# Define a synchronous helper for get_job to apply tenacity retry +@retry_on_gcp_transient_error +def _get_bq_job_sync(client: bigquery.Client, job_id: str, location: Optional[str]) -> bigquery.QueryJob: + """Synchronous helper to fetch a BigQuery job, wrapped with retry.""" + logger.debug(f"Polling BQ API for job {job_id} in location {location or 'default'}") + return client.get_job(job_id, location=location) + + +async def run_bq_job_poller( + firestore_job_store: FirestoreBqJobStore, + bq_client: bigquery.Client, + poll_interval_seconds: int = 60, # Default poll interval 1 minute + pending_job_query_limit: int = 50 # Max jobs to fetch from Firestore per poll cycle +): + """ + Periodically queries Firestore for pending BigQuery jobs, + checks their status against the BigQuery API, and updates Firestore. + """ + logger.info(f"Starting BigQuery Job Poller. Poll interval: {poll_interval_seconds}s, Firestore query limit: {pending_job_query_limit} jobs.") + + while True: + try: + logger.debug("Polling for pending BigQuery jobs...") + # Query Firestore for jobs that are not in a terminal state + # older_than_minutes can be 0 to get all pending jobs, or a value to pick up potentially stuck ones. + pending_jobs = await firestore_job_store.query_pending_jobs(older_than_minutes=0, limit=pending_job_query_limit) + + if not pending_jobs: + logger.debug("No pending BQ jobs found in Firestore to poll.") + else: + logger.info(f"Found {len(pending_jobs)} pending BQ job(s) to check.") + + for job_info in pending_jobs: + try: + logger.debug(f"Checking status of BQ job {job_info.job_id} (Firestore status: {job_info.status})") + + # Get current job status from BigQuery API + bq_job: Optional[bigquery.QueryJob] = await asyncio.to_thread( + _get_bq_job_sync, bq_client, job_info.job_id, job_info.location + ) + + if not bq_job: # Should not happen if get_job_sync is robust, but as a safeguard + logger.warning(f"BQ job {job_info.job_id} not found via API, though pending in Firestore. Marking as ERROR.") + await firestore_job_store.update_job_status( + job_id=job_info.job_id, + status="ERROR", + error_result={"error_type": "PollingError", "message": "Job not found via BQ API during polling."} + ) + continue + + current_bq_status = bq_job.state + logger.debug(f"BQ API status for job {job_info.job_id}: {current_bq_status}") + + if current_bq_status == job_info.status and current_bq_status not in ["DONE", "ERROR"]: + # Status hasn't changed, but ensure updated_time is touched in Firestore to prevent + # it from being picked up by older_than_minutes filter if it's actively being polled. + # However, only update if it's truly PENDING/RUNNING. If it's DONE/ERROR in BQ but not FS, + # the below conditions will handle it. + # For now, let's only update if status truly changes to avoid too many writes. + # If a job is genuinely stuck in PENDING/RUNNING in BQ, it will keep being polled. + logger.debug(f"Job {job_info.job_id} status ({current_bq_status}) unchanged from Firestore. No update unless terminal.") + # Optionally, one could update `updated_time` here to show polling activity. + # await firestore_job_store.update_job_status(job_id=job_info.job_id, status=current_bq_status) + + + if current_bq_status == "DONE": + error_info = None + if bq_job.error_result: + logger.warning(f"BQ job {job_info.job_id} completed with errors: {bq_job.error_result}") + error_info = bq_job.error_result # This is already a dict + else: + logger.info(f"BQ job {job_info.job_id} completed successfully.") + + # TODO: Optionally, gather some results summary here (schema, total rows) + # query_details_update = {"schema": bq_job.schema, "total_rows": bq_job.total_rows} + # For now, just status and error. + await firestore_job_store.update_job_status( + job_id=job_info.job_id, + status="DONE", + error_result=error_info + # query_details=query_details_update + ) + elif current_bq_status != job_info.status: # Status changed, but not to DONE (e.g., PENDING -> RUNNING) + logger.info(f"BQ job {job_info.job_id} status changed from {job_info.status} to {current_bq_status}. Updating Firestore.") + await firestore_job_store.update_job_status(job_id=job_info.job_id, status=current_bq_status) + + # If BQ status is ERROR (which is not DONE), it would be caught by bq_job.error_result + # and handled by the current_bq_status == "DONE" block if BQ API reports "DONE" for errored jobs. + # BigQuery API usually sets state to "DONE" even if there are errors (job.error_result is populated). + # If BQ API can return a distinct "ERROR" state, that needs specific handling. + # Assuming error_result being populated means it's effectively an error state. + + except google_exceptions.NotFound: + logger.warning(f"BQ job {job_info.job_id} not found via BQ API during polling. Marking as ERROR in Firestore.") + await firestore_job_store.update_job_status( + job_id=job_info.job_id, + status="ERROR", + error_result={"error_type": "NotFound", "message": "Job not found in BigQuery during polling cycle."} + ) + except Exception as e: + logger.error(f"Error processing job {job_info.job_id} in BQ poller: {e}", exc_info=True) + # Optionally, update Firestore with a poller-specific error, or just let it retry. + # For now, we log and continue to the next job. The job will be picked up again. + + except Exception as e: + logger.error(f"BigQuery Job Poller encountered an unexpected error in its main loop: {e}", exc_info=True) + # Avoid exiting the poller; wait before retrying the whole loop. + + logger.debug(f"BQ poller cycle complete. Waiting for {poll_interval_seconds} seconds.") + await asyncio.sleep(poll_interval_seconds) diff --git a/mcp_agent/cli.py b/mcp_agent/cli.py index d45c0bc..0f5968e 100644 --- a/mcp_agent/cli.py +++ b/mcp_agent/cli.py @@ -2,30 +2,50 @@ import asyncio import logging import sys -from typing import Set +import os +from typing import Set, Optional -# Ensure utils is importable if running cli directly (might need adjustments based on execution context) +from dotenv import load_dotenv +from pythonjsonlogger import jsonlogger + +# Attempt relative import first for server functions try: - from .server import run_stdio_server, run_sse_server + from .server import run_stdio_server, run_sse_server # These will be implemented in server.py except ImportError: - # Handle case where script is run directly, adjust path if necessary - # This is a common pattern but might need refinement based on install/run method - import os + # Fallback running script directly sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from mcp_agent.server import run_stdio_server, run_sse_server +# Imports for GCP clients and utilities +try: + from .gcp_tools.storage import get_storage_client # Corrected import for GCS client + from .gcp_tools.bigquery import get_bq_client # Corrected import for BQ client + from .utils import get_secret_manager_client, fetch_secret + from .job_store import FirestoreBqJobStore # Now implemented + from .bq_poller import run_bq_job_poller # For the BQ background poller +except ImportError as e: + # This helps in debugging if the script is run in a weird environment or packaging is off + print(f"Critical Import Error: {e}. Please ensure the package is installed correctly or PYTHONPATH is set.", file=sys.stderr) + sys.exit(1) + -# Configure root logger -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - stream=sys.stderr, # Log to stderr so stdout/stdin can be used for MCP -) -# Get logger for this module -logger = logging.getLogger(__name__) +# --- Configure Logging JSON Formatter --- +# Using the setup from the original server.py for consistency +root_logger = logging.getLogger() +logHandler = logging.StreamHandler(sys.stderr) # Log to stderr +formatter = jsonlogger.JsonFormatter('%(asctime)s %(levelname)s %(name)s %(module)s %(funcName)s %(lineno)d %(message)s') +logHandler.setFormatter(formatter) +root_logger.handlers.clear() +root_logger.addHandler(logHandler) +root_logger.setLevel(logging.INFO) # Default to INFO +logger = logging.getLogger("mcp_agent.cli") # Logger for this module +# --- End Logging Setup --- def parse_args(): - parser = argparse.ArgumentParser(description="Run the MCP Agent server for GCS/BigQuery.") + parser = argparse.ArgumentParser( + description="Run the MCP Agent server for GCS/BigQuery (v1.0.0 Stateless with Firestore BQ Jobs).", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) parser.add_argument( "--tools", type=str, @@ -42,95 +62,251 @@ def parse_args(): "--host", type=str, default="127.0.0.1", - help="Host address to bind to for SSE server (default: 127.0.0.1). Use 0.0.0.0 for network access.", + help="[SSE Only] Host address to bind to for SSE server (default: 127.0.0.1). Use 0.0.0.0 for network access.", ) parser.add_argument( "--require-api-key", - type=str, - default=None, - metavar="API_KEY", - help="[SSE Only] Require clients to send 'Authorization: Bearer ' header.", + action='store_true', # Changed to action='store_true' from server.py + help="[SSE Only] Enable API key authentication. Reads MCP_AGENT_API_KEY_SECRET_NAME (for Secret Manager full path) then MCP_AGENT_API_KEY (direct key).", ) parser.add_argument( "--debug", action="store_true", - help="Enable debug logging for all loggers." + help="Enable verbose debug logging for mcp_agent and reduce verbosity of GCP libraries." + ) + parser.add_argument( + "--bq-poll-interval", + type=int, + default=60, + help="[BigQuery Only] Interval in seconds for polling BQ job statuses." ) - return parser.parse_args() def main(): + # Load .env file if it exists + dotenv_path = load_dotenv() + if dotenv_path: + logger.info("Loaded environment variables from .env file.", extra={'dotenv_path': str(dotenv_path)}) + else: + logger.info("No .env file found or loaded.") + args = parse_args() - # Set logging level for all loggers if debug is enabled + # --- Setup Logging Level --- if args.debug: - logging.getLogger().setLevel(logging.DEBUG) - # Example: Set google client libs to INFO to reduce verbosity unless needed + logging.getLogger("mcp_agent").setLevel(logging.DEBUG) + # Reduce verbosity of noisy GCP libraries unless specifically debugging them logging.getLogger("google.cloud").setLevel(logging.INFO) logging.getLogger("google.api_core").setLevel(logging.INFO) logging.getLogger("google.auth").setLevel(logging.INFO) + logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO) logger.debug("Debug logging enabled for mcp_agent.") + else: + # Set default log levels for external libraries if not in debug mode + logging.getLogger("google.cloud").setLevel(logging.WARNING) + logging.getLogger("google.api_core").setLevel(logging.WARNING) + logging.getLogger("google.auth").setLevel(logging.WARNING) + logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) + logging.getLogger("mcp").setLevel(logging.INFO) # Assuming 'mcp' is the model-context-protocol library logger + # --- Validate Tools --- + try: + enabled_tools: Set[str] = set(t.strip().lower() for t in args.tools.split(',') if t.strip()) + except Exception: + logger.critical("Invalid format for --tools argument.") + sys.exit(1) - enabled_tools: Set[str] = set(t.strip().lower() for t in args.tools.split(',') if t.strip()) valid_tools = {"storage", "bigquery"} - - # Validate tools invalid_tools = enabled_tools - valid_tools if invalid_tools: - logger.error(f"Invalid tool(s) specified: {', '.join(invalid_tools)}. Allowed tools: {', '.join(valid_tools)}") + logger.critical(f"Invalid tool(s) specified: {', '.join(invalid_tools)}. Allowed tools: {', '.join(valid_tools)}") sys.exit(1) if not enabled_tools: - logger.error("No tools specified. Use --tools storage,bigquery or similar.") + logger.critical("No tools specified. Use --tools storage,bigquery or similar.") sys.exit(1) - logger.info(f"Configuring mcp-agent server...") - logger.info(f"Enabled tools: {enabled_tools}") + logger.info(f"Configuring mcp-agent server v1.0.0", extra={"enabled_tools": list(enabled_tools), "args": vars(args)}) - if args.port.lower() == "stdio": - if args.require_api_key: - logger.warning("--require-api-key is ignored for stdio mode.") - logger.info("Mode: STDIO") - try: - # Ensure GCP clients can initialize before entering async loop potentially - from .gcp_tools.storage import get_storage_client - from .gcp_tools.bigquery import get_bq_client - if "storage" in enabled_tools: get_storage_client() - if "bigquery" in enabled_tools: get_bq_client() - # Run the server - asyncio.run(run_stdio_server(enabled_tools)) - except KeyboardInterrupt: - logger.info("Stdio server interrupted by user.") - except Exception as e: - logger.critical(f"Failed to start or run stdio server: {e}", exc_info=args.debug) + # --- Determine API Key (SSE Only) --- + api_key_to_use: Optional[str] = None + if args.port.lower() != "stdio" and args.require_api_key: + secret_name_env_var = 'MCP_AGENT_API_KEY_SECRET_NAME' + direct_key_env_var = 'MCP_AGENT_API_KEY' + + secret_name_val = os.getenv(secret_name_env_var) + direct_key_val = os.getenv(direct_key_env_var) + secret_source = "None" + + if secret_name_val: + logger.info(f"Attempting to fetch API key from Secret Manager using secret path in {secret_name_env_var}.", extra={"secret_name": secret_name_val}) + try: + api_key_to_use = fetch_secret(secret_name_val) # fetch_secret uses retry + if api_key_to_use: + secret_source = f"Secret Manager ({secret_name_val})" + logger.info("Successfully fetched API key from Secret Manager.") + else: + # fetch_secret already logs errors, this is a critical failure for startup + logger.critical(f"Failed to fetch API key from Secret Manager: {secret_name_val}. Value was empty or fetch failed.") + sys.exit(1) + except Exception as sm_err: # Broad catch, fetch_secret should handle specifics + logger.critical(f"FATAL: Error during Secret Manager access for {secret_name_val}: {sm_err}", exc_info=args.debug) + sys.exit(1) + elif direct_key_val: + logger.info(f"Using API key from environment variable {direct_key_env_var}.") + api_key_to_use = direct_key_val + secret_source = f"Environment Variable ({direct_key_env_var})" + else: + logger.critical(f"FATAL: --require-api-key is set, but neither {secret_name_env_var} nor {direct_key_env_var} environment variables are set with a value.") + sys.exit(1) + + if not api_key_to_use: # Double check if key is actually obtained + logger.critical("FATAL: API key required but could not be obtained.") sys.exit(1) - else: + logger.info(f"API Key Authentication: Enabled for SSE. Source: {secret_source}") + + elif args.port.lower() != "stdio": + logger.info("API Key Authentication: Disabled for SSE (flag --require-api-key not set).") + + + # --- Pre-flight Check for GCP Clients & Firestore --- + try: + logger.info("Performing pre-flight GCP client initialization checks...") + # Using a list of functions to call for checks + checks_to_perform = [] + + if "storage" in enabled_tools: + checks_to_perform.append(get_storage_client) + + if "bigquery" in enabled_tools: + checks_to_perform.append(get_bq_client) # BQ client itself + # Firestore Job Store check - instantiate and try to init client + try: + firestore_job_store_instance = FirestoreBqJobStore(project=os.getenv("GCP_PROJECT")) # Pass project if available + # Add a dedicated init check method to the store if needed, e.g., store.ensure_client_initialized() + # For now, instantiating it is part of the check. A specific check can be added to its methods. + # We will use this instance later. + checks_to_perform.append(firestore_job_store_instance.ensure_client_initialized) + except Exception as fjse: + logger.critical(f"Failed to instantiate FirestoreBqJobStore during pre-flight: {fjse}", exc_info=args.debug) + sys.exit(1) + + + if args.port.lower() != "stdio" and args.require_api_key and os.getenv('MCP_AGENT_API_KEY_SECRET_NAME'): + checks_to_perform.append(get_secret_manager_client) + + for check_func in checks_to_perform: + # If the check function itself is async, we'd need to run it in an event loop. + # For now, assuming these are synchronous client getters/initializers. + if asyncio.iscoroutinefunction(check_func): + # This is tricky at this stage of startup if the main loop isn't running. + # For simplicity, ensure client getters are sync or handle async init carefully. + # For now, we assume they are synchronous or handle their own loop if necessary. + logger.debug(f"Running async pre-flight check: {check_func.__name__}") + # Not ideal to run a new loop for each, but for startup: + # Or, collect them and run all async checks together if possible. + # This part needs careful handling if pre-flight checks are async. + # For now, assuming sync client getters like google-cloud-python typically provides. + check_func() # Placeholder if it's sync + else: + logger.debug(f"Running sync pre-flight check: {check_func.__name__}") + check_func() + + logger.info("GCP client pre-flight checks successful.") + + except Exception as e: + logger.critical(f"FATAL: GCP client pre-flight check failed: {e}", exc_info=args.debug) + logger.critical("Ensure Application Default Credentials (ADC) are set up correctly, necessary GCP APIs (Storage, BigQuery, Firestore, Secret Manager) are enabled, and the account has required IAM permissions (e.g., Storage Object Admin, BigQuery Job User, Datastore User, Secret Manager Secret Accessor).") + sys.exit(1) + # --- End Pre-flight Check --- + + # --- Start Server --- + server_mode_info = "STDIO" + if args.port.lower() != "stdio": + server_mode_info = f"SSE on {args.host}:{args.port}" + + logger.info(f"Starting server in {server_mode_info} mode...") + + # Initialize BQ client and Firestore store if BQ is enabled, to pass to server and poller + from google.cloud import bigquery as gcp_bigquery # For type hinting bq_client_instance + + bq_client_instance: Optional[gcp_bigquery.Client] = None + firestore_job_store_instance_for_server: Optional[FirestoreBqJobStore] = None + + if "bigquery" in enabled_tools: try: + bq_client_instance = get_bq_client() # Get the client instance + firestore_job_store_instance_for_server = FirestoreBqJobStore(project=os.getenv("GCP_PROJECT")) + # Ensure its client is ready if we didn't do it in pre-flight or if pre-flight was minimal + # This is important if pre-flight only instantiated. + # await firestore_job_store_instance_for_server.ensure_client_initialized() # Call this within an async context + except Exception as e: + logger.critical(f"Failed to initialize BigQuery client or FirestoreBqJobStore for server runtime: {e}", exc_info=args.debug) + sys.exit(1) + + poller_task = None + main_server_task = None + + async def server_with_poller(): + nonlocal poller_task, main_server_task + + # Ensure Firestore client is initialized before starting poller or server + if firestore_job_store_instance_for_server: + await firestore_job_store_instance_for_server.ensure_client_initialized() + + if "bigquery" in enabled_tools and bq_client_instance and firestore_job_store_instance_for_server: + logger.info("Starting BigQuery job poller task...") + poller_task = asyncio.create_task(run_bq_job_poller( + firestore_job_store=firestore_job_store_instance_for_server, + bq_client=bq_client_instance, + poll_interval_seconds=args.bq_poll_interval + )) + + server_kwargs = { + "enabled_tools": enabled_tools, + "bq_job_store": firestore_job_store_instance_for_server # Pass the store + } + + if args.port.lower() == "stdio": + if args.require_api_key: + logger.warning("--require-api-key is ignored for stdio mode.") + main_server_task = asyncio.create_task(run_stdio_server(**server_kwargs)) + else: port_num = int(args.port) if not (1024 <= port_num <= 65535): - raise ValueError("Port number must be between 1024 and 65535.") - logger.info(f"Mode: SSE on {args.host}:{port_num}") - if args.require_api_key: - logger.info("API Key Authentication: Enabled") - else: - logger.info("API Key Authentication: Disabled") - - # Ensure GCP clients can initialize before entering async loop potentially - from .gcp_tools.storage import get_storage_client - from .gcp_tools.bigquery import get_bq_client - if "storage" in enabled_tools: get_storage_client() - if "bigquery" in enabled_tools: get_bq_client() - # Run the server - asyncio.run(run_sse_server(enabled_tools, args.host, port_num, args.require_api_key)) - - except ValueError as e: - logger.error(f"Invalid port number specified: {e}") - sys.exit(1) - except KeyboardInterrupt: - logger.info("SSE server interrupted by user.") - except Exception as e: - logger.critical(f"Failed to start or run SSE server: {e}", exc_info=args.debug) - sys.exit(1) + logger.critical(f"Invalid port number: {port_num}. Must be between 1024 and 65535.") + sys.exit(1) # Should be caught by try/except below if it raises + server_kwargs.update({ + "host": args.host, + "port": port_num, + "api_key": api_key_to_use + }) + main_server_task = asyncio.create_task(run_sse_server(**server_kwargs)) + + if main_server_task: + await main_server_task # Wait for the main server task to complete + + try: + asyncio.run(server_with_poller()) + except KeyboardInterrupt: + logger.info(f"{server_mode_info} server and/or poller interrupted by user (Ctrl+C). Shutting down.") + except ValueError as e: + logger.critical(f"Invalid port number specified: {args.port}. Error: {e}", exc_info=args.debug) + sys.exit(1) + except SystemExit: + raise # Allow sys.exit to propagate + except Exception as e: + logger.critical(f"Failed to start or run {server_mode_info} server: {e}", exc_info=args.debug) + sys.exit(1) + finally: + # Note: asyncio.run() handles task cancellation on exit/exception. + # Explicit cancellation here might be redundant or interfere if not careful. + if poller_task and not poller_task.done(): + logger.info("Attempting to cancel BQ poller task...") + poller_task.cancel() + # Allow time for cancellation to be processed + # However, asyncio.run should manage this. + # await asyncio.sleep(0.1) + logger.info(f"{server_mode_info} server shutdown process initiated or completed.") if __name__ == "__main__": main() diff --git a/mcp_agent/context.py b/mcp_agent/context.py deleted file mode 100644 index 3d756d1..0000000 --- a/mcp_agent/context.py +++ /dev/null @@ -1,72 +0,0 @@ -import asyncio -import logging -from typing import Dict, Optional, Tuple - -logger = logging.getLogger(__name__) - -class ConnectionContextManager: - """Manages context (GCS bucket, BQ dataset) per connection ID.""" - - def __init__(self): - # Structure: { conn_id: {"gcs_bucket": "...", "bq_project": "...", "bq_dataset": "..."} } - self._context_store: Dict[str, Dict[str, Optional[str]]] = {} - self._lock = asyncio.Lock() - logger.info("ConnectionContextManager initialized.") - - async def set_gcs_context(self, conn_id: str, bucket_name: str): - async with self._lock: - if conn_id not in self._context_store: - self._context_store[conn_id] = {} - self._context_store[conn_id]["gcs_bucket"] = bucket_name - logger.info(f"[Conn: {conn_id}] GCS context set to bucket '{bucket_name}'") - - async def get_gcs_context(self, conn_id: str) -> Optional[str]: - async with self._lock: - return self._context_store.get(conn_id, {}).get("gcs_bucket") - - async def clear_gcs_context(self, conn_id: str): - async with self._lock: - if conn_id in self._context_store: - if "gcs_bucket" in self._context_store[conn_id]: - del self._context_store[conn_id]["gcs_bucket"] - logger.info(f"[Conn: {conn_id}] GCS context cleared.") - if not self._context_store[conn_id]: # Remove conn_id if empty - del self._context_store[conn_id] - - async def set_bq_context(self, conn_id: str, project_id: str, dataset_id: str): - async with self._lock: - if conn_id not in self._context_store: - self._context_store[conn_id] = {} - self._context_store[conn_id]["bq_project"] = project_id - self._context_store[conn_id]["bq_dataset"] = dataset_id - logger.info(f"[Conn: {conn_id}] BQ context set to '{project_id}:{dataset_id}'") - - async def get_bq_context(self, conn_id: str) -> Optional[Tuple[str, str]]: - async with self._lock: - conn_data = self._context_store.get(conn_id, {}) - project = conn_data.get("bq_project") - dataset = conn_data.get("bq_dataset") - if project and dataset: - return project, dataset - return None - - async def clear_bq_context(self, conn_id: str): - async with self._lock: - if conn_id in self._context_store: - cleared = False - if "bq_project" in self._context_store[conn_id]: - del self._context_store[conn_id]["bq_project"] - cleared = True - if "bq_dataset" in self._context_store[conn_id]: - del self._context_store[conn_id]["bq_dataset"] - cleared = True - if cleared: - logger.info(f"[Conn: {conn_id}] BQ context cleared.") - if not self._context_store[conn_id]: # Remove conn_id if empty - del self._context_store[conn_id] - - async def clear_connection_context(self, conn_id: str): - async with self._lock: - if conn_id in self._context_store: - del self._context_store[conn_id] - logger.info(f"[Conn: {conn_id}] All context cleared upon disconnect.") diff --git a/mcp_agent/gcp_tools/__init__.py b/mcp_agent/gcp_tools/__init__.py index cfc1c2d..090b97b 100644 --- a/mcp_agent/gcp_tools/__init__.py +++ b/mcp_agent/gcp_tools/__init__.py @@ -1,140 +1,108 @@ # mcp_agent/gcp_tools/__init__.py -# Make tools easily importable -from .storage import ( - gcs_list_buckets, - gcs_set_context_bucket, - gcs_clear_context_bucket, - gcs_list_objects, - gcs_read_object, - gcs_write_object, -) -from .bigquery import ( - bq_set_context_dataset, - bq_clear_context_dataset, - bq_list_datasets, - bq_list_tables, - bq_get_table_schema, - bq_query, -) +# This file defines the available GCP tools for the MCP agent. -# Define tool schemas for MCP advertisement +import logging +from typing import Dict, Callable, Awaitable, List from mcp import types as mcp_types -# Schemas can be complex, defining them manually here for clarity -# In a real app, you might generate these from function signatures or dataclasses +logger = logging.getLogger("mcp_agent.gcp_tools") -GCS_TOOLS_SCHEMAS = [ - mcp_types.Tool( - name="gcs_list_buckets", - description="Lists accessible Google Cloud Storage buckets.", - arguments={}, - ), - mcp_types.Tool( - name="gcs_set_context_bucket", - description="Sets the default GCS bucket for subsequent commands in this session.", - arguments={ - "bucket_name": mcp_types.ToolArgument( - type="string", description="The name of the GCS bucket.", is_required=True - ) - }, - ), - mcp_types.Tool( - name="gcs_clear_context_bucket", - description="Clears the default GCS bucket context for this session.", - arguments={}, - ), - mcp_types.Tool( - name="gcs_list_objects", - description="Lists objects and common prefixes (directories) in a GCS bucket. Uses context bucket if 'bucket_name' is omitted. Supports pagination.", - arguments={ - "bucket_name": mcp_types.ToolArgument(type="string", description="Specific bucket name (overrides context).", is_required=False), - "prefix": mcp_types.ToolArgument(type="string", description="Filter results by this prefix (e.g., 'images/').", is_required=False), - "page_token": mcp_types.ToolArgument(type="string", description="Token from a previous response to get the next page.", is_required=False), - "max_results": mcp_types.ToolArgument(type="integer", description="Maximum items per page.", is_required=False, default_value=100), - }, - ), - mcp_types.Tool( - name="gcs_read_object", - description="Reads the content of an object in a GCS bucket. Uses context bucket if 'bucket_name' is omitted.", - arguments={ - "object_path": mcp_types.ToolArgument(type="string", description="The full path to the object within the bucket.", is_required=True), - "bucket_name": mcp_types.ToolArgument(type="string", description="Specific bucket name (overrides context).", is_required=False), - }, - ), - mcp_types.Tool( - name="gcs_write_object", - description="Writes string content to an object in a GCS bucket. Uses context bucket if 'bucket_name' is omitted. Overwrites if exists.", - arguments={ - "object_path": mcp_types.ToolArgument(type="string", description="The full path for the object within the bucket.", is_required=True), - "content": mcp_types.ToolArgument(type="string", description="The string content to write.", is_required=True), - "bucket_name": mcp_types.ToolArgument(type="string", description="Specific bucket name (overrides context).", is_required=False), - }, - ), -] +# Import tool functions from their respective modules +# Note: These imports assume the functions are defined in these files. +# If a function isn't found, it means it hasn't been implemented or is misnamed. +try: + from .storage import ( + gcs_list_buckets, + gcs_list_objects, + gcs_get_read_signed_url, + gcs_get_write_signed_url, + gcs_write_string_object, + # get_storage_client # Typically not a tool itself, but a helper + ) + logger.debug("Successfully imported GCS tools from .storage") +except ImportError as e: + logger.error(f"Error importing GCS tools from .storage: {e}. Some GCS tools may not be available.", exc_info=True) + # Define placeholders if import fails to prevent server from crashing on ALL_TOOLS_MAP access + def _gcs_placeholder(*args, **kwargs): raise NotImplementedError("GCS tool not loaded due to import error") + gcs_list_buckets = gcs_list_objects = gcs_get_read_signed_url = _gcs_placeholder + gcs_get_write_signed_url = gcs_write_string_object = _gcs_placeholder -BQ_TOOLS_SCHEMAS = [ - mcp_types.Tool( - name="bq_set_context_dataset", - description="Sets the default BigQuery project and dataset for subsequent commands.", - arguments={ - "project_id": mcp_types.ToolArgument(type="string", description="The Google Cloud project ID.", is_required=True), - "dataset_id": mcp_types.ToolArgument(type="string", description="The BigQuery dataset ID.", is_required=True), - }, - ), - mcp_types.Tool( - name="bq_clear_context_dataset", - description="Clears the default BigQuery project/dataset context.", - arguments={}, - ), - mcp_types.Tool( - name="bq_list_datasets", - description="Lists accessible BigQuery datasets within a project.", - arguments={ - "project_id": mcp_types.ToolArgument(type="string", description="Specific project ID (defaults to server's default project if omitted).", is_required=False), - }, - ), - mcp_types.Tool( - name="bq_list_tables", - description="Lists tables within a BigQuery dataset. Uses context if project/dataset IDs are omitted.", - arguments={ - "project_id": mcp_types.ToolArgument(type="string", description="Specific project ID (overrides context).", is_required=False), - "dataset_id": mcp_types.ToolArgument(type="string", description="Specific dataset ID (overrides context).", is_required=False), - }, - ), - mcp_types.Tool( - name="bq_get_table_schema", - description="Gets the schema of a BigQuery table. Uses context project/dataset if IDs are omitted.", - arguments={ - "table_id": mcp_types.ToolArgument(type="string", description="Table ID (e.g., 'my_table' or 'dataset.my_table').", is_required=True), - "project_id": mcp_types.ToolArgument(type="string", description="Specific project ID (overrides context).", is_required=False), - "dataset_id": mcp_types.ToolArgument(type="string", description="Specific dataset ID (overrides context).", is_required=False), - }, - ), - mcp_types.Tool( - name="bq_query", - description="Executes a SQL query in BigQuery. Uses context project/dataset for unqualified table names.", - arguments={ - "query": mcp_types.ToolArgument(type="string", description="The SQL query string.", is_required=True), - "project_id": mcp_types.ToolArgument(type="string", description="Project ID to run the query in (overrides context default project).", is_required=False), - "dataset_id": mcp_types.ToolArgument(type="string", description="Default dataset ID for unqualified table names (overrides context default dataset).", is_required=False), - "max_results": mcp_types.ToolArgument(type="integer", description="Maximum rows to return in the first page.", is_required=False, default_value=1000), - "page_token": mcp_types.ToolArgument(type="string", description="Page token from a previous query result to fetch the next page.", is_required=False), - }, - ), -] -# Map tool names to functions -ALL_TOOLS_MAP = { +try: + from .bigquery import ( + bq_list_datasets, + bq_list_tables, + bq_get_table_schema, + bq_submit_query, + bq_get_job_status, + bq_get_query_results, + # get_bq_client # Typically not a tool itself + ) + logger.debug("Successfully imported BigQuery tools from .bigquery") +except ImportError as e: + logger.error(f"Error importing BQ tools from .bigquery: {e}. Some BQ tools may not be available.", exc_info=True) + def _bq_placeholder(*args, **kwargs): raise NotImplementedError("BQ tool not loaded due to import error") + bq_list_datasets = bq_list_tables = bq_get_table_schema = _bq_placeholder + bq_submit_query = bq_get_job_status = bq_get_query_results = _bq_placeholder + + +# ALL_TOOLS_MAP: Maps tool names (as called by MCP client) to their async function implementations. +# Each function is expected to take `arguments: Dict[str, Any], conn_id: str, **kwargs` +# and return `McpToolReturnType` (which is `List[mcp_types.Content]`). +# The `bq_job_store` will be passed in kwargs by the dispatcher if available. +ALL_TOOLS_MAP: Dict[str, Callable[..., Awaitable[List[mcp_types.Content]]]] = { + # GCS Tools "gcs_list_buckets": gcs_list_buckets, - "gcs_set_context_bucket": gcs_set_context_bucket, - "gcs_clear_context_bucket": gcs_clear_context_bucket, "gcs_list_objects": gcs_list_objects, - "gcs_read_object": gcs_read_object, - "gcs_write_object": gcs_write_object, - "bq_set_context_dataset": bq_set_context_dataset, - "bq_clear_context_dataset": bq_clear_context_dataset, + "gcs_get_read_signed_url": gcs_get_read_signed_url, + "gcs_get_write_signed_url": gcs_get_write_signed_url, + "gcs_write_string_object": gcs_write_string_object, + + # BigQuery Tools "bq_list_datasets": bq_list_datasets, "bq_list_tables": bq_list_tables, "bq_get_table_schema": bq_get_table_schema, - "bq_query": bq_query, + "bq_submit_query": bq_submit_query, + "bq_get_job_status": bq_get_job_status, + "bq_get_query_results": bq_get_query_results, } + +logger.info(f"ALL_TOOLS_MAP initialized with {len(ALL_TOOLS_MAP)} tools: {list(ALL_TOOLS_MAP.keys())}") + +# TODO: Define MCP Tool Schemas for advertisement (Priority 4 - Documentation) +# These schemas describe the tools, their arguments, and descriptions for MCP clients. +# Example structure: +# from mcp import types as mcp_types +# TOOL_SCHEMAS: List[mcp_types.Tool] = [ +# mcp_types.Tool( +# name="gcs_list_buckets", +# description="Lists accessible Google Cloud Storage buckets.", +# arguments={ +# "project_id": mcp_types.ToolArgument(type="string", description="Optional GCP project ID.", is_required=False) +# } +# ), +# # ... other tool schemas ... +# ] +# For now, this part is deferred. The server will function using ALL_TOOLS_MAP for dispatch. +# Client-side tool discovery and argument validation will be limited until schemas are provided. + +# Ensure base.py is not directly exporting tools unless intended. +# It might contain base classes or shared utilities for tools. +from . import base +# If base.py has tool definitions, they should be explicitly imported and added to map. +# For now, assuming tools are in storage.py and bigquery.py. + +# Clean up namespace if placeholders were defined due to import errors, +# though a better approach is to let the server fail on startup if core tools can't load. +# This cleanup is more for robustness if some non-critical tools failed to import. +_placeholders_to_remove = [] +for tool_name, func in ALL_TOOLS_MAP.items(): + if hasattr(func, '__name__') and func.__name__ in ["_gcs_placeholder", "_bq_placeholder"]: + _placeholders_to_remove.append(tool_name) + +if _placeholders_to_remove: + logger.warning(f"Removing placeholder tools due to import errors: {_placeholders_to_remove}") + for tool_name in _placeholders_to_remove: + del ALL_TOOLS_MAP[tool_name] + +logger.info(f"Final ALL_TOOLS_MAP contains {len(ALL_TOOLS_MAP)} actively loaded tools.") diff --git a/mcp_agent/gcp_tools/bigquery.py b/mcp_agent/gcp_tools/bigquery.py new file mode 100644 index 0000000..869ff79 --- /dev/null +++ b/mcp_agent/gcp_tools/bigquery.py @@ -0,0 +1,227 @@ +import asyncio +import logging +from typing import Any, Dict, List, Optional, Tuple, Sequence + +from google.cloud import bigquery +from google.api_core import exceptions as google_exceptions, page_iterator +from mcp import types as mcp_types + +# REMOVED: ConnectionContextManager import +from ..job_store import FirestoreBqJobStore # Import Firestore store +from ..utils import format_success, format_error, format_info, handle_gcp_error, McpToolReturnType +# Import retry decorator +from ..utils import retry_on_gcp_transient_error + +logger = logging.getLogger("mcp_agent.gcp_tools.bigquery") + +_bq_client: Optional[bigquery.Client] = None # Keep client cache + +def get_bq_client() -> bigquery.Client: + """Initializes and returns a cached BigQuery client using Application Default Credentials.""" + global _bq_client + if _bq_client is None: + logger.info("Initializing Google Cloud BigQuery client.") + try: + _bq_client = bigquery.Client() + logger.info("Google Cloud BigQuery client initialized successfully.") + except Exception as e: + logger.critical(f"Failed to initialize BigQuery client: {e}", exc_info=True) + raise RuntimeError(f"BigQuery client initialization failed: {e}") from e + return _bq_client + +# --- Apply Retry Decorator Sync Helpers --- +@retry_on_gcp_transient_error +def _get_dataset_sync(client: bigquery.Client, dataset_ref: bigquery.DatasetReference): + logger.debug(f"Running client get dataset thread {dataset_ref} retry") + client.get_dataset(dataset_ref) + +@retry_on_gcp_transient_error +def _list_datasets_sync(client: bigquery.Client, project_id: Optional[str]): + logger.debug(f"Running client list datasets thread project {project_id or 'default'} retry") + return [ds.dataset_id for ds in client.list_datasets(project=project_id)], (project_id or client.project) + +@retry_on_gcp_transient_error +def _list_tables_sync(client: bigquery.Client, dataset_ref: bigquery.DatasetReference): + logger.debug(f"Running client list tables thread {dataset_ref} retry") + return [table.table_id for table in client.list_tables(dataset_ref)] + +@retry_on_gcp_transient_error +def _get_table_sync(client: bigquery.Client, table_ref: bigquery.TableReference): + logger.debug(f"Running client get table thread {table_ref} retry") + return client.get_table(table_ref) + +@retry_on_gcp_transient_error +def _submit_job_sync(client: bigquery.Client, query_str: str, job_config: bigquery.QueryJobConfig, project: str): + logger.debug(f"Running client query thread project {project} retry") + return client.query(query=query_str, job_config=job_config, project=project) + +@retry_on_gcp_transient_error +def _get_job_sync(client: bigquery.Client, job_id: str, location: Optional[str]): + logger.debug(f"Running client get job thread {job_id} retry") + return client.get_job(job_id, location=location) + +@retry_on_gcp_transient_error +def _list_rows_sync(client: bigquery.Client, job_id: str, location: Optional[str], page_token: Optional[str], max_results: int): + logger.debug(f"Running client list rows thread page job {job_id} retry") + rows_iterator = client.list_rows(job_id, location=location, page_token=page_token, max_results=max_results) + page_rows = list(rows_iterator) # Consume page + return rows_iterator.schema, page_rows, rows_iterator.next_page_token, rows_iterator.total_rows + +# --- Tool Implementations Require explicit args --- + +async def bq_list_datasets( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: + """Lists accessible BQ datasets""" + project_id = arguments.get("project_id"); # ... type validation ... + if project_id is not None and not isinstance(project_id, str): return format_error("Invalid project id must be string") + try: + client = get_bq_client() + dataset_list, used_project = await asyncio.to_thread(_list_datasets_sync, client, project_id) + return format_success("Datasets listed", data={"project_id": used_project, "datasets": dataset_list}) + except Exception as e: return handle_gcp_error(e, f"listing BQ datasets project {project_id or 'default'}") + +async def bq_list_tables( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: + """Lists tables within required BQ dataset""" + project_id = arguments.get("project_id"); dataset_id = arguments.get("dataset_id") + if not project_id or not isinstance(project_id, str): return format_error("Missing invalid project id") + if not dataset_id or not isinstance(dataset_id, str): return format_error("Missing invalid dataset id") + try: + client = get_bq_client(); dataset_ref = bigquery.DatasetReference(project_id, dataset_id) + table_list = await asyncio.to_thread(_list_tables_sync, client, dataset_ref) + return format_success("Tables listed", data={"project_id": project_id, "dataset_id": dataset_id, "tables": table_list}) + except Exception as e: return handle_gcp_error(e, f"listing BQ tables dataset {project_id}:{dataset_id}") + +async def bq_get_table_schema( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: + """Gets schema required BQ table""" + project_id = arguments.get("project_id"); dataset_id = arguments.get("dataset_id"); table_id = arguments.get("table_id") + # Simplified validation assumes IDs are mandatory args now + if not project_id or not isinstance(project_id, str): return format_error("Missing invalid project id") + if not dataset_id or not isinstance(dataset_id, str): return format_error("Missing invalid dataset id") + if not table_id or not isinstance(table_id, str): return format_error("Missing invalid table id") + # Table ID parsing logic removed assumes simple table ID with required project/dataset args + if '.' in table_id: return format_error("Table id should be simple name project dataset provided separately") + + try: + client = get_bq_client(); table_ref_str = f"{project_id}.{dataset_id}.{table_id}" + table_ref = bigquery.TableReference.from_string(table_ref_str) + table = await asyncio.to_thread(_get_table_sync, client, table_ref) + schema_list = [{"name": f.name, "type": f.field_type, "mode": f.mode} for f in table.schema] + return format_success("Schema retrieved", data={"project_id": table.project, "dataset_id": table.dataset_id, "table_id": table.table_id, "schema": schema_list}) + except google_exceptions.NotFound: return format_error(f"Table {table_ref_str} not found") + except Exception as e: return handle_gcp_error(e, f"getting schema table {table_ref_str}") + +async def bq_submit_query( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: + """Submits BQ query job asynchronously returns job ID uses Firestore""" + query_str = arguments.get("query"); # ... validation ... + if not query_str or not isinstance(query_str, str): return format_error("Missing invalid query string") + project_id_arg = arguments.get("project_id"); # ... validation ... + default_project_id_arg = arguments.get("default_dataset_project_id"); # ... validation ... + default_dataset_id_arg = arguments.get("default_dataset_id"); # ... validation ... + + target_project = project_id_arg # Project run job in + job_default_dataset_ref: Optional[bigquery.DatasetReference] = None + if default_project_id_arg and default_dataset_id_arg: + job_default_dataset_ref = bigquery.DatasetReference(default_project_id_arg, default_dataset_id_arg) + + try: + client = get_bq_client(); + if not target_project: target_project = client.project + job_config = bigquery.QueryJobConfig(use_legacy_sql=False); + if job_default_dataset_ref: job_config.default_dataset = job_default_dataset_ref + logger.info(f"Submitting BQ Job Project {target_project} Query {query_str[:50]}", extra={"conn_id": conn_id}) + query_job = await asyncio.to_thread(_submit_job_sync, client, query_str, job_config, target_project) + job_id = query_job.job_id; location = query_job.location; initial_state = query_job.state + logger.info(f"BQ Job submitted {job_id} Location {location} State {initial_state}", extra={"conn_id": conn_id}) + # --- Store Job Info Firestore --- + job_info = BqJobInfo(job_id=job_id, location=location, conn_id=conn_id, status=initial_state) + await bq_job_store.add_job(job_info) # Uses Firestore store now + # -------------------------------- + return format_success("Query submitted Use bq get job status poll", data={"job_id": job_id, "location": location, "state": initial_state}) + except google_exceptions.BadRequest as e: return handle_gcp_error(e, "submitting query BadRequest") + except google_exceptions.Forbidden as e: return handle_gcp_error(e, "submitting query Forbidden") + except Exception as e: return handle_gcp_error(e, f"submitting BQ query") + +async def bq_get_job_status( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: + """Checks job status via Firestore if DONE Success fetches first page results""" + job_id = arguments.get("job_id"); # ... validation ... + if not job_id or not isinstance(job_id, str): return format_error("Missing invalid job id") + # Location arg is mainly for fallback API call if needed now + location_arg = arguments.get("location"); # ... validation ... + + logger.debug(f"Getting job status from Firestore {job_id}", extra={"conn_id": conn_id}) + job_info = await bq_job_store.get_job(job_id) # Reads from Firestore + + if not job_info: + # Optional Fallback check BQ API directly if not found in Firestore + # logger.warning(f"Job {job_id} not found Firestore trying direct API lookup", extra={"conn_id": conn_id}) + # try: client=get_bq_client(); job = await asyncio.to_thread(_get_job_sync, client, job_id, location_arg); job_info = BqJobInfo(...) # Reconstruct if needed + # except google_exceptions.NotFound: return format_error(...) + # except Exception as e: return handle_gcp_error(...) + # If still not found after fallback + return format_error(f"Job {job_id} not found tracked") + + status_data = {"job_id": job_info.job_id, "location": job_info.location, "state": job_info.status, "error_result": job_info.error_result} + + if job_info.status == 'DONE': + if job_info.error_result: return format_error(f"Job {job_id} finished errors", data=status_data) + else: + # --- Job Done Successfully Fetch FIRST page results --- + logger.info(f"Job {job_id} DONE Fetching first page results", extra={"conn_id": conn_id}) + try: + client = get_bq_client(); max_results_first_page = 1000 + # Use retry wrapped helper fetch page + schema, rows, token, total = await asyncio.to_thread( + _list_rows_sync, client, job_id, job_info.location, None, max_results_first_page # page token None + ) + schema_list = [{"name": f.name, "type": f.field_type} for f in schema]; rows_list = [_serialize_row(r) for r in rows] + status_data["schema"] = schema_list; status_data["rows"] = rows_list; status_data["next_page_token"] = token; status_data["total_rows"] = total + return format_success(f"Job {job_id} completed Returning first page results", data=status_data) + except Exception as e: + logger.error(f"Error fetching first page results completed job {job_id} {e}", exc_info=True, extra={"conn_id": conn_id}) + # Return DONE status indicate result fetch error + return format_error(f"Job {job_id} completed but failed fetch first page results {e}", data = {**status_data, "rows": None, "schema": None, "next_page_token": None}) # type ignore + else: + # Job PENDING or RUNNING + logger.info(f"Job {job_id} still {job_info.status}", extra={"conn_id": conn_id}) + return format_info(f"Job {job_id} currently {job_info.status}", data=status_data) + + +async def bq_get_query_results( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: + """Fetches specific page results completed BQ query job requires page token""" + job_id = arguments.get("job_id"); page_token = arguments.get("page_token"); location_arg = arguments.get("location") + # Validation + if not job_id or not isinstance(job_id, str): return format_error("Missing invalid job id") + if not page_token or not isinstance(page_token, str): return format_error("Missing invalid required argument page token fetch subsequent pages") + if location_arg is not None and not isinstance(location_arg, str): return format_error("Invalid location") + try: max_results = int(arguments.get("max_results", 1000)); # ... range check ... + except (ValueError, TypeError): max_results = 1000 + + # Determine location argument > stored job info > error + location = location_arg + if not location: + job_info = await bq_job_store.get_job(job_id) # Read from Firestore + if job_info: location = job_info.location + else: return format_error(f"Cannot fetch results page Location job {job_id} unknown Please provide location") + if not location: return format_error(f"Cannot fetch results page Location job {job_id} could not be determined") + + try: + client = get_bq_client() + logger.debug(f"Getting results page BQ job {job_id} Loc {location} PageToken {page_token[:10]}", extra={"conn_id": conn_id}) + # Fetch requested page retry wrapped helper + schema, rows, token, total = await asyncio.to_thread( + _list_rows_sync, client, job_id, location, page_token, max_results + ) + schema_list = [{"name": f.name, "type": f.field_type} for f in schema]; rows_list = [_serialize_row(r) for r in rows] + return format_success("Query results page retrieved", data={"job_id": job_id, "location": location, "schema": schema_list, "rows": rows_list, "next_page_token": token, "total_rows": total}) + except google_exceptions.NotFound: return format_error(f"Job {job_id} not found or invalid page token") + except Exception as e: return handle_gcp_error(e, f"getting results page job {job_id}") + + +def _serialize_row(row: bigquery.table.Row) -> Dict[str, Any]: + """Helper convert BQ Row JSON serializable dict""" + row_dict = {}; # ... implementation unchanged ... + for key, value in row.items(): + if isinstance(value, bytes): + try: row_dict[key] = value.decode('utf-8') + except UnicodeDecodeError: row_dict[key] = f"" + else: row_dict[key] = value + return row_dict diff --git a/mcp_agent/gcp_tools/storage.py b/mcp_agent/gcp_tools/storage.py index 4e30e2a..7125a2a 100644 --- a/mcp_agent/gcp_tools/storage.py +++ b/mcp_agent/gcp_tools/storage.py @@ -1,219 +1,254 @@ import asyncio import logging -from typing import Any, Dict, List, Optional, Tuple, Sequence +from typing import Any, Dict, List, Optional +from datetime import timedelta, datetime, timezone -from google.cloud import bigquery -from google.api_core import exceptions as google_exceptions, page_iterator +from google.cloud import storage +from google.api_core import exceptions as google_exceptions from mcp import types as mcp_types -# REMOVED: ConnectionContextManager import -from ..job_store import FirestoreBqJobStore # Import Firestore store -from ..utils import format_success, format_error, format_info, handle_gcp_error, McpToolReturnType -# Import retry decorator -from ..utils import retry_on_gcp_transient_error - -logger = logging.getLogger("mcp_agent.gcp_tools.bigquery") - -_bq_client: Optional[bigquery.Client] = None # Keep client cache - -def get_bq_client() -> bigquery.Client: - """Initializes returns cached BQ client uses ADC""" - # ... (implementation unchanged) ... - global _bq_client; # ... init logic ...; return _bq_client - -# --- Apply Retry Decorator Sync Helpers --- +try: + from ..utils import ( + format_success, format_error, handle_gcp_error, + McpToolReturnType, retry_on_gcp_transient_error + ) +except ImportError: + logging.critical("Failed to import utils in gcs_tools_temp.py. Ensure PYTHONPATH is correct.") + # Define dummy decorators/formatters if utils not found, to allow basic loading + def retry_on_gcp_transient_error(func): return func + def format_success(msg, data=None): return [mcp_types.TextContent(type="text", text=f'{{"status": "success", "message": "{msg}", "data": {data or {}}}}')] + def format_error(msg, data=None): return [mcp_types.TextContent(type="text", text=f'{{"status": "error", "message": "{msg}", "data": {data or {}}}}')] + def handle_gcp_error(e, desc): return [mcp_types.TextContent(type="text", text=f'{{"status": "error", "message": "GCP Error in {desc}: {e}"}}')] + McpToolReturnType = List[mcp_types.Content] + + +logger = logging.getLogger("mcp_agent.gcp_tools.storage") # Target logger name + +_storage_client: Optional[storage.Client] = None + +def get_storage_client() -> storage.Client: + """Initializes and returns a cached GCS client using Application Default Credentials.""" + global _storage_client + if _storage_client is None: + logger.info("Initializing Google Cloud Storage client.") + try: + _storage_client = storage.Client() + logger.info("Google Cloud Storage client initialized successfully.") + except Exception as e: + logger.critical(f"Failed to initialize GCS client: {e}", exc_info=True) + raise RuntimeError(f"GCS client initialization failed: {e}") from e + return _storage_client + +# --- Synchronous helpers with retry --- @retry_on_gcp_transient_error -def _get_dataset_sync(client: bigquery.Client, dataset_ref: bigquery.DatasetReference): - logger.debug(f"Running client get dataset thread {dataset_ref} retry") - client.get_dataset(dataset_ref) +def _list_buckets_sync(client: storage.Client, project_id: Optional[str]) -> List[str]: + logger.debug(f"Running GCS list buckets for project {project_id or 'default client project'}") + buckets = client.list_buckets(project=project_id) + return [bucket.name for bucket in buckets] @retry_on_gcp_transient_error -def _list_datasets_sync(client: bigquery.Client, project_id: Optional[str]): - logger.debug(f"Running client list datasets thread project {project_id or 'default'} retry") - return [ds.dataset_id for ds in client.list_datasets(project=project_id)], (project_id or client.project) +def _list_blobs_sync(client: storage.Client, bucket_name: str, prefix: Optional[str], delimiter: Optional[str]) -> List[Dict[str, Any]]: + logger.debug(f"Running GCS list blobs for bucket '{bucket_name}', prefix '{prefix}', delimiter '{delimiter}'") + bucket = client.bucket(bucket_name) + blobs_iterator = bucket.list_blobs(prefix=prefix, delimiter=delimiter) + results = [] + for blob_item in blobs_iterator: + results.append({ + "name": blob_item.name, + "size": blob_item.size, + "updated": blob_item.updated.isoformat() if blob_item.updated else None, + "content_type": blob_item.content_type + }) + # If delimiter is used, prefixes (virtual folders) are also in blobs_iterator.prefixes + if delimiter and hasattr(blobs_iterator, 'prefixes') and blobs_iterator.prefixes: + for p in blobs_iterator.prefixes: + results.append({"name": p, "type": "prefix"}) # Indicate it's a prefix + return results @retry_on_gcp_transient_error -def _list_tables_sync(client: bigquery.Client, dataset_ref: bigquery.DatasetReference): - logger.debug(f"Running client list tables thread {dataset_ref} retry") - return [table.table_id for table in client.list_tables(dataset_ref)] +def _upload_string_sync(client: storage.Client, bucket_name: str, object_name: str, content: str, content_type: Optional[str]): + logger.debug(f"Running GCS upload string to '{bucket_name}/{object_name}'") + bucket = client.bucket(bucket_name) + blob = bucket.blob(object_name) + blob.upload_from_string(content, content_type=content_type or 'text/plain') + return {"name": blob.name, "bucket": blob.bucket.name, "size": blob.size, "content_type": blob.content_type} @retry_on_gcp_transient_error -def _get_table_sync(client: bigquery.Client, table_ref: bigquery.TableReference): - logger.debug(f"Running client get table thread {table_ref} retry") - return client.get_table(table_ref) - -@retry_on_gcp_transient_error -def _submit_job_sync(client: bigquery.Client, query_str: str, job_config: bigquery.QueryJobConfig, project: str): - logger.debug(f"Running client query thread project {project} retry") - return client.query(query=query_str, job_config=job_config, project=project) - -@retry_on_gcp_transient_error -def _get_job_sync(client: bigquery.Client, job_id: str, location: Optional[str]): - logger.debug(f"Running client get job thread {job_id} retry") - return client.get_job(job_id, location=location) +def _generate_signed_url_sync( + client: storage.Client, + bucket_name: str, + object_name: str, + method: str, + expiration_delta: timedelta, + content_type: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + version: str = "v4" +) -> str: + logger.debug(f"Generating GCS {method} signed URL for '{bucket_name}/{object_name}', version '{version}'") + bucket = client.bucket(bucket_name) + blob = bucket.blob(object_name) + + # Ensure expiration is timezone-aware for v4, though timedelta handles it. + # The library now prefers timedelta directly for expiration. + + url = blob.generate_signed_url( + version=version, + expiration=expiration_delta, + method=method, + content_type=content_type, + headers=headers + ) + return url + +# --- Async Tool Implementations --- + +async def gcs_list_buckets(arguments: Dict[str, Any], conn_id: str, **kwargs) -> McpToolReturnType: + project_id = arguments.get("project_id") + if project_id is not None and not isinstance(project_id, str): + return format_error("Invalid 'project_id', must be a string.") + try: + client = get_storage_client() + bucket_names = await asyncio.to_thread(_list_buckets_sync, client, project_id) + return format_success("Buckets listed successfully.", data={"buckets": bucket_names, "project_id": project_id or client.project}) + except Exception as e: + return handle_gcp_error(e, f"listing GCS buckets for project '{project_id or 'default'}'") + +async def gcs_list_objects(arguments: Dict[str, Any], conn_id: str, **kwargs) -> McpToolReturnType: + bucket_name = arguments.get("bucket_name") + prefix = arguments.get("prefix") + delimiter = arguments.get("delimiter") + + if not bucket_name or not isinstance(bucket_name, str): + return format_error("Missing or invalid 'bucket_name' argument.") + if prefix is not None and not isinstance(prefix, str): + return format_error("Invalid 'prefix' argument, must be a string.") + if delimiter is not None and not isinstance(delimiter, str): + return format_error("Invalid 'delimiter' argument, must be a string.") -@retry_on_gcp_transient_error -def _list_rows_sync(client: bigquery.Client, job_id: str, location: Optional[str], page_token: Optional[str], max_results: int): - logger.debug(f"Running client list rows thread page job {job_id} retry") - rows_iterator = client.list_rows(job_id, location=location, page_token=page_token, max_results=max_results) - page_rows = list(rows_iterator) # Consume page - return rows_iterator.schema, page_rows, rows_iterator.next_page_token, rows_iterator.total_rows - -# --- Tool Implementations Require explicit args --- - -async def bq_list_datasets( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: - """Lists accessible BQ datasets""" - project_id = arguments.get("project_id"); # ... type validation ... - if project_id is not None and not isinstance(project_id, str): return format_error("Invalid project id must be string") try: - client = get_bq_client() - dataset_list, used_project = await asyncio.to_thread(_list_datasets_sync, client, project_id) - return format_success("Datasets listed", data={"project_id": used_project, "datasets": dataset_list}) - except Exception as e: return handle_gcp_error(e, f"listing BQ datasets project {project_id or 'default'}") - -async def bq_list_tables( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: - """Lists tables within required BQ dataset""" - project_id = arguments.get("project_id"); dataset_id = arguments.get("dataset_id") - if not project_id or not isinstance(project_id, str): return format_error("Missing invalid project id") - if not dataset_id or not isinstance(dataset_id, str): return format_error("Missing invalid dataset id") + client = get_storage_client() + objects = await asyncio.to_thread(_list_blobs_sync, client, bucket_name, prefix, delimiter) + return format_success(f"Objects listed for bucket '{bucket_name}'.", data={"bucket_name": bucket_name, "objects": objects}) + except google_exceptions.NotFound: + return format_error(f"Bucket '{bucket_name}' not found.") + except Exception as e: + return handle_gcp_error(e, f"listing objects in GCS bucket '{bucket_name}'") + +async def gcs_get_read_signed_url(arguments: Dict[str, Any], conn_id: str, **kwargs) -> McpToolReturnType: + bucket_name = arguments.get("bucket_name") + object_name = arguments.get("object_name") + expiration_minutes = arguments.get("expiration_minutes", 60) # Default to 60 minutes + + if not bucket_name or not isinstance(bucket_name, str): + return format_error("Missing or invalid 'bucket_name'.") + if not object_name or not isinstance(object_name, str): + return format_error("Missing or invalid 'object_name'.") + try: + expiration_minutes = int(expiration_minutes) + if expiration_minutes <= 0 or expiration_minutes > 7 * 24 * 60: # Max 7 days for v4 + raise ValueError("Expiration must be between 1 minute and 7 days (10080 minutes).") + except ValueError: + return format_error("Invalid 'expiration_minutes', must be a positive integer (max 10080).") + try: - client = get_bq_client(); dataset_ref = bigquery.DatasetReference(project_id, dataset_id) - table_list = await asyncio.to_thread(_list_tables_sync, client, dataset_ref) - return format_success("Tables listed", data={"project_id": project_id, "dataset_id": dataset_id, "tables": table_list}) - except Exception as e: return handle_gcp_error(e, f"listing BQ tables dataset {project_id}:{dataset_id}") - -async def bq_get_table_schema( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: - """Gets schema required BQ table""" - project_id = arguments.get("project_id"); dataset_id = arguments.get("dataset_id"); table_id = arguments.get("table_id") - # Simplified validation assumes IDs are mandatory args now - if not project_id or not isinstance(project_id, str): return format_error("Missing invalid project id") - if not dataset_id or not isinstance(dataset_id, str): return format_error("Missing invalid dataset id") - if not table_id or not isinstance(table_id, str): return format_error("Missing invalid table id") - # Table ID parsing logic removed assumes simple table ID with required project/dataset args - if '.' in table_id: return format_error("Table id should be simple name project dataset provided separately") + client = get_storage_client() + expiration_delta = timedelta(minutes=expiration_minutes) + + # For GET, content_type and headers are usually not needed for the URL itself + # unless specific response headers are desired from GCS (response_disposition, etc.) + # which can be passed via `response_disposition` or `response_type` to generate_signed_url. + # For simplicity, we are not including them here unless a specific need arises. + + signed_url = await asyncio.to_thread( + _generate_signed_url_sync, + client, bucket_name, object_name, "GET", expiration_delta, version="v4" + ) + return format_success("Read signed URL generated.", data={ + "bucket_name": bucket_name, + "object_name": object_name, + "signed_url": signed_url, + "method": "GET", + "expires_at": (datetime.now(timezone.utc) + expiration_delta).isoformat() + }) + except google_exceptions.NotFound: # Bucket or blob might not exist, though URL can still be generated + logger.warning(f"GCS Read Signed URL: Bucket '{bucket_name}' or object '{object_name}' may not exist, but URL generated.", exc_info=False) + # Depending on strictness, one might error here or let the client discover the 404. + # For now, let the URL be generated; the client will get a 404 if the object doesn't exist. + # Re-raising to be caught by handle_gcp_error if that's the desired behavior for non-existence. + # However, generate_signed_url itself doesn't check existence. + # Let's assume the primary error source would be IAM for token creation. + return format_error(f"Could not generate read signed URL. Ensure bucket '{bucket_name}' and object '{object_name}' exist if access fails, or check IAM permissions for the service account (needs Service Account Token Creator on itself).") + + except Exception as e: + return handle_gcp_error(e, f"generating read signed URL for '{bucket_name}/{object_name}'") + + +async def gcs_get_write_signed_url(arguments: Dict[str, Any], conn_id: str, **kwargs) -> McpToolReturnType: + bucket_name = arguments.get("bucket_name") + object_name = arguments.get("object_name") + expiration_minutes = arguments.get("expiration_minutes", 60) # Default to 60 minutes + content_type = arguments.get("content_type") # Optional, e.g., 'application/octet-stream' or 'image/jpeg' + custom_headers = arguments.get("headers") # Optional custom headers like 'x-goog-meta-*' + + if not bucket_name or not isinstance(bucket_name, str): + return format_error("Missing or invalid 'bucket_name'.") + if not object_name or not isinstance(object_name, str): + return format_error("Missing or invalid 'object_name'.") + if content_type is not None and not isinstance(content_type, str): + return format_error("Invalid 'content_type', must be a string if provided.") + if custom_headers is not None and not isinstance(custom_headers, dict): + return format_error("Invalid 'headers', must be a dictionary if provided.") try: - client = get_bq_client(); table_ref_str = f"{project_id}.{dataset_id}.{table_id}" - table_ref = bigquery.TableReference.from_string(table_ref_str) - table = await asyncio.to_thread(_get_table_sync, client, table_ref) - schema_list = [{"name": f.name, "type": f.field_type, "mode": f.mode} for f in table.schema] - return format_success("Schema retrieved", data={"project_id": table.project, "dataset_id": table.dataset_id, "table_id": table.table_id, "schema": schema_list}) - except google_exceptions.NotFound: return format_error(f"Table {table_ref_str} not found") - except Exception as e: return handle_gcp_error(e, f"getting schema table {table_ref_str}") - -async def bq_submit_query( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: - """Submits BQ query job asynchronously returns job ID uses Firestore""" - query_str = arguments.get("query"); # ... validation ... - if not query_str or not isinstance(query_str, str): return format_error("Missing invalid query string") - project_id_arg = arguments.get("project_id"); # ... validation ... - default_project_id_arg = arguments.get("default_dataset_project_id"); # ... validation ... - default_dataset_id_arg = arguments.get("default_dataset_id"); # ... validation ... - - target_project = project_id_arg # Project run job in - job_default_dataset_ref: Optional[bigquery.DatasetReference] = None - if default_project_id_arg and default_dataset_id_arg: - job_default_dataset_ref = bigquery.DatasetReference(default_project_id_arg, default_dataset_id_arg) + expiration_minutes = int(expiration_minutes) + if expiration_minutes <= 0 or expiration_minutes > 7 * 24 * 60: + raise ValueError("Expiration must be between 1 minute and 7 days (10080 minutes).") + except ValueError: + return format_error("Invalid 'expiration_minutes', must be a positive integer (max 10080).") try: - client = get_bq_client(); - if not target_project: target_project = client.project - job_config = bigquery.QueryJobConfig(use_legacy_sql=False); - if job_default_dataset_ref: job_config.default_dataset = job_default_dataset_ref - logger.info(f"Submitting BQ Job Project {target_project} Query {query_str[:50]}", extra={"conn_id": conn_id}) - query_job = await asyncio.to_thread(_submit_job_sync, client, query_str, job_config, target_project) - job_id = query_job.job_id; location = query_job.location; initial_state = query_job.state - logger.info(f"BQ Job submitted {job_id} Location {location} State {initial_state}", extra={"conn_id": conn_id}) - # --- Store Job Info Firestore --- - job_info = BqJobInfo(job_id=job_id, location=location, conn_id=conn_id, status=initial_state) - await bq_job_store.add_job(job_info) # Uses Firestore store now - # -------------------------------- - return format_success("Query submitted Use bq get job status poll", data={"job_id": job_id, "location": location, "state": initial_state}) - except google_exceptions.BadRequest as e: return handle_gcp_error(e, "submitting query BadRequest") - except google_exceptions.Forbidden as e: return handle_gcp_error(e, "submitting query Forbidden") - except Exception as e: return handle_gcp_error(e, f"submitting BQ query") - -async def bq_get_job_status( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: - """Checks job status via Firestore if DONE Success fetches first page results""" - job_id = arguments.get("job_id"); # ... validation ... - if not job_id or not isinstance(job_id, str): return format_error("Missing invalid job id") - # Location arg is mainly for fallback API call if needed now - location_arg = arguments.get("location"); # ... validation ... - - logger.debug(f"Getting job status from Firestore {job_id}", extra={"conn_id": conn_id}) - job_info = await bq_job_store.get_job(job_id) # Reads from Firestore - - if not job_info: - # Optional Fallback check BQ API directly if not found in Firestore - # logger.warning(f"Job {job_id} not found Firestore trying direct API lookup", extra={"conn_id": conn_id}) - # try: client=get_bq_client(); job = await asyncio.to_thread(_get_job_sync, client, job_id, location_arg); job_info = BqJobInfo(...) # Reconstruct if needed - # except google_exceptions.NotFound: return format_error(...) - # except Exception as e: return handle_gcp_error(...) - # If still not found after fallback - return format_error(f"Job {job_id} not found tracked") - - status_data = {"job_id": job_info.job_id, "location": job_info.location, "state": job_info.status, "error_result": job_info.error_result} - - if job_info.status == 'DONE': - if job_info.error_result: return format_error(f"Job {job_id} finished errors", data=status_data) - else: - # --- Job Done Successfully Fetch FIRST page results --- - logger.info(f"Job {job_id} DONE Fetching first page results", extra={"conn_id": conn_id}) - try: - client = get_bq_client(); max_results_first_page = 1000 - # Use retry wrapped helper fetch page - schema, rows, token, total = await asyncio.to_thread( - _list_rows_sync, client, job_id, job_info.location, None, max_results_first_page # page token None - ) - schema_list = [{"name": f.name, "type": f.field_type} for f in schema]; rows_list = [_serialize_row(r) for r in rows] - status_data["schema"] = schema_list; status_data["rows"] = rows_list; status_data["next_page_token"] = token; status_data["total_rows"] = total - return format_success(f"Job {job_id} completed Returning first page results", data=status_data) - except Exception as e: - logger.error(f"Error fetching first page results completed job {job_id} {e}", exc_info=True, extra={"conn_id": conn_id}) - # Return DONE status indicate result fetch error - return format_error(f"Job {job_id} completed but failed fetch first page results {e}", data = {**status_data, "rows": None, "schema": None, "next_page_token": None}) # type ignore - else: - # Job PENDING or RUNNING - logger.info(f"Job {job_id} still {job_info.status}", extra={"conn_id": conn_id}) - return format_info(f"Job {job_id} currently {job_info.status}", data=status_data) - - -async def bq_get_query_results( arguments: Dict[str, Any], conn_id: str, bq_job_store: FirestoreBqJobStore, **kwargs ) -> McpToolReturnType: - """Fetches specific page results completed BQ query job requires page token""" - job_id = arguments.get("job_id"); page_token = arguments.get("page_token"); location_arg = arguments.get("location") - # Validation - if not job_id or not isinstance(job_id, str): return format_error("Missing invalid job id") - if not page_token or not isinstance(page_token, str): return format_error("Missing invalid required argument page token fetch subsequent pages") - if location_arg is not None and not isinstance(location_arg, str): return format_error("Invalid location") - try: max_results = int(arguments.get("max_results", 1000)); # ... range check ... - except (ValueError, TypeError): max_results = 1000 - - # Determine location argument > stored job info > error - location = location_arg - if not location: - job_info = await bq_job_store.get_job(job_id) # Read from Firestore - if job_info: location = job_info.location - else: return format_error(f"Cannot fetch results page Location job {job_id} unknown Please provide location") - if not location: return format_error(f"Cannot fetch results page Location job {job_id} could not be determined") + client = get_storage_client() + expiration_delta = timedelta(minutes=expiration_minutes) + + signed_url = await asyncio.to_thread( + _generate_signed_url_sync, + client, bucket_name, object_name, "PUT", expiration_delta, + content_type=content_type, headers=custom_headers, version="v4" + ) + return format_success("Write signed URL generated.", data={ + "bucket_name": bucket_name, + "object_name": object_name, + "signed_url": signed_url, + "method": "PUT", + "content_type_expected": content_type, # Client should use this Content-Type header + "custom_headers_expected": custom_headers, # Client should include these headers + "expires_at": (datetime.now(timezone.utc) + expiration_delta).isoformat() + }) + except Exception as e: # Catch broad exceptions, including potential IAM issues for SA Token Creator + return handle_gcp_error(e, f"generating write signed URL for '{bucket_name}/{object_name}'") + +async def gcs_write_string_object(arguments: Dict[str, Any], conn_id: str, **kwargs) -> McpToolReturnType: + bucket_name = arguments.get("bucket_name") + object_name = arguments.get("object_name") + content = arguments.get("content") + content_type = arguments.get("content_type") # Optional + + if not bucket_name or not isinstance(bucket_name, str): + return format_error("Missing or invalid 'bucket_name'.") + if not object_name or not isinstance(object_name, str): + return format_error("Missing or invalid 'object_name'.") + if content is None or not isinstance(content, str): # Content must be string for this tool + return format_error("Missing or invalid 'content', must be a string.") + if content_type is not None and not isinstance(content_type, str): + return format_error("Invalid 'content_type', must be a string if provided.") try: - client = get_bq_client() - logger.debug(f"Getting results page BQ job {job_id} Loc {location} PageToken {page_token[:10]}", extra={"conn_id": conn_id}) - # Fetch requested page retry wrapped helper - schema, rows, token, total = await asyncio.to_thread( - _list_rows_sync, client, job_id, location, page_token, max_results + client = get_storage_client() + upload_result = await asyncio.to_thread( + _upload_string_sync, client, bucket_name, object_name, content, content_type ) - schema_list = [{"name": f.name, "type": f.field_type} for f in schema]; rows_list = [_serialize_row(r) for r in rows] - return format_success("Query results page retrieved", data={"job_id": job_id, "location": location, "schema": schema_list, "rows": rows_list, "next_page_token": token, "total_rows": total}) - except google_exceptions.NotFound: return format_error(f"Job {job_id} not found or invalid page token") - except Exception as e: return handle_gcp_error(e, f"getting results page job {job_id}") - - -def _serialize_row(row: bigquery.table.Row) -> Dict[str, Any]: - """Helper convert BQ Row JSON serializable dict""" - row_dict = {}; # ... implementation unchanged ... - for key, value in row.items(): - if isinstance(value, bytes): - try: row_dict[key] = value.decode('utf-8') - except UnicodeDecodeError: row_dict[key] = f"" - else: row_dict[key] = value - return row_dict + return format_success(f"String content written to '{bucket_name}/{object_name}'.", data=upload_result) + except google_exceptions.NotFound: + return format_error(f"Bucket '{bucket_name}' not found.") + except Exception as e: + return handle_gcp_error(e, f"writing string to GCS object '{bucket_name}/{object_name}'") diff --git a/mcp_agent/server.py b/mcp_agent/server.py index 46e79b1..cb60a11 100644 --- a/mcp_agent/server.py +++ b/mcp_agent/server.py @@ -1,133 +1,255 @@ -import argparse import asyncio +import json import logging import sys -import os -from typing import Set -from dotenv import load_dotenv -from pythonjsonlogger import jsonlogger +from typing import Set, Optional, Any, Dict, List, Callable, Awaitable -# Attempt relative import first +from aiohttp import web +from aiohttp_sse import sse_response +from mcp import types as mcp_types + +# Attempt to import ALL_TOOLS_MAP and FirestoreBqJobStore try: - from .server import run_stdio_server, run_sse_server - from .gcp_tools.storage import get_storage_client - from .gcp_tools.bigquery import get_bq_client - from .utils import get_secret_manager_client, fetch_secret - from .job_store import FirestoreBqJobStore # Use Firestore store + from .gcp_tools import ALL_TOOLS_MAP # To be populated later + from .job_store import FirestoreBqJobStore # Now available except ImportError: - # Fallback running script directly - import os; sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) - from mcp_agent.server import run_stdio_server, run_sse_server - from mcp_agent.gcp_tools.storage import get_storage_client - from mcp_agent.gcp_tools.bigquery import get_bq_client - from mcp_agent.utils import get_secret_manager_client, fetch_secret - from mcp_agent.job_store import FirestoreBqJobStore - - -# --- Configure Logging JSON Formatter --- -root_logger = logging.getLogger(); logHandler = logging.StreamHandler(sys.stderr) -formatter = jsonlogger.JsonFormatter('%(asctime)s %(levelname)s %(name)s %(message)s %(pathname)s %(lineno)d') -logHandler.setFormatter(formatter); root_logger.handlers.clear(); root_logger.addHandler(logHandler) -root_logger.setLevel(logging.INFO); logger = logging.getLogger("mcp_agent.cli") -# --- End Logging Setup --- - -# Create instance Firestore store enable pre flight check -# Note This assumes default Firestore database project -_firestore_job_store = FirestoreBqJobStore() - -def parse_args(): - """Parses command line arguments MCP agent server""" - parser = argparse.ArgumentParser( - description="Run MCP Agent server GCS BQ v1 0 0 Stateless Firestore BQ Jobs", # Updated - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("--tools", type=str, required=True, help="Comma separated storage bigquery") - parser.add_argument("--port", type=str, required=True, help="Connection mode stdio or SSE port number") - parser.add_argument("--host", type=str, default="127.0.0.1", help="SSE Only Host address Use 0 0 0 0 network access") - parser.add_argument("--require-api-key", action='store_true', help="SSE Only Enable API key auth Reads MCP AGENT API KEY SECRET NAME then MCP AGENT API KEY") - parser.add_argument("--debug", action="store_true", help="Enable verbose debug logging") - return parser.parse_args() - -def main() -> None: - """Main entry point mcp agent command line tool""" - dotenv_path = load_dotenv() # Load env file early - if dotenv_path: logger.info("Loaded env vars from env file", extra={'dotenv_path': dotenv_path}) - else: logger.info("No env file found") - args = parse_args() - # --- Setup Logging Level --- - log_level = logging.DEBUG if args.debug else logging.INFO # ... set levels ... - logging.getLogger("mcp_agent").setLevel(log_level); # ... other logger levels ... - if args.debug: logger.debug("Debug logging enabled") - else: logging.getLogger("google").setLevel(logging.WARNING); logging.getLogger("mcp").setLevel(logging.INFO) - - # --- Validate Tools --- - try: enabled_tools: Set[str] = set(t.strip().lower() for t in args.tools.split(',') if t.strip()) - except Exception: logger.critical("Invalid tools format"); sys.exit(1) - valid_tools = {"storage", "bigquery"}; invalid_tools = enabled_tools - valid_tools - if invalid_tools: logger.critical(f"Invalid tools {invalid_tools} Allowed {valid_tools}"); sys.exit(1) - if not enabled_tools: logger.critical("No tools specified"); sys.exit(1) - logger.info(f"Configuring mcp agent server v1 0 0", extra={"enabled_tools": list(enabled_tools)}) - - # --- Determine API Key Secret Manager integration --- - api_key_to_use: Optional[str] = None; secret_source: str = "None" - if args.port.lower() != "stdio" and args.require_api_key: - secret_name_var = os.getenv('MCP_AGENT_API_KEY_SECRET_NAME'); direct_key_var = os.getenv('MCP_AGENT_API_KEY') - if secret_name_var: - logger.info("Attempting fetch API key Secret Manager", extra={"secret_name": secret_name_var}) - try: - # Fetch secret uses retry internally now - api_key_to_use = fetch_secret(secret_name_var) - if api_key_to_use: secret_source = "Secret Manager"; logger.info("Successfully fetched API key Secret Manager") - else: logger.critical(f"FATAL Failed fetch API key Secret Manager {secret_name_var}"); sys.exit(1) - except Exception as sm_err: logger.critical(f"FATAL Error Secret Manager access {sm_err}", exc_info=args.debug); sys.exit(1) - elif direct_key_var: - logger.info("Using API key MCP AGENT API KEY environment variable"); api_key_to_use = direct_key_var; secret_source = "Environment Variable" - else: logger.critical("FATAL require api key flag set but neither secret name nor direct key env var set"); sys.exit(1) - logger.info(f"API Key Authentication Enabled Source {secret_source}") - elif args.port.lower() != "stdio": logger.info("API Key Authentication Disabled") - # --- End API Key Handling --- - - # --- Pre flight Check GCP Clients Add Firestore --- + # This implies a packaging or setup issue if job_store is not found. + logger = logging.getLogger("mcp_agent.server_prelim") # Use a distinct logger name for this specific warning + logger.critical("Could not import ALL_TOOLS_MAP or FirestoreBqJobStore. Tool dispatch will be severely limited or non-functional.", exc_info=True) + ALL_TOOLS_MAP: Dict[str, Callable[..., Awaitable[List[mcp_types.Content]]]] = {} + FirestoreBqJobStore = None # Make it None if not imported, so type hints don't break entirely below + +logger = logging.getLogger("mcp_agent.server") # Main logger for this module + +# --- Tool Dispatcher --- +async def dispatch_tool( + message: Dict[str, Any], + enabled_tools: Set[str], # Currently unused here, logic might be in ALL_TOOLS_MAP population + conn_id: str, + bq_job_store: Optional[FirestoreBqJobStore] = None, + **kwargs: Any # To catch any other args passed from server handlers +) -> List[mcp_types.Content]: + """ + Dispatches an MCP tool call to the appropriate implementation. + """ + tool_name = message.get("tool_name") + arguments = message.get("arguments", {}) + + if not tool_name: + logger.warning("Request missing 'tool_name'.", extra={"conn_id": conn_id, "request_message": message}) + return [mcp_types.TextContent(type="text", text=json.dumps({"status": "error", "message": "Missing tool_name in request."}))] + + if not ALL_TOOLS_MAP: # Check if tool map is empty (e.g. import failed) + logger.error("ALL_TOOLS_MAP is empty. Cannot dispatch any tools.", extra={"conn_id": conn_id}) + return [mcp_types.TextContent(type="text", text=json.dumps({"status": "error", "message": "Tool dispatch mechanism not available."}))] + + if tool_name not in ALL_TOOLS_MAP: + logger.warning(f"Tool '{tool_name}' not recognized or not enabled.", extra={"conn_id": conn_id, "tool_name": tool_name}) + return [mcp_types.TextContent(type="text", text=json.dumps({"status": "error", "message": f"Tool '{tool_name}' not recognized or not enabled."}))] + + tool_function = ALL_TOOLS_MAP[tool_name] + logger.info(f"Dispatching to tool: '{tool_name}'", extra={"conn_id": conn_id, "tool_name": tool_name, "arguments": arguments}) + try: - logger.info("Performing pre flight GCP client initialization check") - clients_to_init = [] - if "storage" in enabled_tools: clients_to_init.append(get_storage_client) - if "bigquery" in enabled_tools: - clients_to_init.append(get_bq_client) - # Add Firestore check if BQ enabled - clients_to_init.append(_firestore_job_store._get_db) # Use internal method force init - if args.port.lower() != "stdio" and args.require_api_key and os.getenv('MCP_AGENT_API_KEY_SECRET_NAME'): - clients_to_init.append(get_secret_manager_client) - # Run initializations sequentially allow easier debug failure - for init_func in clients_to_init: - if asyncio.iscoroutinefunction(init_func): asyncio.run(init_func()) # Run async init checks synchronously startup - else: init_func() - logger.info("GCP client pre flight check successful") + # Pass bq_job_store to the tool function if it's available and the tool might need it. + # Individual tools will need to be designed to accept bq_job_store in their **kwargs or explicitly. + # For now, we pass it if it's not None. + if bq_job_store: + response_contents = await tool_function(arguments=arguments, conn_id=conn_id, bq_job_store=bq_job_store, **kwargs) + else: + response_contents = await tool_function(arguments=arguments, conn_id=conn_id, **kwargs) + return response_contents except Exception as e: - logger.critical("FATAL GCP client check failed", extra={"error": str(e)}, exc_info=args.debug) - logger.critical("Check ADC credentials API enablement GCS BQ Firestore SecretManager network IAM roles Firestore User Admin") - sys.exit(1) - # --- End Pre flight Check --- - - # --- Start Server --- - loop = asyncio.get_event_loop(); main_task = None - server_mode = "STDIO" if args.port.lower() == "stdio" else f"SSE on {args.host}:{args.port}" + logger.error(f"Error executing tool '{tool_name}': {e}", exc_info=True, extra={"conn_id": conn_id, "tool_name": tool_name}) + # Consider using handle_gcp_error from utils if it's a GCP exception + return [mcp_types.TextContent(type="text", text=json.dumps({"status": "error", "message": f"Error executing tool '{tool_name}': {str(e)}"}))] + + +# --- STDIO Server Implementation --- +async def run_stdio_server( + enabled_tools: Set[str], + bq_job_store: Optional[FirestoreBqJobStore] = None, + **kwargs: Any # Catch-all for future parameters +) -> None: + logger.info("Starting STDIO server. Listening on stdin...") + # For STDIO, conn_id might be less dynamic unless specified by client in messages. + # Using a fixed one for the session. + conn_id = "stdio_session_main" + + while True: + try: + line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline) + if not line: + logger.info("STDIN closed. Shutting down STDIO server.") + break + line = line.strip() + if not line: + continue + + logger.debug(f"Received STDIO message: {line}", extra={"conn_id": conn_id}) + try: + message = json.loads(line) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON from STDIO: {line}. Error: {e}", exc_info=True, extra={"conn_id": conn_id}) + error_response_obj = mcp_types.TextContent(type="text", text=json.dumps({"status": "error", "message": "Invalid JSON message received."})) + sys.stdout.write(json.dumps(error_response_obj.model_dump()) + "\n") + sys.stdout.flush() + continue + + response_contents = await dispatch_tool( + message=message, + enabled_tools=enabled_tools, + conn_id=conn_id, + bq_job_store=bq_job_store + ) + + for content in response_contents: + if isinstance(content, mcp_types.Content): + sys.stdout.write(json.dumps(content.model_dump()) + "\n") + else: # Should not happen if dispatch_tool returns List[mcp_types.Content] + logger.error(f"Invalid content type from dispatch_tool: {type(content)}", extra={"conn_id": conn_id}) + sys.stdout.write(json.dumps(str(content)) + "\n") # Best effort + sys.stdout.flush() + + except KeyboardInterrupt: + logger.info("STDIO server interrupted by user (Ctrl+C).") + break + except Exception as e: + logger.critical(f"Unexpected error in STDIO server loop: {e}", exc_info=True, extra={"conn_id": conn_id}) + await asyncio.sleep(1) + + logger.info("STDIO server has shut down.") + + +# --- SSE Server Implementation --- +@web.middleware +async def api_key_middleware(request: web.Request, handler: Callable): + api_key_required = request.app.get("api_key") + # This middleware is only active if api_key_required is not None in app state + if api_key_required: + auth_header = request.headers.get("Authorization") + if not auth_header or not auth_header.startswith("Bearer "): + logger.warning("SSE request missing or malformed Authorization header.", extra={"remote": request.remote}) + raise web.HTTPUnauthorized( + text=json.dumps({"status": "error", "message": "Missing or malformed Authorization header. Expected 'Bearer '."}), + content_type="application/json" + ) + + token = auth_header.split("Bearer ")[1] + if token != api_key_required: + logger.warning("SSE request with invalid API key.", extra={"remote": request.remote}) + raise web.HTTPForbidden( + text=json.dumps({"status": "error", "message": "Invalid API key."}), + content_type="application/json" + ) + return await handler(request) + +async def handle_mcp_sse_request(request: web.Request): + # Generate a somewhat unique connection ID for logging/tracing this request + conn_id = request.headers.get("X-Connection-ID") # Allow client to specify + if not conn_id: + conn_id = f"sse_{request.remote}_{int(asyncio.get_running_loop().time())}" + + logger.info(f"SSE request received from {request.remote}", extra={"conn_id": conn_id, "path": request.path, "headers": dict(request.headers)}) + + if request.content_type != 'application/json': + logger.warning(f"Invalid content type: {request.content_type}", extra={"conn_id": conn_id}) + raise web.HTTPUnsupportedMediaType( + text=json.dumps({"status": "error", "message": "Expected application/json content type."}), + content_type="application/json" + ) + try: - if args.port.lower() == "stdio": - if args.require_api_key: logger.warning("API key requirement ignored stdio mode") - logger.info(f"Starting server {server_mode} mode") - main_task = loop.create_task(run_stdio_server(enabled_tools)) - else: # SSE Mode + message = await request.json() + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON from SSE request: {e}", exc_info=True, extra={"conn_id": conn_id}) + raise web.HTTPBadRequest( + text=json.dumps({"status": "error", "message": f"Invalid JSON in request body: {e}"}), + content_type="application/json" + ) + + logger.debug(f"Parsed SSE message: {message}", extra={"conn_id": conn_id}) + + enabled_tools = request.app["enabled_tools"] + bq_job_store = request.app.get("bq_job_store") # Will be None if BQ not enabled + + response_contents = await dispatch_tool( + message=message, + enabled_tools=enabled_tools, + conn_id=conn_id, + bq_job_store=bq_job_store + ) + + # Prepare SSE response. Each Content object becomes a separate event. + # The MCP spec usually expects a stream of JSON objects. + # aiohttp-sse sends events in the format: + # event: + # data: + # id: + # We'll use the default event name 'message'. + async with sse_response(request) as sse_resp: + for content_idx, content_obj in enumerate(response_contents): try: - port_num = int(args.port); # ... validate port range ... - if not (1024 <= port_num <= 65535): raise ValueError("Port out range") - logger.info(f"Starting server {server_mode} mode") - main_task = loop.create_task(run_sse_server(enabled_tools, args.host, port_num, api_key_to_use)) - except ValueError as e: logger.critical(f"Invalid port {args.port} {e}"); sys.exit(1) - if main_task: loop.run_until_complete(main_task) - except KeyboardInterrupt: logger.info("Server interrupted Ctrl C Shutting down") - except Exception as e: logger.critical(f"Unexpected error running server {e}", exc_info=args.debug) - finally: logger.info("Server shutdown process complete"); sys.exit(0) # Explicit exit - -if __name__ == "__main__": - main() + if isinstance(content_obj, mcp_types.Content): + payload_str = json.dumps(content_obj.model_dump()) + else: # Should ideally not happen + logger.error(f"dispatch_tool returned non-Content item: {type(content_obj)}", extra={"conn_id": conn_id}) + payload_str = json.dumps(str(content_obj)) # Best effort + + await sse_resp.send(payload_str) # Default event name is 'message' + logger.debug(f"Sent SSE event {content_idx + 1}/{len(response_contents)}: {payload_str[:100]}...", extra={"conn_id": conn_id}) + except Exception as e: + logger.error(f"Error serializing or sending SSE event {content_idx}: {e}", exc_info=True, extra={"conn_id": conn_id}) + # Attempt to send an error event back to the client if the stream is still open + try: + error_event_payload = json.dumps({"status": "error", "message": f"Internal error during SSE event generation for event {content_idx}: {str(e)}"}) + await sse_resp.send(error_event_payload) + except Exception as send_err_exc: + logger.error(f"Failed to send error event back to client: {send_err_exc}", extra={"conn_id": conn_id}) + # At this point, the connection might be too broken to continue. + # The sse_response context manager will handle closing. + break + return sse_resp # Return the response object + + +async def run_sse_server( + enabled_tools: Set[str], + host: str, + port: int, + api_key: Optional[str], # This is the API key value itself, if configured + bq_job_store: Optional[FirestoreBqJobStore] = None, + **kwargs: Any # Catch-all for future parameters +) -> None: + # If api_key is provided, authentication is active. + # The middleware will use app["api_key"] to check against. + app = web.Application(middlewares=[api_key_middleware] if api_key else []) + + app["enabled_tools"] = enabled_tools + app["api_key"] = api_key # Store the actual API key string for the middleware to use. Could be None. + app["bq_job_store"] = bq_job_store + + if api_key: + logger.info(f"SSE Server configured WITH API Key Authentication.") + else: + logger.info(f"SSE Server configured WITHOUT API Key Authentication (no API key provided or --require-api-key not used).") + + app.router.add_post("/mcp", handle_mcp_sse_request) + + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, host, port) + + logger.info(f"Starting SSE server on http://{host}:{port}/mcp") + try: + await site.start() + # Keep server running until interrupted + while True: + await asyncio.Future() # Wait indefinitely until cancelled + except KeyboardInterrupt: + logger.info("SSE server interrupted by user (Ctrl+C).") + except Exception as e: + logger.critical(f"SSE server failed: {e}", exc_info=True) + finally: + logger.info("Shutting down SSE server...") + await runner.cleanup() + logger.info("SSE server has shut down.") diff --git a/requirements.txt b/requirements.txt index d5ebcbb..3b9cf19 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,8 @@ google-cloud-secret-manager >= 2.10.0 python-dotenv >= 1.0.0 python-json-logger >= 2.0.0 tenacity >= 8.0.0 +aiohttp >= 3.8.0 # For SSE server +aiohttp-sse >= 2.1.0 # For SSE event streaming # Notes: # - Specify exact versions if needed for strict reproducibility (e.g., ==2.5.0). diff --git a/todo.md b/todo.md index 50e924b..b85fa32 100644 --- a/todo.md +++ b/todo.md @@ -2,87 +2,94 @@ This document outlines tasks to improve the `mcp-agent` codebase, fix critical issues, and align it with the v1.0.0 specification in the `README.md`. +**Legend:** +* `[x]` - Completed +* `[~]` - Partially Completed / In Progress +* `[ ]` - Not Started + ## priority-1 Critical Bugs & Missing Features These items prevent the agent from functioning as described in `README.md v1.0.0`. -* **[ ] Implement `FirestoreBqJobStore` Class:** - * Create the `FirestoreBqJobStore` class responsible for all Firestore interactions related to BigQuery job persistence (add, get, update job status). - * Define methods as implied by `README.md` and `bq_submit_query`/`bq_get_job_status` usage: - * `add_job(job_info: BqJobInfo)` - * `get_job(job_id: str) -> Optional[BqJobInfo]` - * `update_job_status(job_id: str, status: str, error_result: Optional[Dict] = None)` - * Potentially a method for the background poller to query pending jobs. - * **Location:** Consider placing this in `mcp_agent/job_store.py` (singular, as per README's implication for this class) or a new `mcp_agent/firestore_job_store.py`. Ensure imports are updated accordingly. -* **[ ] Define `BqJobInfo` Dataclass/TypedDict:** - * Create a `BqJobInfo` structure (e.g., a dataclass or TypedDict) to represent BigQuery job details stored in Firestore. - * Fields should include `job_id`, `location`, `conn_id`, `status`, `error_result`, `created_time`, `updated_time`, etc. - * Used by `FirestoreBqJobStore` and BigQuery tool functions. -* **[ ] Implement GCS Signed URL Tools:** - * Create the actual GCS tool implementations for generating signed URLs. - * Functions like `gcs_get_read_signed_url`, `gcs_get_write_signed_url` need to use `bucket.generate_signed_url()`. - * Other GCS tools (`gcs_list_buckets`, `gcs_list_objects`, `gcs_write_string_object`) also need their correct stateless implementations. - * **Location:** These should be in `mcp_agent/gcp_tools/storage.py`. -* **[ ] Implement Background BQ Job Poller:** - * The `README.md` mentions a "background BQ poller task (reading from/writing to Firestore)". This logic needs to be implemented. - * It should periodically query Firestore for jobs not in a terminal state (DONE, ERROR), check their actual status via BQ API, and update Firestore via `FirestoreBqJobStore`. - * This poller should be started as an asyncio task when the server initializes (likely in `cli.py` or the actual server run functions). +* **[x] Implement `FirestoreBqJobStore` Class:** + * Created the `FirestoreBqJobStore` class in `mcp_agent/job_store.py`. + * Defined methods: `add_job`, `get_job`, `update_job_status`, `query_pending_jobs`. +* **[x] Define `BqJobInfo` Dataclass/TypedDict:** + * Created `BqJobInfo` dataclass in `mcp_agent/job_store.py`. + * Fields include `job_id`, `location`, `conn_id`, `status`, `error_result`, `created_time`, `updated_time`, `project_id`, `query`. +* **[x] Implement GCS Signed URL Tools:** + * Implemented GCS tools (`gcs_list_buckets`, `gcs_list_objects`, `gcs_get_read_signed_url`, `gcs_get_write_signed_url`, `gcs_write_string_object`) in `mcp_agent/gcp_tools/storage.py`. +* **[x] Implement Background BQ Job Poller:** + * Implemented in `mcp_agent/bq_poller.py`. + * Started as an asyncio task in `mcp_agent/cli.py`. + * Periodically queries Firestore, checks BQ API, and updates Firestore. +* **[x] Implement Core Server Functions (`run_stdio_server`, `run_sse_server`):** (Moved from "Unknowns" as it was a critical missing piece) + * Implemented basic STDIO server loop in `mcp_agent/server.py`. + * Implemented SSE server using `aiohttp` in `mcp_agent/server.py`. + * Includes basic `dispatch_tool` logic. ## priority-2 Refactoring & Cleanup Improve code structure, clarity, and remove inconsistencies. -* **[ ] Rename BigQuery Tools File:** - * Rename `mcp_agent/gcp_tools/storage.py` (which currently contains BigQuery logic) to `mcp_agent/gcp_tools/bigquery.py`. -* **[ ] Consolidate Tool Definitions:** - * The content of `mcp_agent/jobstore.py` (plural, defining tool schemas and `ALL_TOOLS_MAP` for v1.0.0) should become the content of `mcp_agent/gcp_tools/__init__.py`. - * This makes `mcp_agent/gcp_tools/` a proper Python package for all tool implementations and their definitions. - * Delete the now-redundant `mcp_agent/jobstore.py` (plural) after moving its content. -* **[ ] Correct Import Paths:** - * Update all import statements affected by file renaming and restructuring. Examples: - * In `mcp_agent/cli.py`: Imports for `get_storage_client` (from the new `gcp_tools.storage`) and `get_bq_client` (from the new `gcp_tools.bigquery`). - * In `mcp_agent/gcp_tools/bigquery.py` (formerly `storage.py`): Import for `FirestoreBqJobStore` from its new correct location. - * In the new `mcp_agent/gcp_tools/__init__.py`: Imports for GCS functions from `.storage` and BQ functions from `.bigquery`. -* **[ ] Remove Legacy Code:** - * Delete `mcp_agent/context.py` (`ConnectionContextManager`). - * Delete the old (stateful) content of `mcp_agent/gcp_tools/__init__.py` (before replacing it as per above). -* **[ ] Clarify/Refactor `mcp_agent/server.py`:** - * Determine the true purpose of `mcp_agent/server.py`. - * If it's a redundant entry point to `cli.py`, remove it. - * If it's meant to contain core server helper functions used by `cli.py` (but not `run_stdio_server`/`run_sse_server` themselves if they are from an external lib), refactor it to only contain that logic and remove CLI parsing. - * The module-level instantiation of `FirestoreBqJobStore()` and pre-flight checks in this file are problematic and likely belong in `cli.py` or the server startup sequence. -* **[ ] Ensure Consistent Client Initialization:** - * The `get_storage_client()` and `get_bq_client()` functions should be correctly defined in their respective modules (`gcp_tools/storage.py` and `gcp_tools/bigquery.py`). - * `cli.py` should call these for pre-flight checks. +* **[x] Rename BigQuery Tools File:** + * Renamed `mcp_agent/gcp_tools/storage.py` (old BQ logic) to `mcp_agent/gcp_tools/bigquery.py`. +* **[x] Rename GCS Tools File:** (New task based on implementation) + * Renamed `mcp_agent/gcp_tools/gcs_tools_temp.py` to `mcp_agent/gcp_tools/storage.py`. +* **[x] Consolidate Tool Definitions:** + * Updated `mcp_agent/gcp_tools/__init__.py` to import new stateless tools and define `ALL_TOOLS_MAP` pointing to them. + * Schemas for advertisement are TBD (moved to a new P4 task). +* **[x] Correct Import Paths:** + * Updated import statements in `cli.py`, `server.py`, and `gcp_tools/__init__.py`. +* **[x] Remove Legacy Code:** + * Deleted `mcp_agent/context.py`. + * Old content of `mcp_agent/gcp_tools/__init__.py` was overwritten. +* **[x] Clarify/Refactor `mcp_agent/server.py`:** + * `cli.py` is now the sole entry point. + * `server.py` now contains the core server logic (`run_stdio_server`, `run_sse_server`, `dispatch_tool`). + * Redundant CLI parsing and pre-flight checks removed from `server.py`. +* **[x] Ensure Consistent Client Initialization:** + * `get_storage_client()` is in `mcp_agent/gcp_tools/storage.py`. + * `get_bq_client()` is in `mcp_agent/gcp_tools/bigquery.py`. + * `cli.py` calls these for pre-flight checks. ## priority-3 Code Quality & Best Practices General improvements for maintainability. -* **[ ] Consistent Logging Names:** - * Ensure loggers use consistent naming conventions, e.g., `logging.getLogger("mcp_agent.gcp_tools.storage")` in `storage.py` and `logging.getLogger("mcp_agent.gcp_tools.bigquery")` in `bigquery.py`. -* **[ ] Type Hinting:** - * Review and enhance type hints across the codebase for better clarity and static analysis. Particularly for `BqJobInfo` and complex dictionary structures. -* **[ ] Error Handling:** - * Review error handling in tool implementations to ensure appropriate error messages are returned to the MCP client. -* **[ ] Configuration Management:** - * Verify that all configurable parts (e.g., Firestore project/database if not default) are handled cleanly, possibly via environment variables or CLI arguments if necessary. +* **[x] Consistent Logging Names:** + * Verified and ensured consistent logger names across modules. +* **[~] Type Hinting:** + * Added and reviewed type hints in new and modified code. Further enhancements for complex dicts can be a future task. +* **[x] Error Handling:** + * Reviewed error handling in new tool implementations; uses `format_error` and `handle_gcp_error`. +* **[x] Configuration Management:** + * Verified handling of GCP project, API key. + * Made BQ poller interval configurable via CLI. ## priority-4 Documentation Updates to reflect the fixed and intended state. -* **[ ] Update `README.md`:** - * Clarify the roles of `cli.py` and the actual MCP server core logic (e.g., `run_stdio_server`, `run_sse_server` - specify if they are from an external library or should be part of this project's codebase). - * Correct file path references if they change significantly (e.g., `job_store.py` singular vs. plural, location of tool definitions). - * Ensure the "How it Works" section accurately reflects the (corrected) codebase. -* **[ ] Inline Code Comments:** - * Add/improve comments in complex sections of the code, especially around Firestore interactions and the BQ polling logic. +* **[x] Update `README.md`:** + * Updated "How it Works", "Tool Reference", "Installation", and "Usage" sections to reflect current architecture and implemented tools. +* **[x] Inline Code Comments:** + * Added/improved comments in `server.py`, `bq_poller.py`, `job_store.py`, and `cli.py`. +* **[ ] Define MCP Tool Schemas:** (New task, moved from P2 "Consolidate Tool Definitions") + * Populate `mcp_agent/gcp_tools/__init__.py` (or a separate schemas file) with `mcp_types.Tool` definitions for all implemented stateless tools. This is needed for proper client-side tool advertisement and argument understanding. ## Unknowns / Needs Clarification -* **Origin of `run_stdio_server` and `run_sse_server`:** - * `cli.py` imports these from `.server`. Are these functions expected to be defined within `mcp_agent/server.py`, or are they provided by an external `mcp.server` library that this project uses? If they are meant to be in this project, their definitions are missing. This needs clarification to understand the full scope of the server implementation. - * The `README.md`'s description of `server.py` as the "MCP Server Core" adds to this confusion. +* **[x] Origin of `run_stdio_server` and `run_sse_server`:** + * These have been implemented in `mcp_agent/server.py`. + +## New Potential Tasks (Post-MVP) + +* **[ ] Thorough Testing:** Implement unit and integration tests for tools, job store, poller, and server logic. +* **[ ] Advanced Error Handling:** More granular error codes/types in MCP responses. +* **[ ] Tool Schema Generation:** Automate or more systematically define MCP tool schemas in `gcp_tools/__init__.py`. +* **[ ] Configuration for Firestore Collection:** Allow `mcp_agent_bq_jobs` collection name to be configurable. +* **[ ] More Robust Pre-flight Checks:** Deeper checks for IAM permissions if possible. +* **[ ] Scalability Testing for Poller:** Evaluate poller performance under high load. This TODO list should guide the process of making `mcp-agent` a functional and maintainable tool.