diff --git a/README.md b/README.md index 20122dcf..40d7d2f9 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ import rcs from rcs import sim from rcs._core.sim import CameraType from rcs.camera.sim import SimCameraConfig, SimCameraSet +from time import sleep simulation = sim.Sim(rcs.scenes["fr3_empty_world"]["mjb"]) urdf_path = rcs.scenes["fr3_empty_world"]["urdf"] ik = rcs.common.RL(str(urdf_path)) @@ -42,23 +43,18 @@ gripper_cfg_sim = sim.SimGripperConfig() gripper_cfg_sim.add_id("0") gripper = sim.SimGripper(simulation, gripper_cfg_sim) -# add camera to have a rendering gui -cameras = { - "wrist": SimCameraConfig( - identifier="wrist_0", - type=CameraType.fixed, - resolution_width=640, - resolution_height=480, - frame_rate=30, - ), -} -camera_set = SimCameraSet(simulation, cameras) +camera_set = SimCameraSet(simulation, {}) simulation.open_gui() +# wait for gui +sleep(5) +# step the robot 10 cm in x direction robot.set_cartesian_position( - robot.get_cartesian_position() * rcs.common.Pose(translation=np.array([0.05, 0, 0])) + robot.get_cartesian_position() * rcs.common.Pose(translation=np.array([0.1, 0, 0])) ) +# close gripper gripper.grasp() simulation.step_until_convergence() +input("press enter to close") ``` ### Gym Env Interface ```python @@ -80,7 +76,7 @@ env_rel = SimEnvCreator()( ) env_rel.get_wrapper_attr("sim").open_gui() -for _ in range(10): +for _ in range(100): obs, info = env_rel.reset() for _ in range(10): # sample random relative action and execute it @@ -110,8 +106,8 @@ For more details real the readme file of the respective extension. After the required hardware extensions are installed the examples also above work on real hardware: Switch to hardware by setting the following flag: ```python -ROBOT_INSTANCE = RobotPlatform.SIMULATION -# ROBOT_INSTANCE = RobotPlatform.HARDWARE +# ROBOT_INSTANCE = RobotPlatform.SIMULATION +ROBOT_INSTANCE = RobotPlatform.HARDWARE ``` #### Command Line Interface diff --git a/extensions/rcs_realsense/pyproject.toml b/extensions/rcs_realsense/pyproject.toml index ac3e8067..e44eba50 100644 --- a/extensions/rcs_realsense/pyproject.toml +++ b/extensions/rcs_realsense/pyproject.toml @@ -9,6 +9,7 @@ description="RCS realsense module" dependencies = [ "rcs==0.4.0", "pyrealsense2~=2.55.1", + "apriltag==0.0.16", ] readme = "README.md" maintainers = [ diff --git a/extensions/rcs_realsense/src/rcs_realsense/calibration.py b/extensions/rcs_realsense/src/rcs_realsense/calibration.py new file mode 100644 index 00000000..937450c5 --- /dev/null +++ b/extensions/rcs_realsense/src/rcs_realsense/calibration.py @@ -0,0 +1,151 @@ +import logging +import threading +import typing +from time import sleep + +import apriltag +import cv2 +import numpy as np +from rcs._core import common +from rcs.camera.hw import CalibrationStrategy +from rcs.camera.interface import Frame +from tqdm import tqdm + +logger = logging.getLogger(__name__) + + +class FR3BaseArucoCalibration(CalibrationStrategy): + """Calibration with a 3D printed aruco marker that fits around the vention's FR3 base mounting plate.""" + + def __init__(self, camera_name: str): + # base frame to camera, world to base frame + self._extrinsics: np.ndarray[tuple[typing.Literal[4], typing.Literal[4]], np.dtype[np.float64]] | None = None + self.camera_name = camera_name + self.tag_to_world = common.Pose( + rpy_vector=np.array([np.pi, 0, -np.pi / 2]), translation=np.array([0.145, 0, 0]) + ).pose_matrix() + + def calibrate( + self, + samples: list[Frame], + intrinsics: np.ndarray[tuple[typing.Literal[3], typing.Literal[4]], np.dtype[np.float64]], + lock: threading.Lock, + ) -> bool: + logger.info("Calibrating camera %s. Position it as you wish and press enter.", self.camera_name) + input() + tries = 3 + while len(samples) < 10 and tries > 0: + logger.info("not enought frames in recorded, waiting 2 seconds...") + tries = -1 + sleep(2) + if tries == 0: + logger.warning("Calibration failed, not enough frames arrived.") + return False + + frames = [] + with lock: + for sample in samples: + frames.append(sample.camera.color.data.copy()) + print(frames) + + _, tag_to_cam = get_average_marker_pose(frames, intrinsics=intrinsics, calib_tag_id=9, show_live_window=False) + + cam_to_world = self.tag_to_world @ np.linalg.inv(tag_to_cam) + world_to_cam = np.linalg.inv(cam_to_world) + self._extrinsics = world_to_cam # type: ignore + return True + + def get_extrinsics(self) -> np.ndarray[tuple[typing.Literal[4], typing.Literal[4]], np.dtype[np.float64]] | None: + return self._extrinsics + + +def get_average_marker_pose( + samples, + intrinsics, + calib_tag_id, + show_live_window, +): + # create detector + options = apriltag.DetectorOptions(families="tag25h9") + detector = apriltag.Detector(options=options) + + # make while loop with tqdm + poses = [] + + last_frame = None + for frame in tqdm(samples): + + # detect tags + marker_det, pose = get_marker_pose(calib_tag_id, detector, intrinsics, frame) + + if marker_det is None: + continue + + for corner in marker_det.corners: + cv2.circle(frame, tuple(corner.astype(int)), 5, (0, 0, 255), -1) + + poses.append(pose) + + last_frame = frame.copy() + + camera_matrix = intrinsics[:3, :3] + + if show_live_window: + cv2.drawFrameAxes(frame, camera_matrix, None, pose[:3, :3], pose[:3, 3], 0.1) # type: ignore + # show frame + cv2.imshow("frame", frame) + + # wait for key press + if cv2.waitKey(1) & 0xFF == ord("q"): + break + if last_frame is None: + msg = "No frames were processed, cannot calculate average pose. Check if the tag is visible." + raise ValueError(msg) + + if show_live_window: + cv2.destroyAllWindows() + + # calculate the average marker pose + avg_pose = np.mean(poses, axis=0) + logger.info(f"Average pose: {avg_pose}") + + return last_frame, avg_pose + + +def get_marker_pose(calib_tag_id, detector, intrinsics, frame): + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + detections = detector.detect(gray) + + # count detections + n_det = 0 + marker_det = None + for det in detections: + if det.tag_id != calib_tag_id: + continue + n_det += 1 + marker_det = det + + if n_det > 1: + msg = f"Expected 1 detection of tag id {calib_tag_id}, got {n_det}." + raise ValueError(msg) + + if marker_det is None: + return None, None + + fx = intrinsics[0, 0] + fy = intrinsics[1, 1] + cx = intrinsics[0, 2] + cy = intrinsics[1, 2] + + pose, _, _ = detector.detection_pose( + marker_det, + camera_params=( + fx, + fy, + cx, + cy, + ), + tag_size=0.1, + ) + + return marker_det, pose diff --git a/extensions/rcs_realsense/src/rcs_realsense/camera.py b/extensions/rcs_realsense/src/rcs_realsense/camera.py index 4cd55c85..01d36e4c 100644 --- a/extensions/rcs_realsense/src/rcs_realsense/camera.py +++ b/extensions/rcs_realsense/src/rcs_realsense/camera.py @@ -1,10 +1,13 @@ +import copy import logging +import threading +import typing from dataclasses import dataclass import numpy as np import pyrealsense2 as rs -from rcs.camera.hw import HardwareCamera -from rcs.camera.interface import CameraFrame, DataFrame, Frame, IMUFrame +from rcs.camera.hw import CalibrationStrategy, DummyCalibrationStrategy, HardwareCamera +from rcs.camera.interface import BaseCameraSet, CameraFrame, DataFrame, Frame, IMUFrame from rcs import common @@ -20,14 +23,20 @@ class RealSenseDevicePipeline: pipeline: rs.pipeline pipeline_profile: rs.pipeline_profile camera: RealSenseDeviceInfo + depth_scale: float | None = None + color_intrinsics: np.ndarray[tuple[typing.Literal[3], typing.Literal[4]], np.dtype[np.float64]] | None = None + depth_intrinsics: np.ndarray[tuple[typing.Literal[3], typing.Literal[4]], np.dtype[np.float64]] | None = None + depth_to_color: common.Pose | None = None class RealSenseCameraSet(HardwareCamera): TIMESTAMP_FACTOR = 1e-3 + CALIBRATION_FRAME_SIZE = 30 def __init__( self, cameras: dict[str, common.BaseCameraConfig], + calibration_strategy: dict[str, CalibrationStrategy] | None = None, enable_ir_emitter: bool = False, enable_ir: bool = False, laser_power: int = 330, @@ -38,6 +47,9 @@ def __init__( self.laser_power = laser_power self.enable_imu = enable_imu self.cameras = cameras + if calibration_strategy is None: + calibration_strategy = {camera_name: DummyCalibrationStrategy() for camera_name in cameras} + self.calibration_strategy = calibration_strategy self._logger = logging.getLogger(__name__) assert ( len({camera.resolution_width for camera in self.cameras.values()}) == 1 @@ -48,6 +60,8 @@ def __init__( self.resolution_width = sample_camera_config.resolution_width self.resolution_height = sample_camera_config.resolution_height self.frame_rate = sample_camera_config.frame_rate + self._frame_buffer_lock: dict[str, threading.Lock] = {} + self._frame_buffer: dict[str, list] = {} self.D400_config = rs.config() self.D400_config.enable_stream( @@ -109,16 +123,6 @@ def open(self): def update_available_devices(self): self._available_devices = self.enumerate_connected_devices(rs.context()) - def enable_all_devices(self, enable_ir_emitter: bool = False): - """ - Enable all the Intel RealSense Devices which are connected to the PC - - """ - self._logger.debug("%i devices have been found", len(self._available_devices)) - - for device_info in self._available_devices.values(): - self.enable_device(device_info, enable_ir_emitter) - def enable_devices(self, devices_to_enable: dict[str, str], enable_ir_emitter: bool = False): """ Enable the Intel RealSense Devices which are connected to the PC @@ -135,9 +139,9 @@ def enable_devices(self, devices_to_enable: dict[str, str], enable_ir_emitter: b assert ( device_serial in self._available_devices ), f"Device {device_name} not found. Check if it is connected." - self.enable_device(self._available_devices[device_serial], enable_ir_emitter) + self.enable_device(device_name, self._available_devices[device_serial], enable_ir_emitter) - def enable_device(self, device_info: RealSenseDeviceInfo, enable_ir_emitter: bool = False): + def enable_device(self, camera_name: str, device_info: RealSenseDeviceInfo, enable_ir_emitter: bool = False): """ Enable an Intel RealSense Device @@ -164,7 +168,43 @@ def enable_device(self, device_info: RealSenseDeviceInfo, enable_ir_emitter: boo if sensor.supports(rs.option.emitter_enabled): sensor.set_option(rs.option.emitter_enabled, 1 if enable_ir_emitter else 0) sensor.set_option(rs.option.laser_power, self.laser_power) - self._enabled_devices[device_info.serial] = RealSenseDevicePipeline(pipeline, pipeline_profile, device_info) + + depth_vp = pipeline_profile.get_stream(rs.stream.depth).as_video_stream_profile() + color_vp = pipeline_profile.get_stream(rs.stream.color).as_video_stream_profile() + + rs_color_intrinsics = color_vp.get_intrinsics() + color_intrinsics = np.array( + [ + [rs_color_intrinsics.fx, 0, (rs_color_intrinsics.width - 1) / 2, 0], + [0, rs_color_intrinsics.fy, (rs_color_intrinsics.height - 1) / 2, 0], + [0, 0, 1, 0], + ] + ) + rs_depth_intrinsics = depth_vp.get_intrinsics() + depth_intrinsics = np.array( + [ + [rs_depth_intrinsics.fx, 0, (rs_depth_intrinsics.width - 1) / 2, 0], + [0, rs_depth_intrinsics.fy, (rs_depth_intrinsics.height - 1) / 2, 0], + [0, 0, 1, 0], + ] + ) + + depth_to_color = depth_vp.get_extrinsics_to(color_vp) + + self._enabled_devices[camera_name] = RealSenseDevicePipeline( + pipeline, + pipeline_profile, + device_info, + depth_scale=sensor.get_depth_scale(), + color_intrinsics=color_intrinsics, + depth_intrinsics=depth_intrinsics, + depth_to_color=common.Pose( + translation=depth_to_color.translation, rotation=np.array(depth_to_color.rotation).reshape(3, 3) # type: ignore + ), + ) + + self._frame_buffer[camera_name] = [] + self._frame_buffer_lock[camera_name] = threading.Lock() self._logger.debug("Enabled device %s (%s)", device_info.serial, device_info.product_line) @staticmethod @@ -195,20 +235,11 @@ def enumerate_connected_devices(context: rs.context) -> dict[str, RealSenseDevic return connect_device def poll_frame(self, camera_name: str) -> Frame: - # TODO(juelg): polling should be performed in a recorder thread - # (anyway needed to record trajectory data to disk) - # and this method should just access the latest frame from the recorder - # Perhaps there even has to be a separate recorder thread for each camera - - # TODO(juelg): what is a "pose" (in frame) and how can we use it - # TODO(juelg): decide whether to use the poll method and to wait if devices get ready assert camera_name in self.camera_names, f"Camera {camera_name} not found in the enabled devices" - serial = self.cameras[camera_name].identifier - device = self._enabled_devices[serial] + device = self._enabled_devices[camera_name] streams = device.pipeline_profile.get_streams() frameset = device.pipeline.wait_for_frames() - # frameset = device.pipeline.poll_for_frames() color: DataFrame | None = None ir: DataFrame | None = None @@ -223,6 +254,13 @@ def to_ts(frame: rs.frame) -> float: # convert to seconds return frame.get_timestamp() * RealSenseCameraSet.TIMESTAMP_FACTOR + color_extrinsics = self.calibration_strategy[camera_name].get_extrinsics() + depth_to_color = device.depth_to_color + assert depth_to_color is not None, "Depth to color extrinsics not found" + depth_extrinsics = ( + color_extrinsics @ depth_to_color.inverse().pose_matrix() if color_extrinsics is not None else None + ) + timestamps = [] for stream in streams: if rs.stream.infrared == stream.stream_type(): @@ -230,10 +268,23 @@ def to_ts(frame: rs.frame) -> float: ir = DataFrame(data=to_numpy(frame), timestamp=to_ts(frame)) elif rs.stream.color == stream.stream_type(): frame = frameset.get_color_frame() - color = DataFrame(data=to_numpy(frame)[:, :, ::-1], timestamp=to_ts(frame)) + color = DataFrame( + data=to_numpy(frame)[:, :, ::-1], + timestamp=to_ts(frame), + intrinsics=device.color_intrinsics, + extrinsics=color_extrinsics, + ) elif rs.stream.depth == stream.stream_type(): frame = frameset.get_depth_frame() - depth = DataFrame(data=to_numpy(frame), timestamp=to_ts(frame)) + assert device.depth_scale is not None, "Depth scale not found" + depth = DataFrame( + data=(to_numpy(frame).astype(np.float64) * device.depth_scale * BaseCameraSet.DEPTH_SCALE).astype( + np.uint16 + ), + timestamp=to_ts(frame), + intrinsics=device.depth_intrinsics, + extrinsics=depth_extrinsics, + ) elif rs.stream.accel == stream.stream_type(): frame = frameset.first(stream.stream_index()) md = frame.as_motion_frame().get_motion_data() @@ -249,81 +300,18 @@ def to_ts(frame: rs.frame) -> float: timestamps.append(to_ts(frame)) assert color is not None, "Color frame not found" - cf = CameraFrame(color=color, ir=ir, depth=depth) + cf = CameraFrame( + color=color, + ir=ir, + depth=depth, + ) imu = IMUFrame(accel=accel, gyro=gyro) - return Frame(camera=cf, imu=imu, avg_timestamp=float(np.mean(timestamps)) if len(timestamps) > 0 else None) - - def get_depth_shape(self): - """ - Retruns width and height of the depth stream for one arbitrary device - - Returns: - ----------- - width : int - height: int - """ - width = -1 - height = -1 - for device in self._enabled_devices.values(): - for stream in device.pipeline_profile.get_streams(): - if rs.stream.depth == stream.stream_type(): - width = stream.as_video_stream_profile().width() - height = stream.as_video_stream_profile().height() - return width, height - - def get_device_intrinsics( - self, frames: dict[str, dict[rs.stream, rs.frame]] - ) -> dict[str, dict[rs.stream, rs.intrinsics]]: - """ - Get the intrinsics of the imager using its frame delivered by the realsense device - - Parameters: - ----------- - frames : rs::frame - The frame grabbed from the imager inside the Intel RealSense for which the intrinsic is needed - - Return: - ----------- - device_intrinsics : dict - keys : serial - Serial number of the device - values: [key] - Intrinsics of the corresponding device - """ - device_intrinsics: dict[str, dict[rs.stream, rs.intrinsics]] = {} - for device_name, frameset in frames.items(): - device_intrinsics[device_name] = {} - for key, value in frameset.items(): - device_intrinsics[device_name][key] = value.get_profile().as_video_stream_profile().get_intrinsics() - return device_intrinsics - - def get_depth_to_color_extrinsics(self, frames): - """ - Get the extrinsics between the depth imager 1 and the color imager using its frame delivered by the realsense device - - Parameters: - ----------- - frames : rs::frame - The frame grabbed from the imager inside the Intel RealSense for which the intrinsic is needed - - Return: - ----------- - device_intrinsics : dict - keys : serial - Serial number of the device - values: [key] - Extrinsics of the corresponding device - """ - device_extrinsics = {} - for dev_info, frameset in frames.items(): - serial = dev_info[0] - device_extrinsics[serial] = ( - frameset[rs.stream.depth] - .get_profile() - .as_video_stream_profile() - .get_extrinsics_to(frameset[rs.stream.color].get_profile()) - ) - return device_extrinsics + f = Frame(camera=cf, imu=imu, avg_timestamp=float(np.mean(timestamps)) if len(timestamps) > 0 else None) + with self._frame_buffer_lock[camera_name]: + if len(self._frame_buffer[camera_name]) >= self.CALIBRATION_FRAME_SIZE: + self._frame_buffer[camera_name].pop(0) + self._frame_buffer[camera_name].append(copy.deepcopy(f)) + return f def disable_streams(self): self.D400_config.disable_all_streams() @@ -346,3 +334,17 @@ def enable_emitter(self, enable_ir_emitter=True): sensor.set_option(rs.option.emitter_enabled, 1 if enable_ir_emitter else 0) if enable_ir_emitter: sensor.set_option(rs.option.laser_power, self.laser_power) + + def calibrate(self) -> bool: + for camera_name in self.cameras: + color_intrinsics = self._enabled_devices[camera_name].color_intrinsics + assert color_intrinsics is not None, f"Color intrinsics for camera {camera_name} not found" + if not self.calibration_strategy[camera_name].calibrate( + intrinsics=color_intrinsics, + samples=self._frame_buffer[camera_name], + lock=self._frame_buffer_lock[camera_name], + ): + self._logger.warning(f"Calibration of camera {camera_name} failed.") + return False + self._logger.info("Calibration successful.") + return True diff --git a/extensions/rcs_realsense/src/rcs_realsense/utils.py b/extensions/rcs_realsense/src/rcs_realsense/utils.py index 372dc04d..b63842bb 100644 --- a/extensions/rcs_realsense/src/rcs_realsense/utils.py +++ b/extensions/rcs_realsense/src/rcs_realsense/utils.py @@ -1,3 +1,7 @@ +import typing + +from rcs.camera.hw import CalibrationStrategy +from rcs_realsense.calibration import FR3BaseArucoCalibration from rcs_realsense.camera import RealSenseCameraSet from rcs import common @@ -10,4 +14,5 @@ def default_realsense(name2id: dict[str, str] | None) -> RealSenseCameraSet | No name: common.BaseCameraConfig(identifier=id, resolution_width=1280, resolution_height=720, frame_rate=30) for name, id in name2id.items() } - return RealSenseCameraSet(cameras=cameras) + calibration_strategy = {name: typing.cast(CalibrationStrategy, FR3BaseArucoCalibration(name)) for name in name2id} + return RealSenseCameraSet(cameras=cameras, calibration_strategy=calibration_strategy) diff --git a/extensions/rcs_so101/src/rcs_so101/creators.py b/extensions/rcs_so101/src/rcs_so101/creators.py index acdf1584..98ae5404 100644 --- a/extensions/rcs_so101/src/rcs_so101/creators.py +++ b/extensions/rcs_so101/src/rcs_so101/creators.py @@ -1,4 +1,5 @@ import logging +import typing from os import PathLike from pathlib import Path from typing import Type @@ -15,6 +16,7 @@ from lerobot.common.teleoperators.so101_leader.so101_leader import SO101Leader from lerobot.common.teleoperators.utils import make_teleoperator_from_config from rcs.camera.hw import HardwareCameraSet +from rcs.camera.interface import BaseCameraSet from rcs.camera.sim import SimCameraSet from rcs.envs.base import ( CameraSetWrapper, @@ -125,7 +127,9 @@ def __call__( # type: ignore env = RobotSimWrapper(env, simulation, sim_wrapper) if cameras is not None: - camera_set = SimCameraSet(simulation, cameras, physical_units=True, render_on_demand=True) + camera_set = typing.cast( + BaseCameraSet, SimCameraSet(simulation, cameras, physical_units=True, render_on_demand=True) + ) env = CameraSetWrapper(env, camera_set, include_depth=True) if gripper_cfg is not None and isinstance(gripper_cfg, SimGripperConfig): diff --git a/pyproject.toml b/pyproject.toml index e9906cd3..7bd11928 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,6 +101,7 @@ ignore = [ "PT018", # assertion should be broken down into multiple parts "NPY002", "G004", # Logging format string does not contain any placeholders + "ARG002", ] [tool.pylint.format] diff --git a/python/rcs/camera/digit_cam.py b/python/rcs/camera/digit_cam.py index 3a1be493..bb7113fb 100644 --- a/python/rcs/camera/digit_cam.py +++ b/python/rcs/camera/digit_cam.py @@ -51,3 +51,7 @@ def close(self): def config(self, camera_name) -> BaseCameraConfig: return self.cameras[camera_name] + + def calibrate(self) -> bool: + """No calibration needed for DIGIT cameras.""" + return True diff --git a/python/rcs/camera/hw.py b/python/rcs/camera/hw.py index c0588b4e..a04e5289 100644 --- a/python/rcs/camera/hw.py +++ b/python/rcs/camera/hw.py @@ -35,6 +35,56 @@ def poll_frame(self, camera_name: str) -> Frame: def camera_names(self) -> list[str]: """Returns the names of the cameras in this set.""" + def calibrate(self) -> bool: + """Returns camera intrinsics""" + + +N = typing.TypeVar("N", bound=int) +H = typing.TypeVar("H", bound=int) +W = typing.TypeVar("W", bound=int) + + +class CalibrationStrategy(typing.Protocol): + """Implementation the hardware dependend calibration strategy.""" + + def calibrate( + self, + samples: list[Frame], + intrinsics: np.ndarray[tuple[typing.Literal[3], typing.Literal[4]], np.dtype[np.float64]], + lock: threading.Lock, + ) -> bool: + """Implements algorithm to calibrate the camera. + + Args: + samples: List of frames to use for calibration. + intrinsics: Intrinsic camera parameters, e.g. from a previous calibration. + lock: A lock to ensure thread safety during calibration as the samples might refresh in parallel. + + Returns: + bool: True if calibration was successful, False otherwise. + """ + + def get_extrinsics(self) -> np.ndarray[tuple[typing.Literal[4], typing.Literal[4]], np.dtype[np.float64]] | None: + """ + Returns the calibrated extrinsic, can also be cached. If not calibrated then it returns None. + It is urged to perform efficient caching for this method as it is called in each step. + """ + + +class DummyCalibrationStrategy(CalibrationStrategy): + """Always returns identity extrinsics.""" + + def calibrate( + self, + samples: list[Frame], + intrinsics: np.ndarray[tuple[typing.Literal[3], typing.Literal[4]], np.dtype[np.float64]], + lock: threading.Lock, + ) -> bool: + return True + + def get_extrinsics(self) -> np.ndarray[tuple[typing.Literal[4], typing.Literal[4]], np.dtype[np.float64]] | None: + return np.eye(4) + class HardwareCameraSet(BaseCameraSet): """This base class polls in a separate thread for all cameras and stores them in a buffer. @@ -178,6 +228,13 @@ def warm_up(self): self.poll_frame(camera_name) self.rate_limiter() + def calibrate(self) -> bool: + for camera in self.cameras: + c = camera.calibrate() + if c is None: + return False + return True + def polling_thread(self, warm_up: bool = True): for camera in self.cameras: camera.open() @@ -221,3 +278,9 @@ def config(self, camera_name: str) -> BaseCameraConfig: def poll_frame(self, camera_name: str) -> Frame: return self.camera_dict[camera_name].poll_frame(camera_name) + + def __enter__(self): + pass + + def __exit__(self, *args, **kwargs): + self.close() diff --git a/python/rcs/camera/interface.py b/python/rcs/camera/interface.py index 53f15ab3..711d16a5 100644 --- a/python/rcs/camera/interface.py +++ b/python/rcs/camera/interface.py @@ -1,8 +1,9 @@ import logging from dataclasses import dataclass from datetime import datetime -from typing import Any, Protocol +from typing import Any, Literal, Protocol +import numpy as np from rcs._core.common import BaseCameraConfig logger = logging.getLogger(__name__) @@ -14,6 +15,8 @@ class DataFrame: data: Any # timestamp in posix time timestamp: float | None = None + intrinsics: np.ndarray[tuple[Literal[3], Literal[4]], np.dtype[np.float64]] | None = None + extrinsics: np.ndarray[tuple[Literal[4], Literal[4]], np.dtype[np.float64]] | None = None @dataclass(kw_only=True) @@ -47,6 +50,8 @@ class FrameSet: class BaseCameraSet(Protocol): """Interface for a set of cameras for sim and hardware""" + DEPTH_SCALE: int = 1000 + def buffer_size(self) -> int: """Returns size of the internal buffer.""" @@ -65,6 +70,9 @@ def close(self): def config(self, camera_name: str) -> BaseCameraConfig: """Returns the configuration object of the cameras.""" + def calibrate(self) -> bool: + """Calibrates the cameras. Returns calibration success""" + @property def camera_names(self) -> list[str]: """Returns a list of the activated human readable names of the cameras.""" diff --git a/python/rcs/camera/sim.py b/python/rcs/camera/sim.py index 2400bf5c..1ec1ee94 100644 --- a/python/rcs/camera/sim.py +++ b/python/rcs/camera/sim.py @@ -1,13 +1,16 @@ import logging from datetime import datetime +from typing import Literal +import mujoco import numpy as np # from rcs._core.common import BaseCameraConfig +from rcs._core import common from rcs._core.sim import FrameSet as _FrameSet from rcs._core.sim import SimCameraConfig from rcs._core.sim import SimCameraSet as _SimCameraSet -from rcs.camera.interface import CameraFrame, DataFrame, Frame, FrameSet +from rcs.camera.interface import BaseCameraSet, CameraFrame, DataFrame, Frame, FrameSet from rcs import sim @@ -68,14 +71,52 @@ def _cpp_to_python_frames(self, cpp_frameset: _FrameSet | None) -> FrameSet | No near = self._sim.model.vis.map.znear * extent far = self._sim.model.vis.map.zfar * extent depth_np_frame = near / (1 - depth_np_frame * (1 - near / far)) + cameraframe = CameraFrame( - color=DataFrame(data=color_np_frame, timestamp=cpp_frameset.timestamp), - depth=DataFrame(data=depth_np_frame, timestamp=cpp_frameset.timestamp), + color=DataFrame( + data=color_np_frame, + timestamp=cpp_frameset.timestamp, + intrinsics=self._intrinsics(color_name), + extrinsics=self._extrinsics(color_name), + ), + depth=DataFrame( + data=(depth_np_frame * BaseCameraSet.DEPTH_SCALE).astype(np.uint16), + timestamp=cpp_frameset.timestamp, + intrinsics=self._intrinsics(depth_name), + extrinsics=self._extrinsics(depth_name), + ), ) frame = Frame(camera=cameraframe, avg_timestamp=cpp_frameset.timestamp) frames[color_name] = frame return FrameSet(frames=frames, avg_timestamp=cpp_frameset.timestamp) + def _intrinsics(self, camera_name) -> np.ndarray[tuple[Literal[3], Literal[4]], np.dtype[np.float64]]: + cam_id = mujoco.mj_name2id(self._sim.model, mujoco.mjtObj.mjOBJ_CAMERA, self.cameras[camera_name].identifier) + fovy = self._sim.model.cam_fovy[cam_id] + fx = fy = 0.5 * self.cameras[camera_name].resolution_height / np.tan(fovy * np.pi / 360) + return np.array( + [ + [fx, 0, (self.cameras[camera_name].resolution_width - 1) / 2, 0], + [0, fy, (self.cameras[camera_name].resolution_height - 1) / 2, 0], + [0, 0, 1, 0], + ] + ) + + def _extrinsics(self, camera_name) -> np.ndarray[tuple[Literal[4], Literal[4]], np.dtype[np.float64]]: + cam_id = mujoco.mj_name2id(self._sim.model, mujoco.mjtObj.mjOBJ_CAMERA, self.cameras[camera_name].identifier) + xpos = self._sim.data.cam_xpos[cam_id] + xmat = self._sim.data.cam_xmat[cam_id].reshape(3, 3) + + cam = common.Pose(rotation=xmat, translation=xpos) + # put z axis infront + rotation_p = common.Pose(rpy_vector=np.array([np.pi, 0, 0]), translation=np.array([0, 0, 0])) + cam = cam * rotation_p + + return cam.inverse().pose_matrix() + + def calibrate(self) -> bool: + return True + def config(self, camera_name: str) -> SimCameraConfig: """Should return the configuration of the camera with the given name.""" return self.cameras[camera_name] diff --git a/python/rcs/envs/base.py b/python/rcs/envs/base.py index 11370c03..4eee1be9 100644 --- a/python/rcs/envs/base.py +++ b/python/rcs/envs/base.py @@ -3,7 +3,7 @@ import copy import logging from enum import Enum, auto -from typing import Annotated, Any, TypeAlias, cast +from typing import Annotated, Any, Literal, TypeAlias, cast import gymnasium as gym import numpy as np @@ -118,39 +118,44 @@ class HandVecDictType(RCSpaceType): ] +class CameraDataDictType(RCSpaceType): + data: Annotated[ + np.ndarray, + # needs to be filled with values downstream + lambda height, width, color_dim=3, dtype=np.uint8, low=0, high=255: gym.spaces.Box( + low=low, + high=high, + shape=(height, width, color_dim), + dtype=dtype, + ), + "frame", + ] + intrinsics: Annotated[ + np.ndarray[tuple[Literal[3], Literal[4]], np.dtype[np.float64]] | None, + gym.spaces.Box( + low=-np.inf, + high=np.inf, + shape=(3, 4), + dtype=np.float64, + ), + ] + extrinsics: Annotated[ + np.ndarray[tuple[Literal[4], Literal[4]], np.dtype[np.float64]] | None, + gym.spaces.Box( + low=-np.inf, + high=np.inf, + shape=(4, 4), + dtype=np.float64, + ), + ] + + class CameraDictType(RCSpaceType): frames: dict[ Annotated[str, "camera_names"], dict[ Annotated[str, "camera_type"], # "rgb" or "depth" - Annotated[ - np.ndarray, - # needs to be filled with values downstream - lambda height, width, color_dim=3, dtype=np.uint8, low=0, high=255: gym.spaces.Box( - low=low, - high=high, - shape=(height, width, color_dim), - dtype=dtype, - ), - "frame", - ], - ], - ] - - -class DigitCameraDictType(RCSpaceType): - digit_frames: dict[ - Annotated[str, "camera_names"], - Annotated[ - np.ndarray, - # needs to be filled with values downstream - lambda height, width, color_dim=3, dtype=np.uint8, low=0, high=255: gym.spaces.Box( - low=low, - high=high, - shape=(height, width, color_dim), - dtype=dtype, - ), - "digit_frames", + CameraDataDictType, ], ] @@ -599,12 +604,13 @@ def __init__(self, env, camera_set: BaseCameraSet, include_depth: bool = False): params.update( { f"/{name}/{self.DEPTH_KEY}/frame": { + # values metric but scaled with factor rcs.BaseCameraSet.DEPTH_SCALE to fit into uint16 "height": camera_set.config(name).resolution_height, "width": camera_set.config(name).resolution_width, "color_dim": 1, - "dtype": np.float32, - "low": 0.0, - "high": 1.0, + "dtype": np.uint16, + "low": 0, + "high": 65535, } for name in camera_set.camera_names } @@ -640,15 +646,23 @@ def check_depth(depth): raise ValueError(msg) return self.include_depth - frame_dict: dict[str, dict[str, np.ndarray]] = { + frame_dict: dict[str, dict[str, CameraDataDictType]] = { camera_name: ( { - self.RGB_KEY: frame.camera.color.data, - self.DEPTH_KEY: frame.camera.depth.data, # type: ignore + self.RGB_KEY: CameraDataDictType( + data=frame.camera.color.data, + intrinsics=frame.camera.color.intrinsics, + extrinsics=frame.camera.color.extrinsics, + ), + self.DEPTH_KEY: CameraDataDictType(data=frame.camera.depth.data, intrinsics=frame.camera.depth.intrinsics, extrinsics=frame.camera.depth.extrinsics), # type: ignore } if check_depth(frame.camera.depth) else { - self.RGB_KEY: frame.camera.color.data, + self.RGB_KEY: CameraDataDictType( + data=frame.camera.color.data, + intrinsics=frame.camera.color.intrinsics, + extrinsics=frame.camera.color.extrinsics, + ), } ) for camera_name, frame in frameset.frames.items() diff --git a/python/rcs/envs/creators.py b/python/rcs/envs/creators.py index b09eae75..be519b43 100644 --- a/python/rcs/envs/creators.py +++ b/python/rcs/envs/creators.py @@ -1,4 +1,5 @@ import logging +import typing from os import PathLike from typing import Type @@ -6,6 +7,7 @@ import numpy as np from gymnasium.envs.registration import EnvCreator from rcs._core.sim import CameraType +from rcs.camera.interface import BaseCameraSet from rcs.camera.sim import SimCameraConfig, SimCameraSet from rcs.envs.base import ( CameraSetWrapper, @@ -87,7 +89,9 @@ def __call__( # type: ignore env = RobotSimWrapper(env, simulation, sim_wrapper) if cameras is not None: - camera_set = SimCameraSet(simulation, cameras, physical_units=True, render_on_demand=True) + camera_set = typing.cast( + BaseCameraSet, SimCameraSet(simulation, cameras, physical_units=True, render_on_demand=True) + ) env = CameraSetWrapper(env, camera_set, include_depth=True) if gripper_cfg is not None and isinstance(gripper_cfg, rcs.sim.SimGripperConfig): diff --git a/python/rcs/envs/space_utils.py b/python/rcs/envs/space_utils.py index 537bdf88..40c316d1 100644 --- a/python/rcs/envs/space_utils.py +++ b/python/rcs/envs/space_utils.py @@ -200,6 +200,11 @@ def value(t, path=""): {key: value(get_args(t)[1], f"{path}/{key}") for key in child_dict_keys_to_unfold[unfold_key]} ) + if not hasattr(t, "__metadata__"): + return gym.spaces.Dict( + {name: value(sub_t, path) for name, sub_t in get_type_hints(t, include_extras=True).items()} + ) + if len(t.__metadata__) == 2 and callable(t.__metadata__[0]): # space can be parametrized and is a function assert params is not None, "No params given." diff --git a/python/tests/test_envs.py b/python/tests/test_envs.py index 096c2e47..0ade40bb 100644 --- a/python/tests/test_envs.py +++ b/python/tests/test_envs.py @@ -39,6 +39,55 @@ class SimpleNestedSpace(RCSpaceType): ] +class SimpleTypeNestedSpace(RCSpaceType): + robots_joints: dict[ + Annotated[str, "robots"], + SimpleSpace, + ] + + +class CameraSpace(RCSpaceType): + data: Annotated[ + np.ndarray, + # needs to be filled with values downstream + lambda height, width, color_dim=3, dtype=np.uint8, low=0, high=255: gym.spaces.Box( + low=low, + high=high, + shape=(height, width, color_dim), + dtype=dtype, + ), + "frame", + ] + intrinsics: Annotated[ + np.ndarray, + gym.spaces.Box( + low=-np.inf, + high=np.inf, + shape=(3, 4), + dtype=np.float64, + ), + ] + extrinsics: Annotated[ + np.ndarray, + gym.spaces.Box( + low=-np.inf, + high=np.inf, + shape=(4, 4), + dtype=np.float64, + ), + ] + + +class AdvancedTypeNestedSpace(RCSpaceType): + frames: dict[ + Annotated[str, "camera_names"], + dict[ + Annotated[str, "camera_type"], # "rgb" or "depth" + CameraSpace, + ], + ] + + class AdvancedNestedSpace(RCSpaceType): frames: dict[ Annotated[str, "cams"], @@ -105,6 +154,91 @@ def test_simple_nested_space(self): } ) + def test_simple_type_nested_space(self): + assert get_space(SimpleTypeNestedSpace, child_dict_keys_to_unfold={"robots": ["robot1"]}) == gym.spaces.Dict( + { + "robots_joints": gym.spaces.Dict( + { + "robot1": gym.spaces.Dict( + { + "my_int": gym.spaces.Discrete(1), + "my_float": gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32), + } + ), + } + ), + } + ) + + def test_advanced_type_nested_space(self): + assert get_space( + AdvancedTypeNestedSpace, + child_dict_keys_to_unfold={"camera_names": ["cam1"], "camera_type": ["depth", "rgb"]}, + params={ + "/cam1/rgb/frame": { + "height": 480, + "width": 640, + "dtype": np.uint8, + "low": 0, + "high": 255, + "color_dim": 3, + }, + "/cam1/depth/frame": { + "height": 480, + "width": 640, + "dtype": np.uint16, + "low": 0, + "high": 65535, + "color_dim": 1, + }, + }, + ) == gym.spaces.Dict( + { + "frames": gym.spaces.Dict( + { + "cam1": gym.spaces.Dict( + { + "depth": gym.spaces.Dict( + { + "data": gym.spaces.Box(low=0, high=65535, shape=(480, 640, 1), dtype=np.uint16), + "intrinsics": gym.spaces.Box( + low=-np.inf, + high=np.inf, + shape=(3, 4), + dtype=np.float64, + ), + "extrinsics": gym.spaces.Box( + low=-np.inf, + high=np.inf, + shape=(4, 4), + dtype=np.float64, + ), + } + ), + "rgb": gym.spaces.Dict( + { + "data": gym.spaces.Box(low=0, high=255, shape=(480, 640, 3), dtype=np.uint16), + "intrinsics": gym.spaces.Box( + low=-np.inf, + high=np.inf, + shape=(3, 4), + dtype=np.float64, + ), + "extrinsics": gym.spaces.Box( + low=-np.inf, + high=np.inf, + shape=(4, 4), + dtype=np.float64, + ), + } + ), + } + ), + } + ), + } + ) + def test_advanced_nested_space(self): assert get_space( diff --git a/src/sim/camera.cpp b/src/sim/camera.cpp index 7323d6c4..a70f8525 100644 --- a/src/sim/camera.cpp +++ b/src/sim/camera.cpp @@ -86,13 +86,20 @@ std::optional