Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rai_core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "rai_core"
version = "2.8.1"
version = "2.8.2"
description = "Core functionality for RAI framework"
authors = ["Maciej Majek <maciej.majek@robotec.ai>", "Bartłomiej Boczek <bartlomiej.boczek@robotec.ai>", "Kajetan Rachwał <kajetan.rachwal@robotec.ai>"]
readme = "README.md"
Expand Down
71 changes: 67 additions & 4 deletions src/rai_core/rai/tools/ros2/navigation/nav2_blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from typing import Type

from action_msgs.msg import GoalStatus
from geometry_msgs.msg import PoseStamped, Quaternion
from nav2_msgs.action import NavigateToPose
from pydantic import BaseModel, Field
Expand All @@ -23,6 +24,39 @@
from rai.tools.ros2.base import BaseROS2Tool


def _get_status_string(status: int) -> str:
"""Convert GoalStatus to human-readable string.

Reference: https://github.com/ros2/rcl_interfaces/blob/rolling/action_msgs/msg/GoalStatus.msg
"""
status_map = {
GoalStatus.STATUS_UNKNOWN: "UNKNOWN",
GoalStatus.STATUS_ACCEPTED: "ACCEPTED",
GoalStatus.STATUS_EXECUTING: "EXECUTING",
GoalStatus.STATUS_CANCELING: "CANCELING",
GoalStatus.STATUS_SUCCEEDED: "SUCCEEDED",
GoalStatus.STATUS_CANCELED: "CANCELED",
GoalStatus.STATUS_ABORTED: "ABORTED",
}
return status_map.get(status, f"UNKNOWN_STATUS_{status}")


def _get_error_message(result_response) -> str:
"""Extract error message from NavigateToPose result response.

Args:
result_response: GetResultService.Response containing status and result fields
- status (int8): GoalStatus value (how action ended)
- result.error_msg (string): Nav2 error details (why it failed)
"""
status_str = _get_status_string(result_response.status)

if result_response.result.error_msg:
return f"Status: {status_str}. {result_response.result.error_msg}"

return f"Status: {status_str}"


class GetCurrentPoseToolInput(BaseModel):
pass

Expand Down Expand Up @@ -81,12 +115,41 @@ def _run(self, x: float, y: float, z: float, yaw: float) -> str:
goal = NavigateToPose.Goal()
goal.pose = pose

result = action_client.send_goal(goal)
try:
result_response = action_client.send_goal(goal)
except Exception as e:
return f"Navigate to pose action failed with exception: {type(e).__name__}: {e}"

if result is None:
if result_response is None:
return "Navigate to pose action failed. Please try again."

if result.result.error_code != 0:
return f"Navigate to pose action failed. Error code: {result.result.error_code}"
# Validate response structure for debugging
try:
# Check status field (used for success/failure)
if not hasattr(result_response, "status"):
return (
f"Navigate to pose action failed. Response missing 'status' field. "
f"Response type: {type(result_response).__name__}"
)

# Check error_code field (original error source)
if hasattr(result_response, "result") and not hasattr(
result_response.result, "error_code"
):
return (
f"Navigate to pose action failed. Result missing 'error_code' field. "
f"Result type: {type(result_response.result).__name__}"
)

# Check goal status (SUCCEEDED = 4)
if result_response.status != GoalStatus.STATUS_SUCCEEDED:
error_msg = _get_error_message(result_response)
return f"Navigate to pose action failed. {error_msg}"

except AttributeError as e:
return (
f"Navigate to pose action failed. AttributeError: {e}. "
f"Response type: {type(result_response).__name__}"
)

return "Navigate to pose successful."
133 changes: 110 additions & 23 deletions tests/communication/ros2/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time
import uuid
from typing import Any, Generator, List, Optional, Tuple
from unittest.mock import MagicMock

