diff --git a/examples/teleop/quest_iris_dual_arm.py b/examples/teleop/quest_iris_dual_arm.py index 2fdee15c..6deeb5ae 100644 --- a/examples/teleop/quest_iris_dual_arm.py +++ b/examples/teleop/quest_iris_dual_arm.py @@ -15,7 +15,7 @@ ) from rcs.envs.creators import SimMultiEnvCreator from rcs.envs.storage_wrapper import StorageWrapper -from rcs.envs.utils import default_sim_gripper_cfg, default_sim_robot_cfg +from rcs.envs.utils import default_digit, default_sim_gripper_cfg, default_sim_robot_cfg from rcs.utils import SimpleFrameRate from rcs_fr3.creators import RCSFR3MultiEnvCreator from rcs_fr3.utils import default_fr3_hw_gripper_cfg, default_fr3_hw_robot_cfg @@ -25,8 +25,6 @@ from simpub.sim.mj_publisher import MujocoPublisher from simpub.xr_device.meta_quest3 import MetaQuest3 -# from rcs_xarm7.creators import RCSXArm7EnvCreator - logger = logging.getLogger(__name__) # download the iris apk from the following repo release: https://github.com/intuitive-robots/IRIS-Meta-Quest3 @@ -38,7 +36,7 @@ INCLUDE_ROTATION = True ROBOT2IP = { - "left": "192.168.102.1", + # "left": "192.168.102.1", "right": "192.168.101.1", } @@ -48,14 +46,20 @@ RECORD_FPS = 30 # set camera dict to none disable cameras # CAMERA_DICT = { -# "side_right": "244222071045", +# "left_wrist": "230422272017", +# "right_wrist": "230422271040", +# "side": "243522070385", # "bird_eye": "243522070364", -# "arro": "243522070385", # } CAMERA_DICT = None MQ3_ADDR = "10.42.0.1" +# DIGIT_DICT = { +# "digit_right_left": "D21182", +# "digit_right_right": "D21193" +# } +DIGIT_DICT = None -DATASET_PATH = "test_data_iris_dual_arm" +DATASET_PATH = "test_data_iris_dual_arm14" INSTRUCTION = "build a tower with the blocks in front of you" @@ -80,6 +84,7 @@ def __init__(self, env: RelativeActionSpace): self._resource_lock = threading.Lock() self._env_lock = threading.Lock() + self._reset_lock = threading.Lock() self._env = env self.controller_names = ROBOT2IP.keys() if ROBOT_INSTANCE == RobotPlatform.HARDWARE else ["right"] @@ -155,11 +160,18 @@ def run(self): if input_data[self._stop_btn] and (self._prev_data is None or not self._prev_data[self._stop_btn]): print("reset successful pressed: resetting env") - with self._env_lock: + with self._reset_lock: # set successful self._env.get_wrapper_attr("success")() + # sleep to allow to let the robot reach the goal + sleep(1) # this might also move the robot to the home position self._env.reset() + for controller in self.controller_names: + self._offset_pose[controller] = Pose() + self._last_controller_pose[controller] = Pose() + self._grp_pos[controller] = 1 + continue # reset unsuccessful if input_data[self._unsuccessful_btn] and ( @@ -261,17 +273,17 @@ def environment_step_loop(self): if self._exit_requested: self._step_env = False break - transforms, grippers = self.next_action() - actions = {} - for robot, transform in transforms.items(): - action = dict( - LimitedTQuatRelDictType(tquat=np.concatenate([transform.translation(), transform.rotation_q()])) # type: ignore - ) + with self._reset_lock: + transforms, grippers = self.next_action() + actions = {} + for robot, transform in transforms.items(): + action = dict( + LimitedTQuatRelDictType(tquat=np.concatenate([transform.translation(), transform.rotation_q()])) # type: ignore + ) - action.update(GripperDictType(gripper=grippers[robot])) - actions[robot] = action + action.update(GripperDictType(gripper=grippers[robot])) + actions[robot] = action - with self._env_lock: self._env.step(actions) rate_limiter() @@ -279,7 +291,7 @@ def environment_step_loop(self): def main(): if ROBOT_INSTANCE == RobotPlatform.HARDWARE: - camera_set = HardwareCameraSet([default_realsense(CAMERA_DICT)]) if CAMERA_DICT is not None else None # type: ignore + camera_set = HardwareCameraSet([default_realsense(CAMERA_DICT), default_digit(DIGIT_DICT)]) if CAMERA_DICT is not None else None # type: ignore env_rel = RCSFR3MultiEnvCreator()( name2ip=ROBOT2IP, camera_set=camera_set, diff --git a/extensions/rcs_fr3/src/rcs_fr3/envs.py b/extensions/rcs_fr3/src/rcs_fr3/envs.py index 7d8bd28f..45413a87 100644 --- a/extensions/rcs_fr3/src/rcs_fr3/envs.py +++ b/extensions/rcs_fr3/src/rcs_fr3/envs.py @@ -40,6 +40,7 @@ def _rs2dict(self, state: hw.RobotState): self._robot_state_keys = [ attr for attr in dir(state) if not attr.startswith("__") and not callable(getattr(state, attr)) ] + self._robot_state_keys.remove("robot_mode") return {key: getattr(state, key) for key in self._robot_state_keys} def reset( diff --git a/pyproject.toml b/pyproject.toml index a92070a1..552c3b44 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ dev = [ "types-requests~=2.31", # temporarily use fixed version until PR #275 is merged # pybind11-stubgen==2.5.5 - "pybind11-stubgen @ git+https://github.com/juelg/pybind11-stubgen@fix/class-sorting", + "pybind11-stubgen @ git+https://github.com/juelg/pybind11-stubgen@7f6afa59cfb8a485d3e53311be62214fefd96d75", "pytest==8.1.1", "commitizen~=3.28.0", "clang", diff --git a/python/rcs/__main__.py b/python/rcs/__main__.py new file mode 100644 index 00000000..1cc552e4 --- /dev/null +++ b/python/rcs/__main__.py @@ -0,0 +1,36 @@ +from pathlib import Path +from typing import Annotated + +import typer +from rcs.envs.storage_wrapper import StorageWrapper + +app = typer.Typer() + + +@app.command() +def consolidate( + path: Annotated[ + Path, + typer.Argument( + exists=True, + file_okay=False, + dir_okay=True, + help="The root directory of the parquet dataset to consolidate.", + ), + ] +): + """ + Consolidates a fragmented Parquet dataset into larger files. + + This is useful if the recording process crashed or was interrupted, + leaving many small files behind. + """ + typer.echo(f"Starting consolidation for: {path}") + + StorageWrapper.consolidate(str(path), schema=None) + + typer.echo("Done.") + + +if __name__ == "__main__": + app() diff --git a/python/rcs/envs/storage_wrapper.py b/python/rcs/envs/storage_wrapper.py index b17530c8..8f43c192 100644 --- a/python/rcs/envs/storage_wrapper.py +++ b/python/rcs/envs/storage_wrapper.py @@ -1,7 +1,9 @@ +import datetime import io import operator +import os +import shutil from concurrent.futures import ThreadPoolExecutor, wait -from itertools import chain from queue import Queue from typing import Any, Optional from uuid import uuid4 @@ -30,8 +32,16 @@ def __init__( max_rows_per_file: Optional[int] = None, ): """ - Asynchronously log environment transitions to a Parquet - dataset on disk. + Asynchronously log environment transitions to a Parquet dataset on disk. + + This wrapper implements a "Crash-Safe" recording strategy: + 1. **Write-on-Receipt:** Data is written to disk in small, atomic batches immediately + after being generated. This ensures that if the process crashes (segfault, OOM), + previous batches are already safe on disk. + 2. **Date Partitioning:** Files are organized by date (YYYY-MM-DD) to scale to + thousands of episodes without exhausting file system inodes. + 3. **Consolidation:** On a clean exit (`close()`), the many small batch files are + merged into larger, optimized Parquet files. Observation handling: - Expects observations to be dictionaries. @@ -40,7 +50,7 @@ def __init__( in-place, and their original shapes are stored alongside as ``"_shape"``. Nested dicts are traversed recursively. - Lists/tuples of arrays are not supported. - - ``close()`` must be called to flush the final batch. + - ``close()`` must be called to flush the final batch and run consolidation. Parameters ---------- @@ -48,23 +58,23 @@ def __init__( The environment to wrap. base_dir : str Output directory where the Parquet dataset will be written. - batch_size : int - Number of transitions to accumulate before flushing a RecordBatch - to the writer queue. + instruction : str + A text description of the task being performed (logged in every row). + batch_size : int, default=32 + Number of transitions to accumulate before flushing to disk. + Smaller batches = safer against data loss but more overhead. schema : Optional[pa.Schema], default=None - Optional Arrow schema to enforce for all batches. If None, the schema - is inferred from the first flushed batch and then reused. + Optional Arrow schema. If None, inferred from the first batch. + always_record : bool, default=False + If True, records immediately upon reset. If False, requires start_record(). basename_template : Optional[str], default=None - Template controlling Parquet file basenames. Passed through to - ``pyarrow.dataset.write_dataset``. + Template for filenames. Note: A unique UUID is automatically injected + to prevent overwrites. max_rows_per_group : Optional[int], default=None - Maximum row count per Parquet row group. Passed through to - ``pyarrow.dataset.write_dataset``. + Passed to ``pyarrow.dataset.write_dataset``. max_rows_per_file : Optional[int], default=None - Maximum row count per Parquet file. Passed through to - ``pyarrow.dataset.write_dataset``. + Passed to ``pyarrow.dataset.write_dataset``. """ - super().__init__(env) self.base_dir = base_dir self.batch_size = batch_size @@ -79,37 +89,100 @@ def __init__( self.instruction = instruction self._success = False self._prev_action = None + self.thread_pool = ThreadPoolExecutor() self.queue: Queue[pa.Table | pa.RecordBatch] = Queue(maxsize=2) self.uuid = uuid4() + self._writer_future = self.thread_pool.submit(self._writer_worker) - def _generator_from_queue(self): - while (batch := self.queue.get()) is not self.QueueSentinel: - yield batch + @staticmethod + def consolidate(base_dir: str, schema: Optional[pa.Schema] = None): + """ + Static method to merge small Parquet files into larger ones. + Can be used by the class or an external CLI to clean up a dataset directory. + """ + if not os.path.exists(base_dir): + print(f"Directory {base_dir} does not exist.") + return - def _writer_worker(self): - gen = self._generator_from_queue() - first = next(gen) - ds.write_dataset( - data=chain([first], gen), - base_dir=self.base_dir, - format="parquet", - schema=self.schema, - existing_data_behavior="overwrite_or_ignore", - basename_template=self.basename_template, - max_rows_per_group=self.max_rows_per_group, - max_rows_per_file=self.max_rows_per_file, - partitioning=ds.partitioning( - schema=pa.schema(fields=[pa.field("uuid", pa.string())]), - flavor="filename", - ), + part_scheme = ds.partitioning( + schema=pa.schema(fields=[pa.field("date", pa.string())]), + flavor="filename", ) + print(f"Consolidating files in {base_dir}...") + temp_dir = str(base_dir).rstrip("/") + "_temp" + + try: + # Read existing dataset + dataset = ds.dataset(base_dir, format="parquet", partitioning=part_scheme, schema=schema) + + try: + if dataset.count_rows() == 0: + return + except (IndexError, ValueError): + return + + ds.write_dataset( + data=dataset, + base_dir=temp_dir, + format="parquet", + schema=schema, + partitioning=part_scheme, + existing_data_behavior="overwrite_or_ignore", + ) + + shutil.rmtree(base_dir) + os.rename(temp_dir, base_dir) + print(f"Consolidation complete for {base_dir}") + + except Exception as e: + print(f"Consolidation failed (data remains safe in original fragments): {e}") + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + + def _writer_worker(self): + """ + Background worker that writes each batch as a separate, safe file. + Exceptions here will propagate to the future and raise RuntimeError in step(). + """ + while True: + batch = self.queue.get() + if batch is self.QueueSentinel: + break + + # Generate a unique 8-char hex for this specific batch file + unique_id = uuid4().hex[:8] + + # Handle basename template uniqueness + if self.basename_template: + template = self.basename_template.replace(".parquet", f"-{unique_id}.parquet") + else: + template = "part-{i}-" + unique_id + ".parquet" + + ds.write_dataset( + data=batch, + base_dir=self.base_dir, + format="parquet", + schema=self.schema, + existing_data_behavior="overwrite_or_ignore", + basename_template=template, + max_rows_per_group=self.max_rows_per_group, + max_rows_per_file=self.max_rows_per_file, + partitioning=ds.partitioning( + schema=pa.schema(fields=[pa.field("date", pa.string())]), + flavor="filename", + ), + ) + def _flush(self): - batch = pa.RecordBatch.from_pylist(self.buffer, schema=self.schema) if self.schema is None: - self.schema = batch.schema + temp_batch = pa.RecordBatch.from_pylist(self.buffer) + self.schema = temp_batch.schema + + self.buffer[-1]["success"] = self._success + batch = pa.RecordBatch.from_pylist(self.buffer, schema=self.schema) self.queue.put(batch) self.buffer.clear() @@ -163,13 +236,15 @@ def to_tiff(depth_data): ] def step(self, action): - # NOTE: expects the observation to be a dictionary + # Check if the writer thread has died if self._writer_future.done(): exc = self._writer_future.exception() - assert exc is not None - msg = "Writer thread failed" - raise RuntimeError(msg) from exc + if exc: + msg = "Writer thread failed" + raise RuntimeError(msg) from exc + obs, reward, terminated, truncated, info = self.env.step(action) + if not self._pause: assert isinstance(obs, dict) if "frames" in obs: @@ -177,12 +252,14 @@ def step(self, action): self._flatten_arrays(obs) if info.get("success"): self.success() + self.buffer.append( { "obs": obs, "reward": reward, "step": self.step_cnt, "uuid": self.uuid.hex, + "date": datetime.date.today().isoformat(), "success": self._success, "action": self._prev_action, "instruction": self.instruction, @@ -192,6 +269,7 @@ def step(self, action): self.step_cnt += 1 if len(self.buffer) == self.batch_size: self._flush() + return obs, reward, terminated, truncated, info def success(self): @@ -219,5 +297,8 @@ def reset(self, *, seed: int | None = None, options: dict[str, Any] | None = Non def close(self): if len(self.buffer) > 0: self._flush() + self.queue.put(self.QueueSentinel) wait([self._writer_future]) + + StorageWrapper.consolidate(self.base_dir, self.schema)