Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,18 @@ classifiers = [
# "mypy==1.4.1",
# ]

[project.optional-dependencies]
so101 = [
"lerobot @ git+https://github.com/huggingface/lerobot.git",
]


[tool.scikit-build]
build.verbose = true
build.targets = ["_core", "scenes"]
logging.level = "INFO"
build-dir = "build"
wheel.packages = ["python/rcs"]
wheel.packages = ["python/rcs", "python/rcs_so101"]
install.components = ["python_package"]

[tool.ruff]
Expand Down
9 changes: 8 additions & 1 deletion python/rcs/_core/common.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ __all__ = [
"RobotState",
"RobotType",
"SIMULATION",
"SO101",
"UR5e",
"robots_meta_config",
]
Expand Down Expand Up @@ -62,6 +63,7 @@ class GripperState:

class IK:
def __init__(self, urdf_path: str, max_duration_ms: int = 300) -> None: ...
def forward(self, q0: numpy.ndarray[M, numpy.dtype[numpy.float64]], tcp_offset: Pose = ...) -> Pose: ...
def ik(
self, pose: Pose, q0: numpy.ndarray[M, numpy.dtype[numpy.float64]], tcp_offset: Pose = ...
) -> numpy.ndarray[M, numpy.dtype[numpy.float64]] | None: ...
Expand Down Expand Up @@ -158,6 +160,7 @@ class Robot:
class RobotConfig:
robot_platform: RobotPlatform
robot_type: RobotType
def __init__(self) -> None: ...

class RobotMetaConfig:
@property
Expand Down Expand Up @@ -206,13 +209,16 @@ class RobotType:
FR3

UR5e

SO101
"""

FR3: typing.ClassVar[RobotType] # value = <RobotType.FR3: 0>
SO101: typing.ClassVar[RobotType] # value = <RobotType.SO101: 2>
UR5e: typing.ClassVar[RobotType] # value = <RobotType.UR5e: 1>
__members__: typing.ClassVar[
dict[str, RobotType]
] # value = {'FR3': <RobotType.FR3: 0>, 'UR5e': <RobotType.UR5e: 1>}
] # value = {'FR3': <RobotType.FR3: 0>, 'UR5e': <RobotType.UR5e: 1>, 'SO101': <RobotType.SO101: 2>}
def __eq__(self, other: typing.Any) -> bool: ...
def __getstate__(self) -> int: ...
def __hash__(self) -> int: ...
Expand All @@ -238,4 +244,5 @@ def robots_meta_config(robot_type: RobotType) -> RobotMetaConfig: ...
FR3: RobotType # value = <RobotType.FR3: 0>
HARDWARE: RobotPlatform # value = <RobotPlatform.HARDWARE: 1>
SIMULATION: RobotPlatform # value = <RobotPlatform.SIMULATION: 0>
SO101: RobotType # value = <RobotType.SO101: 2>
UR5e: RobotType # value = <RobotType.UR5e: 1>
9 changes: 5 additions & 4 deletions python/rcs/_core/sim.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ class SimGripperConfig(rcs._core.common.GripperConfig):
epsilon_inner: float
epsilon_outer: float
ignored_collision_geoms: list[str]
joint1: str
joint2: str
joint: str
max_actuator_width: float
max_joint_width: float
min_actuator_width: float
min_joint_width: float
seconds_between_callbacks: float
def __init__(self) -> None: ...
def add_id(self, id: str) -> None: ...
Expand All @@ -132,8 +135,6 @@ class SimGripperState(rcs._core.common.GripperState):
def last_commanded_width(self) -> float: ...
@property
def last_width(self) -> float: ...
@property
def max_unnormalized_width(self) -> float: ...

class SimRobot(rcs._core.common.Robot):
def __init__(
Expand Down
2 changes: 2 additions & 0 deletions python/rcs/sim/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from rcs._core.sim import (
SimCameraConfig,
SimGripper,
SimGripperConfig,
SimGripperState,
Expand All @@ -17,4 +18,5 @@
"SimGripperConfig",
"SimGripperState",
"gui_loop",
"SimCameraConfig",
]
Empty file added python/rcs_so101/__init__.py
Empty file.
150 changes: 150 additions & 0 deletions python/rcs_so101/creators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import logging
from os import PathLike
from pathlib import Path
from typing import Type

import gymnasium as gym
import rcs
from gymnasium.envs.registration import EnvCreator
from lerobot.common.robots import make_robot_from_config
from lerobot.common.robots.so101_follower.config_so101_follower import (
SO101FollowerConfig,
)
from lerobot.common.teleoperators.so101_leader.config_so101_leader import (
SO101LeaderConfig,
)
from lerobot.common.teleoperators.so101_leader.so101_leader import SO101Leader
from lerobot.common.teleoperators.utils import make_teleoperator_from_config
from rcs import common, sim
from rcs.camera.hw import HardwareCameraSet
from rcs.camera.sim import SimCameraSet
from rcs.envs.base import (
CameraSetWrapper,
ControlMode,
GripperWrapper,
RelativeActionSpace,
RelativeTo,
RobotEnv,
)
from rcs.envs.creators import RCSHardwareEnvCreator
from rcs.envs.sim import CollisionGuard, GripperWrapperSim, RobotSimWrapper, SimWrapper
from rcs.sim import SimCameraConfig, SimGripperConfig, SimRobotConfig
from rcs_so101.hw import SO101, S0101Gripper

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class RCSSO101EnvCreator(RCSHardwareEnvCreator):
def __call__( # type: ignore
self,
id: str,
port: str,
urdf_path: str,
calibration_dir: PathLike | str | None = None,
camera_set: HardwareCameraSet | None = None,
max_relative_movement: float | tuple[float, float] | None = None,
relative_to: RelativeTo = RelativeTo.LAST_STEP,
) -> gym.Env:
if isinstance(calibration_dir, str):
calibration_dir = Path(calibration_dir)
cfg = SO101FollowerConfig(id=id, calibration_dir=calibration_dir, port=port)
hf_robot = make_robot_from_config(cfg)
robot = SO101(hf_robot, urdf_path=urdf_path)
env: gym.Env = RobotEnv(robot, ControlMode.JOINTS)
# env = FR3HW(env)

gripper = S0101Gripper(hf_robot)
env = GripperWrapper(env, gripper, binary=False)

if camera_set is not None:
camera_set.start()
camera_set.wait_for_frames()
logger.info("CameraSet started")
env = CameraSetWrapper(env, camera_set)

if max_relative_movement is not None:
env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to)

return env

@staticmethod
def teleoperator(
id: str,
port: str,
calibration_dir: PathLike | str | None = None,
) -> SO101Leader:
if isinstance(calibration_dir, str):
calibration_dir = Path(calibration_dir)
cfg = SO101LeaderConfig(id=id, calibration_dir=calibration_dir, port=port)
teleop = make_teleoperator_from_config(cfg)
teleop.connect()
return teleop


class SO101SimEnvCreator(EnvCreator):
def __call__( # type: ignore
self,
control_mode: ControlMode,
robot_cfg: SimRobotConfig,
urdf_path: str,
mjcf: str,
collision_guard: bool = False,
gripper_cfg: SimGripperConfig | None = None,
cameras: dict[str, SimCameraConfig] | None = None,
max_relative_movement: float | tuple[float, float] | None = None,
relative_to: RelativeTo = RelativeTo.LAST_STEP,
sim_wrapper: Type[SimWrapper] | None = None,
) -> gym.Env:
"""
Creates a simulation environment for the FR3 robot.
Args:
control_mode (ControlMode): Control mode for the robot.
robot_cfg (rcs.sim.SimRobotConfig): Configuration for the FR3 robot.
collision_guard (bool): Whether to use collision guarding. If True, the same mjcf scene is used for collision guarding.
gripper_cfg (rcs.sim.SimGripperConfig | None): Configuration for the gripper. If None, no gripper is used.
camera_set_cfg (SimCameraSetConfig | None): Configuration for the camera set. If None, no cameras are used.
max_relative_movement (float | tuple[float, float] | None): Maximum allowed movement. If float, it restricts
translational movement in meters. If tuple, it restricts both translational (in meters) and rotational
(in radians) movements. If None, no restriction is applied.
relative_to (RelativeTo): Specifies whether the movement is relative to a configured origin or the last step.
urdf_path (str | PathLike | None): Path to the URDF file. If None, the URDF file is automatically deduced
which requires a UTN compatible lab scene to be present.
mjcf (str | PathLike): Path to the Mujoco scene XML file. Defaults to "fr3_empty_world".
sim_wrapper (Type[SimWrapper] | None): Wrapper to be applied before the simulation wrapper. This is useful
for reset management e.g. resetting objects in the scene with correct observations. Defaults to None.
Returns:
gym.Env: The configured simulation environment for the FR3 robot.
"""
simulation = sim.Sim(mjcf)

ik = rcs.common.IK(str(urdf_path))
robot = rcs.sim.SimRobot(simulation, ik, robot_cfg)
env: gym.Env = RobotEnv(robot, control_mode)
env = RobotSimWrapper(env, simulation, sim_wrapper)

if cameras is not None:
camera_set = SimCameraSet(simulation, cameras, physical_units=True, render_on_demand=True)
env = CameraSetWrapper(env, camera_set, include_depth=True)

if gripper_cfg is not None and isinstance(gripper_cfg, SimGripperConfig):
gripper = sim.SimGripper(simulation, gripper_cfg)
env = GripperWrapper(env, gripper, binary=False)
env = GripperWrapperSim(env, gripper)

if collision_guard:
env = CollisionGuard.env_from_xml_paths(
env,
str(rcs.scenes.get(str(mjcf), mjcf)),
str(urdf_path),
gripper=gripper_cfg is not None,
check_home_collision=False,
control_mode=control_mode,
tcp_offset=rcs.common.Pose(common.FrankaHandTCPOffset()),
sim_gui=True,
truncate_on_collision=True,
)
if max_relative_movement is not None:
env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to)

return env
73 changes: 73 additions & 0 deletions python/rcs_so101/env_joint_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import logging
from time import sleep

import rcs
from rcs import sim
from rcs._core.common import RobotPlatform
from rcs.envs.base import ControlMode, RelativeTo
from rcs_so101.creators import RCSSO101EnvCreator, SO101SimEnvCreator

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

ROBOT_INSTANCE = RobotPlatform.SIMULATION


def main():

if ROBOT_INSTANCE == RobotPlatform.HARDWARE:
env_rel = RCSSO101EnvCreator()(
id="so101_follower",
urdf_path="/home/tobi/coding/lerobot/so101_new_calib.urdf",
port="/dev/ttyACM0",
calibration_dir="/home/tobi/coding/lerobot-container/calibration/robots/so101_follower/ninja_follower.json",
# max_relative_movement=(0.5, np.deg2rad(90)),
relative_to=RelativeTo.LAST_STEP,
)
else:
cfg = sim.SimRobotConfig()
cfg.robot_type = rcs.common.RobotType.SO101
cfg.actuators = ["1", "2", "3", "4", "5"]
cfg.joints = ["1", "2", "3", "4", "5"]
cfg.arm_collision_geoms = []
cfg.attachment_site = "gripper"

grp_cfg = sim.SimGripperConfig()
grp_cfg.actuator = "6"
grp_cfg.joint = "6"
grp_cfg.collision_geoms = []
grp_cfg.collision_geoms_fingers = []

env_rel = SO101SimEnvCreator()(
control_mode=ControlMode.JOINTS,
urdf_path="/home/tobi/coding/lerobot/so101_new_calib.urdf",
robot_cfg=cfg,
collision_guard=False,
mjcf="/home/tobi/coding/lerobot/SO-ARM100/Simulation/SO101/scene.xml",
gripper_cfg=grp_cfg,
# camera_set_cfg=default_mujoco_cameraset_cfg(),
max_relative_movement=None,
# max_relative_movement=10.0,
# max_relative_movement=0.5,
relative_to=RelativeTo.LAST_STEP,
)
env_rel.get_wrapper_attr("sim").open_gui()

for _ in range(10):
obs, info = env_rel.reset()
for _ in range(100):
# sample random relative action and execute it
act = env_rel.action_space.sample()
print(act)
# act["gripper"] = 1.0
obs, reward, terminated, truncated, info = env_rel.step(act)
print(obs)
if truncated or terminated:
logger.info("Truncated or terminated!")
return
logger.info(act["gripper"])
sleep(1.0)


if __name__ == "__main__":
main()
Loading