diff --git a/.vscode/settings.json b/.vscode/settings.json index 1dc798a..a57d9ce 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -28,5 +28,6 @@ "python.analysis.typeCheckingMode": "basic", "python.analysis.autoImportCompletions": true, "python.defaultInterpreterPath": ".venv/bin/python", - "python.terminal.activateEnvInCurrentTerminal": true + "python.terminal.activateEnvInCurrentTerminal": true, + "cSpell.words": ["autoreconnect", "demuxes", "dtheta", "keepalive", "MQTT", "neopixel", "paho", "testbot", "TMPL"] } diff --git a/src/robot/communication/directed_comm.py b/src/robot/communication/directed_comm.py index c0317d8..40e97d9 100644 --- a/src/robot/communication/directed_comm.py +++ b/src/robot/communication/directed_comm.py @@ -1,7 +1,94 @@ """Directed communication module.""" +import json +import queue +import threading +from typing import Callable, Optional, Tuple + class DirectedCommunication: - """Placeholder directed communication class.""" + """ + Directed (one-to-one) inter-robot comms. + + Publish (robot -> server): + Topic: /comm/out/direct + Payload: {"id": , "to": , "msg": ""} + + Subscribe (server -> robot): + Topic: /comm/in/direct/{robot_id} + Payload: ("{\"from\": , \"msg\": \"...\"}" + " or plain string per server;"); + this helper normalizes to (src, msg). + """ + + OUT_TOPIC = "/comm/out/direct" + IN_TOPIC_TMPL = "/comm/in/direct/{robot_id}" + + def __init__(self, robot_id: int, robot_mqtt): + self.robot_id = robot_id + self._mqtt = robot_mqtt + + self._in_q: "queue.Queue[Tuple[int, str]]" = queue.Queue() + self._cb: Optional[Callable[[int, str], None]] = None + self._lock = threading.Lock() + + inbox = self.IN_TOPIC_TMPL.format(robot_id=self.robot_id) + self._mqtt.subscribe(inbox) + if hasattr(self._mqtt, "add_handler"): + self._mqtt.add_handler(inbox, self._on_message) + + # If RobotMQTT doesn’t provide add_handler, it should route here. + def handle_incoming(self, topic: str, payload: str): + self._on_message(topic, payload) + + def _on_message(self, topic: str, payload: str): + expected = self.IN_TOPIC_TMPL.format(robot_id=self.robot_id) + if topic != expected: + return + + # Try JSON with {"from": int, "msg": "..."}; fall back to raw string + src = -1 + msg = payload + try: + obj = json.loads(payload) + if isinstance(obj, dict): + src = int(obj.get("from", -1)) + msg = str(obj.get("msg", "")) + except Exception: + pass + + self._in_q.put((src, msg)) + with self._lock: + if self._cb: + try: + self._cb(src, msg) + except Exception: + pass + + # API + + def set_callback(self, fn: Callable[[int, str], None]): + with self._lock: + self._cb = fn + + def send(self, to_robot: int, message: str): + """ + Send directly to a specific robot via the server’s directed comm + channel. + """ + data = {"id": self.robot_id, "to": int(to_robot), "msg": message} + self._mqtt.publish(self.OUT_TOPIC, json.dumps(data)) + + def recv_nowait(self) -> Optional[tuple[int, str]]: + try: + return self._in_q.get_nowait() + except queue.Empty: + return None - pass + def recv(self, timeout: float | None = None) -> Optional[tuple[int, str]]: + try: + if timeout is not None: + return self._in_q.get(timeout=timeout) + return self._in_q.get() + except queue.Empty: + return None diff --git a/src/robot/communication/simple_comm.py b/src/robot/communication/simple_comm.py index 7ae7394..c2c574b 100644 --- a/src/robot/communication/simple_comm.py +++ b/src/robot/communication/simple_comm.py @@ -1,7 +1,85 @@ """Simple communication module.""" +import json +import queue +import threading +from typing import Callable, Optional + class SimpleCommunication: - """Placeholder simple communication class.""" + """ + Simple (broadcast-like) inter-robot comms. + + Publish (robot -> server): + Topic: /comm/out/simple + Payload: {"id": , "msg": "", "dist": } + + Subscribe (server -> robot): + Topic: /comm/in/simple/{robot_id} + Payload: "" (server delivers message body) + """ + + OUT_TOPIC = "/comm/out/simple" + IN_TOPIC_TMPL = "/comm/in/simple/{robot_id}" + + def __init__(self, robot_id: int, robot_mqtt): + self.robot_id = robot_id + self._mqtt = robot_mqtt + + # inbound queue and optional callback + self._in_q: "queue.Queue[str]" = queue.Queue() + self._cb: Optional[Callable[[str], None]] = None + self._lock = threading.Lock() + + # subscribe to inbox + in_topic = self.IN_TOPIC_TMPL.format(robot_id=self.robot_id) + self._mqtt.subscribe(in_topic) + if hasattr(self._mqtt, "add_handler"): + self._mqtt.add_handler(in_topic, self._on_message) + + # If RobotMQTT doesn’t provide add_handler, it should forward to this + def handle_incoming(self, topic: str, payload: str): + self._on_message(topic, payload) + + def _on_message(self, topic: str, payload: str): + expected = self.IN_TOPIC_TMPL.format(robot_id=self.robot_id) + if topic != expected: + return + self._in_q.put(payload) + with self._lock: + if self._cb: + try: + self._cb(payload) + except Exception: + pass + + # API + + def set_callback(self, fn: Callable[[str], None]): + with self._lock: + self._cb = fn + + def send(self, message: str, distance: float | None = None): + """ + Send a message via the simple protocol; server decides receivers. + """ + data = {"id": self.robot_id, "msg": message} + if distance is not None: + data["dist"] = distance + self._mqtt.publish(self.OUT_TOPIC, json.dumps(data)) + + def recv_nowait(self) -> Optional[str]: + try: + return self._in_q.get_nowait() + except queue.Empty: + return None - pass + def recv(self, timeout: float | None = None) -> Optional[str]: + try: + return ( + self._in_q.get(timeout=timeout) + if timeout is not None + else self._in_q.get() + ) + except queue.Empty: + return None diff --git a/src/robot/helpers/coordinate.py b/src/robot/helpers/coordinate.py index 09e5a61..5d122fc 100644 --- a/src/robot/helpers/coordinate.py +++ b/src/robot/helpers/coordinate.py @@ -1,7 +1,95 @@ """Coordinate tracking helper.""" +import math +from dataclasses import dataclass +from typing import Tuple, Optional + +def _wrap_deg(angle_deg: float) -> float: + # Normalize to [0, 360) + a = angle_deg % 360.0 + if a < 0: + a += 360.0 + return a + + +@dataclass class Coordinate: - """Placeholder coordinate tracker.""" + """ + Pose container and odometry utilities. + Units: centimeters for x,y; degrees for heading (0..360). + Heading 0 deg = +X axis; positive angles rotate CCW. + """ + x: float = 0.0 + y: float = 0.0 + heading: float = 0.0 # degrees + + # ------------- Odometry updates ------------- + + def update_from_wheel_speeds( + self, v_l: float, v_r: float, dt: float, wheel_base: float + ): + """ + Integrate pose from left/right linear speeds (cm/s) over dt (s). + wheel_base is wheel separation (cm). + """ + theta = math.radians(self.heading) + if abs(v_l - v_r) < 1e-9: + v = 0.5 * (v_l + v_r) + self.x += v * math.cos(theta) * dt + self.y += v * math.sin(theta) * dt + # heading unchanged + else: + R = (wheel_base / 2.0) * (v_l + v_r) / (v_r - v_l) + omega = (v_r - v_l) / wheel_base + dtheta = omega * dt + self.x += R * (math.sin(theta + dtheta) - math.sin(theta)) + self.y += -R * (math.cos(theta + dtheta) - math.cos(theta)) + self.heading = _wrap_deg(math.degrees(theta + dtheta)) + # If straight case, ensure heading normalized too + self.heading = _wrap_deg(self.heading) + # Matches canonical differential-drive odometry for ICC motion. [4][1] + + def set_pose(self, x: float, y: float, heading_deg: float): + self.x = float(x) + self.y = float(y) + self.heading = _wrap_deg(float(heading_deg)) + + def apply_delta(self, dx: float, dy: float, dtheta_deg: float): + """Apply a small world-frame delta to the pose.""" + self.x += float(dx) + self.y += float(dy) + self.heading = _wrap_deg(self.heading + float(dtheta_deg)) + + # ------------- Frame conversions ------------- + + def world_to_robot(self, wx: float, wy: float) -> Tuple[float, float]: + """ + Convert a world point to robot-centric coordinates. + """ + dx = wx - self.x + dy = wy - self.y + theta = math.radians(self.heading) + rx = dx * math.cos(-theta) - dy * math.sin(-theta) + ry = dx * math.sin(-theta) + dy * math.cos(-theta) + return rx, ry + + def robot_to_world(self, rx: float, ry: float) -> Tuple[float, float]: + """ + Convert a robot-centric point to world coordinates. + """ + theta = math.radians(self.heading) + wx = rx * math.cos(theta) - ry * math.sin(theta) + self.x + wy = rx * math.sin(theta) + ry * math.cos(theta) + self.y + return wx, wy + + # ------------- Publishing ------------- - pass + def publish(self, robot_mqtt, robot_id: Optional[int] = None): + """ + Publish pose via the RobotMQTT helper. If a different robot_id is + required by the application, pass it; otherwise RobotMQTT will use + its configured id. + """ + # RobotMQTT.publish_coordinate reads self.x/y/heading by default. + robot_mqtt.publish_coordinate(self) diff --git a/src/robot/helpers/robot_mqtt.py b/src/robot/helpers/robot_mqtt.py index 0af763c..8cfb5bf 100644 --- a/src/robot/helpers/robot_mqtt.py +++ b/src/robot/helpers/robot_mqtt.py @@ -1,7 +1,127 @@ """MQTT helper functions.""" +import json +import time +from typing import Callable, Dict + class RobotMQTT: - """Placeholder MQTT helper.""" + """ + High-level MQTT helper coordinating robot lifecycle, state, and I/O. + + Management topics (robot <-> simulator/tools): + Outbound commands (robot -> tools/server): + - /mgr/out/start : {"id": } + - /mgr/out/stop : {"id": } + - /mgr/out/reset : {"id": } + - /loc/out/pose : {"id": , "x": , "y": , + "heading": } + + Inbound management (tools/server -> robot): + - /mgr/in/{robot_id} : "START" | "STOP" | "RESET" | + JSON {"cmd": "...", ...} + + Sensor/comm helpers can register per-topic handlers via add_handler(). + """ + + # Outbound topics + START_OUT = "/mgr/out/start" + STOP_OUT = "/mgr/out/stop" + RESET_OUT = "/mgr/out/reset" + POSE_OUT = "/loc/out/pose" + + # Inbound topics + MGR_IN_TMPL = "/mgr/in/{robot_id}" + + def __init__(self, robot_id, mqtt_client, coordinate): + self.robot_id = str(robot_id) + self.client = mqtt_client + self.coordinate = coordinate + + # Pluggable message handlers keyed by exact topic + self._handlers: Dict[str, Callable[[str, str], None]] = {} + + # Subscribe to management inbox + inbox = self.MGR_IN_TMPL.format(robot_id=self.robot_id) + self.client.subscribe(inbox) + + # If underlying client supports per-topic handler registration, fine. + # Otherwise, Robot.mqtt dispatches to handle_incoming() which demuxes. + if hasattr(self.client, "add_handler"): + self.client.add_handler(inbox, self._handle_mgr) + + # Allow helpers (sensors/comm/indicators) to register callbacks + def add_handler(self, topic: str, callback: Callable[[str, str], None]): + self._handlers[topic] = callback + self.client.subscribe(topic) + + # Optional removal + def remove_handler(self, topic: str): + self._handlers.pop(topic, None) + if hasattr(self.client, "unsubscribe"): + self.client.unsubscribe(topic) + + # ------------- High-level commands ------------- + + def start(self): + self.client.publish(self.START_OUT, json.dumps({"id": self.robot_id})) + + def stop(self): + self.client.publish(self.STOP_OUT, json.dumps({"id": self.robot_id})) + + def reset(self): + self.client.publish(self.RESET_OUT, json.dumps({"id": self.robot_id})) + + def publish_coordinate(self, coordinate=None): + c = coordinate or self.coordinate + payload = { + "id": self.robot_id, + "x": float(c.x), + "y": float(c.y), + "heading": float(c.heading), + "ts": time.time(), + } + self.client.publish(self.POSE_OUT, json.dumps(payload)) + + # ------------- Incoming message routing ------------- + + def handle_incoming(self, topic, payload): + """ + Entry point used by Robot.run() to deliver all inbound MQTT messages. + Routes to registered handlers, then management handler + if the topic matches. + """ + # First route to any exact-match helper + cb = self._handlers.get(topic) + if cb: + cb(topic, payload) + + # Management inbox + inbox = self.MGR_IN_TMPL.format(robot_id=self.robot_id) + if topic == inbox: + self._handle_mgr(topic, payload) + + def _handle_mgr(self, _topic, payload): + """ + Handle management commands from tools/simulator: + Accepts simple strings START/STOP/RESET or JSON {"cmd":"START"}. + """ + cmd = payload.strip() + try: + obj = json.loads(payload) + if isinstance(obj, dict) and "cmd" in obj: + cmd = str(obj["cmd"]).upper() + except Exception: + cmd = cmd.upper() - pass + if cmd == "START": + # Upstream expects robot to move to RUN state; + # publish ack if needed + # Actual state change is handled by the Robot subclass logic. + return + if cmd == "STOP": + return + if cmd == "RESET": + # Often used to reinitialize pose; forward a fresh pose broadcast + self.publish_coordinate() + return diff --git a/src/robot/indicators/neopixel.py b/src/robot/indicators/neopixel.py index 0cd1144..4f6af36 100644 --- a/src/robot/indicators/neopixel.py +++ b/src/robot/indicators/neopixel.py @@ -1,7 +1,158 @@ """NeoPixel indicator controller.""" +import json +import threading +from typing import Iterable, Optional + class NeoPixel: - """Placeholder NeoPixel controller.""" + """ + NeoPixel indicator controller via MQTT. + + Outbound (robot -> server/simulator): + Topic: /indicator/out/neopixel + Payloads: + {"id": , "cmd": "set", "color": "#RRGGBB"} + {"id": , "cmd": "pixels", "colors": ["#RRGGBB", ...]} + { + "id": , + "cmd": "blink", + "color": "#RRGGBB", + "period_ms": 500, + "duty": 0.5, + } + {"id": , "cmd": "pulse", "color": "#RRGGBB", "period_ms": 1200} + {"id": , "cmd": "off"} + + Optional inbound acknowledgments (server -> robot): + Topic: /indicator/in/neopixel/{robot_id} + Payload: {"ok": true, "cmd": "..."} or a status string + (ignored if absent). + """ + + OUT_TOPIC = "/indicator/out/neopixel" + IN_TOPIC_TMPL = "/indicator/in/neopixel/{robot_id}" + + def __init__( + self, + robot_id: int, + robot_mqtt, + pixel_count: int | None = None, + subscribe_ack: bool = False, + ): + """ + :param robot_id: Robot numeric id + :param robot_mqtt: RobotMQTT helper for publish/subscribe + and optional handler registry + :param pixel_count: Optional number of LEDs for validation + (None to skip) + :param subscribe_ack: Subscribe to ack/status messages if simulator + emits them + """ + self.robot_id = robot_id + self._mqtt = robot_mqtt + self._pixel_count = pixel_count + self._subscribe_ack = subscribe_ack + self._lock = threading.Lock() + self._last_cmd: Optional[dict] = None + self._closed = False + + if subscribe_ack: + inbox = self.IN_TOPIC_TMPL.format(robot_id=robot_id) + self._mqtt.subscribe(inbox) + if hasattr(self._mqtt, "add_handler"): + self._mqtt.add_handler(inbox, self._on_ack) + + # Inbound ack handler (optional) + def _on_ack(self, topic: str, payload: str): + # Acks are optional; store last payload for debugging + with self._lock: + self._last_cmd = {"ack_topic": topic, "ack_payload": payload} + + # ------------- Public API ------------- + + def init(self, color: str = "#000000"): + """Initialize strip to a known color.""" + self.set_color(color) + + def off(self): + """Turn off the strip.""" + self._send({"cmd": "off"}) + + def set_color(self, color: str): + """Set all pixels to a single color in #RRGGBB.""" + self._validate_hex(color) + self._send({"cmd": "set", "color": color}) + + def set_pixels(self, colors: Iterable[str]): + """ + Set per-pixel colors. + :param colors: iterable of #RRGGBB strings + """ + cols = list(colors) + for c in cols: + self._validate_hex(c) + if self._pixel_count is not None and len(cols) != self._pixel_count: + raise ValueError( + f"Expected {self._pixel_count} colors, got {len(cols)}" + ) + self._send({"cmd": "pixels", "colors": cols}) + + def blink(self, color: str, period_ms: int = 500, duty: float = 0.5): + """ + Blink a solid color with period and duty cycle. + """ + self._validate_hex(color) + if period_ms <= 0 or not (0 < duty <= 1): + raise ValueError("period_ms must be >0 and 0 0") + self._send({ + "cmd": "pulse", + "color": color, + "period_ms": int(period_ms), + }) + + def last_status(self) -> Optional[dict]: + """Return last recorded ack/status (if subscribed).""" + with self._lock: + return dict(self._last_cmd) if self._last_cmd else None + + def close(self): + """Cleanup subscriptions/resources.""" + if self._subscribe_ack: + inbox = self.IN_TOPIC_TMPL.format(robot_id=self.robot_id) + # If RobotMQTT has an unsubscribe, call it; otherwise it’s harmless + # to leave subscribed. + if hasattr(self._mqtt, "unsubscribe"): + self._mqtt.unsubscribe(inbox) + self._closed = True + + # ------------- Internals ------------- + + def _send(self, cmd: dict): + if self._closed: + raise RuntimeError("NeoPixel controller is closed") + payload = {"id": self.robot_id} + payload.update(cmd) + self._mqtt.publish(self.OUT_TOPIC, json.dumps(payload)) - pass + @staticmethod + def _validate_hex(color: str): + if ( + not isinstance(color, str) + or len(color) != 7 + or not color.startswith("#") + ): + raise ValueError("Color must be a #RRGGBB string") + int(color[1:], 16) # raises ValueError on invalid hex diff --git a/src/robot/motion.py b/src/robot/motion.py index 3945773..caa6ed4 100644 --- a/src/robot/motion.py +++ b/src/robot/motion.py @@ -1,7 +1,116 @@ -"""Motion controller algorithms.""" +""" +Motion controller algorithms. +""" + +import math +import time + +# Assume wheel parameters (can be tuned to match your Java lib) +WHEEL_RADIUS = 1.0 # wheel radius in cm +WHEEL_BASE = 5.0 # distance between wheels in cm +MAX_SPEED = 100 # max motor speed in cm/s class MotionController: - """Placeholder motion controller.""" + """ + Dead-reckoning motion controller. + Computes new positions and heading from motor speeds. + """ + + def __init__(self, coordinate, robot_mqtt, step_interval=0.1): + """ + :param coordinate: Coordinate object to track robot pose + :param robot_mqtt: RobotMqtt helper for publishing updates + :param step_interval: simulation step interval in seconds + """ + self.coordinate = coordinate + self.robot_mqtt = robot_mqtt + self.step_interval = step_interval + + # ------------------------------------------------------------------- + # Core motion update + # ------------------------------------------------------------------- + + def drive(self, left_speed, right_speed, duration): + """ + Drive robot using left & right motor speeds for a duration. + Updates position using dead-reckoning kinematics. + + :param left_speed: Left wheel speed (cm/s) + :param right_speed: Right wheel speed (cm/s) + :param duration: Duration in seconds + """ + # Validate + self._validate_speed(left_speed) + self._validate_speed(right_speed) + + steps = int(duration / self.step_interval) + for _ in range(steps): + self._step(left_speed, right_speed, self.step_interval) + self.robot_mqtt.publish_coordinate(self.coordinate) + time.sleep(self.step_interval) + + def _step(self, vl, vr, dt): + """Perform one integration step.""" + theta = math.radians(self.coordinate.heading) + + if abs(vl - vr) < 1e-6: + # Straight line motion + v = (vl + vr) / 2.0 + dx = v * math.cos(theta) * dt + dy = v * math.sin(theta) * dt + dtheta = 0 + else: + # Differential drive arc + R = (WHEEL_BASE / 2.0) * (vl + vr) / (vr - vl) + omega = (vr - vl) / WHEEL_BASE # angular velocity + dtheta = omega * dt + dx = R * (math.sin(theta + dtheta) - math.sin(theta)) + dy = -R * (math.cos(theta + dtheta) - math.cos(theta)) + + # Update state + self.coordinate.x += dx + self.coordinate.y += dy + self.coordinate.heading = (math.degrees(theta + dtheta)) % 360 + + # ------------------------------------------------------------------- + # High-level helpers + # ------------------------------------------------------------------- + + def move_forward(self, distance, speed=50): + """ + Move forward by a specific distance. + """ + duration = distance / speed + self.drive(speed, speed, duration) + + def move_backward(self, distance, speed=50): + """ + Move backward by a specific distance. + """ + duration = distance / speed + self.drive(-speed, -speed, duration) + + def rotate(self, angle_deg, speed=30): + """ + Rotate robot in place by angle (deg). + Positive = left turn, Negative = right turn. + """ + # Angular velocity: omega = (vr - vl) / WHEEL_BASE + vl = -speed + vr = speed + omega = (vr - vl) / WHEEL_BASE + duration = math.radians(abs(angle_deg)) / abs(omega) + + if angle_deg > 0: # left turn + self.drive(vl, vr, duration) + else: # right turn + self.drive(vr, vl, duration) + + # ------------------------------------------------------------------- + # Utils + # ------------------------------------------------------------------- - pass + def _validate_speed(self, speed): + if abs(speed) > MAX_SPEED: + raise ValueError(f"Speed {speed} exceeds maximum {MAX_SPEED}") diff --git a/src/robot/mqtt_client.py b/src/robot/mqtt_client.py index 4367bb4..1434346 100644 --- a/src/robot/mqtt_client.py +++ b/src/robot/mqtt_client.py @@ -1,7 +1,146 @@ -"""MQTT client wrapper for robot communication.""" +""" +MQTT client wrapper for robot communication. +""" + +import queue +import threading +import time +import paho.mqtt.client as mqtt class RobotMqttClient: - """Placeholder MQTT client wrapper.""" + """ + A wrapper around paho-mqtt that handles: + - Connecting & reconnecting to broker + - Publishing messages with QoS/retain + - Subscribing to topics + - Maintaining inbound/outbound message queues + - Callback hooks for message arrival + """ + + def __init__(self, client_id, server="127.0.0.1", port=1883, + username=None, password=None, keepalive=60, + autoreconnect=True): + self.client_id = client_id + self.server = server + self.port = port + self.username = username + self.password = password + self.keepalive = keepalive + self.autoreconnect = autoreconnect + + # Queues + self.inbound_queue = queue.Queue() # all msgs received + self.outbound_queue = queue.Queue() # msgs to be sent + + # Event for stopping background loop + self._stop_event = threading.Event() + + # Create Paho client + self.client = mqtt.Client(client_id=str(client_id), clean_session=True) + + if username and password: + self.client.username_pw_set(username, password) + + # Attach callbacks + self.client.on_connect = self._on_connect + self.client.on_disconnect = self._on_disconnect + self.client.on_message = self._on_message + self.client.on_publish = self._on_publish + + # User-defined callback for new messages + self.on_message_callback = None + + # --------------------------------------------------------- + # MQTT Lifecycle + # --------------------------------------------------------- + + def connect(self): + """Connect to MQTT broker and start background loop.""" + self.client.connect(self.server, self.port, self.keepalive) + self.client.loop_start() + + # Start outbound publisher thread + self._publisher_thread = threading.Thread( + target=self._publish_loop, daemon=True + ) + self._publisher_thread.start() + + def disconnect(self): + """Disconnect cleanly.""" + self._stop_event.set() + self.client.loop_stop() + self.client.disconnect() + + # --------------------------------------------------------- + # Publish / Subscribe + # --------------------------------------------------------- + + def publish(self, topic, payload, qos=0, retain=False): + """Queue a message for publishing.""" + self.outbound_queue.put((topic, payload, qos, retain)) + + def subscribe(self, topic, qos=0): + """Subscribe to a topic.""" + self.client.subscribe(topic, qos) + + def set_message_callback(self, callback): + """ + Set a callback for when a message arrives. + Callback should accept (topic, payload). + """ + self.on_message_callback = callback + + # --------------------------------------------------------- + # Internal Helpers + # --------------------------------------------------------- + + def _publish_loop(self): + """ + Background thread that drains outbound queue and publishes messages. + """ + while not self._stop_event.is_set(): + try: + topic, payload, qos, retain = self.outbound_queue.get( + timeout=0.5 + ) + self.client.publish(topic, payload, qos=qos, retain=retain) + except queue.Empty: + continue + + # --------------------------------------------------------- + # Paho Callbacks + # --------------------------------------------------------- + + def _on_connect(self, client, userdata, flags, rc): + if rc == 0: + print(f"[MQTT] Connected to {self.server}:{self.port}") + else: + print(f"[MQTT] Connection failed with code {rc}") + + def _on_disconnect(self, client, userdata, rc): + print("[MQTT] Disconnected") + if self.autoreconnect and not self._stop_event.is_set(): + print("[MQTT] Attempting reconnect...") + while not self._stop_event.is_set(): + try: + time.sleep(2) + self.client.reconnect() + break + except Exception: + continue + + def _on_message(self, client, userdata, msg): + payload = msg.payload.decode("utf-8") + self.inbound_queue.put((msg.topic, payload)) + + if self.on_message_callback: + self.on_message_callback(msg.topic, payload) - pass + def _on_publish(self, client, userdata, mid): + # This method is intentionally left empty as it serves as a placeholder + # for the Paho MQTT on_publish callback. It can be overridden + # or extended + # if specific behavior is needed (ACK , etc...) when a message + # is published. + pass diff --git a/src/robot/robot_base.py b/src/robot/robot_base.py index 54d0c4b..388189b 100644 --- a/src/robot/robot_base.py +++ b/src/robot/robot_base.py @@ -1,7 +1,124 @@ -"""Base Robot class.""" +""" +Base Robot class. +""" + +import time +import threading +from robot.mqtt_client import RobotMqttClient +from robot.helpers.coordinate import Coordinate +from robot.helpers.robot_mqtt import RobotMQTT class Robot: - """Base robot class providing lifecycle management.""" + """ + Base robot class providing lifecycle management. + - Initializes MQTT client, coordinate tracker, and RobotMQTT helper + - Provides setup() for sensors and communication helpers + - Provides a run loop that calls user-defined loop() at fixed intervals + while dispatching inbound MQTT messages + """ + + def __init__(self, robot_id: str, + broker_host="127.0.0.1", broker_port=1883, + username=None, password=None, + loop_hz=10): + """ + Initialize base robot. + :param robot_id: Unique robot identifier + :param broker_host: MQTT broker host + :param broker_port: MQTT broker port + :param username: optional username + :param password: optional password + :param loop_hz: Frequency of the run loop (Hz) + """ + self.robot_id = robot_id + self.loop_hz = loop_hz + self._stop_event = threading.Event() + + # Core components + self.coordinate = Coordinate() + self.mqtt_client = RobotMqttClient( + client_id=robot_id, + server=broker_host, + port=broker_port, + username=username, + password=password + ) + self.robot_mqtt = RobotMQTT( + robot_id, self.mqtt_client, self.coordinate + ) + + # --------------------------------------------------------- + # Lifecycle + # --------------------------------------------------------- + + def setup(self): + """ + Setup hook for initializing sensors, communication helpers, + and outputs. + Override in subclass. + """ + pass + + def loop(self): + """ + Main robot behavior loop. + Override in subclass to implement robot logic. + """ + pass + + def run(self): + """ + Run the robot main loop: + - Connects MQTT client + - Calls setup() + - Executes loop() at fixed interval + - Dispatches inbound messages + """ + print(f"[Robot] Starting {self.robot_id}") + + # Connect MQTT + self.mqtt_client.set_message_callback(self._on_message) + self.mqtt_client.connect() + + # Call user setup + self.setup() + + interval = 1.0 / self.loop_hz + try: + while not self._stop_event.is_set(): + start = time.time() + + # Dispatch inbound MQTT messages + self._dispatch_inbound() + + # Run user loop + self.loop() + + # Sleep to maintain loop frequency + elapsed = time.time() - start + time.sleep(max(0, interval - elapsed)) + except KeyboardInterrupt: + print(f"[Robot] Stopping {self.robot_id}") + finally: + self.shutdown() + + def shutdown(self): + """Clean shutdown of robot and MQTT client.""" + print("[Robot] Shutting down...") + self._stop_event.set() + self.mqtt_client.disconnect() + + # --------------------------------------------------------- + # Internal Helpers + # --------------------------------------------------------- + + def _dispatch_inbound(self): + """Drain inbound MQTT messages and forward to RobotMQTT handler.""" + while not self.mqtt_client.inbound_queue.empty(): + topic, payload = self.mqtt_client.inbound_queue.get() + self.robot_mqtt.handle_incoming(topic, payload) - pass + def _on_message(self, topic, payload): + """Callback from MQTT client, just enqueues messages.""" + self.mqtt_client.inbound_queue.put((topic, payload)) diff --git a/src/robot/sensors/color.py b/src/robot/sensors/color.py index 94fe0be..25a7f26 100644 --- a/src/robot/sensors/color.py +++ b/src/robot/sensors/color.py @@ -1,7 +1,112 @@ """Color sensor implementation.""" +import json +import threading +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class ColorReading: + r: int + g: int + b: int + ambient: int -class ColorSensor: - """Placeholder color sensor.""" +class ColorSensorTimeout(TimeoutError): pass + + +class ColorSensor: + """ + Color sensor using Pera‑Swarm MQTT topics. + + Request: + Topic: /sensor/color + Payload: {"id": , "reality": } + Response: + Topic: /sensor/color/{robotID} + Payload (string): "{R} {G} {B} {ambient}" + """ + + REQ_TOPIC = "/sensor/color" + RESP_TOPIC_TMPL = "/sensor/color/{robot_id}" + + def __init__(self, robot_id: int, robot_mqtt, timeout_s: float = 2.0): + self.robot_id = robot_id + self._robot_mqtt = robot_mqtt + self._timeout_s = timeout_s + + # Sync + state + self._event = threading.Event() + self._lock = threading.Lock() + self._latest: Optional[ColorReading] = None + + # Subscribe to response topic + resp_topic = self.RESP_TOPIC_TMPL.format(robot_id=self.robot_id) + self._robot_mqtt.subscribe(resp_topic) + + # If the RobotMQTT helper allows handler registration, attach directly + if hasattr(self._robot_mqtt, "add_handler"): + self._robot_mqtt.add_handler(resp_topic, self._on_mqtt_message) + + # If RobotMQTT doesn’t support handler registration, + # it should dispatch to this + def handle_incoming(self, topic: str, payload: str): + self._on_mqtt_message(topic, payload) + + # Internal: parse simulator reply + def _on_mqtt_message(self, topic: str, payload: str): + expected = self.RESP_TOPIC_TMPL.format(robot_id=self.robot_id) + if topic != expected: + return + + parts = payload.strip().split() + if len(parts) < 4: + return + try: + r, g, b, ambient = map(int, parts[:4]) + except ValueError: + return + + with self._lock: + self._latest = ColorReading(r=r, g=g, b=b, ambient=ambient) + self._event.set() + + def request_color(self, reality: Optional[str] = None): + """ + Send a request without blocking; response will update internal cache. + """ + payload = {"id": self.robot_id} + if reality is not None: + try: + payload["reality"] = int(reality) + except (ValueError, TypeError): + pass + self._robot_mqtt.publish(self.REQ_TOPIC, json.dumps(payload)) + + def get_color( + self, reality: Optional[str] = None, timeout_s: Optional[float] = None + ) -> ColorReading: + """ + Request a reading and block until response or timeout. + """ + tmo = self._timeout_s if timeout_s is None else timeout_s + self._event.clear() + self.request_color(reality=reality) + + if not self._event.wait(tmo): + raise ColorSensorTimeout(f"ColorSensor timed out after {tmo:.2f}s") + + with self._lock: + self._event.clear() + assert self._latest is not None + return self._latest + + def last_color(self) -> Optional[ColorReading]: + """ + Return the most recently cached reading without making a request. + """ + with self._lock: + return self._latest diff --git a/src/robot/sensors/distance.py b/src/robot/sensors/distance.py index 3c8c035..d0565eb 100644 --- a/src/robot/sensors/distance.py +++ b/src/robot/sensors/distance.py @@ -1,7 +1,131 @@ -"""Distance sensor implementation.""" +"""Distance sensor that performs a request/response over MQTT.""" +import json +import threading +import uuid -class DistanceSensor: - """Placeholder distance sensor.""" +class DistanceSensorTimeout(TimeoutError): pass + + +class DistanceSensor: + """ + Distance sensor that performs a request/response over MQTT. + + Expected topics (customize to match Java library): + - Request: robot/{robot_id}/{channel}/sensors/distance/req + - Response: robot/{robot_id}/{channel}/sensors/distance/resp + Payloads carry a correlation_id so multiple concurrent sensors/requests + can be disambiguated. + """ + + REQ_TOPIC_TMPL = "robot/{robot_id}/{channel}/sensors/distance/req" + RESP_TOPIC_TMPL = "robot/{robot_id}/{channel}/sensors/distance/resp" + + def __init__( + self, + robot_id: str, + robot_mqtt, + channel: str = "v1", + timeout_s: float = 2.0 + ): + """ + :param robot_id: Unique robot id + :param robot_mqtt: RobotMQTT helper that exposes publish() and + subscribe() + and delivers messages to handle_incoming(topic, payload) + :param channel: Channel/version segment used in topics + :param timeout_s: Max seconds to wait for a sensor reply + """ + self.robot_id = robot_id + self.channel = channel + self.timeout_s = timeout_s + + # Synchronization + self._lock = threading.Lock() + self._waiters: dict[str, tuple[threading.Event, float | None]] = { + # correlation_id -> (Event, value) + } + + # MQTT wiring + self._robot_mqtt = robot_mqtt + resp_topic = self.RESP_TOPIC_TMPL.format( + robot_id=robot_id, channel=channel + ) + # Ensure we receive responses + self._robot_mqtt.subscribe(resp_topic) + + # Register a handler with RobotMQTT to route only distance responses + # here. + # If RobotMQTT exposes a generic add_handler(topic_prefix, callback), + # use it. + # Otherwise, DistanceSensor exposes handle_incoming() + # for RobotMQTT to call. + if hasattr(self._robot_mqtt, "add_handler"): + self._robot_mqtt.add_handler(resp_topic, self._on_mqtt_message) + + # If RobotMQTT doesn't support handler registration, + # RobotMQTT should call this: + def handle_incoming(self, topic: str, payload: str): + """Pass-through for RobotMQTT to deliver messages to this sensor.""" + self._on_mqtt_message(topic, payload) + + # Internal MQTT message handler + def _on_mqtt_message(self, topic: str, payload: str): + # Only process our response topic + expected = self.RESP_TOPIC_TMPL.format( + robot_id=self.robot_id, channel=self.channel + ) + if topic != expected: + return + + try: + data = json.loads(payload) + except Exception: + return + + corr = data.get("correlation_id") + # distance in whatever units the simulator uses + value = data.get("distance") + + if corr is None: + return + + with self._lock: + event_value = self._waiters.get(corr) + if not event_value: + return + event, _ = event_value + self._waiters[corr] = (event, value) + event.set() + + def get_distance(self) -> float: + """ + Publish a distance request and synchronously wait for the response. + Raises DistanceSensorTimeout on timeout. + """ + corr_id = uuid.uuid4().hex + event = threading.Event() + with self._lock: + self._waiters[corr_id] = (event, None) + + req_topic = self.REQ_TOPIC_TMPL.format( + robot_id=self.robot_id, channel=self.channel + ) + payload = json.dumps({"correlation_id": corr_id}) + + # Send request (QoS 0 by default; adjust if needed) + self._robot_mqtt.publish(req_topic, payload) + + # Wait for response + ok = event.wait(self.timeout_s) + with self._lock: + _, value = self._waiters.pop(corr_id, (None, None)) + + if not ok or value is None: + raise DistanceSensorTimeout( + f"DistanceSensor timed out after {self.timeout_s:.2f}s" + ) + + return float(value) diff --git a/src/robot/sensors/proximity.py b/src/robot/sensors/proximity.py index 683a83d..14c05c9 100644 --- a/src/robot/sensors/proximity.py +++ b/src/robot/sensors/proximity.py @@ -1,7 +1,159 @@ """Proximity sensor implementation.""" +import json +import threading +from typing import List, Optional -class ProximitySensor: - """Placeholder proximity sensor.""" +class ProximityTimeout(TimeoutError): pass + + +class ProximityReading: + """ + Container for proximity data returned by the simulator. + + The Java docs indicate the response depends on the requested angles and may + include distances and optional color codes; at minimum we keep distances + and + raw fields so higher layers can parse colors if present. [See doc note] + """ + + def __init__(self, distances: List[int], raw: str): + self._distances = distances + self._raw = raw + + def distances(self) -> List[int]: + return self._distances + + def raw(self) -> str: + return self._raw + + +class ProximitySensor: + """ + Proximity sensor emulation using the Pera-Swarm MQTT protocol. + + Request topic: + /sensor/proximity + Payload JSON: { + "id": , "angles": [int, ...], "reality": + } + Response topic: + /sensor/proximity/{robot_id} + Payload: space separated values; for N angles, the first N numbers + are distances. + Example (angles [-90,0,90]): "5 10 5 #404040 #000000 #FF0000" + [docs] [3] + + This class subscribes to the response topic, publishes requests, + and returns parsed distances in the order of configured angles. + """ + + REQ_TOPIC = "/sensor/proximity" + RESP_TOPIC_TMPL = "/sensor/proximity/{robot_id}" + + def __init__( + self, + robot_id: int, + robot_mqtt, + angles: Optional[List[int]] = None, + timeout_s: float = 2.0, + ): + """ + :param robot_id: numeric robot id used by the simulator + :param robot_mqtt: RobotMQTT helper with publish(), subscribe(), and + message dispatch + :param angles: list of angles (degrees) like [-90, 0, 90] + :param timeout_s: maximum seconds to wait for a response + """ + self.robot_id = robot_id + self._robot_mqtt = robot_mqtt + self._angles = list(angles) if angles is not None else [-90, 0, 90] + self._timeout_s = timeout_s + + # Sync primitives + self._lock = threading.Lock() + self._event = threading.Event() + self._latest_raw: str = "" + self._latest_distances: List[int] = [] + + # Subscribe to the response topic from simulator + resp_topic = self.RESP_TOPIC_TMPL.format(robot_id=self.robot_id) + self._robot_mqtt.subscribe(resp_topic) + + # Register handler with RobotMQTT if it supports registration; + # otherwise RobotMQTT should forward all messages to handle_incoming(). + if hasattr(self._robot_mqtt, "add_handler"): + self._robot_mqtt.add_handler(resp_topic, self._on_message) + + # Optional: Java API parity + def set_angles(self, angles: List[int]): + with self._lock: + self._angles = list(angles) + + def get_angles(self) -> List[int]: + with self._lock: + return list(self._angles) + + # RobotMQTT can call this to route messages here + # if no add_handler() exists. + def handle_incoming(self, topic: str, payload: str): + self._on_message(topic, payload) + + # Internal: MQTT callback + def _on_message(self, topic: str, payload: str): + expected = self.RESP_TOPIC_TMPL.format(robot_id=self.robot_id) + if topic != expected: + return + + # Example response: "5 10 5 #404040 #000000 #FF0000" + tokens = payload.strip().split() + # First len(angles) tokens are distances + with self._lock: + k = len(self._angles) + try: + distances = list(map(int, tokens[:k])) + except ValueError: + distances = [] + self._latest_distances = distances + self._latest_raw = payload + self._event.set() + + def get_proximity( + self, reality: Optional[str] = None, timeout_s: Optional[float] = None + ) -> ProximityReading: + """ + Request proximity readings and block until reply or timeout. + + :param reality: optional reality selector passed through to simulator + :param timeout_s: override default timeout for this call + :return: ProximityReading with distances in angle order + :raises ProximityTimeout: if response not received in time + """ + tmo = self._timeout_s if timeout_s is None else timeout_s + + # Clear any previous signal + self._event.clear() + with self._lock: + angles = list(self._angles) + + # Publish request + payload = {"id": self.robot_id, "angles": angles} + if reality is not None: + payload["reality"] = reality + self._robot_mqtt.publish(self.REQ_TOPIC, json.dumps(payload)) + + # Wait for response + if not self._event.wait(tmo): + raise ProximityTimeout( + f"Proximity request timed out after {tmo:.2f}s" + ) + + with self._lock: + distances = list(self._latest_distances) + raw = self._latest_raw + # Prepare for next call + self._event.clear() + + return ProximityReading(distances=distances, raw=raw) diff --git a/tests/test_motion.py b/tests/test_motion.py index e12c3b6..ced0474 100644 --- a/tests/test_motion.py +++ b/tests/test_motion.py @@ -1,9 +1,62 @@ +# CHANGE: Patched the MQTT connection in test_robot_base_run_and_shutdown +# to avoid real network calls. This ensures the test passes even if no +# MQTT broker is running, by allowing setup() to be called. + """Tests for the MotionController.""" +from robot.helpers.coordinate import Coordinate +from robot.helpers.robot_mqtt import RobotMQTT from robot.motion import MotionController +class DummyMqttClient: + def publish(self, *a, **kw): + # This method is intentionally left empty as it is a dummy + # implementation for testing purposes and does not require any + # functionality. + pass + + def subscribe(self, *a, **kw): + # This method is intentionally left empty as it is a dummy + # implementation for testing purposes and does not require any + # functionality. + pass + + def test_motion_controller_instantiation(): """Instantiate MotionController.""" - controller = MotionController() + coordinate = Coordinate() + robot_mqtt = RobotMQTT( + robot_id=1, mqtt_client=DummyMqttClient(), coordinate=coordinate + ) + controller = MotionController(coordinate=coordinate, robot_mqtt=robot_mqtt) assert isinstance(controller, MotionController) + + +def test_robot_base_run_and_shutdown(): + """Test Robot base class run and shutdown.""" + import threading + from unittest.mock import patch + + from robot.robot_base import Robot + + class TestRobot(Robot): + def __init__(self, *a, **kw): + super().__init__(*a, **kw) + self.loop_called = 0 + + def setup(self): + self.setup_called = True + + def loop(self): + self.loop_called += 1 + # Stop after first loop for test + self._stop_event.set() + + robot = TestRobot(robot_id="testbot", loop_hz=100) + with patch.object(robot.mqtt_client, "connect", return_value=None): + t = threading.Thread(target=robot.run) + t.start() + t.join(timeout=2) + assert hasattr(robot, "setup_called") + assert robot.loop_called >= 1