From 417e0a9f36912a1224bd9fcf320b3cf779774d67 Mon Sep 17 00:00:00 2001 From: Alexandre Abadie Date: Mon, 16 Feb 2026 09:17:15 +0100 Subject: [PATCH] dotbot/dotbot_simulator: rework threading --- doc/conf.py | 1 + dotbot/dotbot_simulator.py | 143 +++++++++++++++++++++---------------- 2 files changed, 81 insertions(+), 63 deletions(-) diff --git a/doc/conf.py b/doc/conf.py index 2a0ac4e..88e624b 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -45,6 +45,7 @@ ("py:class", r"dotbot.models.Annotated"), ("py:class", r"Query"), ("py:class", r"PydanticUndefined"), + ("py:class", r"queue.Queue"), ] # -- Options for HTML output ------------------------------------------------- diff --git a/dotbot/dotbot_simulator.py b/dotbot/dotbot_simulator.py index a15c625..358cc70 100644 --- a/dotbot/dotbot_simulator.py +++ b/dotbot/dotbot_simulator.py @@ -6,6 +6,7 @@ """Dotbot simulator for the DotBot project.""" +import queue import random import threading import time @@ -76,11 +77,10 @@ class InitStateToml(BaseModel): network: SimulatedNetworkSettings = SimulatedNetworkSettings() -class DotBotSimulator(threading.Thread): +class DotBotSimulator: """Simulator class for the dotbot.""" - def __init__(self, settings: SimulatedDotBotSettings): - super().__init__(daemon=True) + def __init__(self, settings: SimulatedDotBotSettings, tx_queue: queue.Queue): self.address = settings.address.lower() self.pos_x = settings.pos_x self.pos_y = settings.pos_y @@ -97,10 +97,18 @@ def __init__(self, settings: SimulatedDotBotSettings): self.waypoints = [] self.waypoint_index = 0 + self._lock = threading.Lock() + self.tx_queue = tx_queue + self.queue = queue.Queue() + self.advertise_thread = threading.Thread(target=self.advertise, daemon=True) + self.rx_thread = threading.Thread(target=self.rx_frame, daemon=True) + self.main_thread = threading.Thread(target=self.run, daemon=True) self.controller_mode: DotBotSimulatorMode = DotBotSimulatorMode.MANUAL self.logger = LOGGER.bind(context=__name__, address=self.address) - self.running = True - self.start() + self._stop_event = threading.Event() + self.rx_thread.start() + self.advertise_thread.start() + self.main_thread.start() @property def header(self): @@ -191,94 +199,103 @@ def update(self, dt: float): def advertise(self): """Send an advertisement message to the gateway.""" - payload = Frame( - header=self.header, - packet=Packet.from_payload( - PayloadDotBotAdvertisement( - calibrated=self.calibrated, - direction=int(self.theta * 180 / pi + 90), - pos_x=int(self.pos_x) if self.pos_x >= 0 else 0, - pos_y=int(self.pos_y) if self.pos_y >= 0 else 0, - pos_z=0, - battery=battery_discharge_model(self.time_elapsed_s), - ) - ), - ) - return payload + while self._stop_event.is_set() is False: + payload = Frame( + header=self.header, + packet=Packet.from_payload( + PayloadDotBotAdvertisement( + calibrated=self.calibrated, + direction=int(self.theta * 180 / pi + 90), + pos_x=int(self.pos_x) if self.pos_x >= 0 else 0, + pos_y=int(self.pos_y) if self.pos_y >= 0 else 0, + pos_z=0, + battery=battery_discharge_model(self.time_elapsed_s), + ) + ), + ) + self.tx_queue.put(payload) + time.sleep(0.5) - def handle_frame(self, frame: Frame): + def rx_frame(self): """Decode the serial input received from the gateway.""" - if self.address == hex(frame.header.destination)[2:]: - if frame.payload_type == PayloadType.CMD_MOVE_RAW: - self.controller_mode = DotBotSimulatorMode.MANUAL - self.v_left = frame.packet.payload.left_y - self.v_right = frame.packet.payload.right_y - - if self.v_left > 127: - self.v_left = self.v_left - 256 - if self.v_right > 127: - self.v_right = self.v_right - 256 - - elif frame.payload_type == PayloadType.LH2_WAYPOINTS: - self.v_left = 0 - self.v_right = 0 - self.controller_mode = DotBotSimulatorMode.MANUAL - self.waypoint_threshold = frame.packet.payload.threshold - self.waypoints = frame.packet.payload.waypoints - if self.waypoints: - self.controller_mode = DotBotSimulatorMode.AUTOMATIC - - def stop(self): - self.logger.info("Stopping DotBot simulator...") - self.running = False - self.join() + while self._stop_event.is_set() is False: + frame = self.queue.get() + if frame is None: + break + with self._lock: + if self.address == hex(frame.header.destination)[2:]: + if frame.payload_type == PayloadType.CMD_MOVE_RAW: + self.controller_mode = DotBotSimulatorMode.MANUAL + self.v_left = frame.packet.payload.left_y + self.v_right = frame.packet.payload.right_y + + if self.v_left > 127: + self.v_left = self.v_left - 256 + if self.v_right > 127: + self.v_right = self.v_right - 256 + + elif frame.payload_type == PayloadType.LH2_WAYPOINTS: + self.v_left = 0 + self.v_right = 0 + self.controller_mode = DotBotSimulatorMode.MANUAL + self.waypoint_threshold = frame.packet.payload.threshold + self.waypoints = frame.packet.payload.waypoints + if self.waypoints: + self.controller_mode = DotBotSimulatorMode.AUTOMATIC def run(self): """Update the state of the dotbot simulator.""" - while self.running is True: - self.update(0.1) + while self._stop_event.is_set() is False: + with self._lock: + self.update(0.1) + + def stop(self): + self.logger.info(f"Stopping DotBot {self.address} simulator...") + self._stop_event.set() + self.queue.put(None) # unblock the rx_thread if waiting on the queue + self.advertise_thread.join() + self.rx_thread.join() + self.main_thread.join() -class DotBotSimulatorCommunicationInterface(threading.Thread): +class DotBotSimulatorCommunicationInterface: """Bidirectional serial interface to control simulated robots""" def __init__(self, on_frame_received: Callable, simulator_init_state_path: str): + self.queue = queue.Queue() self.on_frame_received = on_frame_received - self.running = True - super().__init__(daemon=True) + self._stp_event = threading.Event() + self.main_thread = threading.Thread(target=self.run, daemon=True) init_state = InitStateToml(**toml.load(simulator_init_state_path)) self.network_pdr = init_state.network.pdr self.dotbots = [ DotBotSimulator( settings=dotbot_settings, + tx_queue=self.queue, ) for dotbot_settings in init_state.dotbots ] - self.start() + self.main_thread.start() self.logger = LOGGER.bind(context=__name__) self.logger.info("DotBot Simulation Started") def run(self): """Listen continuously at each byte received on the fake serial interface.""" - for dotbot in self.dotbots: - self.on_frame_received(dotbot.advertise()) - time.sleep(0.1) - - while self.running: - for dotbot in self.dotbots: - self.handle_dotbot_frame(dotbot.advertise()) - time.sleep( - 0.5 - PayloadDotBotAdvertisement().size * len(self.dotbots) * 0.000001 - ) + while self._stp_event.is_set() is False: + frame = self.queue.get() + if frame is None: + break + self.handle_dotbot_frame(frame) def stop(self): self.logger.info("Stopping DotBot Simulation...") - self.running = False + self._stp_event.set() + self.queue.put(None) # unblock the run thread if waiting on the queue for dotbot in self.dotbots: dotbot.stop() - self.join() + self.main_thread.join() def flush(self): """Flush fake serial output.""" @@ -304,4 +321,4 @@ def write(self, bytes_): f"Packet to DotBot {dotbot.address} lost in simulation" ) continue - dotbot.handle_frame(Frame.from_bytes(bytes_)) + dotbot.queue.put(Frame.from_bytes(bytes_))