From cd53a07ffaa45580248e4e431d7d548ca30434d0 Mon Sep 17 00:00:00 2001 From: Seongjin Bien Date: Fri, 18 Jul 2025 09:00:02 +0200 Subject: [PATCH 01/10] feat: Added basic joint-level OMPL --- pyproject.toml | 1 + python/rcs/ompl/mj_ompl.py | 410 +++++++++++++++++++++++++++++++++++++ 2 files changed, 411 insertions(+) create mode 100644 python/rcs/ompl/mj_ompl.py diff --git a/pyproject.toml b/pyproject.toml index e9906cd3..3f7a0deb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = ["websockets>=11.0", "pin==3.7.0", "tilburg-hand", "digit-interface", + "ompl>=1.7.0", ] readme = "README.md" maintainers = [ diff --git a/python/rcs/ompl/mj_ompl.py b/python/rcs/ompl/mj_ompl.py new file mode 100644 index 00000000..c9db7401 --- /dev/null +++ b/python/rcs/ompl/mj_ompl.py @@ -0,0 +1,410 @@ +import mujoco as mj +from ompl import util as ou +from ompl import base as ob +from ompl import geometric as og +from copy import deepcopy +from rcs.envs.base import GripperWrapper, RobotEnv +import sys +import os +import warnings +import xml.etree.ElementTree as ET + +print("OMPL modules imported successfully.") +DEFAULT_PLANNING_TIME = 5.0 # Default time allowed for planning in seconds +INTERPOLATE_NUM = 500 # Number of points to interpolate between start and goal states + +def get_collision_bodies(xml_file, robot_name): + """ + Extract collision bodies from the XML file for a specific robot. + + Args: + xml_file (str): Path to the XML file. + robot_name (str): Name of the robot to search for. + + Returns: + list: List of collision bodies' names found in the robot's subtree. + """ + tree = ET.parse(xml_file) + root = tree.getroot() + + # Find the top-level body with the specified robot name + robot = root.find(f".//body[@name='{robot_name}']") + + if robot is None: + raise ValueError(f"Robot '{robot_name}' not found in the XML file.") + + # Collect every in its subtree that has class="collision" + collision_bodies = robot.findall(".//body") + names = [robot_name] + for g in collision_bodies: + name = g.get("name") + if name is None: + raise ValueError(f"Body {ET.tostring(g).rstrip()}'s name is None." + "Please give it a name in the XML file.") + names.append(name) + + return names + +class MjORobot(): + + def __init__(self, + robot_name: str, + env: RobotEnv, + njoints:int=7, + robot_xml_name:str="", + robot_root_name:str="base_0", + robot_joint_name:str="fr3_joint#_0", + robot_actuator_name:str="fr3_joint#_0", + obstacle_body_names: list = None): + ''' + Initialize the robot object with the given parameters. + It is essentially a thin wrapper around the RobotEnv (i.e. MuJoCo variables), + for easily extracting the relevant bodies, joints, etc. from the MuJoCo model. + Anything that directly calls MuJoCo functions should be done through this class. + + Parameters: + - robot_name: Name of the robot. + - env: The RobotEnv environment in which the robot operates. + - njoints: Number of joints in the robot. + - robot_xml_name: Path to the robot's XML file. + This file will be used to query the s of the robot. + - robot_root_name: Name of the root body of the robot, + i.e. the top level 's name. + - robot_joint_name: Pattern of the robot's joints to be controlled, + where # will be replaced by 1 to njoints. + - robot_actuator_name: Pattern of the robot's actuators to be controlled, + where # will be replaced by 1 to njoints. + Actuators are not controlled, but is used to validate + that the number of joints and actuators match. + - obstacle_body_names: List of names of other s to be checked for collision. + If None, it will be set to an empty list. + + ''' + # First check that the xml file exists + assert os.path.isfile(robot_xml_name), \ + f"Robot XML file {robot_xml_name} does not exist. Please provide a valid path." + + self.robot_name = robot_name + self.env = env + self.model = self.env.get_wrapper_attr("sim").model + self.data = self.env.get_wrapper_attr("sim").data + + # Get the robot's collision geometries from the XML file + self.robot_body_names = get_collision_bodies(robot_xml_name, robot_root_name) + + # Query the number of joints implicitly by scanning linearly from 1 to njoints + self.joint_names = [f"{robot_joint_name.replace('#',str(i))}" for i in range(1, njoints+1)] + self.actuator_names = [f"{robot_actuator_name.replace('#',str(i))}" for i in range(1, njoints+1)] + + # Grab the collision bodies, joint, and actuator IDs from the model + self.robot_body_ids = set([mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) \ + for name in self.robot_body_names \ + if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) > -1]) + self.joint_ids = [mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_JOINT, name) \ + for name in self.joint_names \ + if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_JOINT, name) > -1] + self.actuator_ids = [mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_ACTUATOR, name) \ + for name in self.actuator_names \ + if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_ACTUATOR, name) > -1] + self._set_obstacle_body_ids(obstacle_body_names) + + # Check if the number of joints, links, and actuators match + assert len(self.joint_ids) == len(self.actuator_ids), \ + f"Mismatch in number of joints and actuators for robot {robot_name}." + + self.njoints = len(self.joint_ids) + + def _set_obstacle_body_ids(self, obstacle_body_names: list): + """ + Set the obstacle bodies for the robot. + + Args: + obstacle_body_names (list): List of names of other bodies to be checked for path planning. + """ + self.obstacle_body_ids = set([mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) \ + for name in obstacle_body_names \ + if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) > -1]) + + def _remove_obstacle_body_ids(self, obstacle_body_names: list|str): + """ + Remove specified obstacle bodies from the robot's obstacle checks, given their names. + + Args: + obstacle_body_names (list): List of names of bodies to be removed from obstacle checks. + """ + obstacle_body_names = obstacle_body_names if isinstance(obstacle_body_names, list) else [obstacle_body_names] + for name in obstacle_body_names: + body_id = mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) + if body_id in self.obstacle_body_ids: + self.obstacle_body_ids.remove(body_id) + else: + warnings.warn(f"obstacle body name {name} does not exist in the model. Skipping removal.") + + def _add_obstacle_body_ids(self, obstacle_body_names: list|str): + """ + Add specified obstacle bodies to the robot's obstacle checks, given their names. + + Args: + obstacle_body_names (list): List of names of bodies to be added to obstacle checks. + """ + obstacle_body_names = obstacle_body_names if isinstance(obstacle_body_names, list) else [obstacle_body_names] + for name in obstacle_body_names: + body_id = mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) + if body_id > -1: + self.obstacle_body_ids.add(body_id) + else: + warnings.warn("obstacle body name {name} does not exist in the model. Skipping addition.") + + def check_collision(self, qpos: ob.State): + ''' + Checks for collisions when the robot is set to the given joint positions. + This is done by setting the joint positions, running mj_fwdPosition and mj_collision, + then resetting the simulation to the original values. + + Args: + qpos (list): List of joint positions to set, ordered by base to end effector. + Returns: + bool: True if there is a collision, False otherwise. + ''' + qpos + if len(qpos) != self.njoints: + raise ValueError(f"Expected {self.njoints} joint positions, got {len(qpos)}.") + + qpos_init = deepcopy(self.data.qpos) + qvel_init = deepcopy(self.data.qvel) + # Set the joint positions + for i, joint_id in enumerate(self.joint_ids): + self.data.qpos[joint_id] = qpos[i] + + mj.mj_fwdPosition(self.model, self.data) + mj.mj_collision(self.model, self.data) + + has_collision = False + for c in self.data.contact: + b1 = self.model.geom_bodyid[c.geom1] + b2 = self.model.geom_bodyid[c.geom2] + # Check for collisions between robot and other bodies (first condition) + # or self-collisions (second condition) + if ((b1 in self.robot_body_ids or b2 in self.robot_body_ids) and \ + (b1 in self.obstacle_body_ids or b2 in self.obstacle_body_ids)) or \ + (b1 in self.robot_body_ids and b2 in self.robot_body_ids): + has_collision = True + break + + # Reset the simulation + self.data.qpos = qpos_init + self.data.qvel = qvel_init + mj.mj_forward(self.model, self.data) + return has_collision + + def get_joint_positions(self): + """ + Get the current joint positions of the robot. + + Returns: + list: Current joint positions. + """ + # Deepcopy might not be necessary + return [deepcopy(self.data.qpos[joint_id]) for joint_id in self.joint_ids] + + +class MjOStateSpace(ob.RealVectorStateSpace): + def __init__(self, num_dim) -> None: + super().__init__(num_dim) + self.num_dim = num_dim + self.state_sampler = None + + def allocStateSampler(self): + ''' + This will be called by the internal OMPL planner + ''' + # WARN: This will cause problems if the underlying planner is multi-threaded!!! + if self.state_sampler: + return self.state_sampler + + # when ompl planner calls this, we will return our sampler + return self.allocDefaultStateSampler() + + def set_state_sampler(self, state_sampler): + ''' + Optional, Set custom state sampler. + ''' + self.state_sampler = state_sampler + +class MjOMPL(): + def __init__(self, + robot:MjORobot=None, + robot_name:str=None, + robot_xml_name:str=None, + robot_root_name:str=None, + robot_joint_name:str=None, + robot_actuator_name:str=None, + njoints:int=None, + robot_env: RobotEnv = None, + obstacle_body_names: list = None): + + if(robot is None): + if robot_name is None or robot_xml_name is None or robot_root_name is None or \ + robot_joint_name is None or robot_actuator_name is None or njoints is None: + raise ValueError("If robot is not provided, all parameters must be provided.") + # Create a new MjORobot instance + robot = MjORobot(robot_name, + robot_env, + njoints=njoints, + robot_xml_name=robot_xml_name, + robot_root_name=robot_root_name, + robot_joint_name=robot_joint_name, + robot_actuator_name=robot_actuator_name, + obstacle_body_names=obstacle_body_names) + + self.robot = robot + self.interpolate_num = INTERPOLATE_NUM + + # Create the planning space + self.space = MjOStateSpace(robot.njoints) + + # Create bounds for the space based on robot's joint limits + bounds = ob.RealVectorBounds(robot.njoints) + joint_bounds = [self.robot.model.jnt_range[joint_id] for joint_id in self.robot.joint_ids] + for i, bound in enumerate(joint_bounds): + bounds.setLow(i, bound[0]) + bounds.setHigh(i, bound[1]) + self.space.setBounds(bounds) + + self.ss = og.SimpleSetup(self.space) + self.ss.setStateValidityChecker(ob.StateValidityCheckerFn(self.is_state_valid)) + self.si = self.ss.getSpaceInformation() + self.set_planner("RRT") # Default planner + + def is_state_valid(self, state): + return not self.robot.check_collision(self.state_to_list(state)) + + def set_planner(self, planner_name): + ''' + Note: Add your planner here!! + ''' + if planner_name == "PRM": + self.planner = og.PRM(self.ss.getSpaceInformation()) + elif planner_name == "RRT": + self.planner = og.RRT(self.ss.getSpaceInformation()) + elif planner_name == "RRTConnect": + self.planner = og.RRTConnect(self.ss.getSpaceInformation()) + elif planner_name == "RRTstar": + self.planner = og.RRTstar(self.ss.getSpaceInformation()) + elif planner_name == "EST": + self.planner = og.EST(self.ss.getSpaceInformation()) + elif planner_name == "FMT": + self.planner = og.FMT(self.ss.getSpaceInformation()) + elif planner_name == "BITstar": + self.planner = og.BITstar(self.ss.getSpaceInformation()) + else: + print("{} not recognized, please add it first".format(planner_name)) + return + + self.ss.setPlanner(self.planner) + + def plan(self, goal:list, start:list=None, allowed_time:float = DEFAULT_PLANNING_TIME): + ''' + Plan a path to goal from current robot state. + + Args: + goal (list): List of joint positions to reach. + start (list=None): List of joint positions to start from. + If None, it will use the current robot state. + allowed_time (float=5.0): Time allowed for planning in seconds. + ''' + start = self.robot.get_joint_positions() if start is None else start + return self._plan_start_goal(start, goal, allowed_time=allowed_time) + + def _plan_start_goal(self, start, goal, allowed_time = DEFAULT_PLANNING_TIME): + ''' + Plan a path to goal from the given robot start state. + If a path is found, it will try to interpolate the path according to + the number of segments specified by self.interpolate_num. + + Args: + start (list): List of joint positions to start from. + goal (list): List of joint positions to reach. + allowed_time (float=5.0): Time allowed for planning in seconds. + + Returns: + res (bool): True if a solution was found, False otherwise. + sol_path_list (list): List of joint positions in the solution path, if found. + ''' + print("start_planning") + print(self.planner.params()) + + orig_robot_state = self.robot.get_joint_positions() + + # set the start and goal states; + s = ob.State(self.space) + g = ob.State(self.space) + for i in range(len(start)): + s[i] = start[i] + g[i] = goal[i] + + self.ss.setStartAndGoalStates(s, g) + + # attempt to solve the problem within allowed planning time + solved = self.ss.solve(allowed_time) + res = False + sol_path_list = [] + if solved: + print("Found solution: interpolating into {} segments".format(self.interpolate_num)) + # print the path to screen + sol_path_geometric = self.ss.getSolutionPath() + sol_path_geometric.interpolate(self.interpolate_num) + sol_path_states = sol_path_geometric.getStates() + sol_path_list = [self.state_to_list(state) for state in sol_path_states] + # print(len(sol_path_list)) + # print(sol_path_list) + for sol_path in sol_path_list: + self.is_state_valid(sol_path) + res = True + else: + print("No solution found") + + # reset robot state + # self.robot.set_state(orig_robot_state) + return res, sol_path_list + + def state_to_list(self, state): + """ + Convert an OMPL state to a list of joint positions. + + Args: + state (ob.State): The OMPL state to convert. + Returns: + list: List of joint positions corresponding to the state. + """ + return [state[i] for i in range(self.robot.njoints)] + + def set_state_sampler(self, state_sampler): + self.space.set_state_sampler(state_sampler) + + def add_collision_bodies(self, obstacle_body_names: list|str): + """ + Add specified obstacle bodies to the robot's obstacle checks. + + Args: + obstacle_body_names (list|str): List of names of bodies to be added to obstacle checks. + """ + self.robot._add_obstacle_body_ids(obstacle_body_names) + + def remove_collision_bodies(self, obstacle_body_names: list|str): + """ + Remove specified obstacle bodies from the robot's obstacle checks. + + Args: + obstacle_body_names (list|str): List of names of bodies to be removed from obstacle checks. + """ + self.robot._remove_obstacle_body_ids(obstacle_body_names) + + def set_collision_bodies(self, obstacle_body_names: list|str): + """ + Set the obstacle bodies for the robot. + + Args: + obstacle_body_names (list|str): List of names of bodies to be set as obstacles. + """ + self.robot._set_obstacle_body_ids(obstacle_body_names) \ No newline at end of file From 28297b587d542a6553b83c110ce231308538a057 Mon Sep 17 00:00:00 2001 From: Seongjin Bien Date: Fri, 18 Jul 2025 14:25:07 +0200 Subject: [PATCH 02/10] feat: Added cartesian pose support for planning This calls the RobotEnv's ik function internally. --- python/rcs/ompl/mj_ompl.py | 64 +++++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/python/rcs/ompl/mj_ompl.py b/python/rcs/ompl/mj_ompl.py index c9db7401..2c22b8eb 100644 --- a/python/rcs/ompl/mj_ompl.py +++ b/python/rcs/ompl/mj_ompl.py @@ -3,11 +3,13 @@ from ompl import base as ob from ompl import geometric as og from copy import deepcopy +import numpy as np from rcs.envs.base import GripperWrapper, RobotEnv import sys import os import warnings import xml.etree.ElementTree as ET +from rcs._core.common import Pose print("OMPL modules imported successfully.") DEFAULT_PLANNING_TIME = 5.0 # Default time allowed for planning in seconds @@ -47,6 +49,16 @@ def get_collision_bodies(xml_file, robot_name): class MjORobot(): + franka_hand_tcp = Pose(pose_matrix=np.array([[ 0.707, 0.707, 0, 0 ], + [-0.707, 0.707, 0, 0 ], + [ 0, 0, 1, 0.1034], + [ 0, 0, 0, 1 ]])) + + digit_hand_tcp = Pose(pose_matrix=np.array([[ 0.707, 0.707, 0, 0 ], + [-0.707, 0.707, 0, 0 ], + [ 0, 0, 1, 0.15 ], + [ 0, 0, 0, 1 ]])) + def __init__(self, robot_name: str, env: RobotEnv, @@ -85,10 +97,9 @@ def __init__(self, f"Robot XML file {robot_xml_name} does not exist. Please provide a valid path." self.robot_name = robot_name - self.env = env + self.env = env self.model = self.env.get_wrapper_attr("sim").model self.data = self.env.get_wrapper_attr("sim").data - # Get the robot's collision geometries from the XML file self.robot_body_names = get_collision_bodies(robot_xml_name, robot_root_name) @@ -207,6 +218,21 @@ def get_joint_positions(self): # Deepcopy might not be necessary return [deepcopy(self.data.qpos[joint_id]) for joint_id in self.joint_ids] + def ik(self, pose, q0=None, tcp_offset=None): + """ + Perform inverse kinematics to find joint positions that achieve the desired pose. + + Args: + pose (Pose): Desired end-effector pose. + q0 (numpy.ndarray, optional): Initial guess for joint positions. + tcp_offset (Pose, optional): Offset from the end-effector to the TCP. + + Returns: + numpy.ndarray: Joint positions that achieve the desired pose, or None if no solution is found. + """ + tcp_offset = tcp_offset if tcp_offset is not None else self.franka_hand_tcp + return self.env.robot.get_ik().ik(pose, q0, tcp_offset) + class MjOStateSpace(ob.RealVectorStateSpace): def __init__(self, num_dim) -> None: @@ -303,28 +329,29 @@ def set_planner(self, planner_name): self.ss.setPlanner(self.planner) - def plan(self, goal:list, start:list=None, allowed_time:float = DEFAULT_PLANNING_TIME): + def plan(self, goal:np.ndarray, start:np.ndarray=None, allowed_time:float = DEFAULT_PLANNING_TIME): ''' Plan a path to goal from current robot state. - + Can be combined with `ik_pose` to plan a path to a desired pose. + e.g. plan(goal=ik_pose(goal_pose), ...) Args: - goal (list): List of joint positions to reach. - start (list=None): List of joint positions to start from. + goal (np.ndarray): List of joint positions to reach. + start (np.ndarray=None): List of joint positions to start from. If None, it will use the current robot state. allowed_time (float=5.0): Time allowed for planning in seconds. ''' start = self.robot.get_joint_positions() if start is None else start return self._plan_start_goal(start, goal, allowed_time=allowed_time) - def _plan_start_goal(self, start, goal, allowed_time = DEFAULT_PLANNING_TIME): + def _plan_start_goal(self, start: np.ndarray, goal: np.ndarray, allowed_time = DEFAULT_PLANNING_TIME): ''' Plan a path to goal from the given robot start state. If a path is found, it will try to interpolate the path according to the number of segments specified by self.interpolate_num. Args: - start (list): List of joint positions to start from. - goal (list): List of joint positions to reach. + start (np.ndarray): List of joint positions to start from. + goal (np.ndarray): List of joint positions to reach. allowed_time (float=5.0): Time allowed for planning in seconds. Returns: @@ -334,8 +361,6 @@ def _plan_start_goal(self, start, goal, allowed_time = DEFAULT_PLANNING_TIME): print("start_planning") print(self.planner.params()) - orig_robot_state = self.robot.get_joint_positions() - # set the start and goal states; s = ob.State(self.space) g = ob.State(self.space) @@ -368,6 +393,23 @@ def _plan_start_goal(self, start, goal, allowed_time = DEFAULT_PLANNING_TIME): # self.robot.set_state(orig_robot_state) return res, sol_path_list + def ik(self, pose:Pose, q0:np.ndarray=None, tcp_offset:Pose=None): + """ + Perform inverse kinematics to find joint positions that achieve the desired pose. + + Args: + pose (Pose): Desired end-effector pose. + q0 (numpy.ndarray, optional): Initial guess for joint positions. + tcp_offset (Pose, optional): Offset from the end-effector to the TCP. + if not provided, it will use the default Franka Hand TCP pose. + + Returns: + numpy.ndarray: Joint positions that achieve the desired pose, or None if no solution is found. + """ + if q0 is None: + q0 = self.robot.get_joint_positions() + return self.robot.ik(pose, q0, tcp_offset) + def state_to_list(self, state): """ Convert an OMPL state to a list of joint positions. From c6a386d80de9e3697e9c0fb7696e5c1c98c3ae8b Mon Sep 17 00:00:00 2001 From: Seongjin Bien Date: Mon, 21 Jul 2025 13:26:28 +0200 Subject: [PATCH 03/10] update: Add instruction for anaconda env --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index 20122dcf..7cafdc55 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,14 @@ sudo apt install $(cat debian_deps.txt) ```shell python3 -m venv .venv source .venv/bin/activate +``` +alternatively, with Conda: +```shell +conda create -n rcs python=3.11 # Version ensures that python is installed to the environment +conda activate rcs +``` +Then, install the package: +```shell pip install -r requirements_dev.txt pip config --site set global.no-build-isolation false ``` From 1db96ede10ebcabfabee548929e94a6a52a0f6dd Mon Sep 17 00:00:00 2001 From: Seongjin Bien Date: Mon, 28 Jul 2025 14:18:04 +0200 Subject: [PATCH 04/10] upd: cleaned up code and add support for geoms --- python/rcs/ompl/mj_ompl.py | 181 ++++++++++++++++++++++++++----------- 1 file changed, 129 insertions(+), 52 deletions(-) diff --git a/python/rcs/ompl/mj_ompl.py b/python/rcs/ompl/mj_ompl.py index 2c22b8eb..da26243e 100644 --- a/python/rcs/ompl/mj_ompl.py +++ b/python/rcs/ompl/mj_ompl.py @@ -11,7 +11,6 @@ import xml.etree.ElementTree as ET from rcs._core.common import Pose -print("OMPL modules imported successfully.") DEFAULT_PLANNING_TIME = 5.0 # Default time allowed for planning in seconds INTERPOLATE_NUM = 500 # Number of points to interpolate between start and goal states @@ -67,7 +66,8 @@ def __init__(self, robot_root_name:str="base_0", robot_joint_name:str="fr3_joint#_0", robot_actuator_name:str="fr3_joint#_0", - obstacle_body_names: list = None): + obstacle_body_names: list = None, + obstacle_geom_names: list = None): ''' Initialize the robot object with the given parameters. It is essentially a thin wrapper around the RobotEnv (i.e. MuJoCo variables), @@ -79,14 +79,14 @@ def __init__(self, - env: The RobotEnv environment in which the robot operates. - njoints: Number of joints in the robot. - robot_xml_name: Path to the robot's XML file. - This file will be used to query the s of the robot. + This file will be used to query the s of the robot to get collision checking info. - robot_root_name: Name of the root body of the robot, i.e. the top level 's name. - robot_joint_name: Pattern of the robot's joints to be controlled, where # will be replaced by 1 to njoints. - robot_actuator_name: Pattern of the robot's actuators to be controlled, where # will be replaced by 1 to njoints. - Actuators are not controlled, but is used to validate + Actuators are not controlled, but is used to validate that the number of joints and actuators match. - obstacle_body_names: List of names of other s to be checked for collision. If None, it will be set to an empty list. @@ -117,7 +117,10 @@ def __init__(self, self.actuator_ids = [mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_ACTUATOR, name) \ for name in self.actuator_names \ if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_ACTUATOR, name) > -1] - self._set_obstacle_body_ids(obstacle_body_names) + self.obstacle_body_ids = set() + self.obstacle_geom_ids = set() + self._add_obstacle_body_ids(obstacle_body_names) + self._add_obstacle_geom_ids(obstacle_geom_names) # Check if the number of joints, links, and actuators match assert len(self.joint_ids) == len(self.actuator_ids), \ @@ -125,17 +128,6 @@ def __init__(self, self.njoints = len(self.joint_ids) - def _set_obstacle_body_ids(self, obstacle_body_names: list): - """ - Set the obstacle bodies for the robot. - - Args: - obstacle_body_names (list): List of names of other bodies to be checked for path planning. - """ - self.obstacle_body_ids = set([mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) \ - for name in obstacle_body_names \ - if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) > -1]) - def _remove_obstacle_body_ids(self, obstacle_body_names: list|str): """ Remove specified obstacle bodies from the robot's obstacle checks, given their names. @@ -143,13 +135,15 @@ def _remove_obstacle_body_ids(self, obstacle_body_names: list|str): Args: obstacle_body_names (list): List of names of bodies to be removed from obstacle checks. """ + if obstacle_body_names is None: + return obstacle_body_names = obstacle_body_names if isinstance(obstacle_body_names, list) else [obstacle_body_names] for name in obstacle_body_names: body_id = mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) if body_id in self.obstacle_body_ids: self.obstacle_body_ids.remove(body_id) else: - warnings.warn(f"obstacle body name {name} does not exist in the model. Skipping removal.") + raise RuntimeError(f"_remove_obstacle_body_ids: obstacle body name {name} does not exist in the set.") def _add_obstacle_body_ids(self, obstacle_body_names: list|str): """ @@ -158,14 +152,50 @@ def _add_obstacle_body_ids(self, obstacle_body_names: list|str): Args: obstacle_body_names (list): List of names of bodies to be added to obstacle checks. """ + if obstacle_body_names is None: + return obstacle_body_names = obstacle_body_names if isinstance(obstacle_body_names, list) else [obstacle_body_names] for name in obstacle_body_names: body_id = mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) if body_id > -1: self.obstacle_body_ids.add(body_id) else: - warnings.warn("obstacle body name {name} does not exist in the model. Skipping addition.") + raise RuntimeError(f"_add_obstacle_body_ids: obstacle body name {name} does not exist in the model.") + + def _add_obstacle_geom_ids(self, obstacle_geom_names: list|str): + """ + Add specified obstacle geoms to the robot's obstacle checks, given their names. + Args: + obstacle_geom_names (list): List of names of geoms to be added to obstacle checks. + """ + if obstacle_geom_names is None: + return + obstacle_geom_names = obstacle_geom_names if isinstance(obstacle_geom_names, list) else [obstacle_geom_names] + for name in obstacle_geom_names: + geom_id = mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_GEOM, name) + if geom_id > -1: + self.obstacle_geom_ids.add(geom_id) + else: + raise RuntimeError(f"_add_obstacle_geom_ids: obstacle geom name {name} does not exist in the model.") + + def _remove_obstacle_geom_ids(self, obstacle_geom_names: list|str): + """ + Remove specified obstacle geoms from the robot's obstacle checks, given their names. + + Args: + obstacle_geom_names (list): List of names of geoms to be removed from obstacle checks. + """ + if obstacle_geom_names is None: + return + obstacle_geom_names = obstacle_geom_names if isinstance(obstacle_geom_names, list) else [obstacle_geom_names] + for name in obstacle_geom_names: + geom_id = mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_GEOM, name) + if geom_id in self.obstacle_geom_ids: + self.obstacle_geom_ids.remove(geom_id) + else: + raise RuntimeError(f"_remove_obstacle_geom_ids: obstacle geom name {name} does not exist in the set.") + def check_collision(self, qpos: ob.State): ''' Checks for collisions when the robot is set to the given joint positions. @@ -201,6 +231,11 @@ def check_collision(self, qpos: ob.State): (b1 in self.robot_body_ids and b2 in self.robot_body_ids): has_collision = True break + # Check for collisions with geoms + if (c.geom1 in self.obstacle_geom_ids or c.geom2 in self.obstacle_geom_ids) and \ + (b1 in self.robot_body_ids or b2 in self.robot_body_ids): + has_collision = True + break # Reset the simulation self.data.qpos = qpos_init @@ -231,7 +266,7 @@ def ik(self, pose, q0=None, tcp_offset=None): numpy.ndarray: Joint positions that achieve the desired pose, or None if no solution is found. """ tcp_offset = tcp_offset if tcp_offset is not None else self.franka_hand_tcp - return self.env.robot.get_ik().ik(pose, q0, tcp_offset) + return self.env.unwrapped.robot.get_ik().ik(pose, q0, tcp_offset) class MjOStateSpace(ob.RealVectorStateSpace): @@ -259,32 +294,64 @@ def set_state_sampler(self, state_sampler): class MjOMPL(): def __init__(self, - robot:MjORobot=None, - robot_name:str=None, - robot_xml_name:str=None, - robot_root_name:str=None, - robot_joint_name:str=None, - robot_actuator_name:str=None, - njoints:int=None, - robot_env: RobotEnv = None, - obstacle_body_names: list = None): - - if(robot is None): - if robot_name is None or robot_xml_name is None or robot_root_name is None or \ - robot_joint_name is None or robot_actuator_name is None or njoints is None: - raise ValueError("If robot is not provided, all parameters must be provided.") - # Create a new MjORobot instance - robot = MjORobot(robot_name, - robot_env, - njoints=njoints, - robot_xml_name=robot_xml_name, - robot_root_name=robot_root_name, - robot_joint_name=robot_joint_name, - robot_actuator_name=robot_actuator_name, - obstacle_body_names=obstacle_body_names) + robot_env: RobotEnv, + robot_name:str, + robot_xml_name:str, + robot_root_name:str, + robot_joint_name:str, + robot_actuator_name:str, + njoints:int, + obstacle_body_names: list = None, + obstacle_geom_names: list = None, + interpolate_num:int=INTERPOLATE_NUM, + default_planning_time:float=DEFAULT_PLANNING_TIME): + ''' + Initialize the OMPL planner with the given parameters. + Besides setting up the OMPL planning context, it instantiates an MjORobot instance, + which is a thin wrapper around the RobotEnv (i.e. MuJoCo variables), + for easily extracting the relevant bodies, joints, etc. from the MuJoCo model. + + Parameters: + - robot_env: The RobotEnv environment in which the robot operates. + - robot_name: Name of the robot. + + - robot_xml_name: Path to the robot's XML file. + This file will be used to query the s of the robot to get collision checking info. + - robot_root_name: Name of the root body of the robot, + i.e. the top level 's name. + - robot_joint_name: Pattern of the robot's joints to be controlled, + where # will be replaced by 1 to njoints. + - robot_actuator_name: Pattern of the robot's actuators to be controlled, + where # will be replaced by 1 to njoints. + Actuators are not controlled, but is used to validate + that the number of joints and actuators match. + - njoints: Number of joints in the robot. + - obstacle_body_names: List of names of other mjOBJ_BODYs to be checked for collision. + If None, it will be set to an empty list. + - obstacle_geom_names: List of names of other mjOBJ_GEOMs to be checked for collision. + If None, it will be set to an empty list. + - interpolate_num=100 (optional): Number of points to interpolate between start and goal states. + - default_planning_time=5.0 (optional): Default time allowed for planning in seconds. + + ''' + if robot_name is None or robot_xml_name is None or robot_root_name is None or \ + robot_joint_name is None or robot_actuator_name is None or njoints is None: + raise ValueError("Initialization values are missing.") + # Create a new MjORobot instance + robot = MjORobot(robot_name, + robot_env, + njoints=njoints, + robot_xml_name=robot_xml_name, + robot_root_name=robot_root_name, + robot_joint_name=robot_joint_name, + robot_actuator_name=robot_actuator_name, + obstacle_body_names=obstacle_body_names, + obstacle_geom_names=obstacle_geom_names) + self.robot = robot - self.interpolate_num = INTERPOLATE_NUM + self.interpolate_num = interpolate_num + self.default_planning_time = default_planning_time # Create the planning space self.space = MjOStateSpace(robot.njoints) @@ -358,8 +425,7 @@ def _plan_start_goal(self, start: np.ndarray, goal: np.ndarray, allowed_time = D res (bool): True if a solution was found, False otherwise. sol_path_list (list): List of joint positions in the solution path, if found. ''' - print("start_planning") - print(self.planner.params()) + print("Planner params: \n", self.planner.params()) # set the start and goal states; s = ob.State(self.space) @@ -381,8 +447,6 @@ def _plan_start_goal(self, start: np.ndarray, goal: np.ndarray, allowed_time = D sol_path_geometric.interpolate(self.interpolate_num) sol_path_states = sol_path_geometric.getStates() sol_path_list = [self.state_to_list(state) for state in sol_path_states] - # print(len(sol_path_list)) - # print(sol_path_list) for sol_path in sol_path_list: self.is_state_valid(sol_path) res = True @@ -427,26 +491,39 @@ def set_state_sampler(self, state_sampler): def add_collision_bodies(self, obstacle_body_names: list|str): """ Add specified obstacle bodies to the robot's obstacle checks. + Prints a warning if the body name does not exist in the model. Args: obstacle_body_names (list|str): List of names of bodies to be added to obstacle checks. """ self.robot._add_obstacle_body_ids(obstacle_body_names) + + def add_collision_geoms(self, obstacle_geom_names: list|str): + """ + Add specified obstacle geometries to the robot's obstacle checks. + Prints a warning if the geometry name does not exist in the model. + + Args: + obstacle_geom_names (list|str): List of names of geometries to be added to obstacle checks. + """ + self.robot._add_obstacle_geom_ids(obstacle_geom_names) def remove_collision_bodies(self, obstacle_body_names: list|str): """ Remove specified obstacle bodies from the robot's obstacle checks. + Prints a warning if the body name does not exist in the current set of obstacles. Args: obstacle_body_names (list|str): List of names of bodies to be removed from obstacle checks. """ self.robot._remove_obstacle_body_ids(obstacle_body_names) - def set_collision_bodies(self, obstacle_body_names: list|str): + def remove_collision_geoms(self, obstacle_geom_names: list|str): """ - Set the obstacle bodies for the robot. - + Remove specified obstacle geometries from the robot's obstacle checks. + Prints a warning if the geometry name does not exist in the current set of obstacles. + Args: - obstacle_body_names (list|str): List of names of bodies to be set as obstacles. + obstacle_geom_names (list|str): List of names of geometries to be removed from obstacle checks. """ - self.robot._set_obstacle_body_ids(obstacle_body_names) \ No newline at end of file + self.robot._remove_obstacle_geom_ids(obstacle_geom_names) \ No newline at end of file From 42ea6ee55b531476df4bfc09d3c716bd45e1744e Mon Sep 17 00:00:00 2001 From: Seongjin Bien Date: Mon, 28 Jul 2025 15:27:56 +0200 Subject: [PATCH 05/10] fix: make ompl path result into a list of np arrays for better compat --- python/rcs/ompl/mj_ompl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/rcs/ompl/mj_ompl.py b/python/rcs/ompl/mj_ompl.py index da26243e..e17a0152 100644 --- a/python/rcs/ompl/mj_ompl.py +++ b/python/rcs/ompl/mj_ompl.py @@ -455,6 +455,7 @@ def _plan_start_goal(self, start: np.ndarray, goal: np.ndarray, allowed_time = D # reset robot state # self.robot.set_state(orig_robot_state) + sol_path_list = [np.array(sol_path, dtype=np.float32) for sol_path in sol_path_list] return res, sol_path_list def ik(self, pose:Pose, q0:np.ndarray=None, tcp_offset:Pose=None): From bc73e3d9bf5cf44d0ef13c76f226ebe280deed42 Mon Sep 17 00:00:00 2001 From: Seongjin Bien Date: Mon, 28 Jul 2025 15:29:45 +0200 Subject: [PATCH 06/10] chg: update function doc on return type --- python/rcs/ompl/mj_ompl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/rcs/ompl/mj_ompl.py b/python/rcs/ompl/mj_ompl.py index e17a0152..bfb719e4 100644 --- a/python/rcs/ompl/mj_ompl.py +++ b/python/rcs/ompl/mj_ompl.py @@ -423,7 +423,7 @@ def _plan_start_goal(self, start: np.ndarray, goal: np.ndarray, allowed_time = D Returns: res (bool): True if a solution was found, False otherwise. - sol_path_list (list): List of joint positions in the solution path, if found. + sol_path_list (list[np.ndarray]): List of joint positions in the solution path, if found. ''' print("Planner params: \n", self.planner.params()) From 75b169f8be1ecb0abb1ac2574c38163bfc91d1f1 Mon Sep 17 00:00:00 2001 From: Seongjin Bien Date: Thu, 31 Jul 2025 18:26:20 +0200 Subject: [PATCH 07/10] fix: pyformat and pylint changes --- python/rcs/ompl/mj_ompl.py | 343 ++++++++++++++++++++----------------- 1 file changed, 185 insertions(+), 158 deletions(-) diff --git a/python/rcs/ompl/mj_ompl.py b/python/rcs/ompl/mj_ompl.py index bfb719e4..a6486de4 100644 --- a/python/rcs/ompl/mj_ompl.py +++ b/python/rcs/ompl/mj_ompl.py @@ -1,76 +1,77 @@ +import os +import xml.etree.ElementTree as ET +from copy import deepcopy + import mujoco as mj -from ompl import util as ou +import numpy as np from ompl import base as ob from ompl import geometric as og -from copy import deepcopy -import numpy as np -from rcs.envs.base import GripperWrapper, RobotEnv -import sys -import os -import warnings -import xml.etree.ElementTree as ET from rcs._core.common import Pose +from rcs.envs.base import RobotEnv DEFAULT_PLANNING_TIME = 5.0 # Default time allowed for planning in seconds -INTERPOLATE_NUM = 500 # Number of points to interpolate between start and goal states +INTERPOLATE_NUM = 500 # Number of points to interpolate between start and goal states + def get_collision_bodies(xml_file, robot_name): """ Extract collision bodies from the XML file for a specific robot. - + Args: xml_file (str): Path to the XML file. robot_name (str): Name of the robot to search for. - + Returns: list: List of collision bodies' names found in the robot's subtree. """ tree = ET.parse(xml_file) root = tree.getroot() - + # Find the top-level body with the specified robot name robot = root.find(f".//body[@name='{robot_name}']") - + if robot is None: - raise ValueError(f"Robot '{robot_name}' not found in the XML file.") - + error_msg = f"Robot '{robot_name}' not found in the XML file." + raise ValueError(error_msg) + # Collect every in its subtree that has class="collision" collision_bodies = robot.findall(".//body") names = [robot_name] for g in collision_bodies: name = g.get("name") if name is None: - raise ValueError(f"Body {ET.tostring(g).rstrip()}'s name is None." - "Please give it a name in the XML file.") + error_msg = f"Body {ET.tostring(g).rstrip()}'s name is None. Please give it a name in the XML file." + raise ValueError(error_msg) names.append(name) - + return names -class MjORobot(): - - franka_hand_tcp = Pose(pose_matrix=np.array([[ 0.707, 0.707, 0, 0 ], - [-0.707, 0.707, 0, 0 ], - [ 0, 0, 1, 0.1034], - [ 0, 0, 0, 1 ]])) - - digit_hand_tcp = Pose(pose_matrix=np.array([[ 0.707, 0.707, 0, 0 ], - [-0.707, 0.707, 0, 0 ], - [ 0, 0, 1, 0.15 ], - [ 0, 0, 0, 1 ]])) - - def __init__(self, - robot_name: str, - env: RobotEnv, - njoints:int=7, - robot_xml_name:str="", - robot_root_name:str="base_0", - robot_joint_name:str="fr3_joint#_0", - robot_actuator_name:str="fr3_joint#_0", - obstacle_body_names: list = None, - obstacle_geom_names: list = None): - ''' + +class MjORobot: + + franka_hand_tcp = Pose( + pose_matrix=np.array([[0.707, 0.707, 0, 0], [-0.707, 0.707, 0, 0], [0, 0, 1, 0.1034], [0, 0, 0, 1]]) + ) + + digit_hand_tcp = Pose( + pose_matrix=np.array([[0.707, 0.707, 0, 0], [-0.707, 0.707, 0, 0], [0, 0, 1, 0.15], [0, 0, 0, 1]]) + ) + + def __init__( + self, + robot_name: str, + env: RobotEnv, + njoints: int = 7, + robot_xml_name: str = "", + robot_root_name: str = "base_0", + robot_joint_name: str = "fr3_joint#_0", + robot_actuator_name: str = "fr3_joint#_0", + obstacle_body_names: list | None = None, + obstacle_geom_names: list | None = None, + ): + """ Initialize the robot object with the given parameters. - It is essentially a thin wrapper around the RobotEnv (i.e. MuJoCo variables), + It is essentially a thin wrapper around the RobotEnv (i.e. MuJoCo variables), for easily extracting the relevant bodies, joints, etc. from the MuJoCo model. Anything that directly calls MuJoCo functions should be done through this class. @@ -78,24 +79,25 @@ def __init__(self, - robot_name: Name of the robot. - env: The RobotEnv environment in which the robot operates. - njoints: Number of joints in the robot. - - robot_xml_name: Path to the robot's XML file. + - robot_xml_name: Path to the robot's XML file. This file will be used to query the s of the robot to get collision checking info. - - robot_root_name: Name of the root body of the robot, + - robot_root_name: Name of the root body of the robot, i.e. the top level 's name. - - robot_joint_name: Pattern of the robot's joints to be controlled, + - robot_joint_name: Pattern of the robot's joints to be controlled, where # will be replaced by 1 to njoints. - - robot_actuator_name: Pattern of the robot's actuators to be controlled, + - robot_actuator_name: Pattern of the robot's actuators to be controlled, where # will be replaced by 1 to njoints. - Actuators are not controlled, but is used to validate + Actuators are not controlled, but is used to validate that the number of joints and actuators match. - obstacle_body_names: List of names of other s to be checked for collision. If None, it will be set to an empty list. - ''' + """ # First check that the xml file exists - assert os.path.isfile(robot_xml_name), \ - f"Robot XML file {robot_xml_name} does not exist. Please provide a valid path." - + assert os.path.isfile( + robot_xml_name + ), f"Robot XML file {robot_xml_name} does not exist. Please provide a valid path." + self.robot_name = robot_name self.env = env self.model = self.env.get_wrapper_attr("sim").model @@ -104,34 +106,41 @@ def __init__(self, self.robot_body_names = get_collision_bodies(robot_xml_name, robot_root_name) # Query the number of joints implicitly by scanning linearly from 1 to njoints - self.joint_names = [f"{robot_joint_name.replace('#',str(i))}" for i in range(1, njoints+1)] - self.actuator_names = [f"{robot_actuator_name.replace('#',str(i))}" for i in range(1, njoints+1)] + self.joint_names = [f"{robot_joint_name.replace('#',str(i))}" for i in range(1, njoints + 1)] + self.actuator_names = [f"{robot_actuator_name.replace('#',str(i))}" for i in range(1, njoints + 1)] # Grab the collision bodies, joint, and actuator IDs from the model - self.robot_body_ids = set([mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) \ - for name in self.robot_body_names \ - if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) > -1]) - self.joint_ids = [mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_JOINT, name) \ - for name in self.joint_names \ - if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_JOINT, name) > -1] - self.actuator_ids = [mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_ACTUATOR, name) \ - for name in self.actuator_names \ - if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_ACTUATOR, name) > -1] + self.robot_body_ids = { + mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) + for name in self.robot_body_names + if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_BODY, name) > -1 + } + self.joint_ids = [ + mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_JOINT, name) + for name in self.joint_names + if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_JOINT, name) > -1 + ] + self.actuator_ids = [ + mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_ACTUATOR, name) + for name in self.actuator_names + if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_ACTUATOR, name) > -1 + ] self.obstacle_body_ids = set() self.obstacle_geom_ids = set() self._add_obstacle_body_ids(obstacle_body_names) self._add_obstacle_geom_ids(obstacle_geom_names) # Check if the number of joints, links, and actuators match - assert len(self.joint_ids) == len(self.actuator_ids), \ - f"Mismatch in number of joints and actuators for robot {robot_name}." - + assert len(self.joint_ids) == len( + self.actuator_ids + ), f"Mismatch in number of joints and actuators for robot {robot_name}." + self.njoints = len(self.joint_ids) - - def _remove_obstacle_body_ids(self, obstacle_body_names: list|str): + + def _remove_obstacle_body_ids(self, obstacle_body_names: list | str): """ Remove specified obstacle bodies from the robot's obstacle checks, given their names. - + Args: obstacle_body_names (list): List of names of bodies to be removed from obstacle checks. """ @@ -143,12 +152,13 @@ def _remove_obstacle_body_ids(self, obstacle_body_names: list|str): if body_id in self.obstacle_body_ids: self.obstacle_body_ids.remove(body_id) else: - raise RuntimeError(f"_remove_obstacle_body_ids: obstacle body name {name} does not exist in the set.") + error_msg = f"_remove_obstacle_body_ids: obstacle body name {name} does not exist in the set." + raise RuntimeError(error_msg) - def _add_obstacle_body_ids(self, obstacle_body_names: list|str): + def _add_obstacle_body_ids(self, obstacle_body_names: list | str): """ Add specified obstacle bodies to the robot's obstacle checks, given their names. - + Args: obstacle_body_names (list): List of names of bodies to be added to obstacle checks. """ @@ -160,9 +170,10 @@ def _add_obstacle_body_ids(self, obstacle_body_names: list|str): if body_id > -1: self.obstacle_body_ids.add(body_id) else: - raise RuntimeError(f"_add_obstacle_body_ids: obstacle body name {name} does not exist in the model.") + error_msg = f"_add_obstacle_body_ids: obstacle body name {name} does not exist in the model." + raise RuntimeError(error_msg) - def _add_obstacle_geom_ids(self, obstacle_geom_names: list|str): + def _add_obstacle_geom_ids(self, obstacle_geom_names: list | str): """ Add specified obstacle geoms to the robot's obstacle checks, given their names. @@ -177,9 +188,10 @@ def _add_obstacle_geom_ids(self, obstacle_geom_names: list|str): if geom_id > -1: self.obstacle_geom_ids.add(geom_id) else: - raise RuntimeError(f"_add_obstacle_geom_ids: obstacle geom name {name} does not exist in the model.") + error_msg = f"_add_obstacle_geom_ids: obstacle geom name {name} does not exist in the model." + raise RuntimeError(error_msg) - def _remove_obstacle_geom_ids(self, obstacle_geom_names: list|str): + def _remove_obstacle_geom_ids(self, obstacle_geom_names: list | str): """ Remove specified obstacle geoms from the robot's obstacle checks, given their names. @@ -194,23 +206,24 @@ def _remove_obstacle_geom_ids(self, obstacle_geom_names: list|str): if geom_id in self.obstacle_geom_ids: self.obstacle_geom_ids.remove(geom_id) else: - raise RuntimeError(f"_remove_obstacle_geom_ids: obstacle geom name {name} does not exist in the set.") - + error_msg = f"_remove_obstacle_geom_ids: obstacle geom name {name} does not exist in the set." + raise RuntimeError(error_msg) + def check_collision(self, qpos: ob.State): - ''' + """ Checks for collisions when the robot is set to the given joint positions. - This is done by setting the joint positions, running mj_fwdPosition and mj_collision, + This is done by setting the joint positions, running mj_fwdPosition and mj_collision, then resetting the simulation to the original values. Args: qpos (list): List of joint positions to set, ordered by base to end effector. Returns: bool: True if there is a collision, False otherwise. - ''' - qpos + """ if len(qpos) != self.njoints: - raise ValueError(f"Expected {self.njoints} joint positions, got {len(qpos)}.") - + error_msg = f"check_collision: Expected {self.njoints} joint positions, got {len(qpos)}." + raise ValueError(error_msg) + qpos_init = deepcopy(self.data.qpos) qvel_init = deepcopy(self.data.qvel) # Set the joint positions @@ -226,14 +239,16 @@ def check_collision(self, qpos: ob.State): b2 = self.model.geom_bodyid[c.geom2] # Check for collisions between robot and other bodies (first condition) # or self-collisions (second condition) - if ((b1 in self.robot_body_ids or b2 in self.robot_body_ids) and \ - (b1 in self.obstacle_body_ids or b2 in self.obstacle_body_ids)) or \ - (b1 in self.robot_body_ids and b2 in self.robot_body_ids): + if ( + (b1 in self.robot_body_ids or b2 in self.robot_body_ids) + and (b1 in self.obstacle_body_ids or b2 in self.obstacle_body_ids) + ) or (b1 in self.robot_body_ids and b2 in self.robot_body_ids): has_collision = True break # Check for collisions with geoms - if (c.geom1 in self.obstacle_geom_ids or c.geom2 in self.obstacle_geom_ids) and \ - (b1 in self.robot_body_ids or b2 in self.robot_body_ids): + if (c.geom1 in self.obstacle_geom_ids or c.geom2 in self.obstacle_geom_ids) and ( + b1 in self.robot_body_ids or b2 in self.robot_body_ids + ): has_collision = True break @@ -246,22 +261,22 @@ def check_collision(self, qpos: ob.State): def get_joint_positions(self): """ Get the current joint positions of the robot. - + Returns: list: Current joint positions. """ - # Deepcopy might not be necessary + # Deepcopy might not be necessary return [deepcopy(self.data.qpos[joint_id]) for joint_id in self.joint_ids] def ik(self, pose, q0=None, tcp_offset=None): """ Perform inverse kinematics to find joint positions that achieve the desired pose. - + Args: pose (Pose): Desired end-effector pose. q0 (numpy.ndarray, optional): Initial guess for joint positions. tcp_offset (Pose, optional): Offset from the end-effector to the TCP. - + Returns: numpy.ndarray: Joint positions that achieve the desired pose, or None if no solution is found. """ @@ -276,9 +291,9 @@ def __init__(self, num_dim) -> None: self.state_sampler = None def allocStateSampler(self): - ''' + """ This will be called by the internal OMPL planner - ''' + """ # WARN: This will cause problems if the underlying planner is multi-threaded!!! if self.state_sampler: return self.state_sampler @@ -287,42 +302,45 @@ def allocStateSampler(self): return self.allocDefaultStateSampler() def set_state_sampler(self, state_sampler): - ''' + """ Optional, Set custom state sampler. - ''' + """ self.state_sampler = state_sampler -class MjOMPL(): - def __init__(self, - robot_env: RobotEnv, - robot_name:str, - robot_xml_name:str, - robot_root_name:str, - robot_joint_name:str, - robot_actuator_name:str, - njoints:int, - obstacle_body_names: list = None, - obstacle_geom_names: list = None, - interpolate_num:int=INTERPOLATE_NUM, - default_planning_time:float=DEFAULT_PLANNING_TIME): - ''' + +class MjOMPL: + def __init__( + self, + robot_env: RobotEnv, + robot_name: str, + robot_xml_name: str, + robot_root_name: str, + robot_joint_name: str, + robot_actuator_name: str, + njoints: int, + obstacle_body_names: list | None = None, + obstacle_geom_names: list | None = None, + interpolate_num: int = INTERPOLATE_NUM, + default_planning_time: float = DEFAULT_PLANNING_TIME, + ): + """ Initialize the OMPL planner with the given parameters. Besides setting up the OMPL planning context, it instantiates an MjORobot instance, which is a thin wrapper around the RobotEnv (i.e. MuJoCo variables), for easily extracting the relevant bodies, joints, etc. from the MuJoCo model. - + Parameters: - robot_env: The RobotEnv environment in which the robot operates. - robot_name: Name of the robot. - - robot_xml_name: Path to the robot's XML file. + - robot_xml_name: Path to the robot's XML file. This file will be used to query the s of the robot to get collision checking info. - - robot_root_name: Name of the root body of the robot, + - robot_root_name: Name of the root body of the robot, i.e. the top level 's name. - - robot_joint_name: Pattern of the robot's joints to be controlled, + - robot_joint_name: Pattern of the robot's joints to be controlled, where # will be replaced by 1 to njoints. - - robot_actuator_name: Pattern of the robot's actuators to be controlled, + - robot_actuator_name: Pattern of the robot's actuators to be controlled, where # will be replaced by 1 to njoints. Actuators are not controlled, but is used to validate that the number of joints and actuators match. @@ -334,20 +352,29 @@ def __init__(self, - interpolate_num=100 (optional): Number of points to interpolate between start and goal states. - default_planning_time=5.0 (optional): Default time allowed for planning in seconds. - ''' - if robot_name is None or robot_xml_name is None or robot_root_name is None or \ - robot_joint_name is None or robot_actuator_name is None or njoints is None: - raise ValueError("Initialization values are missing.") + """ + if ( + robot_name is None + or robot_xml_name is None + or robot_root_name is None + or robot_joint_name is None + or robot_actuator_name is None + or njoints is None + ): + error_msg = "Initialization values are missing." + raise ValueError(error_msg) # Create a new MjORobot instance - robot = MjORobot(robot_name, - robot_env, - njoints=njoints, - robot_xml_name=robot_xml_name, - robot_root_name=robot_root_name, - robot_joint_name=robot_joint_name, - robot_actuator_name=robot_actuator_name, - obstacle_body_names=obstacle_body_names, - obstacle_geom_names=obstacle_geom_names) + robot = MjORobot( + robot_name, + robot_env, + njoints=njoints, + robot_xml_name=robot_xml_name, + robot_root_name=robot_root_name, + robot_joint_name=robot_joint_name, + robot_actuator_name=robot_actuator_name, + obstacle_body_names=obstacle_body_names, + obstacle_geom_names=obstacle_geom_names, + ) self.robot = robot self.interpolate_num = interpolate_num @@ -355,7 +382,7 @@ def __init__(self, # Create the planning space self.space = MjOStateSpace(robot.njoints) - + # Create bounds for the space based on robot's joint limits bounds = ob.RealVectorBounds(robot.njoints) joint_bounds = [self.robot.model.jnt_range[joint_id] for joint_id in self.robot.joint_ids] @@ -368,14 +395,14 @@ def __init__(self, self.ss.setStateValidityChecker(ob.StateValidityCheckerFn(self.is_state_valid)) self.si = self.ss.getSpaceInformation() self.set_planner("RRT") # Default planner - + def is_state_valid(self, state): return not self.robot.check_collision(self.state_to_list(state)) - + def set_planner(self, planner_name): - ''' + """ Note: Add your planner here!! - ''' + """ if planner_name == "PRM": self.planner = og.PRM(self.ss.getSpaceInformation()) elif planner_name == "RRT": @@ -395,9 +422,9 @@ def set_planner(self, planner_name): return self.ss.setPlanner(self.planner) - - def plan(self, goal:np.ndarray, start:np.ndarray=None, allowed_time:float = DEFAULT_PLANNING_TIME): - ''' + + def plan(self, goal: np.ndarray, start: np.ndarray = None, allowed_time: float = DEFAULT_PLANNING_TIME): + """ Plan a path to goal from current robot state. Can be combined with `ik_pose` to plan a path to a desired pose. e.g. plan(goal=ik_pose(goal_pose), ...) @@ -406,25 +433,25 @@ def plan(self, goal:np.ndarray, start:np.ndarray=None, allowed_time:float = DEFA start (np.ndarray=None): List of joint positions to start from. If None, it will use the current robot state. allowed_time (float=5.0): Time allowed for planning in seconds. - ''' + """ start = self.robot.get_joint_positions() if start is None else start return self._plan_start_goal(start, goal, allowed_time=allowed_time) - - def _plan_start_goal(self, start: np.ndarray, goal: np.ndarray, allowed_time = DEFAULT_PLANNING_TIME): - ''' + + def _plan_start_goal(self, start: np.ndarray, goal: np.ndarray, allowed_time=DEFAULT_PLANNING_TIME): + """ Plan a path to goal from the given robot start state. - If a path is found, it will try to interpolate the path according to + If a path is found, it will try to interpolate the path according to the number of segments specified by self.interpolate_num. - + Args: start (np.ndarray): List of joint positions to start from. goal (np.ndarray): List of joint positions to reach. allowed_time (float=5.0): Time allowed for planning in seconds. - + Returns: res (bool): True if a solution was found, False otherwise. sol_path_list (list[np.ndarray]): List of joint positions in the solution path, if found. - ''' + """ print("Planner params: \n", self.planner.params()) # set the start and goal states; @@ -458,27 +485,27 @@ def _plan_start_goal(self, start: np.ndarray, goal: np.ndarray, allowed_time = D sol_path_list = [np.array(sol_path, dtype=np.float32) for sol_path in sol_path_list] return res, sol_path_list - def ik(self, pose:Pose, q0:np.ndarray=None, tcp_offset:Pose=None): + def ik(self, pose: Pose, q0: np.ndarray = None, tcp_offset: Pose = None): """ Perform inverse kinematics to find joint positions that achieve the desired pose. - + Args: pose (Pose): Desired end-effector pose. q0 (numpy.ndarray, optional): Initial guess for joint positions. tcp_offset (Pose, optional): Offset from the end-effector to the TCP. if not provided, it will use the default Franka Hand TCP pose. - + Returns: numpy.ndarray: Joint positions that achieve the desired pose, or None if no solution is found. """ if q0 is None: q0 = self.robot.get_joint_positions() return self.robot.ik(pose, q0, tcp_offset) - + def state_to_list(self, state): """ Convert an OMPL state to a list of joint positions. - + Args: state (ob.State): The OMPL state to convert. Returns: @@ -489,17 +516,17 @@ def state_to_list(self, state): def set_state_sampler(self, state_sampler): self.space.set_state_sampler(state_sampler) - def add_collision_bodies(self, obstacle_body_names: list|str): + def add_collision_bodies(self, obstacle_body_names: list | str): """ Add specified obstacle bodies to the robot's obstacle checks. Prints a warning if the body name does not exist in the model. - + Args: obstacle_body_names (list|str): List of names of bodies to be added to obstacle checks. """ self.robot._add_obstacle_body_ids(obstacle_body_names) - def add_collision_geoms(self, obstacle_geom_names: list|str): + def add_collision_geoms(self, obstacle_geom_names: list | str): """ Add specified obstacle geometries to the robot's obstacle checks. Prints a warning if the geometry name does not exist in the model. @@ -508,18 +535,18 @@ def add_collision_geoms(self, obstacle_geom_names: list|str): obstacle_geom_names (list|str): List of names of geometries to be added to obstacle checks. """ self.robot._add_obstacle_geom_ids(obstacle_geom_names) - - def remove_collision_bodies(self, obstacle_body_names: list|str): + + def remove_collision_bodies(self, obstacle_body_names: list | str): """ Remove specified obstacle bodies from the robot's obstacle checks. Prints a warning if the body name does not exist in the current set of obstacles. - + Args: obstacle_body_names (list|str): List of names of bodies to be removed from obstacle checks. """ self.robot._remove_obstacle_body_ids(obstacle_body_names) - - def remove_collision_geoms(self, obstacle_geom_names: list|str): + + def remove_collision_geoms(self, obstacle_geom_names: list | str): """ Remove specified obstacle geometries from the robot's obstacle checks. Prints a warning if the geometry name does not exist in the current set of obstacles. @@ -527,4 +554,4 @@ def remove_collision_geoms(self, obstacle_geom_names: list|str): Args: obstacle_geom_names (list|str): List of names of geometries to be removed from obstacle checks. """ - self.robot._remove_obstacle_geom_ids(obstacle_geom_names) \ No newline at end of file + self.robot._remove_obstacle_geom_ids(obstacle_geom_names) From e9f8a6371c273650027a91b3de9289a3572aa3e7 Mon Sep 17 00:00:00 2001 From: Seongjin Bien Date: Mon, 4 Aug 2025 10:20:36 +0200 Subject: [PATCH 08/10] fix: mypy errors --- python/rcs/ompl/mj_ompl.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/python/rcs/ompl/mj_ompl.py b/python/rcs/ompl/mj_ompl.py index a6486de4..29226f10 100644 --- a/python/rcs/ompl/mj_ompl.py +++ b/python/rcs/ompl/mj_ompl.py @@ -40,7 +40,7 @@ def get_collision_bodies(xml_file, robot_name): for g in collision_bodies: name = g.get("name") if name is None: - error_msg = f"Body {ET.tostring(g).rstrip()}'s name is None. Please give it a name in the XML file." + error_msg = f"Body {ET.tostring(g).rstrip()!r}'s name is None. Please give it a name in the XML file." raise ValueError(error_msg) names.append(name) @@ -125,8 +125,8 @@ def __init__( for name in self.actuator_names if mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_ACTUATOR, name) > -1 ] - self.obstacle_body_ids = set() - self.obstacle_geom_ids = set() + self.obstacle_body_ids: set[int] = set() + self.obstacle_geom_ids: set[int] = set() self._add_obstacle_body_ids(obstacle_body_names) self._add_obstacle_geom_ids(obstacle_geom_names) @@ -155,7 +155,7 @@ def _remove_obstacle_body_ids(self, obstacle_body_names: list | str): error_msg = f"_remove_obstacle_body_ids: obstacle body name {name} does not exist in the set." raise RuntimeError(error_msg) - def _add_obstacle_body_ids(self, obstacle_body_names: list | str): + def _add_obstacle_body_ids(self, obstacle_body_names: list | None): """ Add specified obstacle bodies to the robot's obstacle checks, given their names. @@ -173,7 +173,7 @@ def _add_obstacle_body_ids(self, obstacle_body_names: list | str): error_msg = f"_add_obstacle_body_ids: obstacle body name {name} does not exist in the model." raise RuntimeError(error_msg) - def _add_obstacle_geom_ids(self, obstacle_geom_names: list | str): + def _add_obstacle_geom_ids(self, obstacle_geom_names: list | None): """ Add specified obstacle geoms to the robot's obstacle checks, given their names. @@ -281,7 +281,7 @@ def ik(self, pose, q0=None, tcp_offset=None): numpy.ndarray: Joint positions that achieve the desired pose, or None if no solution is found. """ tcp_offset = tcp_offset if tcp_offset is not None else self.franka_hand_tcp - return self.env.unwrapped.robot.get_ik().ik(pose, q0, tcp_offset) + return self.env.unwrapped.robot.get_ik().ik(pose, q0, tcp_offset) # type: ignore[attr-defined] class MjOStateSpace(ob.RealVectorStateSpace): @@ -423,7 +423,7 @@ def set_planner(self, planner_name): self.ss.setPlanner(self.planner) - def plan(self, goal: np.ndarray, start: np.ndarray = None, allowed_time: float = DEFAULT_PLANNING_TIME): + def plan(self, goal: np.ndarray, start: np.ndarray | None = None, allowed_time: float = DEFAULT_PLANNING_TIME): """ Plan a path to goal from current robot state. Can be combined with `ik_pose` to plan a path to a desired pose. @@ -485,7 +485,7 @@ def _plan_start_goal(self, start: np.ndarray, goal: np.ndarray, allowed_time=DEF sol_path_list = [np.array(sol_path, dtype=np.float32) for sol_path in sol_path_list] return res, sol_path_list - def ik(self, pose: Pose, q0: np.ndarray = None, tcp_offset: Pose = None): + def ik(self, pose: Pose, q0: np.ndarray | None = None, tcp_offset: Pose | None = None): """ Perform inverse kinematics to find joint positions that achieve the desired pose. @@ -516,7 +516,7 @@ def state_to_list(self, state): def set_state_sampler(self, state_sampler): self.space.set_state_sampler(state_sampler) - def add_collision_bodies(self, obstacle_body_names: list | str): + def add_collision_bodies(self, obstacle_body_names: list): """ Add specified obstacle bodies to the robot's obstacle checks. Prints a warning if the body name does not exist in the model. @@ -526,7 +526,7 @@ def add_collision_bodies(self, obstacle_body_names: list | str): """ self.robot._add_obstacle_body_ids(obstacle_body_names) - def add_collision_geoms(self, obstacle_geom_names: list | str): + def add_collision_geoms(self, obstacle_geom_names: list): """ Add specified obstacle geometries to the robot's obstacle checks. Prints a warning if the geometry name does not exist in the model. From c5195b8884165a185eb6f15de51531f4a5679213 Mon Sep 17 00:00:00 2001 From: Seongjin Bien Date: Mon, 4 Aug 2025 10:30:55 +0200 Subject: [PATCH 09/10] pycheckformat --- python/rcs/ompl/mj_ompl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/rcs/ompl/mj_ompl.py b/python/rcs/ompl/mj_ompl.py index 29226f10..a8d7a459 100644 --- a/python/rcs/ompl/mj_ompl.py +++ b/python/rcs/ompl/mj_ompl.py @@ -281,7 +281,7 @@ def ik(self, pose, q0=None, tcp_offset=None): numpy.ndarray: Joint positions that achieve the desired pose, or None if no solution is found. """ tcp_offset = tcp_offset if tcp_offset is not None else self.franka_hand_tcp - return self.env.unwrapped.robot.get_ik().ik(pose, q0, tcp_offset) # type: ignore[attr-defined] + return self.env.unwrapped.robot.get_ik().ik(pose, q0, tcp_offset) # type: ignore[attr-defined] class MjOStateSpace(ob.RealVectorStateSpace): From 8cf40b04832283e9dede3939c9406c6a2e9287bd Mon Sep 17 00:00:00 2001 From: Seongjin Bien Date: Mon, 4 Aug 2025 10:32:26 +0200 Subject: [PATCH 10/10] pycheckformat --- python/rcs/camera/sim.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/rcs/camera/sim.py b/python/rcs/camera/sim.py index 6c2e2049..fd79bd10 100644 --- a/python/rcs/camera/sim.py +++ b/python/rcs/camera/sim.py @@ -80,9 +80,7 @@ def _cpp_to_python_frames(self, cpp_frameset: _FrameSet | None) -> FrameSet | No extrinsics=self._extrinsics(color_name), ), depth=DataFrame( - data= (depth_np_frame * BaseCameraSet.DEPTH_SCALE).astype( - np.uint16 - ), + data=(depth_np_frame * BaseCameraSet.DEPTH_SCALE).astype(np.uint16), timestamp=cpp_frameset.timestamp, intrinsics=self._intrinsics(depth_name), extrinsics=self._extrinsics(depth_name),