From 90f436644868039d8a5fa4711ce050221de708f3 Mon Sep 17 00:00:00 2001 From: Matte Noble Date: Thu, 11 Dec 2025 15:17:46 -0800 Subject: [PATCH] feat(keycardai-mcp): Add greater control over OAuth metadata location - Refactors `auth_metadata_mount` into it's component parts - Exposes mounts for individual metadata - Allows the user to specify exactly where their OAuth metadata is exposed - NOTE: This is only for advanced use cases where you know you need something non-standard. Otherwise, follow the OAuth spec. --- README.md | 149 ++++++- .../keycardai/mcp/server/routers/metadata.py | 219 +++++++++-- .../mcp/tests/integration/test_metadata.py | 362 ++++++++++++++++++ .../keycardai/mcp/server/routers/__init__.py | 0 .../mcp/server/routers/test_metadata.py | 350 +++++++++++++++++ 5 files changed, 1038 insertions(+), 42 deletions(-) create mode 100644 packages/mcp/tests/integration/test_metadata.py create mode 100644 packages/mcp/tests/keycardai/mcp/server/routers/__init__.py create mode 100644 packages/mcp/tests/keycardai/mcp/server/routers/test_metadata.py diff --git a/README.md b/README.md index bf4dc54..6ca36bd 100644 --- a/README.md +++ b/README.md @@ -181,7 +181,7 @@ if __name__ == "__main__": import os from fastmcp import FastMCP, Context from keycardai.mcp.integrations.fastmcp import ( - AuthProvider, + AuthProvider, AccessContext, ClientSecret ) @@ -209,11 +209,11 @@ mcp = FastMCP("My Secure FastMCP Server", auth=auth) def call_external_api(ctx: Context, query: str) -> str: # Get access context to check token exchange status access_context: AccessContext = ctx.get_state("keycardai") - + # Check for errors before accessing token if access_context.has_errors(): return f"Error: Failed to obtain access token - {access_context.get_errors()}" - + # Access delegated token through context namespace token = access_context.access("https://api.example.com").access_token # Use token to call external API @@ -239,6 +239,143 @@ Configure the remote MCP in your AI client, like [Cursor](https://cursor.com/?fr ### 🎉 Your MCP server is now protected with Keycard authentication! 🎉 +## Using FastAPI + +Mounting a FastMCP server into a larger FastAPI service introduces a few +gotchas, particularly related to the various OAuth metadata endpoints. + +### Standards Compliant Approach + +> [!NOTE] +> Most MCP clients expect standards-compliance. Follow this approach if you're +> using those clients or the official MCP SDKs. + +The OAuth spec declares that your metadata must be exposed at the root of your +service. + +``` +/.well-known/oauth-protected-resource +``` + +This causes a problem when you're mounting multiple APIs or MCP servers to a +common FastAPI service. Each API or MCP Server will potentially have their own +OAuth metadata. + +The OAuth spec defines that the metadata for each individual service should be +exposed as an extension to the base `well-known` URI. For example: + +``` +/.well-known/oauth-protected-resource/api +/.well-known/oauth-protected-resource/mcp-server/mcp +``` + +To ensure FastMCP and FastAPI produce this, you need to ensure your routing is +defined in a specific way: + +```python +from fastmcp import FastMCP +from fastapi import FastAPI + +mcp = FastMCP("MCP Server") +mcp_app = mcp.http_app() # DO NOT specify a path here + +app = FastAPI(title="API", lifespan=mcp_app.lifespan) + +# You MUST mount the MCP's `http_app` to the full path for FastMCP to expose the +# OAuth metadata correctly. +app.mount("/mcp-server/mcp", mcp_app) +``` + +### Custom, Non Standards Compliant, Approach + +> ![WARNING] +> **This is not advised.** Only follow this if you know for sure you need +> flexibility outside of what the spec requires. + +If you've built custom clients or need to mount the metadata at a different, non +standards compliant, location, you can do that manually. + +#### Mounting at a Custom Root + +```python +from fastmcp import FastMCP +from fastapi import FastAPI +from keycardai.mcp.server.routers.metadata import well_known_metadata_mount + +auth_provider = AuthProvider( + zone_id="your-zone-id", # Get this from keycard.ai + mcp_server_name="My Secure FastMCP Server", + mcp_base_url="http://127.0.0.1:8000/" +) + +auth = auth_provider.get_remote_auth_provider() + +mcp = FastMCP("MCP Server", auth=auth) +mcp_app = mcp.http_app() + +app = FastAPI(title="API", lifespan=mcp_app.lifespan) + +app.mount( + "/custom-well-known", + well_known_metadata_mount(issuer=auth.zone_url), +) +``` + +which will produce the following endpoints + +``` +/custom-well-known/oauth-protected-resource +/custom-well-known/oauth-authorization-server +``` + +#### Mounting at a Specific URI + +If you need even more control, you can mount the individual routes at a specific +URI. + +```python +from fastmcp import FastMCP +from fastapi import FastAPI +from keycardai.mcp.server.routers.metadata import ( + well_known_authorization_server_route, + well_known_protected_resource_route, +) + +auth_provider = AuthProvider( + zone_id="your-zone-id", # Get this from keycard.ai + mcp_server_name="My Secure FastMCP Server", + mcp_base_url="http://127.0.0.1:8000/" +) + +auth = auth_provider.get_remote_auth_provider() + +mcp = FastMCP("MCP Server", auth=auth) +mcp_app = mcp.http_app() + +app = FastAPI(title="API", lifespan=mcp_app.lifespan) + +app.router.routes.append( + well_known_protected_resource_route( + path="/my/custom/path/to/well-known/oauth-protected-resource", + issuer=auth.zone_url, + ) +) + +app.router.routes.append( + well_known_authorization_server_route( + path="/my/custom/path/to/well-known/oauth-authorization-server", + issuer=auth.zone_url, + ) +) +``` + +which will produce the following endpoints + +``` +/my/custom/path/to/well-known/oauth-protected-resource +/my/custom/path/to/well-known/oauth-authorization-server +``` + ## Features ### Delegated Access @@ -248,11 +385,10 @@ Keycard allows MCP servers to access other resources on behalf of users with aut #### Setup Protected Resources 1. **Configure credential provider** (e.g., Google Workspace) -2. **Configure protected resource** (e.g., Google Drive API) +2. **Configure protected resource** (e.g., Google Drive API) 3. **Set MCP server dependencies** to allow delegated access 4. **Create client secret identity** for secure authentication - ## Overview This workspace contains multiple Python packages that provide various Keycard functionality: @@ -314,6 +450,7 @@ pip install ./packages/mcp-fastmcp ## Documentation Comprehensive documentation is available at our [documentation site](https://docs.keycard.ai), including: + - API reference for all packages - Usage examples and tutorials - Integration guides @@ -381,4 +518,4 @@ For questions, issues, or support: - GitHub Issues: [https://github.com/keycardai/python-sdk/issues](https://github.com/keycardai/python-sdk/issues) - Documentation: [https://docs.keycardai.com](https://docs.keycard.ai/) -- Email: support@keycard.ai \ No newline at end of file +- Email: support@keycard.ai diff --git a/packages/mcp/src/keycardai/mcp/server/routers/metadata.py b/packages/mcp/src/keycardai/mcp/server/routers/metadata.py index ab62f45..3e58bbf 100644 --- a/packages/mcp/src/keycardai/mcp/server/routers/metadata.py +++ b/packages/mcp/src/keycardai/mcp/server/routers/metadata.py @@ -16,41 +16,176 @@ from ..middleware import BearerAuthMiddleware -def auth_metadata_mount(issuer: str, enable_multi_zone: bool = False, jwks: JsonWebKeySet | None = None) -> Mount: - inferred_metadata = InferredProtectedResourceMetadata( - authorization_servers=[issuer], +def auth_metadata_mount( + issuer: str, + enable_multi_zone: bool = False, + jwks: JsonWebKeySet | None = None, +) -> Mount: + """Create a Starlette Mount for OAuth metadata endpoints at the standard /.well-known path. + + Args: + issuer: The OAuth issuer URL used for authorization server metadata. + enable_multi_zone: Whether to enable multi-zone support for metadata endpoints. + When enabled, metadata responses may include zone-specific information. + jwks: Optional JSON Web Key Set to expose at the /.well-known/jwks.json endpoint. + If not provided, no JWKS route will be created. + + Returns: + A Starlette Mount containing the well-known metadata routes. + """ + return well_known_metadata_mount( + path="/.well-known", + issuer=issuer, + resource="{resource_path:path}", + enable_multi_zone=enable_multi_zone, + jwks=jwks, ) - routes = [ - Route( - "/oauth-protected-resource{resource_path:path}", - protected_resource_metadata( - inferred_metadata, - enable_multi_zone=enable_multi_zone, - ), - name="oauth-protected-resource", + + +def well_known_metadata_mount( + issuer: str, + path: str, + resource: str = "", + enable_multi_zone: bool = False, + jwks: JsonWebKeySet | None = None, +) -> Mount: + """Create a Starlette Mount for OAuth metadata endpoints at a custom path. + + Args: + issuer: The OAuth issuer URL used for authorization server metadata. + path: The base path where the mount will be attached (e.g., "/.well-known"). + resource: Optional resource path suffix for metadata routes. + enable_multi_zone: Whether to enable multi-zone support for metadata endpoints. + jwks: Optional JSON Web Key Set to expose at the jwks.json endpoint. + + Returns: + A Starlette Mount containing the well-known metadata routes. + """ + return Mount( + path=path, + routes=well_known_metadata_routes( + issuer=issuer, + enable_multi_zone=enable_multi_zone, + jwks=jwks, + resource=resource, ), - Route( - "/oauth-authorization-server{resource_path:path}", - authorization_server_metadata( - issuer, enable_multi_zone=enable_multi_zone - ), - name="oauth-authorization-server", - ), + ) + + +def well_known_metadata_routes( + issuer: str, + enable_multi_zone: bool = False, + jwks: JsonWebKeySet | None = None, + resource: str = "", +) -> list[Route]: + """Create a list of Starlette Routes for OAuth well-known metadata endpoints. + + Args: + issuer: The OAuth issuer URL used for authorization server metadata. + enable_multi_zone: Whether to enable multi-zone support for metadata endpoints. + jwks: Optional JSON Web Key Set to expose. If provided, adds a JWKS route. + resource: Optional resource path prefix (currently unused in route creation). + + Returns: + A list of Starlette Route objects for the well-known endpoints. + """ + routes = [ + well_known_protected_resource_route(issuer, enable_multi_zone), + well_known_authorization_server_route(issuer, enable_multi_zone), ] if jwks: - routes.append( - Route( - "/jwks.json", - jwks_endpoint(jwks), - name="jwks", - ) - ) + routes.append(well_known_jwks_route(jwks)) - return Mount( - path="/.well-known", - routes=routes, - name="well-known", + return routes + + +def well_known_protected_resource_route( + issuer: str, + enable_multi_zone: bool = False, + resource: str = "/oauth-protected-resource", +) -> Route: + """Create a Starlette Route for the OAuth Protected Resource Metadata endpoint. + + This endpoint follows RFC 9728 and exposes metadata about the protected resource, + including which authorization servers can be used to obtain access tokens. + + Args: + issuer: The OAuth issuer URL, added to the authorization_servers list + in the protected resource metadata response. + enable_multi_zone: Whether to enable multi-zone support. When enabled, + the metadata response may include zone-specific information. + resource: The path for this route. Defaults to "/oauth-protected-resource" + as per the well-known URI convention. + + Returns: + A Starlette Route for the protected resource metadata endpoint. + """ + inferred_metadata = InferredProtectedResourceMetadata( + authorization_servers=[issuer], + ) + + return Route( + resource, + protected_resource_metadata( + inferred_metadata, + enable_multi_zone=enable_multi_zone, + ), + name="oauth-protected-resource", + ) + + +def well_known_authorization_server_route( + issuer: str, + + enable_multi_zone: bool = False, + resource: str = "/oauth-authorization-server", +) -> Route: + """Create a Starlette Route for the OAuth Authorization Server Metadata endpoint. + + This endpoint follows RFC 8414 and exposes metadata about the authorization server, + enabling clients to discover OAuth endpoints and capabilities dynamically. + + Args: + issuer: The OAuth issuer URL, used as the issuer identifier in the + authorization server metadata response. + enable_multi_zone: Whether to enable multi-zone support. When enabled, + the metadata response may include zone-specific information. + resource: The path for this route. Defaults to "/oauth-authorization-server" + as per the well-known URI convention. + + Returns: + A Starlette Route for the authorization server metadata endpoint. + """ + return Route( + resource, + authorization_server_metadata( + issuer, + enable_multi_zone=enable_multi_zone, + ), + name="oauth-authorization-server", + ) + + +def well_known_jwks_route(jwks: JsonWebKeySet) -> Route: + """Create a Starlette Route for the JSON Web Key Set (JWKS) endpoint. + + This endpoint exposes the public keys used for token verification, + allowing clients to validate JWT signatures. The endpoint is typically + served at /.well-known/jwks.json. + + Args: + jwks: The JSON Web Key Set containing public keys to expose. + This should contain the public keys corresponding to the + private keys used for signing tokens. + + Returns: + A Starlette Route for the JWKS endpoint at "/jwks.json". + """ + return Route( + "/jwks.json", + jwks_endpoint(jwks), + name="jwks", ) @@ -63,17 +198,29 @@ def protected_mcp_router( ) -> Sequence[Route]: """Create a protected MCP router with authentication middleware. - This function creates the routing structure needed for a protected MCP server, - including OAuth metadata endpoints and the main MCP application with authentication. + This function creates the complete routing structure needed for a protected + MCP server, including OAuth metadata endpoints and the main MCP application + wrapped with bearer token authentication middleware. + + The router includes: + - OAuth well-known metadata endpoints (protected resource, authorization server) + - Optional JWKS endpoint for token verification + - The MCP application protected by BearerAuthMiddleware Args: - issuer: The OAuth issuer URL (zone URL) - mcp_app: The MCP FastMCP streamable HTTP application - verifier: Token verifier for authentication middleware - enable_multi_zone: Whether to enable multi-zone support + issuer: The OAuth issuer URL (zone URL) used for metadata endpoints. + mcp_app: The MCP application (typically a FastMCP streamable HTTP app) + to be protected with authentication. + verifier: Token verifier instance used by the authentication middleware + to validate incoming bearer tokens. + enable_multi_zone: Whether to enable multi-zone support. When True, + the MCP app is mounted at "/{zone_id:str}" instead of "/". + jwks: Optional JSON Web Key Set to expose at the JWKS endpoint. + If provided, clients can fetch public keys for token verification. Returns: - Sequence of routes including metadata mount and protected MCP mount + A sequence of routes including the metadata mount and the protected + MCP application mount. """ routes = [ auth_metadata_mount(issuer, enable_multi_zone=enable_multi_zone, jwks=jwks), diff --git a/packages/mcp/tests/integration/test_metadata.py b/packages/mcp/tests/integration/test_metadata.py new file mode 100644 index 0000000..5788f97 --- /dev/null +++ b/packages/mcp/tests/integration/test_metadata.py @@ -0,0 +1,362 @@ +"""Integration tests for metadata router endpoints. + +These tests use Starlette's TestClient to verify actual HTTP responses +from the OAuth metadata endpoints. +""" + +from unittest.mock import Mock, patch + +import pytest +from starlette.applications import Starlette +from starlette.testclient import TestClient + +from keycardai.mcp.server.routers.metadata import ( + auth_metadata_mount, + well_known_metadata_mount, +) +from keycardai.oauth.types import JsonWebKey, JsonWebKeySet + + +class TestProtectedResourceMetadata: + """Integration tests for protected resource metadata endpoint.""" + + @pytest.fixture + def issuer(self): + return "https://auth.localdev.keycard.sh" + + @pytest.fixture + def app(self, issuer): + return Starlette( + routes=[ + well_known_metadata_mount(issuer=issuer, path="/.well-known"), + ] + ) + + @pytest.fixture + def client(self, app): + return TestClient(app) + + def test_returns_200(self, client): + """Test that protected resource endpoint returns 200 OK.""" + response = client.get("/.well-known/oauth-protected-resource") + assert response.status_code == 200 + + def test_contains_authorization_servers(self, issuer, client): + """Test that response contains authorization_servers with issuer.""" + response = client.get("/.well-known/oauth-protected-resource") + data = response.json() + + assert "authorization_servers" in data + assert isinstance(data["authorization_servers"], list) + assert len(data["authorization_servers"]) == 1 + assert f"{issuer}/" in data["authorization_servers"] + + def test_contains_resource_url(self, client): + """Test that response contains resource field derived from request.""" + response = client.get("/.well-known/oauth-protected-resource") + data = response.json() + + assert "resource" in data + assert "testserver" in data["resource"] + + def test_contains_jwks_uri(self, client): + """Test that response contains jwks_uri field.""" + response = client.get("/.well-known/oauth-protected-resource") + data = response.json() + + assert "jwks_uri" in data + assert "/.well-known/jwks.json" in data["jwks_uri"] + + def test_contains_client_id(self, client): + """Test that response contains client_id matching resource.""" + response = client.get("/.well-known/oauth-protected-resource") + data = response.json() + + assert "client_id" in data + assert data["client_id"] == data["resource"] + + def test_contains_grant_types(self, client): + """Test that response contains grant_types with client_credentials.""" + response = client.get("/.well-known/oauth-protected-resource") + data = response.json() + + assert "grant_types" in data + assert "client_credentials" in data["grant_types"] + + def test_contains_token_endpoint_auth_method(self, client): + """Test that token_endpoint_auth_method is private_key_jwt.""" + response = client.get("/.well-known/oauth-protected-resource") + data = response.json() + + assert "token_endpoint_auth_method" in data + assert data["token_endpoint_auth_method"] == "private_key_jwt" + + +class TestAuthorizationServerMetadata: + """Integration tests for authorization server metadata endpoint.""" + + @pytest.fixture + def issuer(self): + return "https://auth.localdev.keycard.sh" + + @pytest.fixture + def mock_upstream_response(self, issuer): + return { + "issuer": issuer, + "authorization_endpoint": f"{issuer}/oauth/authorize", + "token_endpoint": f"{issuer}/oauth/token", + "jwks_uri": f"{issuer}/.well-known/jwks.json", + "response_types_supported": ["code"], + "grant_types_supported": ["authorization_code", "client_credentials"], + } + + @pytest.fixture + def app(self, issuer): + return Starlette( + routes=[ + well_known_metadata_mount(issuer=issuer, path="/.well-known"), + ] + ) + + @pytest.fixture + def client(self, app): + return TestClient(app) + + @patch("httpx.Client") + def test_returns_200_on_success( + self, mock_client_class, client, mock_upstream_response + ): + """Test that endpoint returns 200 when upstream responds successfully.""" + mock_response = Mock() + mock_response.json.return_value = mock_upstream_response + mock_response.raise_for_status.return_value = None + + mock_client = Mock() + mock_client.get.return_value = mock_response + mock_client_class.return_value.__enter__.return_value = mock_client + + response = client.get("/.well-known/oauth-authorization-server") + assert response.status_code == 200 + + @patch("httpx.Client") + def test_proxies_upstream_metadata( + self, mock_client_class, client, issuer, mock_upstream_response + ): + """Test that endpoint returns metadata from upstream server.""" + mock_response = Mock() + mock_response.json.return_value = mock_upstream_response + mock_response.raise_for_status.return_value = None + + mock_client = Mock() + mock_client.get.return_value = mock_response + mock_client_class.return_value.__enter__.return_value = mock_client + + response = client.get("/.well-known/oauth-authorization-server") + data = response.json() + + assert data["issuer"] == issuer + assert data["authorization_endpoint"] == f"{issuer}/oauth/authorize" + assert data["token_endpoint"] == f"{issuer}/oauth/token" + + @patch("httpx.Client") + def test_preserves_all_upstream_fields( + self, mock_client_class, client, mock_upstream_response + ): + """Test that all metadata fields from upstream are preserved.""" + mock_response = Mock() + mock_response.json.return_value = mock_upstream_response + mock_response.raise_for_status.return_value = None + + mock_client = Mock() + mock_client.get.return_value = mock_response + mock_client_class.return_value.__enter__.return_value = mock_client + + response = client.get("/.well-known/oauth-authorization-server") + data = response.json() + + assert "jwks_uri" in data + assert "response_types_supported" in data + assert "grant_types_supported" in data + + +class TestJwksEndpoint: + """Integration tests for JWKS endpoint.""" + + @pytest.fixture + def issuer(self): + return "https://auth.localdev.keycard.sh" + + @pytest.fixture + def jwks(self): + return JsonWebKeySet( + keys=[ + JsonWebKey( + kty="RSA", + kid="test-key-1", + use="sig", + n="0vx7agoebGcQSuuPiLJXZptN9nndrQmbXEps2aiAFbWhM78LhWx4cbbfAAtVT86zwu1RK7aPFFxuhDR1L6tSoc_BJECPebWKRXjBZCiFV4n3oknjhMstn64tZ_2W-5JsGY4Hc5n9yBXArwl93lqt7_RN5w6Cf0h4QyQ5v-65YGjQR0_FDW2QvzqY368QQMicAtaSqzs8KJZgnYb9c7d0zgdAZHzu6qMQvRL5hajrn1n91CbOpbISD08qNLyrdkt-bFTWhAI4vMQFh6WeZu0fM4lFd2NcRwr3XPksINHaQ-G_xBniIqbw0Ls1jF44-csFCur-kEgU8awapJzKnqDKgw", + e="AQAB", + ) + ] + ) + + @pytest.fixture + def app(self, issuer, jwks): + return Starlette( + routes=[ + well_known_metadata_mount( + issuer=issuer, path="/.well-known", jwks=jwks + ), + ] + ) + + @pytest.fixture + def client(self, app): + return TestClient(app) + + def test_returns_200(self, client): + """Test that JWKS endpoint returns 200 OK.""" + response = client.get("/.well-known/jwks.json") + assert response.status_code == 200 + + def test_returns_json_content_type(self, client): + """Test that response has JSON content type.""" + response = client.get("/.well-known/jwks.json") + assert response.headers["content-type"] == "application/json" + + def test_contains_keys_array(self, client): + """Test that response contains keys array.""" + response = client.get("/.well-known/jwks.json") + data = response.json() + + assert "keys" in data + assert isinstance(data["keys"], list) + assert len(data["keys"]) == 1 + + def test_key_has_required_fields(self, client): + """Test that key contains required JWK fields.""" + response = client.get("/.well-known/jwks.json") + data = response.json() + key = data["keys"][0] + + assert key["kty"] == "RSA" + assert key["kid"] == "test-key-1" + assert key["use"] == "sig" + + def test_rsa_key_has_public_components(self, client): + """Test that RSA key has n and e components.""" + response = client.get("/.well-known/jwks.json") + data = response.json() + key = data["keys"][0] + + assert "n" in key + assert "e" in key + assert key["e"] == "AQAB" + + +class TestJwksEndpointEmpty: + """Test JWKS endpoint with empty keys.""" + + @pytest.fixture + def app(self): + return Starlette( + routes=[ + well_known_metadata_mount( + issuer="https://auth.localdev.keycard.sh", + path="/.well-known", + jwks=JsonWebKeySet(keys=[]), + ), + ] + ) + + @pytest.fixture + def client(self, app): + return TestClient(app) + + def test_returns_empty_keys_array(self, client): + """Test that empty JWKS returns empty keys array.""" + response = client.get("/.well-known/jwks.json") + + assert response.status_code == 200 + data = response.json() + assert data["keys"] == [] + + +class TestAuthMetadataMount: + """Integration tests for auth_metadata_mount convenience function.""" + + @pytest.fixture + def issuer(self): + return "https://auth.localdev.keycard.sh" + + @pytest.fixture + def jwks(self): + return JsonWebKeySet( + keys=[ + JsonWebKey( + kty="RSA", + kid="mount-test-key", + use="sig", + n="test-modulus", + e="AQAB", + ) + ] + ) + + @pytest.fixture + def app(self, issuer, jwks): + mount = auth_metadata_mount(issuer=issuer, jwks=jwks) + return Starlette(routes=[mount]) + + @pytest.fixture + def client(self, app): + return TestClient(app) + + def test_protected_resource_accessible(self, client): + """Test that protected resource endpoint is accessible via mount.""" + response = client.get("/.well-known/oauth-protected-resource") + assert response.status_code == 200 + + def test_jwks_accessible(self, client): + """Test that JWKS endpoint is accessible via mount.""" + response = client.get("/.well-known/jwks.json") + assert response.status_code == 200 + + def test_jwks_returns_configured_key(self, client): + """Test that JWKS returns the configured key.""" + response = client.get("/.well-known/jwks.json") + data = response.json() + + assert data["keys"][0]["kid"] == "mount-test-key" + + def test_protected_resource_has_correct_issuer(self, issuer, client): + """Test that protected resource points to correct authorization server.""" + response = client.get("/.well-known/oauth-protected-resource") + data = response.json() + + assert f"{issuer}/" in data["authorization_servers"] + + +class TestMultiZone: + """Integration tests for multi-zone functionality.""" + + @pytest.fixture + def issuer(self): + return "https://keycard.cloud" + + @pytest.fixture + def app(self, issuer): + mount = auth_metadata_mount(issuer=issuer, enable_multi_zone=True) + return Starlette(routes=[mount]) + + @pytest.fixture + def client(self, app): + return TestClient(app) + + def test_without_zone_uses_base_issuer(self, issuer, client): + """Test that request without zone ID uses base issuer.""" + response = client.get("/.well-known/oauth-protected-resource") + + assert response.status_code == 200 + data = response.json() + assert f"{issuer}/" in data["authorization_servers"] diff --git a/packages/mcp/tests/keycardai/mcp/server/routers/__init__.py b/packages/mcp/tests/keycardai/mcp/server/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/mcp/tests/keycardai/mcp/server/routers/test_metadata.py b/packages/mcp/tests/keycardai/mcp/server/routers/test_metadata.py new file mode 100644 index 0000000..bb820ac --- /dev/null +++ b/packages/mcp/tests/keycardai/mcp/server/routers/test_metadata.py @@ -0,0 +1,350 @@ +"""Unit tests for metadata router functions. + +These tests verify the route and mount creation for OAuth metadata endpoints. +""" + +from unittest.mock import Mock + +from starlette.routing import Mount, Route + +from keycardai.mcp.server.routers.metadata import ( + auth_metadata_mount, + protected_mcp_router, + well_known_authorization_server_route, + well_known_jwks_route, + well_known_metadata_mount, + well_known_metadata_routes, + well_known_protected_resource_route, +) + + +class TestAuthMetadataMount: + """Tests for auth_metadata_mount function.""" + + def test_returns_mount_instance(self): + """Test that the function returns a Starlette Mount.""" + issuer = "https://auth.example.com" + result = auth_metadata_mount(issuer) + assert isinstance(result, Mount) + + def test_mount_path_is_well_known(self): + """Test that mount is at /.well-known path.""" + issuer = "https://auth.example.com" + result = auth_metadata_mount(issuer) + assert result.path == "/.well-known" + + def test_contains_protected_resource_route(self): + """Test that mount contains protected resource route.""" + issuer = "https://auth.example.com" + result = auth_metadata_mount(issuer) + route_names = [r.name for r in result.routes if hasattr(r, "name")] + assert "oauth-protected-resource" in route_names + + def test_contains_authorization_server_route(self): + """Test that mount contains authorization server route.""" + issuer = "https://auth.example.com" + result = auth_metadata_mount(issuer) + route_names = [r.name for r in result.routes if hasattr(r, "name")] + assert "oauth-authorization-server" in route_names + + def test_without_jwks_no_jwks_route(self): + """Test that JWKS route is not included when jwks is None.""" + issuer = "https://auth.example.com" + result = auth_metadata_mount(issuer, jwks=None) + route_names = [r.name for r in result.routes if hasattr(r, "name")] + assert "jwks" not in route_names + + def test_with_jwks_includes_jwks_route(self): + """Test that JWKS route is included when jwks is provided.""" + issuer = "https://auth.example.com" + jwks = {"keys": [{"kty": "RSA", "kid": "test-key"}]} + result = auth_metadata_mount(issuer, jwks=jwks) + route_names = [r.name for r in result.routes if hasattr(r, "name")] + assert "jwks" in route_names + + def test_enable_multi_zone_parameter_passed(self): + """Test that enable_multi_zone parameter is accepted.""" + issuer = "https://auth.example.com" + # Should not raise + result = auth_metadata_mount(issuer, enable_multi_zone=True) + assert isinstance(result, Mount) + + +class TestWellKnownMetadataMount: + """Tests for well_known_metadata_mount function.""" + + def test_returns_mount_instance(self): + """Test that the function returns a Starlette Mount.""" + issuer = "https://auth.example.com" + result = well_known_metadata_mount(issuer, path="/custom-path") + assert isinstance(result, Mount) + + def test_custom_path(self): + """Test that mount uses the provided custom path.""" + issuer = "https://auth.example.com" + custom_path = "/custom/.well-known" + result = well_known_metadata_mount(issuer, path=custom_path) + assert result.path == custom_path + + def test_default_resource_empty_string(self): + """Test that default resource parameter is empty string.""" + issuer = "https://auth.example.com" + result = well_known_metadata_mount(issuer, path="/.well-known") + assert isinstance(result, Mount) + # Routes should still be present + assert len(result.routes) >= 2 + + def test_with_resource_parameter(self): + """Test with custom resource parameter.""" + issuer = "https://auth.example.com" + result = well_known_metadata_mount( + issuer, path="/.well-known", resource="/custom-resource" + ) + assert isinstance(result, Mount) + + def test_with_all_parameters(self): + """Test with all parameters provided.""" + issuer = "https://auth.example.com" + jwks = {"keys": []} + result = well_known_metadata_mount( + issuer=issuer, + path="/api/.well-known", + resource="/resource", + enable_multi_zone=True, + jwks=jwks, + ) + assert isinstance(result, Mount) + assert result.path == "/api/.well-known" + + +class TestWellKnownMetadataRoutes: + """Tests for well_known_metadata_routes function.""" + + def test_returns_list_of_routes(self): + """Test that the function returns a list of Route objects.""" + issuer = "https://auth.example.com" + result = well_known_metadata_routes(issuer) + assert isinstance(result, list) + assert all(isinstance(r, Route) for r in result) + + def test_contains_two_routes_without_jwks(self): + """Test that two routes are returned when no JWKS is provided.""" + issuer = "https://auth.example.com" + result = well_known_metadata_routes(issuer) + assert len(result) == 2 + + def test_contains_three_routes_with_jwks(self): + """Test that three routes are returned when JWKS is provided.""" + issuer = "https://auth.example.com" + jwks = {"keys": []} + result = well_known_metadata_routes(issuer, jwks=jwks) + assert len(result) == 3 + + def test_route_names(self): + """Test that routes have correct names.""" + issuer = "https://auth.example.com" + result = well_known_metadata_routes(issuer) + route_names = {r.name for r in result} + assert "oauth-protected-resource" in route_names + assert "oauth-authorization-server" in route_names + + def test_jwks_route_name_when_provided(self): + """Test that JWKS route has correct name when provided.""" + issuer = "https://auth.example.com" + jwks = {"keys": []} + result = well_known_metadata_routes(issuer, jwks=jwks) + route_names = {r.name for r in result} + assert "jwks" in route_names + + def test_enable_multi_zone_parameter(self): + """Test that enable_multi_zone parameter is accepted.""" + issuer = "https://auth.example.com" + result = well_known_metadata_routes(issuer, enable_multi_zone=True) + assert len(result) == 2 + + +class TestWellKnownProtectedResourceRoute: + """Tests for well_known_protected_resource_route function.""" + + def test_returns_route_instance(self): + """Test that the function returns a Starlette Route.""" + issuer = "https://auth.example.com" + result = well_known_protected_resource_route(issuer) + assert isinstance(result, Route) + + def test_default_path(self): + """Test that default path is /oauth-protected-resource.""" + issuer = "https://auth.example.com" + result = well_known_protected_resource_route(issuer) + assert result.path == "/oauth-protected-resource" + + def test_custom_path(self): + """Test with custom resource path.""" + issuer = "https://auth.example.com" + custom_path = "/custom-protected-resource" + result = well_known_protected_resource_route(issuer, resource=custom_path) + assert result.path == custom_path + + def test_route_name(self): + """Test that route has correct name.""" + issuer = "https://auth.example.com" + result = well_known_protected_resource_route(issuer) + assert result.name == "oauth-protected-resource" + + def test_enable_multi_zone_parameter(self): + """Test that enable_multi_zone parameter is accepted.""" + issuer = "https://auth.example.com" + result = well_known_protected_resource_route(issuer, enable_multi_zone=True) + assert isinstance(result, Route) + + def test_route_has_endpoint(self): + """Test that route has an endpoint function.""" + issuer = "https://auth.example.com" + result = well_known_protected_resource_route(issuer) + assert result.endpoint is not None + assert callable(result.endpoint) + + +class TestWellKnownAuthorizationServerRoute: + """Tests for well_known_authorization_server_route function.""" + + def test_returns_route_instance(self): + """Test that the function returns a Starlette Route.""" + issuer = "https://auth.example.com" + result = well_known_authorization_server_route(issuer) + assert isinstance(result, Route) + + def test_default_path(self): + """Test that default path is /oauth-authorization-server.""" + issuer = "https://auth.example.com" + result = well_known_authorization_server_route(issuer) + assert result.path == "/oauth-authorization-server" + + def test_custom_path(self): + """Test with custom resource path.""" + issuer = "https://auth.example.com" + custom_path = "/custom-auth-server" + result = well_known_authorization_server_route(issuer, resource=custom_path) + assert result.path == custom_path + + def test_route_name(self): + """Test that route has correct name.""" + issuer = "https://auth.example.com" + result = well_known_authorization_server_route(issuer) + assert result.name == "oauth-authorization-server" + + def test_enable_multi_zone_parameter(self): + """Test that enable_multi_zone parameter is accepted.""" + issuer = "https://auth.example.com" + result = well_known_authorization_server_route(issuer, enable_multi_zone=True) + assert isinstance(result, Route) + + def test_route_has_endpoint(self): + """Test that route has an endpoint function.""" + issuer = "https://auth.example.com" + result = well_known_authorization_server_route(issuer) + assert result.endpoint is not None + assert callable(result.endpoint) + + +class TestWellKnownJwksRoute: + """Tests for well_known_jwks_route function.""" + + def test_returns_route_instance(self): + """Test that the function returns a Starlette Route.""" + jwks = {"keys": []} + result = well_known_jwks_route(jwks) + assert isinstance(result, Route) + + def test_path_is_jwks_json(self): + """Test that path is /jwks.json.""" + jwks = {"keys": []} + result = well_known_jwks_route(jwks) + assert result.path == "/jwks.json" + + def test_route_name(self): + """Test that route has correct name.""" + jwks = {"keys": []} + result = well_known_jwks_route(jwks) + assert result.name == "jwks" + + def test_route_has_endpoint(self): + """Test that route has an endpoint function.""" + jwks = {"keys": []} + result = well_known_jwks_route(jwks) + assert result.endpoint is not None + assert callable(result.endpoint) + + def test_with_populated_jwks(self): + """Test with a populated JWKS.""" + jwks = { + "keys": [ + { + "kty": "RSA", + "kid": "test-key-1", + "use": "sig", + "n": "0vx7agoebGcQSuuPiLJXZptN9nndrQmbXEps2aiAFbWhM78LhWx4cbbfAAtVT86zwu1RK7aPFFxuhDR1L6tSoc_BJECPebWKRXjBZCiFV4n3oknjhMstn64tZ_2W-5JsGY4Hc5n9yBXArwl93lqt7_RN5w6Cf0h4QyQ5v-65YGjQR0_FDW2QvzqY368QQMicAtaSqzs8KJZgnYb9c7d0zgdAZHzu6qMQvRL5hajrn1n91CbOpbISD08qNLyrdkt-bFTWhAI4vMQFh6WeZu0fM4lFd2NcRwr3XPksINHaQ-G_xBniIqbw0Ls1jF44-csFCur-kEgU8awapJzKnqDKgw", + "e": "AQAB", + } + ] + } + result = well_known_jwks_route(jwks) + assert isinstance(result, Route) + + +class TestEdgeCases: + """Test edge cases and parameter combinations.""" + + def test_issuer_with_trailing_slash(self): + """Test issuer URL with trailing slash.""" + issuer = "https://auth.example.com/" + result = auth_metadata_mount(issuer) + assert isinstance(result, Mount) + + def test_issuer_with_path(self): + """Test issuer URL with path.""" + issuer = "https://auth.example.com/oauth" + result = auth_metadata_mount(issuer) + assert isinstance(result, Mount) + + def test_issuer_with_port(self): + """Test issuer URL with port.""" + issuer = "https://auth.example.com:8443" + result = auth_metadata_mount(issuer) + assert isinstance(result, Mount) + + def test_http_issuer(self): + """Test with HTTP issuer (development scenario).""" + issuer = "http://localhost:8000" + result = auth_metadata_mount(issuer) + assert isinstance(result, Mount) + + def test_empty_jwks_keys(self): + """Test with empty JWKS keys array.""" + issuer = "https://auth.example.com" + jwks = {"keys": []} + result = well_known_metadata_routes(issuer, jwks=jwks) + assert len(result) == 3 # Should still include JWKS route + + def test_all_parameters_combined(self): + """Test with all parameters provided.""" + mock_app = Mock() + mock_verifier = Mock() + jwks = {"keys": [{"kty": "RSA", "kid": "test"}]} + + result = protected_mcp_router( + issuer="https://auth.example.com", + mcp_app=mock_app, + verifier=mock_verifier, + enable_multi_zone=True, + jwks=jwks, + ) + + assert len(result) == 2 + # Metadata mount with JWKS + metadata_mount = result[0] + route_names = [r.name for r in metadata_mount.routes if hasattr(r, "name")] + assert "jwks" in route_names + # Multi-zone MCP mount + mcp_mount = result[1] + assert mcp_mount.path == "/{zone_id:str}"