Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@
"python.analysis.typeCheckingMode": "basic",
"python.analysis.autoImportCompletions": true,
"python.defaultInterpreterPath": ".venv/bin/python",
"python.terminal.activateEnvInCurrentTerminal": true
"python.terminal.activateEnvInCurrentTerminal": true,
"cSpell.words": ["autoreconnect", "demuxes", "dtheta", "keepalive", "MQTT", "neopixel", "paho", "testbot", "TMPL"]
}
91 changes: 89 additions & 2 deletions src/robot/communication/directed_comm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,94 @@
"""Directed communication module."""

import json
import queue
import threading
from typing import Callable, Optional, Tuple


class DirectedCommunication:
"""Placeholder directed communication class."""
"""
Directed (one-to-one) inter-robot comms.

Publish (robot -> server):
Topic: /comm/out/direct
Payload: {"id": <src>, "to": <dst>, "msg": "<string>"}

Subscribe (server -> robot):
Topic: /comm/in/direct/{robot_id}
Payload: ("{\"from\": <src>, \"msg\": \"...\"}"
" or plain string per server;");
this helper normalizes to (src, msg).
"""

OUT_TOPIC = "/comm/out/direct"
IN_TOPIC_TMPL = "/comm/in/direct/{robot_id}"

def __init__(self, robot_id: int, robot_mqtt):
self.robot_id = robot_id
self._mqtt = robot_mqtt

self._in_q: "queue.Queue[Tuple[int, str]]" = queue.Queue()
self._cb: Optional[Callable[[int, str], None]] = None
self._lock = threading.Lock()

inbox = self.IN_TOPIC_TMPL.format(robot_id=self.robot_id)
self._mqtt.subscribe(inbox)
if hasattr(self._mqtt, "add_handler"):
self._mqtt.add_handler(inbox, self._on_message)

# If RobotMQTT doesn’t provide add_handler, it should route here.
def handle_incoming(self, topic: str, payload: str):
self._on_message(topic, payload)

def _on_message(self, topic: str, payload: str):
expected = self.IN_TOPIC_TMPL.format(robot_id=self.robot_id)
if topic != expected:
return

# Try JSON with {"from": int, "msg": "..."}; fall back to raw string
src = -1
msg = payload
try:
obj = json.loads(payload)
if isinstance(obj, dict):
src = int(obj.get("from", -1))
msg = str(obj.get("msg", ""))
except Exception:
pass

self._in_q.put((src, msg))
with self._lock:
if self._cb:
try:
self._cb(src, msg)
except Exception:
pass

# API

def set_callback(self, fn: Callable[[int, str], None]):
with self._lock:
self._cb = fn

def send(self, to_robot: int, message: str):
"""
Send directly to a specific robot via the server’s directed comm
channel.
"""
data = {"id": self.robot_id, "to": int(to_robot), "msg": message}
self._mqtt.publish(self.OUT_TOPIC, json.dumps(data))

def recv_nowait(self) -> Optional[tuple[int, str]]:
try:
return self._in_q.get_nowait()
except queue.Empty:
return None

pass
def recv(self, timeout: float | None = None) -> Optional[tuple[int, str]]:
try:
if timeout is not None:
return self._in_q.get(timeout=timeout)
return self._in_q.get()
except queue.Empty:
return None
82 changes: 80 additions & 2 deletions src/robot/communication/simple_comm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,85 @@
"""Simple communication module."""

import json
import queue
import threading
from typing import Callable, Optional


class SimpleCommunication:
"""Placeholder simple communication class."""
"""
Simple (broadcast-like) inter-robot comms.

Publish (robot -> server):
Topic: /comm/out/simple
Payload: {"id": <int>, "msg": "<string>", "dist": <optional number>}

Subscribe (server -> robot):
Topic: /comm/in/simple/{robot_id}
Payload: "<string>" (server delivers message body)
"""

OUT_TOPIC = "/comm/out/simple"
IN_TOPIC_TMPL = "/comm/in/simple/{robot_id}"

def __init__(self, robot_id: int, robot_mqtt):
self.robot_id = robot_id
self._mqtt = robot_mqtt

# inbound queue and optional callback
self._in_q: "queue.Queue[str]" = queue.Queue()
self._cb: Optional[Callable[[str], None]] = None
self._lock = threading.Lock()

# subscribe to inbox
in_topic = self.IN_TOPIC_TMPL.format(robot_id=self.robot_id)
self._mqtt.subscribe(in_topic)
if hasattr(self._mqtt, "add_handler"):
self._mqtt.add_handler(in_topic, self._on_message)

