From f343f4babdae8e20115ca822f79248891c4f1a55 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Thu, 5 Feb 2026 18:20:15 +0000 Subject: [PATCH 1/3] feat(fault-tools): Add Pydantic response models and LLM-friendly formatting - Add FaultItem, FaultStatus, FreezeFrameSnapshot, RosbagSnapshot models - Add ExtendedDataRecords, EnvironmentData, FaultResponse models - Update fault tools to return formatted output instead of raw JSON - Add format_fault_list, format_fault_response, format_snapshots_response - Add comprehensive tests for models and formatting functions --- src/ros2_medkit_mcp/mcp_app.py | 210 +++++++++++++- src/ros2_medkit_mcp/models.py | 223 +++++++++++++++ tests/test_fault_formatting.py | 481 +++++++++++++++++++++++++++++++++ 3 files changed, 909 insertions(+), 5 deletions(-) create mode 100644 tests/test_fault_formatting.py diff --git a/src/ros2_medkit_mcp/mcp_app.py b/src/ros2_medkit_mcp/mcp_app.py index 7e2fe89..9a8ec65 100644 --- a/src/ros2_medkit_mcp/mcp_app.py +++ b/src/ros2_medkit_mcp/mcp_app.py @@ -27,10 +27,14 @@ EntityDataArgs, EntityGetArgs, EntityTopicDataArgs, + EnvironmentData, ExecutionArgs, + ExtendedDataRecords, FaultGetArgs, + FaultItem, FaultsListArgs, FaultSnapshotsArgs, + FreezeFrameSnapshot, FunctionIdArgs, GetConfigurationArgs, GetOperationArgs, @@ -38,6 +42,7 @@ ListExecutionsArgs, ListOperationsArgs, PublishTopicArgs, + RosbagSnapshot, SetConfigurationArgs, SubareasArgs, SubcomponentsArgs, @@ -108,6 +113,201 @@ def format_error(error: str) -> list[TextContent]: return format_result(ToolResult.fail(error)) +# ==================== Fault Formatting Helpers ==================== + + +def format_fault_item(item: FaultItem) -> str: + """Format a single fault item for LLM readability. + + Args: + item: The fault item to format. + + Returns: + Formatted string with fault details. + """ + lines = [f"Fault: {item.code}"] + if item.fault_name: + lines[0] += f" - {item.fault_name}" + if item.severity: + lines.append(f" Severity: {item.severity}") + if item.status: + lines.append( + f" Status: {item.status.value if hasattr(item.status, 'value') else item.status}" + ) + if item.is_confirmed is not None: + lines.append(f" Confirmed: {item.is_confirmed}") + if item.is_current is not None: + lines.append(f" Current: {item.is_current}") + if item.counter is not None: + lines.append(f" Occurrences: {item.counter}") + if item.first_occurrence: + lines.append(f" First Seen: {item.first_occurrence}") + if item.last_occurrence: + lines.append(f" Last Seen: {item.last_occurrence}") + return "\n".join(lines) + + +def format_fault_list(faults: list[dict[str, Any]]) -> list[TextContent]: + """Format a list of faults for LLM readability. + + Args: + faults: List of fault dictionaries from the API. + + Returns: + Formatted TextContent list. + """ + if not faults: + return [TextContent(type="text", text="No faults found.")] + + lines = [f"Found {len(faults)} fault(s):\n"] + for fault_dict in faults: + try: + item = FaultItem.model_validate(fault_dict) + lines.append(format_fault_item(item)) + lines.append("") + except Exception: + # Fallback to basic formatting if model validation fails + code = fault_dict.get("code", "unknown") + name = fault_dict.get("faultName", "") + severity = fault_dict.get("severity", "") + status = fault_dict.get("status", "") + lines.append(f"Fault: {code}" + (f" - {name}" if name else "")) + if severity: + lines.append(f" Severity: {severity}") + if status: + lines.append(f" Status: {status}") + lines.append("") + + return [TextContent(type="text", text="\n".join(lines))] + + +def format_snapshot(snapshot: FreezeFrameSnapshot | RosbagSnapshot) -> str: + """Format a snapshot for display. + + Args: + snapshot: A freeze frame or rosbag snapshot. + + Returns: + Formatted string describing the snapshot. + """ + lines = [f" Snapshot: {snapshot.snapshot_id}"] + lines.append(f" Timestamp: {snapshot.timestamp}") + if snapshot.data_source: + lines.append(f" Source: {snapshot.data_source}") + + if isinstance(snapshot, RosbagSnapshot): + lines.append(f" Download URI: {snapshot.bulk_data_uri}") + if snapshot.file_size: + size_mb = snapshot.file_size / (1024 * 1024) + lines.append(f" File Size: {size_mb:.2f} MB") + lines.append(f" Available: {snapshot.is_available}") + elif isinstance(snapshot, FreezeFrameSnapshot) and snapshot.data: + lines.append(f" Data: {json.dumps(snapshot.data, indent=6, default=str)}") + + return "\n".join(lines) + + +def format_environment_data(env_data: EnvironmentData) -> str: + """Format environment data for LLM readability. + + Args: + env_data: Environment data with snapshots. + + Returns: + Formatted string describing the environment data. + """ + lines = ["\nEnvironment Data:"] + + if env_data.extended_data_records: + records = env_data.extended_data_records + + if records.freeze_frame_snapshots: + lines.append(f" Freeze Frame Snapshots ({len(records.freeze_frame_snapshots)}):") + for snap in records.freeze_frame_snapshots: + lines.append(format_snapshot(snap)) + + if records.rosbag_snapshots: + lines.append(f" Rosbag Snapshots ({len(records.rosbag_snapshots)}):") + for snap in records.rosbag_snapshots: + lines.append(format_snapshot(snap)) + + return "\n".join(lines) + + +def format_fault_response(fault_data: dict[str, Any]) -> list[TextContent]: + """Format a fault response with environment data for LLM readability. + + Args: + fault_data: Fault response dictionary from the API. + + Returns: + Formatted TextContent list. + """ + lines = [] + + # Parse the fault item + item_data = fault_data.get("item", fault_data) + try: + item = FaultItem.model_validate(item_data) + lines.append(format_fault_item(item)) + except Exception: + # Fallback to basic formatting + code = item_data.get("code", "unknown") + lines.append(f"Fault: {code}") + + # Parse environment data if present + env_data_dict = fault_data.get("environmentData") or fault_data.get("environment_data") + if env_data_dict: + try: + env_data = EnvironmentData.model_validate(env_data_dict) + lines.append(format_environment_data(env_data)) + except Exception: + # Fallback: just show raw JSON for environment data + lines.append(f"\nEnvironment Data: {json.dumps(env_data_dict, indent=2, default=str)}") + + # Include x-medkit extensions if present + x_medkit = fault_data.get("x-medkit") or item_data.get("x-medkit") + if x_medkit: + lines.append(f"\nROS 2 MedKit Extensions: {json.dumps(x_medkit, indent=2, default=str)}") + + return [TextContent(type="text", text="\n".join(lines))] + + +def format_snapshots_response(snapshots_data: dict[str, Any]) -> list[TextContent]: + """Format a snapshots response for LLM readability. + + Args: + snapshots_data: Snapshots response dictionary from the API. + + Returns: + Formatted TextContent list. + """ + lines = ["Diagnostic Snapshots:"] + + # Try to validate as ExtendedDataRecords + try: + records = ExtendedDataRecords.model_validate(snapshots_data) + + if records.freeze_frame_snapshots: + lines.append(f"\nFreeze Frame Snapshots ({len(records.freeze_frame_snapshots)}):") + for snap in records.freeze_frame_snapshots: + lines.append(format_snapshot(snap)) + + if records.rosbag_snapshots: + lines.append(f"\nRosbag Snapshots ({len(records.rosbag_snapshots)}):") + for snap in records.rosbag_snapshots: + lines.append(format_snapshot(snap)) + + if not records.freeze_frame_snapshots and not records.rosbag_snapshots: + lines.append(" No snapshots available.") + + except Exception: + # Fallback to raw JSON + lines.append(json.dumps(snapshots_data, indent=2, default=str)) + + return [TextContent(type="text", text="\n".join(lines))] + + # Map dotted names (from docs) to valid underscore names TOOL_ALIASES: dict[str, str] = { "sovd.version": "sovd_version", @@ -980,12 +1180,12 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: elif normalized_name == "sovd_faults_list": args = FaultsListArgs(**arguments) faults = await client.list_faults(args.entity_id, args.entity_type) - return format_json_response(faults) + return format_fault_list(faults) elif normalized_name == "sovd_faults_get": args = FaultGetArgs(**arguments) fault = await client.get_fault(args.entity_id, args.fault_id, args.entity_type) - return format_json_response(fault) + return format_fault_response(fault) elif normalized_name == "sovd_faults_clear": args = FaultGetArgs(**arguments) @@ -1060,7 +1260,7 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: elif normalized_name == "sovd_all_faults_list": faults = await client.list_all_faults() - return format_json_response(faults) + return format_fault_list(faults) elif normalized_name == "sovd_clear_all_faults": args = ClearAllFaultsArgs(**arguments) @@ -1072,12 +1272,12 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: snapshots = await client.get_fault_snapshots( args.entity_id, args.fault_code, args.entity_type ) - return format_json_response(snapshots) + return format_snapshots_response(snapshots) elif normalized_name == "sovd_system_fault_snapshots": args = SystemFaultSnapshotsArgs(**arguments) snapshots = await client.get_system_fault_snapshots(args.fault_code) - return format_json_response(snapshots) + return format_snapshots_response(snapshots) # ==================== Entity Data ==================== diff --git a/src/ros2_medkit_mcp/models.py b/src/ros2_medkit_mcp/models.py index 9b7a3b8..17c0a31 100644 --- a/src/ros2_medkit_mcp/models.py +++ b/src/ros2_medkit_mcp/models.py @@ -4,6 +4,7 @@ They validate input arguments while allowing flexible output from the API. """ +from enum import Enum from typing import Any from pydantic import BaseModel, Field @@ -418,6 +419,228 @@ class SetConfigurationArgs(BaseModel): ) +# ==================== Fault Response Models ==================== + + +class FaultStatus(str, Enum): + """Fault status values per SOVD specification.""" + + PENDING = "PENDING" + ACTIVE = "ACTIVE" + CLEARED = "CLEARED" + INACTIVE = "INACTIVE" + + +class FaultItem(BaseModel): + """Fault item model per SOVD specification.""" + + code: str = Field(..., description="Fault code (DTC)") + fault_name: str | None = Field( + default=None, + alias="faultName", + description="Human-readable fault name", + ) + severity: str | None = Field( + default=None, + description="Fault severity (e.g., 'critical', 'warning', 'info')", + ) + status: FaultStatus | None = Field( + default=None, + description="Current fault status", + ) + is_confirmed: bool | None = Field( + default=None, + alias="isConfirmed", + description="Whether the fault is confirmed", + ) + is_current: bool | None = Field( + default=None, + alias="isCurrent", + description="Whether the fault is currently active", + ) + is_test_failed: bool | None = Field( + default=None, + alias="isTestFailed", + description="Whether the related test failed", + ) + counter: int | None = Field( + default=None, + description="Occurrence counter", + ) + aging_counter: int | None = Field( + default=None, + alias="agingCounter", + description="Aging counter for fault maturation", + ) + first_occurrence: str | None = Field( + default=None, + alias="firstOccurrence", + description="ISO 8601 timestamp of first occurrence", + ) + last_occurrence: str | None = Field( + default=None, + alias="lastOccurrence", + description="ISO 8601 timestamp of last occurrence", + ) + healing_cycles: int | None = Field( + default=None, + alias="healingCycles", + description="Number of healing cycles", + ) + x_medkit: dict[str, Any] | None = Field( + default=None, + alias="x-medkit", + description="ROS 2 MedKit specific extensions", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + +class FreezeFrameSnapshot(BaseModel): + """Freeze frame snapshot with captured diagnostic data.""" + + snapshot_id: str = Field( + ..., + alias="snapshotId", + description="Unique identifier for the snapshot", + ) + timestamp: str = Field( + ..., + description="ISO 8601 timestamp when snapshot was captured", + ) + data_source: str | None = Field( + default=None, + alias="dataSource", + description="Source of the snapshot data (e.g., topic name)", + ) + data: dict[str, Any] = Field( + default_factory=dict, + description="Captured diagnostic data", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + +class BulkDataDescriptor(BaseModel): + """Descriptor for bulk data (rosbag) with download URI.""" + + id: str = Field(..., description="Bulk data identifier") + category: str | None = Field( + default=None, + description="Data category (e.g., 'rosbag', 'snapshot')", + ) + bulk_data_uri: str = Field( + ..., + alias="bulkDataUri", + description="URI to download the bulk data file", + ) + file_size: int | None = Field( + default=None, + alias="fileSize", + description="File size in bytes", + ) + is_available: bool = Field( + default=True, + alias="isAvailable", + description="Whether the file is available for download", + ) + timestamp: str | None = Field( + default=None, + description="ISO 8601 timestamp when the data was captured", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + +class RosbagSnapshot(BaseModel): + """Rosbag snapshot with bulk data download URI.""" + + snapshot_id: str = Field( + ..., + alias="snapshotId", + description="Unique identifier for the snapshot", + ) + timestamp: str = Field( + ..., + description="ISO 8601 timestamp when snapshot was captured", + ) + bulk_data_uri: str = Field( + ..., + alias="bulkDataUri", + description="URI to download the rosbag file", + ) + file_size: int | None = Field( + default=None, + alias="fileSize", + description="File size in bytes", + ) + is_available: bool = Field( + default=True, + alias="isAvailable", + description="Whether the rosbag file is available for download", + ) + data_source: str | None = Field( + default=None, + alias="dataSource", + description="Source description for the snapshot", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + +class ExtendedDataRecords(BaseModel): + """Extended data records containing diagnostic snapshots.""" + + freeze_frame_snapshots: list[FreezeFrameSnapshot] = Field( + default_factory=list, + alias="freezeFrameSnapshots", + description="List of freeze frame snapshots with captured data", + ) + rosbag_snapshots: list[RosbagSnapshot] = Field( + default_factory=list, + alias="rosbagSnapshots", + description="List of rosbag snapshots with download URIs", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + +class EnvironmentData(BaseModel): + """Environment data captured at fault occurrence.""" + + extended_data_records: ExtendedDataRecords | None = Field( + default=None, + alias="extendedDataRecords", + description="Snapshot data including freeze frames and rosbags", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + +class FaultResponse(BaseModel): + """Complete fault response with item and environment data.""" + + item: FaultItem = Field(..., description="The fault item details") + environment_data: EnvironmentData | None = Field( + default=None, + alias="environmentData", + description="Environment data captured at fault occurrence", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + +class FaultListResponse(BaseModel): + """Response containing a list of fault items.""" + + items: list[FaultItem] = Field( + default_factory=list, + description="List of fault items", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + class ToolResult(BaseModel): """Standard result wrapper for tool responses.""" diff --git a/tests/test_fault_formatting.py b/tests/test_fault_formatting.py new file mode 100644 index 0000000..f471f8d --- /dev/null +++ b/tests/test_fault_formatting.py @@ -0,0 +1,481 @@ +"""Tests for fault formatting functions and response models.""" + +from mcp.types import TextContent + +from ros2_medkit_mcp.mcp_app import ( + format_environment_data, + format_fault_item, + format_fault_list, + format_fault_response, + format_snapshot, + format_snapshots_response, +) +from ros2_medkit_mcp.models import ( + EnvironmentData, + ExtendedDataRecords, + FaultItem, + FaultResponse, + FaultStatus, + FreezeFrameSnapshot, + RosbagSnapshot, +) + + +class TestFaultItemModel: + """Tests for FaultItem Pydantic model.""" + + def test_minimal_fault_item(self) -> None: + """Test FaultItem with only required fields.""" + item = FaultItem(code="P0123") + assert item.code == "P0123" + assert item.fault_name is None + assert item.severity is None + + def test_full_fault_item(self) -> None: + """Test FaultItem with all fields.""" + item = FaultItem( + code="P0123", + fault_name="Engine Overtemp", + severity="critical", + status=FaultStatus.ACTIVE, + is_confirmed=True, + is_current=True, + counter=3, + first_occurrence="2025-01-01T00:00:00Z", + last_occurrence="2025-01-02T00:00:00Z", + ) + assert item.code == "P0123" + assert item.fault_name == "Engine Overtemp" + assert item.severity == "critical" + assert item.status == FaultStatus.ACTIVE + assert item.is_confirmed is True + assert item.counter == 3 + + def test_fault_item_from_api_response(self) -> None: + """Test FaultItem validation from API response with camelCase.""" + api_data = { + "code": "P0456", + "faultName": "Sensor Failure", + "severity": "warning", + "status": "ACTIVE", + "isConfirmed": True, + "isCurrent": False, + "counter": 5, + "firstOccurrence": "2025-01-01T10:00:00Z", + "lastOccurrence": "2025-01-01T12:00:00Z", + } + item = FaultItem.model_validate(api_data) + assert item.code == "P0456" + assert item.fault_name == "Sensor Failure" + assert item.is_confirmed is True + assert item.is_current is False + + +class TestSnapshotModels: + """Tests for snapshot Pydantic models.""" + + def test_freeze_frame_snapshot(self) -> None: + """Test FreezeFrameSnapshot model.""" + snap = FreezeFrameSnapshot( + snapshot_id="snap-001", + timestamp="2025-01-01T00:00:00Z", + data_source="/temperature", + data={"value": 85.5, "unit": "celsius"}, + ) + assert snap.snapshot_id == "snap-001" + assert snap.data["value"] == 85.5 + + def test_freeze_frame_from_api(self) -> None: + """Test FreezeFrameSnapshot from API response.""" + api_data = { + "snapshotId": "snap-002", + "timestamp": "2025-01-01T00:00:00Z", + "dataSource": "/sensor/data", + "data": {"temp": 100}, + } + snap = FreezeFrameSnapshot.model_validate(api_data) + assert snap.snapshot_id == "snap-002" + assert snap.data_source == "/sensor/data" + + def test_rosbag_snapshot(self) -> None: + """Test RosbagSnapshot model.""" + snap = RosbagSnapshot( + snapshot_id="rosbag-001", + timestamp="2025-01-01T00:00:00Z", + bulk_data_uri="/api/v1/bulk-data/rosbag-001.db3", + file_size=1024000, + is_available=True, + ) + assert snap.snapshot_id == "rosbag-001" + assert snap.bulk_data_uri == "/api/v1/bulk-data/rosbag-001.db3" + assert snap.file_size == 1024000 + + def test_rosbag_from_api(self) -> None: + """Test RosbagSnapshot from API response.""" + api_data = { + "snapshotId": "rosbag-002", + "timestamp": "2025-01-01T00:00:00Z", + "bulkDataUri": "/api/v1/bulk-data/file.db3", + "fileSize": 2048000, + "isAvailable": False, + } + snap = RosbagSnapshot.model_validate(api_data) + assert snap.snapshot_id == "rosbag-002" + assert snap.bulk_data_uri == "/api/v1/bulk-data/file.db3" + assert snap.is_available is False + + +class TestExtendedDataRecords: + """Tests for ExtendedDataRecords model.""" + + def test_empty_records(self) -> None: + """Test ExtendedDataRecords with empty lists.""" + records = ExtendedDataRecords() + assert records.freeze_frame_snapshots == [] + assert records.rosbag_snapshots == [] + + def test_records_with_snapshots(self) -> None: + """Test ExtendedDataRecords with both snapshot types.""" + freeze = FreezeFrameSnapshot( + snapshot_id="ff-1", + timestamp="2025-01-01T00:00:00Z", + data={"key": "value"}, + ) + rosbag = RosbagSnapshot( + snapshot_id="rb-1", + timestamp="2025-01-01T00:00:00Z", + bulk_data_uri="/bulk-data/file.db3", + ) + records = ExtendedDataRecords( + freeze_frame_snapshots=[freeze], + rosbag_snapshots=[rosbag], + ) + assert len(records.freeze_frame_snapshots) == 1 + assert len(records.rosbag_snapshots) == 1 + + def test_records_from_api(self) -> None: + """Test ExtendedDataRecords from API response.""" + api_data = { + "freezeFrameSnapshots": [ + {"snapshotId": "ff-1", "timestamp": "2025-01-01T00:00:00Z", "data": {}} + ], + "rosbagSnapshots": [ + { + "snapshotId": "rb-1", + "timestamp": "2025-01-01T00:00:00Z", + "bulkDataUri": "/bulk-data/test.db3", + } + ], + } + records = ExtendedDataRecords.model_validate(api_data) + assert len(records.freeze_frame_snapshots) == 1 + assert len(records.rosbag_snapshots) == 1 + assert records.rosbag_snapshots[0].bulk_data_uri == "/bulk-data/test.db3" + + +class TestEnvironmentData: + """Tests for EnvironmentData model.""" + + def test_empty_environment_data(self) -> None: + """Test EnvironmentData without records.""" + env = EnvironmentData() + assert env.extended_data_records is None + + def test_environment_data_with_records(self) -> None: + """Test EnvironmentData with ExtendedDataRecords.""" + records = ExtendedDataRecords( + freeze_frame_snapshots=[ + FreezeFrameSnapshot( + snapshot_id="ff-1", + timestamp="2025-01-01T00:00:00Z", + data={"sensor": "value"}, + ) + ] + ) + env = EnvironmentData(extended_data_records=records) + assert env.extended_data_records is not None + assert len(env.extended_data_records.freeze_frame_snapshots) == 1 + + def test_environment_from_api(self) -> None: + """Test EnvironmentData from API response.""" + api_data = { + "extendedDataRecords": { + "freezeFrameSnapshots": [ + {"snapshotId": "ff-1", "timestamp": "2025-01-01T00:00:00Z", "data": {}} + ], + "rosbagSnapshots": [], + } + } + env = EnvironmentData.model_validate(api_data) + assert env.extended_data_records is not None + assert len(env.extended_data_records.freeze_frame_snapshots) == 1 + + +class TestFaultResponse: + """Tests for FaultResponse model.""" + + def test_fault_response_minimal(self) -> None: + """Test FaultResponse with minimal data.""" + item = FaultItem(code="P0123") + response = FaultResponse(item=item) + assert response.item.code == "P0123" + assert response.environment_data is None + + def test_fault_response_with_env_data(self) -> None: + """Test FaultResponse with environment data.""" + item = FaultItem(code="P0123", fault_name="Test Fault") + env = EnvironmentData( + extended_data_records=ExtendedDataRecords( + rosbag_snapshots=[ + RosbagSnapshot( + snapshot_id="rb-1", + timestamp="2025-01-01T00:00:00Z", + bulk_data_uri="/bulk-data/test.db3", + ) + ] + ) + ) + response = FaultResponse(item=item, environment_data=env) + assert response.item.code == "P0123" + assert response.environment_data is not None + assert len(response.environment_data.extended_data_records.rosbag_snapshots) == 1 + + +class TestFormatFaultItem: + """Tests for format_fault_item function.""" + + def test_format_minimal_fault(self) -> None: + """Test formatting fault with only code.""" + item = FaultItem(code="P0123") + output = format_fault_item(item) + assert "Fault: P0123" in output + + def test_format_full_fault(self) -> None: + """Test formatting fault with all details.""" + item = FaultItem( + code="P0123", + fault_name="Engine Overtemp", + severity="critical", + status=FaultStatus.ACTIVE, + is_confirmed=True, + counter=5, + first_occurrence="2025-01-01T00:00:00Z", + ) + output = format_fault_item(item) + assert "Fault: P0123 - Engine Overtemp" in output + assert "Severity: critical" in output + assert "Status: ACTIVE" in output + assert "Confirmed: True" in output + assert "Occurrences: 5" in output + assert "First Seen: 2025-01-01" in output + + +class TestFormatFaultList: + """Tests for format_fault_list function.""" + + def test_format_empty_list(self) -> None: + """Test formatting empty fault list.""" + result = format_fault_list([]) + assert len(result) == 1 + assert isinstance(result[0], TextContent) + assert "No faults found" in result[0].text + + def test_format_single_fault(self) -> None: + """Test formatting list with one fault.""" + faults = [{"code": "P0123", "faultName": "Test Fault", "severity": "warning"}] + result = format_fault_list(faults) + assert len(result) == 1 + assert "Found 1 fault(s)" in result[0].text + assert "P0123" in result[0].text + assert "Test Fault" in result[0].text + + def test_format_multiple_faults(self) -> None: + """Test formatting list with multiple faults.""" + faults = [ + {"code": "P0123", "faultName": "Fault 1"}, + {"code": "P0456", "faultName": "Fault 2"}, + {"code": "P0789", "faultName": "Fault 3"}, + ] + result = format_fault_list(faults) + assert "Found 3 fault(s)" in result[0].text + assert "P0123" in result[0].text + assert "P0456" in result[0].text + assert "P0789" in result[0].text + + def test_format_fallback_on_invalid_data(self) -> None: + """Test that formatting falls back gracefully on invalid data.""" + # Missing required 'code' field - should fall back to basic formatting + faults = [{"unknown_field": "value"}] + result = format_fault_list(faults) + # Should not raise, should return something + assert len(result) == 1 + + +class TestFormatSnapshot: + """Tests for format_snapshot function.""" + + def test_format_freeze_frame(self) -> None: + """Test formatting freeze frame snapshot.""" + snap = FreezeFrameSnapshot( + snapshot_id="ff-001", + timestamp="2025-01-01T00:00:00Z", + data_source="/temperature", + data={"value": 85.5}, + ) + output = format_snapshot(snap) + assert "Snapshot: ff-001" in output + assert "Timestamp: 2025-01-01" in output + assert "Source: /temperature" in output + assert "Data:" in output + assert "85.5" in output + + def test_format_rosbag_snapshot(self) -> None: + """Test formatting rosbag snapshot.""" + snap = RosbagSnapshot( + snapshot_id="rb-001", + timestamp="2025-01-01T00:00:00Z", + bulk_data_uri="/api/v1/bulk-data/rb-001.db3", + file_size=1048576, # 1 MB + is_available=True, + ) + output = format_snapshot(snap) + assert "Snapshot: rb-001" in output + assert "Download URI: /api/v1/bulk-data/rb-001.db3" in output + assert "File Size: 1.00 MB" in output + assert "Available: True" in output + + +class TestFormatEnvironmentData: + """Tests for format_environment_data function.""" + + def test_format_with_freeze_frames(self) -> None: + """Test formatting environment data with freeze frames.""" + env = EnvironmentData( + extended_data_records=ExtendedDataRecords( + freeze_frame_snapshots=[ + FreezeFrameSnapshot( + snapshot_id="ff-1", + timestamp="2025-01-01T00:00:00Z", + data={"temp": 100}, + ) + ] + ) + ) + output = format_environment_data(env) + assert "Environment Data:" in output + assert "Freeze Frame Snapshots (1):" in output + assert "ff-1" in output + + def test_format_with_rosbags(self) -> None: + """Test formatting environment data with rosbags.""" + env = EnvironmentData( + extended_data_records=ExtendedDataRecords( + rosbag_snapshots=[ + RosbagSnapshot( + snapshot_id="rb-1", + timestamp="2025-01-01T00:00:00Z", + bulk_data_uri="/bulk-data/file.db3", + ) + ] + ) + ) + output = format_environment_data(env) + assert "Environment Data:" in output + assert "Rosbag Snapshots (1):" in output + assert "rb-1" in output + assert "/bulk-data/file.db3" in output + + +class TestFormatFaultResponse: + """Tests for format_fault_response function.""" + + def test_format_response_minimal(self) -> None: + """Test formatting fault response with minimal data.""" + response_data = {"item": {"code": "P0123"}} + result = format_fault_response(response_data) + assert len(result) == 1 + assert "P0123" in result[0].text + + def test_format_response_with_env_data(self) -> None: + """Test formatting fault response with environment data.""" + response_data = { + "item": {"code": "P0123", "faultName": "Test Fault", "severity": "critical"}, + "environmentData": { + "extendedDataRecords": { + "freezeFrameSnapshots": [ + {"snapshotId": "ff-1", "timestamp": "2025-01-01T00:00:00Z", "data": {}} + ], + "rosbagSnapshots": [ + { + "snapshotId": "rb-1", + "timestamp": "2025-01-01T00:00:00Z", + "bulkDataUri": "/bulk-data/test.db3", + } + ], + } + }, + } + result = format_fault_response(response_data) + text = result[0].text + assert "P0123" in text + assert "Test Fault" in text + assert "Environment Data:" in text + assert "Freeze Frame Snapshots" in text + assert "Rosbag Snapshots" in text + assert "/bulk-data/test.db3" in text + + def test_format_response_snake_case_env_data(self) -> None: + """Test formatting fault response with snake_case environment_data key.""" + response_data = { + "item": {"code": "P0456"}, + "environment_data": { + "extendedDataRecords": { + "freezeFrameSnapshots": [], + "rosbagSnapshots": [ + { + "snapshotId": "rb-1", + "timestamp": "2025-01-01T00:00:00Z", + "bulkDataUri": "/bulk-data/test.db3", + } + ], + } + }, + } + result = format_fault_response(response_data) + assert "/bulk-data/test.db3" in result[0].text + + +class TestFormatSnapshotsResponse: + """Tests for format_snapshots_response function.""" + + def test_format_empty_snapshots(self) -> None: + """Test formatting response with no snapshots.""" + response_data = {"freezeFrameSnapshots": [], "rosbagSnapshots": []} + result = format_snapshots_response(response_data) + assert "Diagnostic Snapshots:" in result[0].text + assert "No snapshots available" in result[0].text + + def test_format_with_snapshots(self) -> None: + """Test formatting response with both snapshot types.""" + response_data = { + "freezeFrameSnapshots": [ + {"snapshotId": "ff-1", "timestamp": "2025-01-01T00:00:00Z", "data": {"val": 1}} + ], + "rosbagSnapshots": [ + { + "snapshotId": "rb-1", + "timestamp": "2025-01-01T00:00:00Z", + "bulkDataUri": "/bulk-data/test.db3", + "fileSize": 2097152, + } + ], + } + result = format_snapshots_response(response_data) + text = result[0].text + assert "Diagnostic Snapshots:" in text + assert "Freeze Frame Snapshots (1):" in text + assert "Rosbag Snapshots (1):" in text + assert "ff-1" in text + assert "rb-1" in text + assert "/bulk-data/test.db3" in text + assert "2.00 MB" in text From e3a9babdc1c92215b4779437191c79175be11d6a Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Thu, 5 Feb 2026 18:33:23 +0000 Subject: [PATCH 2/3] feat(tools): add bulk-data MCP tools for rosbag downloads Add five new MCP tools for bulk-data operations: - sovd_bulkdata_categories: List bulk-data categories (rosbags, logs) - sovd_bulkdata_list: List items in a category with sizes and dates - sovd_bulkdata_info: Get metadata via HEAD request before download - sovd_bulkdata_download: Download single bulk-data file to disk - sovd_bulkdata_download_for_fault: Download all rosbags for a fault Implementation: - BulkDataItem, BulkDataCategoryResponse, BulkDataListResponse models - SovdClient methods: list_bulk_data_categories, list_bulk_data, get_bulk_data_info, download_bulk_data (with 300s timeout) - Human-readable formatting with file sizes and timestamps - Creates output_dir if not exists, extracts filename from headers/URI Tests: - TestBulkDataModels: Pydantic model validation - TestBulkDataArgModels: Argument model validation - TestFormatFunctions: Formatting function coverage - TestSaveBulkDataFile: File saving with temp directories - TestClientBulkDataMethods: HTTP client with respx mocks - TestDownloadRosbagsForFault: Multi-file download scenarios --- src/ros2_medkit_mcp/client.py | 105 +++++++ src/ros2_medkit_mcp/mcp_app.py | 383 +++++++++++++++++++++++ src/ros2_medkit_mcp/models.py | 126 ++++++++ tests/test_bulkdata_tools.py | 544 +++++++++++++++++++++++++++++++++ 4 files changed, 1158 insertions(+) create mode 100644 tests/test_bulkdata_tools.py diff --git a/src/ros2_medkit_mcp/client.py b/src/ros2_medkit_mcp/client.py index 82be8e0..2bd4017 100644 --- a/src/ros2_medkit_mcp/client.py +++ b/src/ros2_medkit_mcp/client.py @@ -947,6 +947,111 @@ async def delete_all_configurations( """ return await self._request("DELETE", f"/{entity_type}/{entity_id}/configurations") + # ==================== Bulk Data ==================== + + async def list_bulk_data_categories( + self, entity_id: str, entity_type: str = "apps" + ) -> list[str]: + """List available bulk-data categories for an entity. + + Args: + entity_id: The entity identifier. + entity_type: Entity type ('components', 'apps', 'areas', 'functions'). + + Returns: + List of category names (e.g., ['rosbags', 'logs']). + """ + result = await self._request("GET", f"/{entity_type}/{entity_id}/bulk-data") + if isinstance(result, dict) and "items" in result: + return result["items"] + if isinstance(result, list): + return result + return [] + + async def list_bulk_data( + self, entity_id: str, category: str, entity_type: str = "apps" + ) -> list[dict[str, Any]]: + """List bulk-data items in a category. + + Args: + entity_id: The entity identifier. + category: Category name (e.g., 'rosbags'). + entity_type: Entity type ('components', 'apps', 'areas', 'functions'). + + Returns: + List of bulk data item dictionaries. + """ + result = await self._request("GET", f"/{entity_type}/{entity_id}/bulk-data/{category}") + if isinstance(result, dict) and "items" in result: + return result["items"] + if isinstance(result, list): + return result + return [] + + async def get_bulk_data_info(self, bulk_data_uri: str) -> dict[str, Any]: + """Get metadata about a bulk-data item via HEAD request. + + Args: + bulk_data_uri: Full bulk-data URI path. + + Returns: + Dictionary with Content-Type, Content-Length, filename. + """ + client = await self._ensure_client() + response = await client.head(bulk_data_uri) + + if response.status_code == 404: + raise SovdClientError( + message=f"Bulk data not found: {bulk_data_uri}", + status_code=404, + ) + + headers = response.headers + content_disposition = headers.get("Content-Disposition", "") + filename = None + if "filename=" in content_disposition: + import re + + match = re.search(r'filename="?([^"]+)"?', content_disposition) + if match: + filename = match.group(1) + + return { + "content_type": headers.get("Content-Type", "application/octet-stream"), + "content_length": headers.get("Content-Length"), + "filename": filename, + "uri": bulk_data_uri, + } + + async def download_bulk_data(self, bulk_data_uri: str) -> tuple[bytes, str | None]: + """Download a bulk-data file. + + Args: + bulk_data_uri: Full bulk-data URI path. + + Returns: + Tuple of (file_content, filename). + """ + client = await self._ensure_client() + response = await client.get(bulk_data_uri, timeout=httpx.Timeout(300.0)) + + if not response.is_success: + raise SovdClientError( + message=f"Download failed: {response.status_code}", + status_code=response.status_code, + ) + + content_disposition = response.headers.get("Content-Disposition", "") + filename = None + if "filename=" in content_disposition: + import re + + match = re.search(r'filename="?([^"]+)"?', content_disposition) + if match: + filename = match.group(1) + + return response.content, filename + @asynccontextmanager async def create_client(settings: Settings) -> AsyncIterator[SovdClient]: diff --git a/src/ros2_medkit_mcp/mcp_app.py b/src/ros2_medkit_mcp/mcp_app.py index 9a8ec65..1c6086c 100644 --- a/src/ros2_medkit_mcp/mcp_app.py +++ b/src/ros2_medkit_mcp/mcp_app.py @@ -6,6 +6,7 @@ import json import logging +from pathlib import Path from typing import Any from mcp.server import Server @@ -18,6 +19,12 @@ AreaComponentsArgs, AreaContainsArgs, AreaIdArgs, + BulkDataCategoriesArgs, + BulkDataDownloadArgs, + BulkDataDownloadForFaultArgs, + BulkDataInfoArgs, + BulkDataItem, + BulkDataListArgs, ClearAllFaultsArgs, ComponentHostsArgs, ComponentIdArgs, @@ -308,6 +315,242 @@ def format_snapshots_response(snapshots_data: dict[str, Any]) -> list[TextConten return [TextContent(type="text", text="\n".join(lines))] +# ==================== Bulk Data Formatting ==================== + + +def format_bulkdata_categories(categories: list[str], entity_id: str) -> list[TextContent]: + """Format bulk-data categories for LLM readability. + + Args: + categories: List of category names. + entity_id: Entity identifier for context. + + Returns: + Formatted TextContent list. + """ + if not categories: + return [TextContent(type="text", text=f"No bulk-data categories available for {entity_id}")] + + lines = [f"Bulk-data categories for {entity_id}:"] + for cat in categories: + lines.append(f" - {cat}") + + return [TextContent(type="text", text="\n".join(lines))] + + +def format_bulkdata_list( + items: list[dict[str, Any]], entity_id: str, category: str +) -> list[TextContent]: + """Format bulk-data items list for LLM readability. + + Args: + items: List of bulk-data item dictionaries. + entity_id: Entity identifier for context. + category: Category name. + + Returns: + Formatted TextContent list. + """ + if not items: + return [TextContent(type="text", text=f"No {category} available for {entity_id}")] + + lines = [f"Bulk-data items in {entity_id}/{category} ({len(items)} total):"] + + for item_dict in items: + try: + item = BulkDataItem.model_validate(item_dict) + name = item.name or item.id + + size_str = "" + if item.size: + size_mb = item.size / (1024 * 1024) + size_str = f", {size_mb:.2f} MB" + + date_str = "" + if item.creation_date: + # Just show the date portion + date_str = f", created {item.creation_date[:10]}" + + lines.append(f" [{item.id}] {name} ({item.mimetype}{size_str}{date_str})") + except Exception: + # Fallback formatting + item_id = item_dict.get("id", "unknown") + name = item_dict.get("name", item_id) + lines.append(f" [{item_id}] {name}") + + return [TextContent(type="text", text="\n".join(lines))] + + +def format_bulkdata_info(info: dict[str, Any]) -> list[TextContent]: + """Format bulk-data info for LLM readability. + + Args: + info: Dictionary with content_type, content_length, filename, uri. + + Returns: + Formatted TextContent list. + """ + lines = [f"Bulk-data info for: {info.get('uri', 'unknown')}"] + + if info.get("filename"): + lines.append(f" Filename: {info['filename']}") + + lines.append(f" Content-Type: {info.get('content_type', 'unknown')}") + + if info.get("content_length"): + size_bytes = int(info["content_length"]) + size_mb = size_bytes / (1024 * 1024) + lines.append(f" Size: {size_mb:.2f} MB ({size_bytes} bytes)") + + return [TextContent(type="text", text="\n".join(lines))] + + +def save_bulk_data_file( + content: bytes, filename: str | None, bulk_data_uri: str, output_dir: str +) -> list[TextContent]: + """Save bulk-data content to a file. + + Args: + content: File content bytes. + filename: Filename from Content-Disposition header. + bulk_data_uri: Original URI for fallback filename. + output_dir: Output directory path. + + Returns: + Formatted TextContent list with download result. + """ + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + # Generate filename if not provided + if not filename: + # Extract from URI (last path component) + uri_parts = bulk_data_uri.strip("/").split("/") + filename = uri_parts[-1] if uri_parts else "download" + # Add extension if missing + if "." not in filename: + filename += ".mcap" + + file_path = output_path / filename + file_path.write_bytes(content) + + size_mb = len(content) / (1024 * 1024) + lines = [ + "Downloaded successfully!", + f" File: {file_path}", + f" Size: {size_mb:.2f} MB ({len(content)} bytes)", + ] + + return [TextContent(type="text", text="\n".join(lines))] + + +async def download_rosbags_for_fault( + client: SovdClient, + entity_id: str, + fault_code: str, + entity_type: str, + output_dir: str, +) -> list[TextContent]: + """Download all rosbag snapshots for a fault. + + Args: + client: SOVD client instance. + entity_id: Entity identifier. + fault_code: Fault code. + entity_type: Entity type. + output_dir: Output directory path. + + Returns: + Formatted TextContent list with download results. + """ + # Get fault with environment data + fault_data = await client.get_fault(entity_id, fault_code, entity_type) + + # Extract environment data + env_data = fault_data.get("environmentData") or fault_data.get("environment_data") + if not env_data: + return [ + TextContent( + type="text", + text=f"No environment data found for fault {fault_code}", + ) + ] + + # Get extended data records + records = env_data.get("extendedDataRecords") or env_data.get("extended_data_records") + if not records: + return [ + TextContent( + type="text", + text=f"No snapshot data found for fault {fault_code}", + ) + ] + + # Get rosbag snapshots + rosbag_snapshots = records.get("rosbagSnapshots") or records.get("rosbag_snapshots", []) + if not rosbag_snapshots: + freeze_frames = records.get("freezeFrameSnapshots") or records.get( + "freeze_frame_snapshots", [] + ) + if freeze_frames: + return [ + TextContent( + type="text", + text=f"Fault {fault_code} has only freeze frame snapshots " + f"({len(freeze_frames)} total), no rosbag recordings to download.", + ) + ] + return [ + TextContent( + type="text", + text=f"No rosbag snapshots found for fault {fault_code}", + ) + ] + + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + downloaded: list[str] = [] + errors: list[str] = [] + + for snap in rosbag_snapshots: + snap_id = snap.get("snapshotId") or snap.get("snapshot_id", "unknown") + bulk_uri = snap.get("bulkDataUri") or snap.get("bulk_data_uri") + + if not bulk_uri: + errors.append(f" - {snap_id}: No bulk_data_uri") + continue + + try: + content, filename = await client.download_bulk_data(bulk_uri) + + if not filename: + filename = f"{snap_id}.mcap" + + file_path = output_path / filename + file_path.write_bytes(content) + + size_mb = len(content) / (1024 * 1024) + downloaded.append(f" - {filename} ({size_mb:.2f} MB)") + + except Exception as e: + errors.append(f" - {snap_id}: {e!s}") + + lines = [f"Downloaded rosbags for fault {fault_code}:"] + + if downloaded: + lines.append(f"\nSuccessfully downloaded ({len(downloaded)}):") + lines.extend(downloaded) + + if errors: + lines.append(f"\nErrors ({len(errors)}):") + lines.extend(errors) + + lines.append(f"\nOutput directory: {output_path}") + + return [TextContent(type="text", text="\n".join(lines))] + + # Map dotted names (from docs) to valid underscore names TOOL_ALIASES: dict[str, str] = { "sovd.version": "sovd_version", @@ -363,6 +606,12 @@ def format_snapshots_response(snapshots_data: dict[str, Any]) -> list[TextConten "sovd_clear_all_faults": "sovd_clear_all_faults", "sovd_fault_snapshots": "sovd_fault_snapshots", "sovd_system_fault_snapshots": "sovd_system_fault_snapshots", + # Bulk data + "sovd_bulkdata_categories": "sovd_bulkdata_categories", + "sovd_bulkdata_list": "sovd_bulkdata_list", + "sovd_bulkdata_info": "sovd_bulkdata_info", + "sovd_bulkdata_download": "sovd_bulkdata_download", + "sovd_bulkdata_download_for_fault": "sovd_bulkdata_download_for_fault", } @@ -1122,6 +1371,110 @@ async def list_tools() -> list[Tool]: "required": ["entity_id"], }, ), + # ==================== Bulk Data ==================== + Tool( + name="sovd_bulkdata_categories", + description="List available bulk-data categories for an entity. Bulk-data categories contain downloadable files like rosbag recordings.", + inputSchema={ + "type": "object", + "properties": { + "entity_id": { + "type": "string", + "description": "The entity identifier", + }, + "entity_type": { + "type": "string", + "description": "Entity type: 'components', 'apps', 'areas', or 'functions'", + "default": "apps", + }, + }, + "required": ["entity_id"], + }, + ), + Tool( + name="sovd_bulkdata_list", + description="List bulk-data items in a category. Use this to discover available rosbag recordings for download.", + inputSchema={ + "type": "object", + "properties": { + "entity_id": { + "type": "string", + "description": "The entity identifier", + }, + "category": { + "type": "string", + "description": "Category name (e.g., 'rosbags')", + }, + "entity_type": { + "type": "string", + "description": "Entity type: 'components', 'apps', 'areas', or 'functions'", + "default": "apps", + }, + }, + "required": ["entity_id", "category"], + }, + ), + Tool( + name="sovd_bulkdata_info", + description="Get information about a specific bulk-data item. Use the bulk_data_uri from fault environment_data snapshots.", + inputSchema={ + "type": "object", + "properties": { + "bulk_data_uri": { + "type": "string", + "description": "Full bulk-data URI path from fault response (e.g., '/apps/motor/bulk-data/rosbags/uuid')", + }, + }, + "required": ["bulk_data_uri"], + }, + ), + Tool( + name="sovd_bulkdata_download", + description="Download a bulk-data file (e.g., rosbag recording) to the specified directory. Use the bulk_data_uri from fault environment_data snapshots.", + inputSchema={ + "type": "object", + "properties": { + "bulk_data_uri": { + "type": "string", + "description": "Full bulk-data URI path from fault response", + }, + "output_dir": { + "type": "string", + "description": "Directory to save the file (default: /tmp)", + "default": "/tmp", + }, + }, + "required": ["bulk_data_uri"], + }, + ), + Tool( + name="sovd_bulkdata_download_for_fault", + description="Download all rosbag recordings associated with a specific fault. Retrieves the fault's environment_data and downloads all rosbag snapshots.", + inputSchema={ + "type": "object", + "properties": { + "entity_id": { + "type": "string", + "description": "The entity identifier", + }, + "fault_code": { + "type": "string", + "description": "The fault code", + }, + "entity_type": { + "type": "string", + "description": "Entity type: 'components', 'apps', 'areas', or 'functions'", + "default": "apps", + }, + "output_dir": { + "type": "string", + "description": "Directory to save the files (default: /tmp)", + "default": "/tmp", + }, + }, + "required": ["entity_id", "fault_code"], + }, + ), ] @server.call_tool() @@ -1395,6 +1748,36 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: result = await client.delete_all_configurations(args.entity_id, args.entity_type) return format_json_response(result) + # ==================== Bulk Data ==================== + + elif normalized_name == "sovd_bulkdata_categories": + args = BulkDataCategoriesArgs(**arguments) + categories = await client.list_bulk_data_categories( + args.entity_id, args.entity_type + ) + return format_bulkdata_categories(categories, args.entity_id) + + elif normalized_name == "sovd_bulkdata_list": + args = BulkDataListArgs(**arguments) + items = await client.list_bulk_data(args.entity_id, args.category, args.entity_type) + return format_bulkdata_list(items, args.entity_id, args.category) + + elif normalized_name == "sovd_bulkdata_info": + args = BulkDataInfoArgs(**arguments) + info = await client.get_bulk_data_info(args.bulk_data_uri) + return format_bulkdata_info(info) + + elif normalized_name == "sovd_bulkdata_download": + args = BulkDataDownloadArgs(**arguments) + content, filename = await client.download_bulk_data(args.bulk_data_uri) + return save_bulk_data_file(content, filename, args.bulk_data_uri, args.output_dir) + + elif normalized_name == "sovd_bulkdata_download_for_fault": + args = BulkDataDownloadForFaultArgs(**arguments) + return await download_rosbags_for_fault( + client, args.entity_id, args.fault_code, args.entity_type, args.output_dir + ) + else: return format_error(f"Unknown tool: {name}") diff --git a/src/ros2_medkit_mcp/models.py b/src/ros2_medkit_mcp/models.py index 17c0a31..deef180 100644 --- a/src/ros2_medkit_mcp/models.py +++ b/src/ros2_medkit_mcp/models.py @@ -641,6 +641,132 @@ class FaultListResponse(BaseModel): model_config = {"populate_by_name": True, "extra": "allow"} +# ==================== Bulk Data Response Models ==================== + + +class BulkDataItem(BaseModel): + """Item in a bulk-data category listing.""" + + id: str = Field(..., description="Unique identifier for the bulk data item") + name: str | None = Field( + default=None, + description="Human-readable name for the item", + ) + mimetype: str = Field( + default="application/octet-stream", + description="MIME type of the data (e.g., 'application/x-mcap')", + ) + size: int | None = Field( + default=None, + description="File size in bytes", + ) + creation_date: str | None = Field( + default=None, + alias="creationDate", + description="ISO 8601 timestamp when the data was created", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + +class BulkDataCategoryResponse(BaseModel): + """Response listing available bulk-data categories.""" + + items: list[str] = Field( + default_factory=list, + description="List of available category names (e.g., 'rosbags', 'logs')", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + +class BulkDataListResponse(BaseModel): + """Response listing items in a bulk-data category.""" + + items: list[BulkDataItem] = Field( + default_factory=list, + description="List of bulk data items in the category", + ) + + model_config = {"populate_by_name": True, "extra": "allow"} + + +# ==================== Bulk Data Argument Models ==================== + + +class BulkDataCategoriesArgs(BaseModel): + """Arguments for listing bulk-data categories.""" + + entity_id: str = Field( + ..., + description="The entity identifier", + ) + entity_type: str = Field( + default="apps", + description="Entity type: 'components', 'apps', 'areas', or 'functions'", + ) + + +class BulkDataListArgs(BaseModel): + """Arguments for listing bulk-data items in a category.""" + + entity_id: str = Field( + ..., + description="The entity identifier", + ) + category: str = Field( + ..., + description="Category name (e.g., 'rosbags')", + ) + entity_type: str = Field( + default="apps", + description="Entity type: 'components', 'apps', 'areas', or 'functions'", + ) + + +class BulkDataInfoArgs(BaseModel): + """Arguments for getting bulk-data item info.""" + + bulk_data_uri: str = Field( + ..., + description="Full bulk-data URI path (e.g., '/apps/motor/bulk-data/rosbags/uuid')", + ) + + +class BulkDataDownloadArgs(BaseModel): + """Arguments for downloading a bulk-data item.""" + + bulk_data_uri: str = Field( + ..., + description="Full bulk-data URI path (e.g., '/apps/motor/bulk-data/rosbags/uuid')", + ) + output_dir: str = Field( + default="/tmp", + description="Directory to save the downloaded file", + ) + + +class BulkDataDownloadForFaultArgs(BaseModel): + """Arguments for downloading all rosbags for a fault.""" + + entity_id: str = Field( + ..., + description="The entity identifier", + ) + fault_code: str = Field( + ..., + description="The fault code", + ) + entity_type: str = Field( + default="apps", + description="Entity type: 'components', 'apps', 'areas', or 'functions'", + ) + output_dir: str = Field( + default="/tmp", + description="Directory to save the downloaded files", + ) + + class ToolResult(BaseModel): """Standard result wrapper for tool responses.""" diff --git a/tests/test_bulkdata_tools.py b/tests/test_bulkdata_tools.py new file mode 100644 index 0000000..2b6d12d --- /dev/null +++ b/tests/test_bulkdata_tools.py @@ -0,0 +1,544 @@ +"""Tests for bulk-data MCP tools and models.""" + +import tempfile +from pathlib import Path + +import httpx +import pytest +import respx + +from ros2_medkit_mcp.client import SovdClient, SovdClientError +from ros2_medkit_mcp.config import Settings +from ros2_medkit_mcp.mcp_app import ( + download_rosbags_for_fault, + format_bulkdata_categories, + format_bulkdata_info, + format_bulkdata_list, + save_bulk_data_file, +) +from ros2_medkit_mcp.models import ( + BulkDataCategoriesArgs, + BulkDataCategoryResponse, + BulkDataDownloadArgs, + BulkDataDownloadForFaultArgs, + BulkDataInfoArgs, + BulkDataItem, + BulkDataListArgs, + BulkDataListResponse, +) + + +@pytest.fixture +def settings() -> Settings: + """Create test settings.""" + return Settings( + base_url="http://test-sovd:8080/api/v1", + bearer_token=None, + timeout_seconds=5.0, + ) + + +@pytest.fixture +def client(settings: Settings) -> SovdClient: + """Create test client.""" + return SovdClient(settings) + + +class TestBulkDataModels: + """Tests for bulk-data Pydantic models.""" + + def test_bulk_data_item(self) -> None: + """Test BulkDataItem model.""" + data = { + "id": "550e8400-uuid", + "name": "MOTOR_OVERHEAT recording", + "mimetype": "application/x-mcap", + "size": 2097152, + "creationDate": "2026-02-04T10:00:00Z", + } + + item = BulkDataItem.model_validate(data) + + assert item.id == "550e8400-uuid" + assert item.name == "MOTOR_OVERHEAT recording" + assert item.mimetype == "application/x-mcap" + assert item.size == 2097152 + assert item.creation_date == "2026-02-04T10:00:00Z" + + def test_bulk_data_item_minimal(self) -> None: + """Test BulkDataItem with minimal fields.""" + data = {"id": "uuid-123"} + + item = BulkDataItem.model_validate(data) + + assert item.id == "uuid-123" + assert item.name is None + assert item.mimetype == "application/octet-stream" + assert item.size is None + + def test_bulk_data_category_response(self) -> None: + """Test BulkDataCategoryResponse model.""" + data = {"items": ["rosbags", "logs"]} + + response = BulkDataCategoryResponse.model_validate(data) + + assert "rosbags" in response.items + assert "logs" in response.items + assert len(response.items) == 2 + + def test_bulk_data_category_response_empty(self) -> None: + """Test BulkDataCategoryResponse with empty items.""" + data = {"items": []} + + response = BulkDataCategoryResponse.model_validate(data) + + assert response.items == [] + + def test_bulk_data_list_response(self) -> None: + """Test BulkDataListResponse model.""" + data = { + "items": [ + {"id": "uuid-1", "name": "File 1", "mimetype": "application/x-mcap"}, + {"id": "uuid-2", "name": "File 2", "mimetype": "application/x-mcap"}, + ] + } + + response = BulkDataListResponse.model_validate(data) + + assert len(response.items) == 2 + assert response.items[0].id == "uuid-1" + assert response.items[1].name == "File 2" + + +class TestBulkDataArgModels: + """Tests for bulk-data argument models.""" + + def test_bulk_data_categories_args(self) -> None: + """Test BulkDataCategoriesArgs model.""" + args = BulkDataCategoriesArgs(entity_id="motor_controller") + + assert args.entity_id == "motor_controller" + assert args.entity_type == "apps" # Default + + def test_bulk_data_list_args(self) -> None: + """Test BulkDataListArgs model.""" + args = BulkDataListArgs( + entity_id="motor_controller", category="rosbags", entity_type="components" + ) + + assert args.entity_id == "motor_controller" + assert args.category == "rosbags" + assert args.entity_type == "components" + + def test_bulk_data_info_args(self) -> None: + """Test BulkDataInfoArgs model.""" + args = BulkDataInfoArgs(bulk_data_uri="/apps/motor/bulk-data/rosbags/uuid") + + assert args.bulk_data_uri == "/apps/motor/bulk-data/rosbags/uuid" + + def test_bulk_data_download_args(self) -> None: + """Test BulkDataDownloadArgs model.""" + args = BulkDataDownloadArgs( + bulk_data_uri="/apps/motor/bulk-data/rosbags/uuid", output_dir="/home/user/downloads" + ) + + assert args.bulk_data_uri == "/apps/motor/bulk-data/rosbags/uuid" + assert args.output_dir == "/home/user/downloads" + + def test_bulk_data_download_args_default_dir(self) -> None: + """Test BulkDataDownloadArgs default output_dir.""" + args = BulkDataDownloadArgs(bulk_data_uri="/apps/motor/bulk-data/rosbags/uuid") + + assert args.output_dir == "/tmp" + + def test_bulk_data_download_for_fault_args(self) -> None: + """Test BulkDataDownloadForFaultArgs model.""" + args = BulkDataDownloadForFaultArgs( + entity_id="motor_controller", + fault_code="MOTOR_OVERHEAT", + entity_type="apps", + output_dir="/tmp/faults", + ) + + assert args.entity_id == "motor_controller" + assert args.fault_code == "MOTOR_OVERHEAT" + assert args.entity_type == "apps" + assert args.output_dir == "/tmp/faults" + + +class TestFormatFunctions: + """Tests for bulk-data formatting functions.""" + + def test_format_bulkdata_categories(self) -> None: + """Test format_bulkdata_categories function.""" + categories = ["rosbags", "logs"] + result = format_bulkdata_categories(categories, "motor_controller") + + assert len(result) == 1 + assert "motor_controller" in result[0].text + assert "rosbags" in result[0].text + assert "logs" in result[0].text + + def test_format_bulkdata_categories_empty(self) -> None: + """Test format_bulkdata_categories with empty list.""" + result = format_bulkdata_categories([], "motor_controller") + + assert len(result) == 1 + assert "No bulk-data categories" in result[0].text + + def test_format_bulkdata_list(self) -> None: + """Test format_bulkdata_list function.""" + items = [ + { + "id": "uuid-1", + "name": "MOTOR_OVERHEAT recording", + "mimetype": "application/x-mcap", + "size": 1048576, + "creationDate": "2026-02-04T10:00:00Z", + }, + { + "id": "uuid-2", + "name": "LOW_BATTERY recording", + "mimetype": "application/x-mcap", + "size": 2097152, + }, + ] + result = format_bulkdata_list(items, "motor_controller", "rosbags") + + text = result[0].text + assert "2 total" in text + assert "uuid-1" in text + assert "uuid-2" in text + assert "MOTOR_OVERHEAT" in text + assert "1.00 MB" in text + assert "2.00 MB" in text + assert "2026-02-04" in text + + def test_format_bulkdata_list_empty(self) -> None: + """Test format_bulkdata_list with empty list.""" + result = format_bulkdata_list([], "motor_controller", "rosbags") + + assert "No rosbags available" in result[0].text + + def test_format_bulkdata_info(self) -> None: + """Test format_bulkdata_info function.""" + info = { + "uri": "/apps/motor/bulk-data/rosbags/uuid", + "filename": "MOTOR_OVERHEAT.mcap", + "content_type": "application/x-mcap", + "content_length": "1048576", + } + result = format_bulkdata_info(info) + + text = result[0].text + assert "/apps/motor/bulk-data/rosbags/uuid" in text + assert "MOTOR_OVERHEAT.mcap" in text + assert "application/x-mcap" in text + assert "1.00 MB" in text + + def test_format_bulkdata_info_minimal(self) -> None: + """Test format_bulkdata_info with minimal info.""" + info = {"uri": "/test/uri", "content_type": "application/octet-stream"} + result = format_bulkdata_info(info) + + text = result[0].text + assert "/test/uri" in text + assert "application/octet-stream" in text + + +class TestSaveBulkDataFile: + """Tests for save_bulk_data_file function.""" + + def test_save_with_filename(self) -> None: + """Test saving file with provided filename.""" + content = b"fake rosbag content" + + with tempfile.TemporaryDirectory() as tmpdir: + result = save_bulk_data_file( + content, "test_file.mcap", "/apps/motor/bulk-data/rosbags/uuid", tmpdir + ) + + assert "Downloaded successfully" in result[0].text + assert "test_file.mcap" in result[0].text + + file_path = Path(tmpdir) / "test_file.mcap" + assert file_path.exists() + assert file_path.read_bytes() == content + + def test_save_without_filename(self) -> None: + """Test saving file without provided filename (extracted from URI).""" + content = b"fake rosbag content" + + with tempfile.TemporaryDirectory() as tmpdir: + result = save_bulk_data_file( + content, None, "/apps/motor/bulk-data/rosbags/my-uuid-123", tmpdir + ) + + assert "Downloaded successfully" in result[0].text + # Should use last URI component with .mcap extension + assert "my-uuid-123.mcap" in result[0].text + + def test_save_creates_directory(self) -> None: + """Test that output directory is created if not exists.""" + content = b"test content" + + with tempfile.TemporaryDirectory() as tmpdir: + nested_dir = Path(tmpdir) / "nested" / "directory" + assert not nested_dir.exists() + + result = save_bulk_data_file(content, "test.mcap", "/test/uri", str(nested_dir)) + + assert "Downloaded successfully" in result[0].text + assert nested_dir.exists() + assert (nested_dir / "test.mcap").exists() + + +class TestClientBulkDataMethods: + """Tests for SovdClient bulk-data methods.""" + + @respx.mock + async def test_list_bulk_data_categories(self, client: SovdClient) -> None: + """Test list_bulk_data_categories method.""" + respx.get("http://test-sovd:8080/api/v1/apps/motor/bulk-data").mock( + return_value=httpx.Response(200, json={"items": ["rosbags", "logs"]}) + ) + + result = await client.list_bulk_data_categories("motor", "apps") + + assert result == ["rosbags", "logs"] + await client.close() + + @respx.mock + async def test_list_bulk_data(self, client: SovdClient) -> None: + """Test list_bulk_data method.""" + items = [ + {"id": "uuid-1", "name": "File 1"}, + {"id": "uuid-2", "name": "File 2"}, + ] + respx.get("http://test-sovd:8080/api/v1/apps/motor/bulk-data/rosbags").mock( + return_value=httpx.Response(200, json={"items": items}) + ) + + result = await client.list_bulk_data("motor", "rosbags", "apps") + + assert len(result) == 2 + assert result[0]["id"] == "uuid-1" + await client.close() + + @respx.mock + async def test_get_bulk_data_info(self, client: SovdClient) -> None: + """Test get_bulk_data_info method.""" + respx.head("http://test-sovd:8080/api/v1/apps/motor/bulk-data/rosbags/uuid").mock( + return_value=httpx.Response( + 200, + headers={ + "Content-Type": "application/x-mcap", + "Content-Length": "1048576", + "Content-Disposition": 'attachment; filename="test.mcap"', + }, + ) + ) + + result = await client.get_bulk_data_info("/apps/motor/bulk-data/rosbags/uuid") + + assert result["content_type"] == "application/x-mcap" + assert result["content_length"] == "1048576" + assert result["filename"] == "test.mcap" + await client.close() + + @respx.mock + async def test_get_bulk_data_info_not_found(self, client: SovdClient) -> None: + """Test get_bulk_data_info with 404.""" + respx.head("http://test-sovd:8080/api/v1/apps/motor/bulk-data/rosbags/uuid").mock( + return_value=httpx.Response(404) + ) + + with pytest.raises(SovdClientError) as exc_info: + await client.get_bulk_data_info("/apps/motor/bulk-data/rosbags/uuid") + + assert exc_info.value.status_code == 404 + await client.close() + + @respx.mock + async def test_download_bulk_data(self, client: SovdClient) -> None: + """Test download_bulk_data method.""" + content = b"fake rosbag content" * 100 + respx.get("http://test-sovd:8080/api/v1/apps/motor/bulk-data/rosbags/uuid").mock( + return_value=httpx.Response( + 200, + content=content, + headers={"Content-Disposition": 'attachment; filename="test.mcap"'}, + ) + ) + + result_content, filename = await client.download_bulk_data( + "/apps/motor/bulk-data/rosbags/uuid" + ) + + assert result_content == content + assert filename == "test.mcap" + await client.close() + + @respx.mock + async def test_download_bulk_data_no_filename(self, client: SovdClient) -> None: + """Test download_bulk_data without Content-Disposition.""" + content = b"fake rosbag content" + respx.get("http://test-sovd:8080/api/v1/apps/motor/bulk-data/rosbags/uuid").mock( + return_value=httpx.Response(200, content=content) + ) + + result_content, filename = await client.download_bulk_data( + "/apps/motor/bulk-data/rosbags/uuid" + ) + + assert result_content == content + assert filename is None + await client.close() + + +class TestDownloadRosbagsForFault: + """Tests for download_rosbags_for_fault function.""" + + @respx.mock + async def test_download_rosbags_success(self, client: SovdClient) -> None: + """Test downloading rosbags for a fault.""" + fault_response = { + "item": {"code": "MOTOR_OVERHEAT", "faultName": "Motor Overheating"}, + "environmentData": { + "extendedDataRecords": { + "freezeFrameSnapshots": [], + "rosbagSnapshots": [ + { + "snapshotId": "rb-1", + "timestamp": "2026-02-04T10:00:00Z", + "bulkDataUri": "/apps/motor/bulk-data/rosbags/rb-1", + }, + { + "snapshotId": "rb-2", + "timestamp": "2026-02-04T10:01:00Z", + "bulkDataUri": "/apps/motor/bulk-data/rosbags/rb-2", + }, + ], + } + }, + } + + respx.get("http://test-sovd:8080/api/v1/apps/motor/faults/MOTOR_OVERHEAT").mock( + return_value=httpx.Response(200, json=fault_response) + ) + respx.get("http://test-sovd:8080/api/v1/apps/motor/bulk-data/rosbags/rb-1").mock( + return_value=httpx.Response( + 200, + content=b"rosbag1", + headers={"Content-Disposition": 'filename="fault1.mcap"'}, + ) + ) + respx.get("http://test-sovd:8080/api/v1/apps/motor/bulk-data/rosbags/rb-2").mock( + return_value=httpx.Response( + 200, + content=b"rosbag2", + headers={"Content-Disposition": 'filename="fault2.mcap"'}, + ) + ) + + with tempfile.TemporaryDirectory() as tmpdir: + result = await download_rosbags_for_fault( + client, "motor", "MOTOR_OVERHEAT", "apps", tmpdir + ) + + text = result[0].text + assert "MOTOR_OVERHEAT" in text + assert "Successfully downloaded (2)" in text + assert "fault1.mcap" in text + assert "fault2.mcap" in text + + assert (Path(tmpdir) / "fault1.mcap").exists() + assert (Path(tmpdir) / "fault2.mcap").exists() + + await client.close() + + @respx.mock + async def test_download_only_freeze_frames(self, client: SovdClient) -> None: + """Test fault with only freeze frames (no rosbags).""" + fault_response = { + "item": {"code": "MINOR_FAULT"}, + "environmentData": { + "extendedDataRecords": { + "freezeFrameSnapshots": [ + {"snapshotId": "ff-1", "timestamp": "2026-02-04T10:00:00Z", "data": {}} + ], + "rosbagSnapshots": [], + } + }, + } + + respx.get("http://test-sovd:8080/api/v1/apps/motor/faults/MINOR_FAULT").mock( + return_value=httpx.Response(200, json=fault_response) + ) + + result = await download_rosbags_for_fault(client, "motor", "MINOR_FAULT", "apps", "/tmp") + + text = result[0].text + assert "only freeze frame snapshots" in text + assert "1 total" in text + + await client.close() + + @respx.mock + async def test_download_no_environment_data(self, client: SovdClient) -> None: + """Test fault without environment data.""" + fault_response = {"item": {"code": "NO_ENV_FAULT"}} + + respx.get("http://test-sovd:8080/api/v1/apps/motor/faults/NO_ENV_FAULT").mock( + return_value=httpx.Response(200, json=fault_response) + ) + + result = await download_rosbags_for_fault(client, "motor", "NO_ENV_FAULT", "apps", "/tmp") + + assert "No environment data found" in result[0].text + + await client.close() + + @respx.mock + async def test_download_with_errors(self, client: SovdClient) -> None: + """Test downloading with some failures.""" + fault_response = { + "item": {"code": "TEST_FAULT"}, + "environmentData": { + "extendedDataRecords": { + "freezeFrameSnapshots": [], + "rosbagSnapshots": [ + { + "snapshotId": "rb-ok", + "timestamp": "2026-02-04T10:00:00Z", + "bulkDataUri": "/apps/motor/bulk-data/rosbags/rb-ok", + }, + { + "snapshotId": "rb-fail", + "timestamp": "2026-02-04T10:01:00Z", + "bulkDataUri": "/apps/motor/bulk-data/rosbags/rb-fail", + }, + ], + } + }, + } + + respx.get("http://test-sovd:8080/api/v1/apps/motor/faults/TEST_FAULT").mock( + return_value=httpx.Response(200, json=fault_response) + ) + respx.get("http://test-sovd:8080/api/v1/apps/motor/bulk-data/rosbags/rb-ok").mock( + return_value=httpx.Response(200, content=b"ok") + ) + respx.get("http://test-sovd:8080/api/v1/apps/motor/bulk-data/rosbags/rb-fail").mock( + return_value=httpx.Response(500, text="Server Error") + ) + + with tempfile.TemporaryDirectory() as tmpdir: + result = await download_rosbags_for_fault(client, "motor", "TEST_FAULT", "apps", tmpdir) + + text = result[0].text + assert "Successfully downloaded (1)" in text + assert "Errors (1)" in text + assert "rb-fail" in text + + await client.close() From a62c7d9e65c41fe1e623f64cae774a538f9360e3 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Fri, 6 Feb 2026 17:09:33 +0000 Subject: [PATCH 3/3] fix: address PR review comments - Add path traversal sanitization in save_bulk_data_file and download_rosbags_for_fault - Add SSRF protection: reject absolute URLs in download_bulk_data - Handle all non-2xx responses in get_bulk_data_info (not just 404) - Remove unused BulkDataDescriptor model --- src/ros2_medkit_mcp/client.py | 15 +++++++++++---- src/ros2_medkit_mcp/mcp_app.py | 23 +++++++++++++++++++---- src/ros2_medkit_mcp/models.py | 31 ------------------------------- 3 files changed, 30 insertions(+), 39 deletions(-) diff --git a/src/ros2_medkit_mcp/client.py b/src/ros2_medkit_mcp/client.py index 2bd4017..8a4bbad 100644 --- a/src/ros2_medkit_mcp/client.py +++ b/src/ros2_medkit_mcp/client.py @@ -1000,10 +1000,10 @@ async def get_bulk_data_info(self, bulk_data_uri: str) -> dict[str, Any]: client = await self._ensure_client() response = await client.head(bulk_data_uri) - if response.status_code == 404: + if not response.is_success: raise SovdClientError( - message=f"Bulk data not found: {bulk_data_uri}", - status_code=404, + message=f"Bulk data not found: {bulk_data_uri} (HTTP {response.status_code})", + status_code=response.status_code, ) headers = response.headers @@ -1027,11 +1027,18 @@ async def download_bulk_data(self, bulk_data_uri: str) -> tuple[bytes, str | Non """Download a bulk-data file. Args: - bulk_data_uri: Full bulk-data URI path. + bulk_data_uri: Relative bulk-data URI path (must start with /). Returns: Tuple of (file_content, filename). + + Raises: + ValueError: If the URI is an absolute URL (SSRF protection). """ + # SSRF protection: reject absolute URLs - only allow relative paths + if bulk_data_uri.startswith(("http://", "https://", "//")): + raise ValueError(f"Absolute URLs not allowed for bulk data download: {bulk_data_uri}") + client = await self._ensure_client() response = await client.get(bulk_data_uri, timeout=httpx.Timeout(300.0)) diff --git a/src/ros2_medkit_mcp/mcp_app.py b/src/ros2_medkit_mcp/mcp_app.py index 1c6086c..9c89c05 100644 --- a/src/ros2_medkit_mcp/mcp_app.py +++ b/src/ros2_medkit_mcp/mcp_app.py @@ -419,7 +419,7 @@ def save_bulk_data_file( Returns: Formatted TextContent list with download result. """ - output_path = Path(output_dir) + output_path = Path(output_dir).resolve() output_path.mkdir(parents=True, exist_ok=True) # Generate filename if not provided @@ -431,7 +431,16 @@ def save_bulk_data_file( if "." not in filename: filename += ".mcap" - file_path = output_path / filename + # Sanitize filename to prevent path traversal + safe_filename = Path(filename).name + if not safe_filename: + safe_filename = "download.mcap" + + file_path = (output_path / safe_filename).resolve() + # Ensure the resolved path is still within output_dir + if not str(file_path).startswith(str(output_path)): + raise ValueError(f"Path traversal detected in filename: {filename}") + file_path.write_bytes(content) size_mb = len(content) / (1024 * 1024) @@ -507,7 +516,7 @@ async def download_rosbags_for_fault( ) ] - output_path = Path(output_dir) + output_path = Path(output_dir).resolve() output_path.mkdir(parents=True, exist_ok=True) downloaded: list[str] = [] @@ -527,7 +536,13 @@ async def download_rosbags_for_fault( if not filename: filename = f"{snap_id}.mcap" - file_path = output_path / filename + # Sanitize filename to prevent path traversal + safe_filename = Path(filename).name or f"{snap_id}.mcap" + file_path = (output_path / safe_filename).resolve() + if not str(file_path).startswith(str(output_path)): + errors.append(f" - {snap_id}: Path traversal detected in filename") + continue + file_path.write_bytes(content) size_mb = len(content) / (1024 * 1024) diff --git a/src/ros2_medkit_mcp/models.py b/src/ros2_medkit_mcp/models.py index deef180..444aac4 100644 --- a/src/ros2_medkit_mcp/models.py +++ b/src/ros2_medkit_mcp/models.py @@ -521,37 +521,6 @@ class FreezeFrameSnapshot(BaseModel): model_config = {"populate_by_name": True, "extra": "allow"} -class BulkDataDescriptor(BaseModel): - """Descriptor for bulk data (rosbag) with download URI.""" - - id: str = Field(..., description="Bulk data identifier") - category: str | None = Field( - default=None, - description="Data category (e.g., 'rosbag', 'snapshot')", - ) - bulk_data_uri: str = Field( - ..., - alias="bulkDataUri", - description="URI to download the bulk data file", - ) - file_size: int | None = Field( - default=None, - alias="fileSize", - description="File size in bytes", - ) - is_available: bool = Field( - default=True, - alias="isAvailable", - description="Whether the file is available for download", - ) - timestamp: str | None = Field( - default=None, - description="ISO 8601 timestamp when the data was captured", - ) - - model_config = {"populate_by_name": True, "extra": "allow"} - - class RosbagSnapshot(BaseModel): """Rosbag snapshot with bulk data download URI."""