import numpy as np
import pytest
Expand Down Expand Up @@ -339,61 +340,147 @@ def shutdown_executors_and_threads(
all_nodes = []
for executor in executors:
try:
all_nodes.extend(executor.get_nodes())
except Exception:
if executor is not None:
nodes = executor.get_nodes()
if nodes is not None:
all_nodes.extend(nodes)
except (AttributeError, RuntimeError, Exception):
# Executor may be in invalid state, skip
pass

# Signal action servers to stop executing
action_servers = []
for node in all_nodes:
try:
if isinstance(node, TestActionServer):
if node is not None and isinstance(node, TestActionServer):
node._shutdown_requested = True
action_servers.append(node)
except Exception:
except (AttributeError, RuntimeError, Exception):
pass

# Wait for actions to complete gracefully (with timeout as fallback)
if action_servers:
for server in action_servers:
server._all_actions_complete.wait(timeout=0.15)
try:
if server is not None and hasattr(server, "_all_actions_complete"):
server._all_actions_complete.wait(timeout=0.15)
except (AttributeError, RuntimeError, Exception):
pass

# Cancel all timers BEFORE shutting down executors
# This prevents executors from trying to call timers after they're canceled
for node in all_nodes:
try:
# Try to access and cancel timers through node's internal structure
if hasattr(node, "_timers"):
timers = node._timers
if isinstance(timers, dict):
for timer in list(timers.values()):
try:
timer.cancel()
except Exception:
pass
except Exception:
if node is not None:
# Try to access and cancel timers through node's internal structure
if hasattr(node, "_timers"):
timers = node._timers
if timers is not None and isinstance(timers, dict):
for timer in list(timers.values()):
try:
if timer is not None:
timer.cancel()
except (AttributeError, RuntimeError, Exception):
pass
except (AttributeError, RuntimeError, Exception):
pass

# Give executor a moment to process timer cancellations and remove them from wait list
time.sleep(0.1)
# Use shorter sleep to reduce chance of accessing invalid state
try:
time.sleep(0.05)
except Exception:
pass

# Now shutdown executors to stop spinning threads
for executor in executors:
try:
executor.shutdown()
except Exception as e:
print(f"Error shutting down executor: {e}")
if executor is not None:
executor.shutdown()
except (AttributeError, RuntimeError, Exception):
# Executor may already be shut down or in invalid state
pass

# Wait for threads to actually finish (shutdown() is async)
for thread in threads:
thread.join(timeout=2.0)
try:
if thread is not None and thread.is_alive():
thread.join(timeout=2.0)
except (AttributeError, RuntimeError, Exception):
pass

# Clean up any remaining nodes (after executors are shut down and threads joined)
# Only destroy nodes that haven't been destroyed yet
for node in all_nodes:
try:
node.destroy_node()
except Exception as e:
print(f"Error destroying node: {e}")
if node is not None:
# Check if node is still valid before destroying
# In ROS2 Humble, accessing destroyed nodes can cause segfaults
try:
# Try a safe operation to check if node is still valid
_ = node.get_name()
node.destroy_node()
except (AttributeError, RuntimeError):
# Node already destroyed or invalid, skip
pass
except (AttributeError, RuntimeError, Exception):
pass


def _create_time_with_to_msg_class():
"""Create a TimeWithToMsg class for ROS2 Humble/Jazzy compatibility.

ROS2 Humble vs Jazzy difference:
- Humble: Strict type checking in __debug__ mode requires actual BuiltinTime
instances, not MagicMock objects. Using MagicMock causes AssertionError.
- Jazzy: More lenient with MagicMock, but BuiltinTime instances don't allow
dynamically adding methods (AttributeError when accessing to_msg).

Solution: Create a wrapper class that inherits from BuiltinTime and adds to_msg().
This is used by both create_mock_connector_with_clock() and setup_mock_clock()
in test_grounding_dino.py.
"""
from builtin_interfaces.msg import Time as BuiltinTime

class TimeWithToMsg(BuiltinTime):
"""BuiltinTime wrapper that adds to_msg() method for compatibility."""

def to_msg(self):
return self

return TimeWithToMsg


def create_mock_connector_with_clock() -> Any:
"""Create a mock ROS2Connector with properly configured clock for testing.

This helper creates a mock connector that works with ROS2 Humble's strict
type checking for Time messages. The clock's now().to_msg() returns a real
builtin_interfaces.msg.Time instance that can be assigned to message stamps.

This is similar to setup_mock_clock() in test_grounding_dino.py but creates
a new mock connector instead of modifying an existing agent's connector.

Returns:
MagicMock: A mock ROS2Connector with node and clock configured.
"""
from rai.communication.ros2.connectors import ROS2Connector

TimeWithToMsg = _create_time_with_to_msg_class()

connector = MagicMock(spec=ROS2Connector)
mock_node = MagicMock()
mock_clock = MagicMock()
mock_time = MagicMock()

# Create a TimeWithToMsg instance (passes isinstance checks and has to_msg())
mock_ts = TimeWithToMsg()
mock_time.to_msg.return_value = mock_ts
mock_clock.now.return_value = mock_time
mock_node.get_clock.return_value = mock_clock

connector.node = mock_node
return connector


@pytest.fixture(scope="function")
Expand Down
98 changes: 97 additions & 1 deletion tests/tools/ros2/test_nav2_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
pytest.skip("ROS2 is not installed", allow_module_level=True)


from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

from geometry_msgs.msg import TransformStamped
from rai.communication.ros2.connectors import ROS2Connector
from rai.tools.ros2.navigation.nav2_blocking import (
GetCurrentPoseTool,
NavigateToPoseBlockingTool,
_get_error_message,
_get_status_string,
)

from tests.communication.ros2.helpers import (
create_mock_connector_with_clock,
ros_setup,
)

Expand All @@ -55,3 +59,95 @@ def test_get_current_pose_tool(ros_setup: None, request: pytest.FixtureRequest)

connector.get_transform.assert_called_once_with("map", "base_link")
assert result == str(transform)


def test_get_status_string() -> None:
"""Test GoalStatus to string conversion."""
from action_msgs.msg import GoalStatus

assert _get_status_string(GoalStatus.STATUS_UNKNOWN) == "UNKNOWN"
assert _get_status_string(GoalStatus.STATUS_SUCCEEDED) == "SUCCEEDED"
assert _get_status_string(GoalStatus.STATUS_ABORTED) == "ABORTED"
assert _get_status_string(GoalStatus.STATUS_CANCELED) == "CANCELED"
assert _get_status_string(999) == "UNKNOWN_STATUS_999"


def test_get_error_message() -> None:
"""Test error message extraction with status and error_msg."""
from action_msgs.msg import GoalStatus

# Test with ABORTED status and no error message
result_response = MagicMock()
result_response.status = GoalStatus.STATUS_ABORTED
result_response.result = MagicMock()
result_response.result.error_msg = ""
error_msg = _get_error_message(result_response)
assert error_msg == "Status: ABORTED"

# Test with ABORTED status and error message
result_with_msg = MagicMock()
result_with_msg.status = GoalStatus.STATUS_ABORTED
result_with_msg.result = MagicMock()
result_with_msg.result.error_msg = "Planner failed to find path"
error_msg = _get_error_message(result_with_msg)
assert error_msg == "Status: ABORTED. Planner failed to find path"

# Test with CANCELED status
canceled_response = MagicMock()
canceled_response.status = GoalStatus.STATUS_CANCELED
canceled_response.result = MagicMock()
canceled_response.result.error_msg = ""
error_msg = _get_error_message(canceled_response)
assert error_msg == "Status: CANCELED"


def test_navigate_to_pose_blocking_tool(
ros_setup: None, request: pytest.FixtureRequest
) -> None:
"""Test navigation tool success, error, and None result scenarios."""
from action_msgs.msg import GoalStatus

connector = create_mock_connector_with_clock()
tool = NavigateToPoseBlockingTool(connector=connector)

with patch(
"rai.tools.ros2.navigation.nav2_blocking.ActionClient"
) as mock_action_client_class:
mock_action_client = MagicMock()
mock_action_client_class.return_value = mock_action_client

# Test success
success_result = MagicMock()
success_result.status = GoalStatus.STATUS_SUCCEEDED
success_result.result = MagicMock()
success_result.result.error_msg = ""
mock_action_client.send_goal.return_value = success_result
result = tool._run(x=1.0, y=2.0, z=0.0, yaw=0.5)
assert result == "Navigate to pose successful."

# Test error without message
error_result = MagicMock()
error_result.status = GoalStatus.STATUS_ABORTED
error_result.result = MagicMock()
error_result.result.error_msg = ""
mock_action_client.send_goal.return_value = error_result
result = tool._run(x=1.0, y=2.0, z=0.0, yaw=0.5)
assert result == "Navigate to pose action failed. Status: ABORTED"

# Test error with message
error_result_with_msg = MagicMock()
error_result_with_msg.status = GoalStatus.STATUS_ABORTED
error_result_with_msg.result = MagicMock()
error_result_with_msg.result.error_msg = "Planner failed"
mock_action_client.send_goal.return_value = error_result_with_msg
result = tool._run(x=1.0, y=2.0, z=0.0, yaw=0.5)
assert (
result == "Navigate to pose action failed. Status: ABORTED. Planner failed"
)

# Test None result
mock_action_client.send_goal.return_value = None
result = tool._run(x=1.0, y=2.0, z=0.0, yaw=0.5)
assert result == "Navigate to pose action failed. Please try again."

assert mock_action_client.send_goal.call_count == 4
Loading