Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,69 @@ for _ in range(100):
print(obs)
if truncated or terminated:
logger.info("Truncated or terminated!")
return
exit()
```

### Remote Procedure Call (RPC) Client and Server
#### Server
```python
from rcs.envs.creators import SimEnvCreator
from rcs.envs.utils import (
default_mujoco_cameraset_cfg,
default_sim_gripper_cfg,
default_sim_robot_cfg,
)
from rcs.envs.base import ControlMode, RelativeTo
from rcs.rpc.server import RcsServer

def run_server():
env = SimEnvCreator()(
control_mode=ControlMode.JOINTS,
collision_guard=False,
robot_cfg=default_sim_robot_cfg(),
gripper_cfg=default_sim_gripper_cfg(),
cameras=default_mujoco_cameraset_cfg(),
max_relative_movement=0.1,
relative_to=RelativeTo.LAST_STEP,
)
server = RcsServer(env, port=50051)
server.start()

if __name__ == "__main__":
run_server()
```

#### Client
```python
import time
from python.rcs.rpc.client import RcsClient

if __name__ == "__main__":
# Create the client (adjust host/port if needed)
client = RcsClient(host="localhost", port=50051)

try:
print("Resetting environment...")
obs = client.reset()
print(f"Initial observation: {obs}")

for i in range(5):
print(f"\nStep {i+1}")
# Replace with a valid action for your environment
action = 0
obs, reward, terminated, truncated, info = client.step(action)
print(f"Obs: {obs}, Reward: {reward}, Terminated: {terminated}, Truncated: {truncated}, Info: {info}")
if terminated or truncated:
print("Episode finished, resetting...")
obs = client.reset()
print(f"Reset observation: {obs}")
time.sleep(0.5)
finally:
print("Closing client.")
client.close()
```


### Examples
Checkout the python examples in the [examples](examples) folder:
- [fr3_direct_control.py](examples/fr3.py) shows direct robot control with RCS's python bindings
Expand Down
27 changes: 27 additions & 0 deletions examples/rpc_run_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import time

from python.rcs.rpc.client import RcsClient

if __name__ == "__main__":
# Create the client (adjust host/port if needed)
client = RcsClient(host="localhost", port=50051)

try:
print("Resetting environment...")
obs = client.reset()
print(f"Initial observation: {obs}")

for i in range(5):
print(f"\nStep {i+1}")
# Replace with a valid action for your environment
action = 0
obs, reward, terminated, truncated, info = client.step(action)
print(f"Obs: {obs}, Reward: {reward}, Terminated: {terminated}, Truncated: {truncated}, Info: {info}")
if terminated or truncated:
print("Episode finished, resetting...")
obs = client.reset()
print(f"Reset observation: {obs}")
time.sleep(0.5)
finally:
print("Closing client.")
client.close()
26 changes: 26 additions & 0 deletions examples/rpc_run_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from rcs.envs.base import ControlMode, RelativeTo
from rcs.envs.creators import SimEnvCreator
from rcs.envs.utils import (
default_mujoco_cameraset_cfg,
default_sim_gripper_cfg,
default_sim_robot_cfg,
)
from rcs.rpc.server import RcsServer


def run_server():
env = SimEnvCreator()(
control_mode=ControlMode.JOINTS,
collision_guard=False,
robot_cfg=default_sim_robot_cfg(),
gripper_cfg=default_sim_gripper_cfg(),
cameras=default_mujoco_cameraset_cfg(),
max_relative_movement=0.1,
relative_to=RelativeTo.LAST_STEP,
)
server = RcsServer(env, port=50051)
server.start()


if __name__ == "__main__":
run_server()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies = ["websockets>=11.0",
"tilburg-hand",
"digit-interface",
"ompl>=1.7.0",
"rpyc==6.0.2",
]
readme = "README.md"
maintainers = [
Expand Down
33 changes: 33 additions & 0 deletions python/rcs/rpc/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import gymnasium as gym
import rpyc
from rpyc.utils.classic import obtain


class RcsClient(gym.Env):
def __init__(self, host="localhost", port=50051):
super().__init__()
self.conn = rpyc.connect(host, port)
self.server = self.conn.root
# Optionally, fetch spaces from server if needed
# self.observation_space = ...
# self.action_space = ...

def step(self, action):
return self.server.step(action)

def reset(self, **kwargs):
return self.server.reset(**kwargs)

def get_obs(self):
return self.server.get_obs()

@property
def unwrapped(self):
return self.server.unwrapped()

@property
def action_space(self):
return obtain(self.server.action_space())

def close(self):
self.conn.close()
49 changes: 49 additions & 0 deletions python/rcs/rpc/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# import wrapper
import rpyc
from gymnasium import Wrapper
from rpyc.utils.server import ThreadedServer

rpyc.core.protocol.DEFAULT_CONFIG["allow_pickle"] = True


@rpyc.service
class RcsServer(Wrapper, rpyc.Service):
def __init__(self, env, host="localhost", port=50051):
super().__init__(env)
self.host = host
self.port = port

@rpyc.exposed
def step(self, action):
"""Perform a step in the environment using the Wrapper base class."""
return super().step(action)

@rpyc.exposed
def reset(self, **kwargs):
"""Reset the environment using the Wrapper base class."""
return super().reset(**kwargs)

@rpyc.exposed
def get_obs(self):
"""Get the current observation using the Wrapper base class if available."""
if hasattr(super(), "get_obs"):
return super().get_obs()
if hasattr(self.env, "get_obs"):
return self.env.get_obs()
error = "The environment does not have a get_obs method."
raise NotImplementedError(error)

@rpyc.exposed
def unwrapped(self):
"""Return the unwrapped environment using the Wrapper base class."""
return super().unwrapped

@rpyc.exposed
def action_space(self):
"""Return the action space using the Wrapper base class."""
return super().action_space

def start(self):
print(f"Starting RcsServer RPC (looped OneShotServer) on {self.host}:{self.port}")
t = ThreadedServer(self, port=self.port)
t.start()
Loading