Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions src/ros2_medkit_mcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,118 @@ async def delete_all_configurations(
"""
return await self._request("DELETE", f"/{entity_type}/{entity_id}/configurations")

# ==================== Bulk Data ====================

async def list_bulk_data_categories(
self, entity_id: str, entity_type: str = "apps"
) -> list[str]:
"""List available bulk-data categories for an entity.

Args:
entity_id: The entity identifier.
entity_type: Entity type ('components', 'apps', 'areas', 'functions').

Returns:
List of category names (e.g., ['rosbags', 'logs']).
"""
result = await self._request("GET", f"/{entity_type}/{entity_id}/bulk-data")
if isinstance(result, dict) and "items" in result:
return result["items"]
if isinstance(result, list):
return result
return []

async def list_bulk_data(
self, entity_id: str, category: str, entity_type: str = "apps"
) -> list[dict[str, Any]]:
"""List bulk-data items in a category.

Args:
entity_id: The entity identifier.
category: Category name (e.g., 'rosbags').
entity_type: Entity type ('components', 'apps', 'areas', 'functions').

Returns:
List of bulk data item dictionaries.
"""
result = await self._request("GET", f"/{entity_type}/{entity_id}/bulk-data/{category}")
if isinstance(result, dict) and "items" in result:
return result["items"]
if isinstance(result, list):
return result
return []

async def get_bulk_data_info(self, bulk_data_uri: str) -> dict[str, Any]:
"""Get metadata about a bulk-data item via HEAD request.

Args:
bulk_data_uri: Full bulk-data URI path.

Returns:
Dictionary with Content-Type, Content-Length, filename.
"""
client = await self._ensure_client()
response = await client.head(bulk_data_uri)

if not response.is_success:
raise SovdClientError(
message=f"Bulk data not found: {bulk_data_uri} (HTTP {response.status_code})",
status_code=response.status_code,
)

headers = response.headers
content_disposition = headers.get("Content-Disposition", "")
filename = None
if "filename=" in content_disposition:
import re

match = re.search(r'filename="?([^"]+)"?', content_disposition)
if match:
filename = match.group(1)

return {
"content_type": headers.get("Content-Type", "application/octet-stream"),
"content_length": headers.get("Content-Length"),
"filename": filename,
"uri": bulk_data_uri,
}

async def download_bulk_data(self, bulk_data_uri: str) -> tuple[bytes, str | None]:
"""Download a bulk-data file.

Args:
bulk_data_uri: Relative bulk-data URI path (must start with /).

Returns:
Tuple of (file_content, filename).

Raises:
ValueError: If the URI is an absolute URL (SSRF protection).
"""
# SSRF protection: reject absolute URLs - only allow relative paths
if bulk_data_uri.startswith(("http://", "https://", "//")):
raise ValueError(f"Absolute URLs not allowed for bulk data download: {bulk_data_uri}")

client = await self._ensure_client()
response = await client.get(bulk_data_uri, timeout=httpx.Timeout(300.0))

if not response.is_success:
raise SovdClientError(
message=f"Download failed: {response.status_code}",
status_code=response.status_code,
)

content_disposition = response.headers.get("Content-Disposition", "")
filename = None
if "filename=" in content_disposition:
import re

match = re.search(r'filename="?([^"]+)"?', content_disposition)
if match:
filename = match.group(1)

return response.content, filename


@asynccontextmanager
async def create_client(settings: Settings) -> AsyncIterator[SovdClient]:
Expand Down
Loading