# If RobotMQTT doesn’t provide add_handler, it should forward to this
def handle_incoming(self, topic: str, payload: str):
self._on_message(topic, payload)

def _on_message(self, topic: str, payload: str):
expected = self.IN_TOPIC_TMPL.format(robot_id=self.robot_id)
if topic != expected:
return
self._in_q.put(payload)
with self._lock:
if self._cb:
try:
self._cb(payload)
except Exception:
pass

# API

def set_callback(self, fn: Callable[[str], None]):
with self._lock:
self._cb = fn

def send(self, message: str, distance: float | None = None):
"""
Send a message via the simple protocol; server decides receivers.
"""
data = {"id": self.robot_id, "msg": message}
if distance is not None:
data["dist"] = distance
self._mqtt.publish(self.OUT_TOPIC, json.dumps(data))

def recv_nowait(self) -> Optional[str]:
try:
return self._in_q.get_nowait()
except queue.Empty:
return None

pass
def recv(self, timeout: float | None = None) -> Optional[str]:
try:
return (
self._in_q.get(timeout=timeout)
if timeout is not None
else self._in_q.get()
)
except queue.Empty:
return None
92 changes: 90 additions & 2 deletions src/robot/helpers/coordinate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,95 @@
"""Coordinate tracking helper."""

import math
from dataclasses import dataclass
from typing import Tuple, Optional


def _wrap_deg(angle_deg: float) -> float:
# Normalize to [0, 360)
a = angle_deg % 360.0
if a < 0:
a += 360.0
return a


@dataclass
class Coordinate:
"""Placeholder coordinate tracker."""
"""
Pose container and odometry utilities.
Units: centimeters for x,y; degrees for heading (0..360).
Heading 0 deg = +X axis; positive angles rotate CCW.
"""
x: float = 0.0
y: float = 0.0
heading: float = 0.0 # degrees

# ------------- Odometry updates -------------

def update_from_wheel_speeds(
self, v_l: float, v_r: float, dt: float, wheel_base: float
):
"""
Integrate pose from left/right linear speeds (cm/s) over dt (s).
wheel_base is wheel separation (cm).
"""
theta = math.radians(self.heading)
if abs(v_l - v_r) < 1e-9:
v = 0.5 * (v_l + v_r)
self.x += v * math.cos(theta) * dt
self.y += v * math.sin(theta) * dt
# heading unchanged
else:
R = (wheel_base / 2.0) * (v_l + v_r) / (v_r - v_l)
omega = (v_r - v_l) / wheel_base
dtheta = omega * dt
self.x += R * (math.sin(theta + dtheta) - math.sin(theta))
self.y += -R * (math.cos(theta + dtheta) - math.cos(theta))
self.heading = _wrap_deg(math.degrees(theta + dtheta))
# If straight case, ensure heading normalized too
self.heading = _wrap_deg(self.heading)
# Matches canonical differential-drive odometry for ICC motion. [4][1]

def set_pose(self, x: float, y: float, heading_deg: float):
self.x = float(x)
self.y = float(y)
self.heading = _wrap_deg(float(heading_deg))

def apply_delta(self, dx: float, dy: float, dtheta_deg: float):
"""Apply a small world-frame delta to the pose."""
self.x += float(dx)
self.y += float(dy)
self.heading = _wrap_deg(self.heading + float(dtheta_deg))

# ------------- Frame conversions -------------

def world_to_robot(self, wx: float, wy: float) -> Tuple[float, float]:
"""
Convert a world point to robot-centric coordinates.
"""
dx = wx - self.x
dy = wy - self.y
theta = math.radians(self.heading)
rx = dx * math.cos(-theta) - dy * math.sin(-theta)
ry = dx * math.sin(-theta) + dy * math.cos(-theta)
return rx, ry

def robot_to_world(self, rx: float, ry: float) -> Tuple[float, float]:
"""
Convert a robot-centric point to world coordinates.
"""
theta = math.radians(self.heading)
wx = rx * math.cos(theta) - ry * math.sin(theta) + self.x
wy = rx * math.sin(theta) + ry * math.cos(theta) + self.y
return wx, wy

# ------------- Publishing -------------

pass
def publish(self, robot_mqtt, robot_id: Optional[int] = None):
"""
Publish pose via the RobotMQTT helper. If a different robot_id is
required by the application, pass it; otherwise RobotMQTT will use
its configured id.
"""
# RobotMQTT.publish_coordinate reads self.x/y/heading by default.
robot_mqtt.publish_coordinate(self)
Loading