diff --git a/dotbot/examples/minimum_naming_game/README.md b/dotbot/examples/minimum_naming_game/README.md new file mode 100644 index 0000000..acf6714 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/README.md @@ -0,0 +1,53 @@ +# Minimum Naming Game + +This demo runs the minimum naming game in the DotBot simulator, where the robots use local communication to converge on a single word. + +This demo includes two variants: a static setup without motion and a dynamic setup with motion. + +## Install Python packages (pip) + +Install the Python packages required to run this demo. + +```bash +pip install pyyaml scipy +``` + +## How to run + +### Specify the initial state + +Specify the initial state of the DotBots by replacing the file path for ```simulator_init_state_path``` in [config_sample.toml](config_sample.toml). + +**Static setup** (without motion) using init_state.toml: + +```toml +simulator_init_state_path = "dotbot/examples/minimum_naming_game/init_state.toml" +``` + +**Dynamic setup** (with motion) using init_state_with_motion.toml: + +```toml +simulator_init_state_path = "dotbot/examples/minimum_naming_game/init_state_with_motion.toml" +``` + +### Start the controller in simulator mode + +```bash +python -m dotbot.controller_app --config-path config_sample.toml -p dotbot-simulator -a dotbot-simulator --log-level error +``` + +### Run the minimum naming game scenario + +Open a new terminal and run the minimum naming game scenario. + +**Static setup** (without motion): + +```bash +python -m dotbot.examples.minimum_naming_game.minimum_naming_game +``` + +**Dynamic setup** (with motion) : + +```bash +python -m dotbot.examples.minimum_naming_game.minimum_naming_game_with_motion +``` \ No newline at end of file diff --git a/dotbot/examples/minimum_naming_game/controller.py b/dotbot/examples/minimum_naming_game/controller.py new file mode 100644 index 0000000..cb5e467 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/controller.py @@ -0,0 +1,212 @@ +import random +from dotbot.models import ( + DotBotLH2Position, + DotBotModel, +) +from dotbot.examples.sct import SCT + +DISTINCT_COLORS = [ + (255, 0, 0), # Red + (0, 255, 0), # Lime + (0, 0, 255), # Blue + (255, 255, 0), # Yellow + (255, 0, 255), # Magenta + (0, 255, 255), # Cyan + (255, 165, 0), # Orange + (128, 0, 255), # Violet +] + + +class Controller: + def __init__(self, address: str, path: str): + self.address = address + + self.position = DotBotLH2Position(x=0.0, y=0.0, z=0.0) # initial position + self.direction = 0.0 # initial orientation + + self.neighbors: list[DotBotModel] = [] # initial empty neighbor list + self.vector = [0.0, 0.0] # initial movement vector + + # SCT initialization + self.sct = SCT(path) + self.add_callbacks() + + self.led = (0, 0, 0) # initial LED color + + # --- Naming Game Variables --- + self.counter = 0 # FOR DEBUGGING + self.last_broadcast_ticks = 0 # Tracks timing + self.max_broadcast_ticks = 5 + + # Pre-defined words (e.g., num_words = 128) + self.num_words = 8 + self.words = list(range(self.num_words)) + + # Word reception state + self.received_word = None + # self.received_word_checked = True + self.new_word_received = False + + # Global variable for the word chosen for transmission + self.w_index = 0 + + # Inventory of known words + self.inventory = set() + + + def control_step(self): + + self.counter += 1 # Increment step counter + + # Run SCT control step + self.sct.run_step() + + self.color_code() # Update LED color based on inventory state + + + # Register callback functions to the generator player + def add_callbacks(self): + + # Automatic addition of callbacks + # 1. Get list of events and list specifying whether an event is controllable or not. + # 2. For each event, check controllable or not and add callback. + + events, controllability_list = self.sct.get_events() + + for event, index in events.items(): + is_controllable = controllability_list[index] + stripped_name = event.split('EV_', 1)[1] # Strip preceding string 'EV_' + + if is_controllable: # Add controllable event + func_name = '_callback_{0}'.format(stripped_name) + func = getattr(self, func_name) + self.sct.add_callback(event, func, None, None) + else: # Add uncontrollable event + func_name = '_check_{0}'.format(stripped_name) + func = getattr(self, func_name) + self.sct.add_callback(event, None, func, None) + + + # Callback functions (controllable events) + def _callback_startTimer(self, data: any): + """ + Saves the current tick count to mark the start of the broadcast interval. + """ + # print(f'DotBot {self.address}. ACTION: startTimer') + self.last_broadcast_ticks = self.counter + + + def _callback_selectAndBroadcast(self, data: any): + """ + Selects a random word from the inventory, or invents a new one + if the inventory is empty. Sets the flag for transmission. + """ + # print(f'DotBot {self.address}. ACTION: selectAndBroadcast', end=". ") + + # Select or Invent a word + if not self.inventory: + # Inventory is empty: invent a new word from the pool + self.w_index = random.randrange(self.num_words) + # Store the word (equivalent to inventory[0] = words[w_index].data[0]) + self.inventory.add(self.words[self.w_index]) + else: + # Inventory is not empty: pick a random word from current known words + self.w_index = random.choice(list(self.inventory)) + + # Set broadcast flag for transmission + self.broadcast_word = True + + # print(f'\tinventory: {self.inventory},\tselected word: {self.w_index}') + + + def _callback_updateInventory(self, data: any): + """ + Updates the inventory based on the last received word. + If the word is known, the agent reaches a local consensus (inventory collapses). + If unknown, the word is added to the agent's vocabulary. + """ + # print(f'DotBot {self.address}. ACTION: updateInventory', end=". ") + + # Ensure we have a word to process + if self.received_word is None: + return + + # Check if the received word is within the inventory + if self.received_word in self.inventory: + # SUCCESS: word is known. + # Remove all other words (collapse inventory to just this one) + self.inventory = {self.received_word} + # print(f' removed all other words, inventory now: {self.inventory}') + else: + # FAILURE: word is unknown. + # Insert it into the inventory + self.inventory.add(self.received_word) + # print(f' added word {self.received_word}, inventory now: {self.inventory}') + + # Mark as checked + self.received_word_checked = True + + + # Callback functions (uncontrollable events) + def _check__selectAndBroadcast(self, data: any) -> bool: + """ + Checks if a new word has been received. + Returns True (1) if a word is waiting to be processed, otherwise False (0). + """ + if self.new_word_received: + # Reset the flag + self.new_word_received = False + return True + + return False + + + def _check_timeout(self, data: any) -> bool: + """ + Checks if the broadcast timer has expired. + Returns True if the current counter exceeds the last broadcast time + plus the defined interval. + """ + if self.counter > (self.last_broadcast_ticks + self.max_broadcast_ticks): + return True + + return False + + + def color_code(self): + """ + Updates the LED color based on the inventory state. + - If the robot has not reached consensus (inventory size != 1), the LED is OFF. + - If consensus is reached, the word is mapped to a specific RGB color. + """ + # 1. Check if the inventory has reached consensus (size exactly 1) + if len(self.inventory) != 1: + self.led = (0, 0, 0) # Turn LED off + return + + # 2. Extract the single word known to the agent + word = list(self.inventory)[0] + + # ------ ORIGINAL ------ + # # 3. Calculate RGB components using the original base-4 logic + # # Mapping word index (0-127) to a color space (1-64) + # color = (word % 63) + 1 + + # r = color // 16 + # rem1 = color % 16 + # g = rem1 // 4 + # b = rem1 % 4 + + # # 4. Update the LED state + # # Note: Original Kilobot RGB values are 0-3. + # # convert to range 0-255. + # self.led = (r * 85, g * 85, b * 85) + # ------------------------ + + # ------ NEW SIMPLIFIED COLOR CODING ------ + # Map the word to an index (0-7) + color_idx = word % len(DISTINCT_COLORS) + + # Assign the high-contrast color + self.led = DISTINCT_COLORS[color_idx] + # ----------------------------------------- \ No newline at end of file diff --git a/dotbot/examples/minimum_naming_game/controller_with_motion.py b/dotbot/examples/minimum_naming_game/controller_with_motion.py new file mode 100644 index 0000000..ecb2199 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/controller_with_motion.py @@ -0,0 +1,232 @@ +import math +import random +from dotbot.models import ( + DotBotLH2Position, + DotBotModel, +) +from dotbot.examples.sct import SCT +from dotbot.examples.minimum_naming_game.walk_avoid import walk_avoid + +DISTINCT_COLORS = [ + (255, 0, 0), # Red + (0, 255, 0), # Lime + (0, 0, 255), # Blue + (255, 255, 0), # Yellow + (255, 0, 255), # Magenta + (0, 255, 255), # Cyan + (255, 165, 0), # Orange + (128, 0, 255), # Violet +] + + +class Controller: + def __init__(self, address: str, path: str, max_speed: float, arena_limits: tuple[float, float]): + self.address = address + self.max_speed = max_speed + self.arena_limits = arena_limits + + self.position = DotBotLH2Position(x=0.0, y=0.0, z=0.0) # initial position + self.direction = 0.0 # initial orientation + self.prev_position: DotBotLH2Position | None = None + + self.neighbors: list[DotBotModel] = [] # initial empty neighbor list + self.vector = [0.0, 0.0] # initial movement vector + + # SCT initialization + self.sct = SCT(path) + self.add_callbacks() + + self.led = (0, 0, 0) # initial LED color + + # --- Naming Game Variables --- + self.counter = 0 # FOR DEBUGGING + self.last_broadcast_ticks = 0 # Tracks timing + self.max_broadcast_ticks = 5 + + # Pre-defined words (e.g., num_words = 128) + self.num_words = 8 + self.words = list(range(self.num_words)) + + # Word reception state + self.received_word = None + # self.received_word_checked = True + self.new_word_received = False + + # Global variable for the word chosen for transmission + self.w_index = 0 + + # Inventory of known words + self.inventory = set() + + + def control_step(self): + + self.counter += 1 # Increment step counter + + # Run SCT control step + self.sct.run_step() + + self.color_code() # Update LED color based on inventory state + + self.vector = walk_avoid(self.position.x, self.position.y, self.direction, self.neighbors, self.max_speed, self.arena_limits) + # print(f'DotBot {self.address} Walk Vector: {self.vector}') + + + def update_pose(self, position: DotBotLH2Position) -> None: + if self.prev_position is not None: + dx = position.x - self.prev_position.x + dy = position.y - self.prev_position.y + if (dx * dx + dy * dy) > 1e-6: + heading_rad = math.atan2(dy, -dx) + self.direction = (math.degrees(heading_rad) + 360.0) % 360.0 + + self.prev_position = position + self.position = position + + + # Register callback functions to the generator player + def add_callbacks(self): + + # Automatic addition of callbacks + # 1. Get list of events and list specifying whether an event is controllable or not. + # 2. For each event, check controllable or not and add callback. + + events, controllability_list = self.sct.get_events() + + for event, index in events.items(): + is_controllable = controllability_list[index] + stripped_name = event.split('EV_', 1)[1] # Strip preceding string 'EV_' + + if is_controllable: # Add controllable event + func_name = '_callback_{0}'.format(stripped_name) + func = getattr(self, func_name) + self.sct.add_callback(event, func, None, None) + else: # Add uncontrollable event + func_name = '_check_{0}'.format(stripped_name) + func = getattr(self, func_name) + self.sct.add_callback(event, None, func, None) + + + # Callback functions (controllable events) + def _callback_startTimer(self, data: any): + """ + Saves the current tick count to mark the start of the broadcast interval. + """ + # print(f'DotBot {self.address}. ACTION: startTimer') + self.last_broadcast_ticks = self.counter + + + def _callback_selectAndBroadcast(self, data: any): + """ + Selects a random word from the inventory, or invents a new one + if the inventory is empty. Sets the flag for transmission. + """ + # print(f'DotBot {self.address}. ACTION: selectAndBroadcast', end=". ") + + # Select or Invent a word + if not self.inventory: + # Inventory is empty: invent a new word from the pool + self.w_index = random.randrange(self.num_words) + # Store the word (equivalent to inventory[0] = words[w_index].data[0]) + self.inventory.add(self.words[self.w_index]) + else: + # Inventory is not empty: pick a random word from current known words + self.w_index = random.choice(list(self.inventory)) + + # Set broadcast flag for transmission + self.broadcast_word = True + + # print(f'\tinventory: {self.inventory},\tselected word: {self.w_index}') + + + def _callback_updateInventory(self, data: any): + """ + Updates the inventory based on the last received word. + If the word is known, the agent reaches a local consensus (inventory collapses). + If unknown, the word is added to the agent's vocabulary. + """ + # print(f'DotBot {self.address}. ACTION: updateInventory', end=". ") + + # Ensure we have a word to process + if self.received_word is None: + return + + # Check if the received word is within the inventory + if self.received_word in self.inventory: + # SUCCESS: word is known. + # Remove all other words (collapse inventory to just this one) + self.inventory = {self.received_word} + # print(f' removed all other words, inventory now: {self.inventory}') + else: + # FAILURE: word is unknown. + # Insert it into the inventory + self.inventory.add(self.received_word) + # print(f' added word {self.received_word}, inventory now: {self.inventory}') + + # Mark as checked + self.received_word_checked = True + + + # Callback functions (uncontrollable events) + def _check__selectAndBroadcast(self, data: any) -> bool: + """ + Checks if a new word has been received. + Returns True (1) if a word is waiting to be processed, otherwise False (0). + """ + if self.new_word_received: + # Reset the flag + self.new_word_received = False + return True + + return False + + + def _check_timeout(self, data: any) -> bool: + """ + Checks if the broadcast timer has expired. + Returns True if the current counter exceeds the last broadcast time + plus the defined interval. + """ + if self.counter > (self.last_broadcast_ticks + self.max_broadcast_ticks): + return True + + return False + + + def color_code(self): + """ + Updates the LED color based on the inventory state. + - If the robot has not reached consensus (inventory size != 1), the LED is OFF. + - If consensus is reached, the word is mapped to a specific RGB color. + """ + # 1. Check if the inventory has reached consensus (size exactly 1) + if len(self.inventory) != 1: + self.led = (0, 0, 0) # Turn LED off + return + + # 2. Extract the single word known to the agent + word = list(self.inventory)[0] + + # ------ ORIGINAL ------ + # # 3. Calculate RGB components using the original base-4 logic + # # Mapping word index (0-127) to a color space (1-64) + # color = (word % 63) + 1 + + # r = color // 16 + # rem1 = color % 16 + # g = rem1 // 4 + # b = rem1 % 4 + + # # 4. Update the LED state + # # Note: Original Kilobot RGB values are 0-3. + # # convert to range 0-255. + # self.led = (r * 85, g * 85, b * 85) + # ------------------------ + + # ------ NEW SIMPLIFIED COLOR CODING ------ + # Map the word to an index (0-7) + color_idx = word % len(DISTINCT_COLORS) + + # Assign the high-contrast color + self.led = DISTINCT_COLORS[color_idx] + # ----------------------------------------- \ No newline at end of file diff --git a/dotbot/examples/minimum_naming_game/gen_init_pos.py b/dotbot/examples/minimum_naming_game/gen_init_pos.py new file mode 100644 index 0000000..617cf36 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/gen_init_pos.py @@ -0,0 +1,48 @@ +import random +import math +from pathlib import Path + +def format_with_underscores(value): + """Formats an integer with underscores every three digits.""" + return f"{value:_}" + +def generate_lattice_toml(width_count, height_count, sep_x, sep_y, start_x=120, start_y=120): + output_lines = [] + + for row in range(height_count): + for col in range(width_count): + bot_id = row * width_count + col + 1 + address = f"AAAAAAAA{bot_id:08X}" + + # Calculate positions + pos_x = start_x + (col * sep_x) + pos_y = start_y + (row * sep_y) + + # Randomize theta between 0 and 2*pi + random_theta = round(random.uniform(0, 2 * math.pi), 2) + + # Manually build the TOML entry string to preserve underscores + output_lines.append("[[dotbots]]") + output_lines.append(f'address = "{address}"') + output_lines.append(f'pos_x = {pos_x:_}') + output_lines.append(f'pos_y = {pos_y:_}') + output_lines.append(f"theta = {random_theta}") + output_lines.append("") # Empty line for readability + + return "\n".join(output_lines) + +# --- Configuration --- +WIDTH_NODES = 5 # Robots per row +HEIGHT_NODES = 5 # Number of rows +SEP_X = 240 # Separation between columns +SEP_Y = 240 # Separation between rows + +# Generate +toml_string = generate_lattice_toml(WIDTH_NODES, HEIGHT_NODES, SEP_X, SEP_Y) + +# Save to file +output_path = Path(__file__).resolve().parent / "init_state.toml" +with open(output_path, "w") as f: + f.write(toml_string) + +print(f"Generated TOML file at {output_path}") \ No newline at end of file diff --git a/dotbot/examples/minimum_naming_game/init_state.toml b/dotbot/examples/minimum_naming_game/init_state.toml new file mode 100644 index 0000000..c2913fb --- /dev/null +++ b/dotbot/examples/minimum_naming_game/init_state.toml @@ -0,0 +1,149 @@ +[[dotbots]] +address = "AAAAAAAA00000001" +pos_x = 120 +pos_y = 120 +theta = 4.38 + +[[dotbots]] +address = "AAAAAAAA00000002" +pos_x = 360 +pos_y = 120 +theta = 0.56 + +[[dotbots]] +address = "AAAAAAAA00000003" +pos_x = 600 +pos_y = 120 +theta = 2.12 + +[[dotbots]] +address = "AAAAAAAA00000004" +pos_x = 840 +pos_y = 120 +theta = 2.37 + +[[dotbots]] +address = "AAAAAAAA00000005" +pos_x = 1_080 +pos_y = 120 +theta = 0.69 + +[[dotbots]] +address = "AAAAAAAA00000006" +pos_x = 120 +pos_y = 360 +theta = 5.21 + +[[dotbots]] +address = "AAAAAAAA00000007" +pos_x = 360 +pos_y = 360 +theta = 5.09 + +[[dotbots]] +address = "AAAAAAAA00000008" +pos_x = 600 +pos_y = 360 +theta = 3.86 + +[[dotbots]] +address = "AAAAAAAA00000009" +pos_x = 840 +pos_y = 360 +theta = 2.66 + +[[dotbots]] +address = "AAAAAAAA0000000A" +pos_x = 1_080 +pos_y = 360 +theta = 3.44 + +[[dotbots]] +address = "AAAAAAAA0000000B" +pos_x = 120 +pos_y = 600 +theta = 2.64 + +[[dotbots]] +address = "AAAAAAAA0000000C" +pos_x = 360 +pos_y = 600 +theta = 2.15 + +[[dotbots]] +address = "AAAAAAAA0000000D" +pos_x = 600 +pos_y = 600 +theta = 5.72 + +[[dotbots]] +address = "AAAAAAAA0000000E" +pos_x = 840 +pos_y = 600 +theta = 0.24 + +[[dotbots]] +address = "AAAAAAAA0000000F" +pos_x = 1_080 +pos_y = 600 +theta = 0.07 + +[[dotbots]] +address = "AAAAAAAA00000010" +pos_x = 120 +pos_y = 840 +theta = 4.61 + +[[dotbots]] +address = "AAAAAAAA00000011" +pos_x = 360 +pos_y = 840 +theta = 2.62 + +[[dotbots]] +address = "AAAAAAAA00000012" +pos_x = 600 +pos_y = 840 +theta = 4.47 + +[[dotbots]] +address = "AAAAAAAA00000013" +pos_x = 840 +pos_y = 840 +theta = 4.52 + +[[dotbots]] +address = "AAAAAAAA00000014" +pos_x = 1_080 +pos_y = 840 +theta = 6.11 + +[[dotbots]] +address = "AAAAAAAA00000015" +pos_x = 120 +pos_y = 1_080 +theta = 2.42 + +[[dotbots]] +address = "AAAAAAAA00000016" +pos_x = 360 +pos_y = 1_080 +theta = 1.86 + +[[dotbots]] +address = "AAAAAAAA00000017" +pos_x = 600 +pos_y = 1_080 +theta = 1.56 + +[[dotbots]] +address = "AAAAAAAA00000018" +pos_x = 840 +pos_y = 1_080 +theta = 0.86 + +[[dotbots]] +address = "AAAAAAAA00000019" +pos_x = 1_080 +pos_y = 1_080 +theta = 3.28 diff --git a/dotbot/examples/minimum_naming_game/init_state_with_motion.toml b/dotbot/examples/minimum_naming_game/init_state_with_motion.toml new file mode 100644 index 0000000..c211177 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/init_state_with_motion.toml @@ -0,0 +1,53 @@ +[[dotbots]] +address = "AAAAAAAA00000001" +pos_x = 240 +pos_y = 240 +theta = 3.5 + +[[dotbots]] +address = "AAAAAAAA00000002" +pos_x = 720 +pos_y = 240 +theta = 6.0 + +[[dotbots]] +address = "AAAAAAAA00000003" +pos_x = 1_200 +pos_y = 240 +theta = 1.64 + +[[dotbots]] +address = "AAAAAAAA00000004" +pos_x = 240 +pos_y = 720 +theta = 1.23 + +[[dotbots]] +address = "AAAAAAAA00000005" +pos_x = 720 +pos_y = 720 +theta = 4.16 + +[[dotbots]] +address = "AAAAAAAA00000006" +pos_x = 1_200 +pos_y = 720 +theta = 3.81 + +[[dotbots]] +address = "AAAAAAAA00000007" +pos_x = 240 +pos_y = 1_200 +theta = 2.8 + +[[dotbots]] +address = "AAAAAAAA00000008" +pos_x = 720 +pos_y = 1_200 +theta = 1.54 + +[[dotbots]] +address = "AAAAAAAA00000009" +pos_x = 1_200 +pos_y = 1_200 +theta = 6.03 diff --git a/dotbot/examples/minimum_naming_game/minimum_naming_game.py b/dotbot/examples/minimum_naming_game/minimum_naming_game.py new file mode 100644 index 0000000..553d5e2 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/minimum_naming_game.py @@ -0,0 +1,151 @@ +import asyncio +import os +from typing import List + +from dotbot.models import ( + DotBotLH2Position, + DotBotModel, + DotBotQueryModel, + DotBotRgbLedCommandModel, + DotBotStatus, + DotBotWaypoints, + WSRgbLed, +) +from dotbot.protocol import ApplicationType +from dotbot.rest import RestClient, rest_client +from dotbot.websocket import DotBotWsClient + +from dotbot.examples.minimum_naming_game.controller import Controller + +import numpy as np +import random +from scipy.spatial import cKDTree + +COMM_RANGE=250 +THRESHOLD=50 + +# TODO: Measure these values for real dotbots +BOT_RADIUS = 40 # Physical radius of a DotBot (unit), used for collision avoidance + +dotbot_controllers = dict() + + +async def fetch_active_dotbots(client: RestClient) -> List[DotBotModel]: + return await client.fetch_dotbots( + query=DotBotQueryModel(status=DotBotStatus.ACTIVE) + ) + + +async def main() -> None: + url = os.getenv("DOTBOT_CONTROLLER_URL", "localhost") + port = os.getenv("DOTBOT_CONTROLLER_PORT", "8000") + use_https = os.getenv("DOTBOT_CONTROLLER_USE_HTTPS", False) + sct_path = os.getenv("DOTBOT_SCT_PATH", "dotbot/examples/minimum_naming_game/models/supervisor.yaml") + + async with rest_client(url, port, use_https) as client: + dotbots = await fetch_active_dotbots(client) + + # print(len(dotbots), "dotbots connected.") + + # Initialization + for dotbot in dotbots: + + # Init controller + controller = Controller(dotbot.address, sct_path) + dotbot_controllers[dotbot.address] = controller + # print(f'type of controller: {type(controller)} for DotBot {dotbot.address}') + + # 1. Extract positions into a list of [x, y] coordinates + coords = [[dotbot.lh2_position.x, dotbot.lh2_position.y] for dotbot in dotbots] + + # 2. Convert the list to a NumPy array + positions = np.array(coords) + + # 3. Build the KD-Tree + # This tree can now be used for fast spatial queries (like finding neighbors) + tree = cKDTree(positions) + + ws = DotBotWsClient(url, port) + await ws.connect() + try: + + counter = 0 + + while True: + print("Step", counter) + + for dotbot in dotbots: + + controller = dotbot_controllers[dotbot.address] + controller.position = dotbot.lh2_position + controller.direction = dotbot.direction + + # print(f'Controller position: {controller.position}, direction: {controller.direction}') + + # 1. Query the tree for indices of neighbors + neighbor_indices = tree.query_ball_point([dotbot.lh2_position.x, dotbot.lh2_position.y], r=COMM_RANGE) + + # 2. Convert indices back into actual DotBot objects + neighbors = [ + dotbots[idx] for idx in neighbor_indices + if dotbots[idx].address != dotbot.address + ] + + # print(f'neighbour of {dotbot.address}: {[n.address for n in neighbors]}') + + # 3. If there are neighbors broadcasting, pick ONE randomly to listen to + if neighbors: + selected_neighbor = dotbot_controllers[random.choice(neighbors).address] + + # Share the word: take the neighbor's chosen word index + if selected_neighbor.w_index != 0: + controller.received_word = selected_neighbor.w_index + + # Set the flags so the robot knows it has a new message to process + controller.new_word_received = True + controller.received_word_checked = False + + # Update controller's neighbor list + controller.neighbors = neighbors + + # Run controller + controller.control_step() # run SCT step + + # Force update + waypoints = DotBotWaypoints( + threshold=THRESHOLD, + waypoints=[ + DotBotLH2Position( + x=dotbots[0].lh2_position.x, y=dotbots[0].lh2_position.y, z=0 + ) + ], + ) + await client.send_waypoint_command( + address=dotbots[0].address, + application=ApplicationType.DotBot, + command=waypoints, + ) + + await ws.send( + WSRgbLed( + cmd="rgb_led", + address=dotbot.address, + application=ApplicationType.DotBot, + data=DotBotRgbLedCommandModel( + red=controller.led[0], + green=controller.led[1], + blue=controller.led[2], + ), + ) + ) + + # await asyncio.sleep(0.1) + counter += 1 + finally: + await ws.close() + + return None + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/dotbot/examples/minimum_naming_game/minimum_naming_game_with_motion.py b/dotbot/examples/minimum_naming_game/minimum_naming_game_with_motion.py new file mode 100644 index 0000000..51be125 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/minimum_naming_game_with_motion.py @@ -0,0 +1,187 @@ +import asyncio +import math +import os +from time import time +from typing import Dict, List + +from dotbot.examples.vec2 import Vec2 +from dotbot.models import ( + DotBotLH2Position, + DotBotModel, + DotBotMoveRawCommandModel, + DotBotQueryModel, + DotBotRgbLedCommandModel, + DotBotStatus, + DotBotWaypoints, + WSRgbLed, + WSMoveRaw, + WSWaypoints, +) +from dotbot.protocol import ApplicationType +from dotbot.rest import RestClient, rest_client +from dotbot.websocket import DotBotWsClient + +from dotbot.examples.minimum_naming_game.controller_with_motion import Controller + +import numpy as np +import random +from scipy.spatial import cKDTree + +COMM_RANGE=250 +THRESHOLD=0 + +# TODO: Measure these values for real dotbots +BOT_RADIUS = 40 # Physical radius of a DotBot (unit), used for collision avoidance +MAX_SPEED = 300 # Maximum allowed linear speed of a bot (mm/s) + +ARENA_SIZE_X = 2000 # Width of the arena in mm +ARENA_SIZE_Y = 2000 # Height of the arena in mm + +dotbot_controllers = dict() + + +async def fetch_active_dotbots(client: RestClient) -> List[DotBotModel]: + return await client.fetch_dotbots( + query=DotBotQueryModel(status=DotBotStatus.ACTIVE) + ) + + +async def main() -> None: + url = os.getenv("DOTBOT_CONTROLLER_URL", "localhost") + port = os.getenv("DOTBOT_CONTROLLER_PORT", "8000") + use_https = os.getenv("DOTBOT_CONTROLLER_USE_HTTPS", False) + sct_path = os.getenv("DOTBOT_SCT_PATH", "dotbot/examples/minimum_naming_game/models/supervisor.yaml") + + async with rest_client(url, port, use_https) as client: + dotbots = await fetch_active_dotbots(client) + + # print(len(dotbots), "dotbots connected.") + + # Initialization + for dotbot in dotbots: + + # Init controller + controller = Controller(dotbot.address, sct_path, 0.9 * MAX_SPEED, arena_limits=(ARENA_SIZE_X, ARENA_SIZE_Y)) + dotbot_controllers[dotbot.address] = controller + # print(f'type of controller: {type(controller)} for DotBot {dotbot.address}') + + # 1. Extract positions into a list of [x, y] coordinates + coords = [[dotbot.lh2_position.x, dotbot.lh2_position.y] for dotbot in dotbots] + + # 2. Convert the list to a NumPy array + positions = np.array(coords) + + # 3. Build the KD-Tree + # This tree can now be used for fast spatial queries (like finding neighbors) + tree = cKDTree(positions) + + ws = DotBotWsClient(url, port) + await ws.connect() + try: + + counter = 0 + + while True: + print("Step", counter) + + dotbots = await fetch_active_dotbots(client) + + # 1. Extract positions into a list of [x, y] coordinates + # This loop iterates through your dotbot list and grabs the lh2_position attributes + coords = [[dotbot.lh2_position.x, dotbot.lh2_position.y] for dotbot in dotbots] + + # 2. Convert the list to a NumPy array + # The structure will be (N, 2), where N is the number of dotbots + positions = np.array(coords) + + # 3. Build the KD-Tree + # This tree can now be used for fast spatial queries (like finding neighbors) + tree = cKDTree(positions) + + for dotbot in dotbots: + + controller = dotbot_controllers[dotbot.address] + controller.update_pose(dotbot.lh2_position) + + # print(f'Controller position: {controller.position}, direction: {controller.direction}') + + # 1. Query the tree for indices of neighbors + neighbor_indices = tree.query_ball_point([dotbot.lh2_position.x, dotbot.lh2_position.y], r=COMM_RANGE) + + # 2. Convert indices back into actual DotBot objects + neighbors = [ + dotbots[idx] for idx in neighbor_indices + if dotbots[idx].address != dotbot.address + ] + + # print(f'neighbour of {dotbot.address}: {[n.address for n in neighbors]}') + + # 3. If there are neighbors broadcasting, pick ONE randomly to listen to + if neighbors: + selected_neighbor = dotbot_controllers[random.choice(neighbors).address] + + # Share the word: take the neighbor's chosen word index + if selected_neighbor.w_index != 0: + controller.received_word = selected_neighbor.w_index + + # Set the flags so the robot knows it has a new message to process + controller.new_word_received = True + controller.received_word_checked = False + + # Update controller's neighbor list + controller.neighbors = neighbors + + # Run controller + controller.control_step() # run SCT step + + ### TEMPORARY: The simulator does not accept negative coordinates, + # so we set it to zero and scale the positive value proportionally. + point = DotBotLH2Position(x=controller.vector[0], y=controller.vector[1], z=0.0) + if dotbot.lh2_position.x + controller.vector[0] < 0: + point.y = controller.vector[1] * (controller.vector[0] / MAX_SPEED) + point.x = 0.0 + if dotbot.lh2_position.y + controller.vector[1] < 0: + point.x = controller.vector[0] * (controller.vector[1] / MAX_SPEED) + point.y = 0.0 + ### + + waypoints = DotBotWaypoints( + threshold=THRESHOLD, + waypoints=[ + DotBotLH2Position( + x=dotbot.lh2_position.x + round(point.x, 2), + y=dotbot.lh2_position.y + round(point.y, 2), + z=0 + ) + ], + ) + + await client.send_waypoint_command( + address=dotbot.address, + application=ApplicationType.DotBot, + command=waypoints, + ) + + await ws.send( + WSRgbLed( + cmd="rgb_led", + address=dotbot.address, + application=ApplicationType.DotBot, + data=DotBotRgbLedCommandModel( + red=controller.led[0], + green=controller.led[1], + blue=controller.led[2], + ), + ) + ) + + # await asyncio.sleep(0.1) + counter += 1 + finally: + await ws.close() + + return None + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/dotbot/examples/minimum_naming_game/models/E1.xml b/dotbot/examples/minimum_naming_game/models/E1.xml new file mode 100644 index 0000000..4ca4b55 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/models/E1.xml @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/dotbot/examples/minimum_naming_game/models/E2.xml b/dotbot/examples/minimum_naming_game/models/E2.xml new file mode 100644 index 0000000..dc3f468 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/models/E2.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/dotbot/examples/minimum_naming_game/models/G1.xml b/dotbot/examples/minimum_naming_game/models/G1.xml new file mode 100644 index 0000000..8f2d4f5 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/models/G1.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/dotbot/examples/minimum_naming_game/models/G2.xml b/dotbot/examples/minimum_naming_game/models/G2.xml new file mode 100644 index 0000000..ec83205 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/models/G2.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/dotbot/examples/minimum_naming_game/models/G3.xml b/dotbot/examples/minimum_naming_game/models/G3.xml new file mode 100644 index 0000000..a88ab9b --- /dev/null +++ b/dotbot/examples/minimum_naming_game/models/G3.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/dotbot/examples/minimum_naming_game/models/G4.xml b/dotbot/examples/minimum_naming_game/models/G4.xml new file mode 100644 index 0000000..4663d81 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/models/G4.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/dotbot/examples/minimum_naming_game/models/Sloc1.xml b/dotbot/examples/minimum_naming_game/models/Sloc1.xml new file mode 100644 index 0000000..5f136a9 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/models/Sloc1.xml @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/dotbot/examples/minimum_naming_game/models/Sloc2.xml b/dotbot/examples/minimum_naming_game/models/Sloc2.xml new file mode 100644 index 0000000..9b0c480 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/models/Sloc2.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + diff --git a/dotbot/examples/minimum_naming_game/models/script.txt b/dotbot/examples/minimum_naming_game/models/script.txt new file mode 100644 index 0000000..f36a4db --- /dev/null +++ b/dotbot/examples/minimum_naming_game/models/script.txt @@ -0,0 +1,8 @@ +Gloc1 = Sync(G1,G3) +Gloc2 = Sync(G2,G4) + +Kloc1 = Sync(Gloc1,E1) +Kloc2 = Sync(Gloc2,E2) + +Sloc1 = SupC(Gloc1,Kloc1) +Sloc2 = SupC(Gloc2,Kloc2) \ No newline at end of file diff --git a/dotbot/examples/minimum_naming_game/models/supervisor.yaml b/dotbot/examples/minimum_naming_game/models/supervisor.yaml new file mode 100644 index 0000000..3abeea8 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/models/supervisor.yaml @@ -0,0 +1,11 @@ + +num_events: 5 +num_supervisors: 2 +events: [ EV_startTimer, EV_selectAndBroadcast, EV_updateInventory, EV__selectAndBroadcast, EV_timeout ] +ev_controllable: [ 1, 1, 1, 0, 0 ] +ev_public: [ 0, 1, 0, 1, 0 ] +sup_events: [ [0, 0, 1, 1, 0], [1, 1, 0, 0, 1] ] +sup_init_state: [ 1, 0 ] +sup_current_state: [ 1, 0 ] +sup_data_pos: [ 0, 11 ] +sup_data: [ 2, EV_updateInventory, 0, 1, EV__selectAndBroadcast, 0, 0, 1, EV__selectAndBroadcast, 0, 0, 1, EV_startTimer, 0, 1, 1, EV_timeout, 0, 2, 1, EV_selectAndBroadcast, 0, 0 ] \ No newline at end of file diff --git a/dotbot/examples/minimum_naming_game/walk_avoid.py b/dotbot/examples/minimum_naming_game/walk_avoid.py new file mode 100644 index 0000000..a5490c1 --- /dev/null +++ b/dotbot/examples/minimum_naming_game/walk_avoid.py @@ -0,0 +1,76 @@ +import math + +from dotbot.models import DotBotModel + + +def walk_avoid(position_x: float, + position_y: float, + direction: float, + neighbors: list[DotBotModel], + max_speed: float, + arena_limits: tuple[float, float]) -> list[float]: + """ + Walk straight while avoiding collisions and arena boundary. + Arena limits: x, y in [0.0, 1.0] + """ + UNIT_SPEED = max_speed + MARGIN = 0.1 # Trigger turn when within 10% of any edge + + # 1. Identify if any neighbor is too close + neighbor_collision = False + if neighbors: + neighbor_collision = True + + # 2. Identify if any arena boundary is violated + curr_x = position_x + curr_y = position_y + + wall_collision = (curr_x < MARGIN*arena_limits[0] or curr_x > (arena_limits[0] - MARGIN*arena_limits[0]) or + curr_y < MARGIN*arena_limits[1] or curr_y > (arena_limits[1] - MARGIN*arena_limits[1])) + + # 3. Determine "Local" movement + local_v = [0.0, 0.0] + + if neighbor_collision or wall_collision: + + if wall_collision: + # Decide direction of repulsion (Left or Right) + if (curr_x < MARGIN*arena_limits[0]): + local_v[0] += UNIT_SPEED + if (curr_x > (arena_limits[0] - MARGIN*arena_limits[0])): + local_v[0] += -UNIT_SPEED + if (curr_y < MARGIN*arena_limits[1]): + local_v[1] += UNIT_SPEED + if (curr_y > (arena_limits[1] - MARGIN*arena_limits[1])): + local_v[1] += -UNIT_SPEED + + if neighbor_collision: + avg_dx = sum(n.lh2_position.x - curr_x for n in neighbors) + avg_dy = sum(n.lh2_position.y - curr_y for n in neighbors) + + mag = math.sqrt(avg_dx**2 + avg_dy**2) + if mag > 0: + # Add "Away" vector to the existing global movement + local_v[0] -= (avg_dx / mag) * UNIT_SPEED + local_v[1] -= (avg_dy / mag) * UNIT_SPEED + # print(f"Neighbor avoidance vector: {local_v} from neighbors {[n.address for n in neighbors]}") + + # Normalize so we don't go double speed in corners + total_mag = math.sqrt(local_v[0]**2 + local_v[1]**2) + if total_mag > 0: + return ( + (local_v[0] / total_mag) * UNIT_SPEED, + (local_v[1] / total_mag) * UNIT_SPEED, + ) + return (0.0, 0.0) + + else: + local_v = [UNIT_SPEED, 0.0] # Normal forward motion + + # 4. Rotate Local Vector to Global Vector + theta_rad = math.radians(direction) + + global_vx = (local_v[0] * -math.cos(theta_rad)) - (local_v[1] * math.sin(theta_rad)) + global_vy = (local_v[0] * math.sin(theta_rad)) + (local_v[1] * -math.cos(theta_rad)) + + return (global_vx, global_vy) diff --git a/dotbot/examples/sct.py b/dotbot/examples/sct.py new file mode 100644 index 0000000..7f1432f --- /dev/null +++ b/dotbot/examples/sct.py @@ -0,0 +1,314 @@ +import random +import yaml + +class SCT: + + def __init__(self, filename): + + self.read_supervisor(filename) + + self.callback = {} + self.input_buffer = None # Clear content after timestep + self.last_events = [0] * len(self.EV) + + + def read_supervisor(self, filename): + try: + with open(filename, 'r') as stream: + self.f = yaml.safe_load(stream) + except yaml.YAMLError as e: + print(e) + + self.num_events = self.f['num_events'] + self.num_supervisors = self.f['num_supervisors'] + self.EV = {} + for i, ev in enumerate(self.f['events']): + self.EV[ev] = i + + self.ev_controllable = self.f['ev_controllable'] + self.sup_events = self.f['sup_events'] + self.sup_init_state = self.f['sup_init_state'] + self.sup_current_state = self.f['sup_current_state'] + self.sup_data_pos = self.f['sup_data_pos'] + self.sup_data = self.f['sup_data'] + + + def add_callback(self, event, clbk, ci, sup_data): + func = {} + func['callback'] = clbk + func['check_input'] = ci + func['sup_data'] = sup_data + self.callback[event] = func + + + def run_step(self): + self.input_buffer = [] # clear buffer + self.update_input() + + # Get all uncontrollable events + uce = self.input_buffer + + # Apply all the uncontrollable events + while uce: + event = uce.pop(0) + self.make_transition(event) + self.exec_callback(event) + + ce_exists, ce = self.get_next_controllable() + + # Apply the chosen controllable event + if ce_exists: + self.make_transition(ce) + self.exec_callback(ce) + + + def input_read(self, ev): + event_name = self.get_event_name(ev) + if ev < self.num_events and self.callback[event_name]: + return self.callback[event_name]['check_input'](self.callback[event_name]['sup_data']) + return False + + + def update_input(self): + for i in range(0,self.num_events): + if not self.ev_controllable[i]: # Check the UCEs only + if self.input_read(i): + self.input_buffer.append(i) + self.last_events[i] = 1 + + + def get_state_position(self, supervisor, state): + position = self.sup_data_pos[supervisor] # Jump to the start position of the supervisor + for s in range(0, state): # Keep iterating until the state is reached + en = self.sup_data[position] # The number of transitions in the state + position += en * 3 + 1 # Next state position (Number transitions * 3 + 1) + return position + + + def make_transition(self, ev): + num_transitions = None + + # Apply transition to each local supervisor + for i in range(0, self.num_supervisors): + if self.sup_events[i][ev]: # Check if the given event is part of this supervisor + + # Current state info of supervisor + position = self.get_state_position(i, self.sup_current_state[i]) + num_transitions = self.sup_data[position] + position += 1 # Point to first transition + + # Find the transition for the given event + while num_transitions: + num_transitions -= 1 + value = self.get_value(self.sup_data[position]) + if value == ev: + self.sup_current_state[i] = (self.sup_data[position + 1] * 256) + (self.sup_data[position + 2]) + break + position += 3 + + + def exec_callback(self, ev): + event_name = self.get_event_name(ev) + if ev < self.num_events and self.callback[event_name]['callback']: + self.callback[event_name]['callback'](self.callback[event_name]['sup_data']) + + + def get_next_controllable(self): + + # Get controllable events that are enabled -> events + actives = self.get_active_controllable_events() + + if not all(v == 0 for v in actives): + randomPos = random.randint(0,1000000000) % actives.count(1) + for i in range(0, self.num_events): + if not randomPos and actives[i]: + return True, i + elif actives[i]: + randomPos -= 1 + + return False, None + + + def get_active_controllable_events(self): + + events = [] + + # Disable all non controllable events + for i in range(0, self.num_events): + if not self.ev_controllable[i]: + events.append(0) + else: + events.append(1) + + # Check disabled events for all supervisors + for i in range(0, self.num_supervisors): + + # Init an array where all events are disabled + ev_disable = [1] * self.num_events + + # Enable all events that are not part of this supervisor + for j in range(0, self.num_events): + if not self.sup_events[i][j]: + ev_disable[j] = 0 + + # Get current state + position = self.get_state_position(i, self.sup_current_state[i]) + num_transitions = self.sup_data[position] + position += 1 + + # Enable all events that have a transition from the current state + while num_transitions: + num_transitions -= 1 + value = self.get_value(self.sup_data[position]) + ev_disable[value] = 0 + position += 3 + + # Remove the controllable events to disable, leaving an array of enabled controllable events + for j in range(0, self.num_events): + if ev_disable[j] == 1 and events[j]: + events[j] = 0 + + return events + + + def get_value(self, index): + if isinstance(index, str): + return self.EV[index] + return index + + + def get_event_name(self, index): + if isinstance(index, int): + return list(self.EV.keys())[list(self.EV.values()).index(index)] + return index + + + # Get function that returns event information (event names and controllability) + def get_events(self): + return self.EV, self.ev_controllable + + +class SCTPub(SCT): + + def __init__(self, filename): + super().__init__(filename) + self.ev_public = self.f['ev_public'] + + + def run_step(self): + self.input_buffer = [] # clear buffer + self.input_buffer_pub = [] + self.update_input() + + # Apply all public uncontrollable events + public_uce = self.input_buffer_pub + while public_uce: + event = public_uce.pop(0) + self.make_transition(event) + self.exec_callback(event) + + # Apply all private uncontrollable events + uce = self.input_buffer + while uce: + event = uce.pop(0) + self.make_transition(event) + self.exec_callback(event) + + ce_exists, ce = self.get_next_controllable() + + # Apply the chosen controllable event + if ce_exists: + self.make_transition(ce) + self.exec_callback(ce) + + + def update_input(self): + for i in range(0,self.num_events): + if not self.ev_controllable[i]: # Check the UCEs only + if self.input_read(i): + if self.ev_public[i]: + self.input_buffer_pub.append(i) + else: + self.input_buffer.append(i) + self.last_events[i] = 1 + +class SCTProb(SCT): + + def __init__(self, filename): + super().__init__(filename) + self.sup_data_prob_pos = self.f['sup_data_prob_pos'] + self.sup_data_prob = self.f['sup_data_prob'] + + + def get_state_position_prob(self, supervisor, state): + prob_position = self.sup_data_prob_pos[supervisor] # Jump to the start position of the supervisor + for s in range(0, state): # Keep iterating until the state is reached + en = self.sup_data_prob[prob_position] # The number of transitions in the state + prob_position += en + 1 # Next state position (Number transitions * 3 + 1) + return prob_position + + + def get_active_controllable_events_prob(self): + + events = [] + + # Disable all non controllable events + for i in range(0, self.num_events): + if not self.ev_controllable[i]: + events.append(0) + else: + events.append(1) + + # Check disabled events for all supervisors + for i in range(0, self.num_supervisors): + + # Init an array where all events are disabled + ev_disable = [1] * self.num_events + + # Enable all events that are not part of this supervisor + for j in range(0, self.num_events): + if not self.sup_events[i][j]: + ev_disable[j] = 0 + + # Get current state + position = self.get_state_position(i, self.sup_current_state[i]) + position_prob = self.get_state_position_prob(i, self.sup_current_state[i]) + num_transitions = self.sup_data[position] + position += 1 + position_prob += 1 + + # Enable all events that have a transition from the current state + while num_transitions: + num_transitions -= 1 + value = self.get_value(self.sup_data[position]) + + if self.ev_controllable[value] and self.sup_events[i][value]: + ev_disable[value] = 0 # Transition with this event, do not disable it, just calculate its probability contribution + events[value] = events[value] * self.sup_data_prob[position_prob] + position_prob += 1 + + position += 3 + + for j in range(self.num_events): + if ev_disable[j] == 1: + events[j] = 0 + + return events + + + def get_next_controllable(self): + + # Get controllable events that are enabled -> events + events = self.get_active_controllable_events_prob() + prob_sum = sum(events) + + if prob_sum > 0.0001: # If at least one event is enabled do + random_value = random.uniform(0, prob_sum) + random_sum = 0.0 + for i in range(self.num_events): + random_sum += events[i] + if (random_value < random_sum) and self.ev_controllable[i]: + return True, i + + return False, None + \ No newline at end of file diff --git a/dotbot/examples/work_and_charge/README.md b/dotbot/examples/work_and_charge/README.md new file mode 100644 index 0000000..292611c --- /dev/null +++ b/dotbot/examples/work_and_charge/README.md @@ -0,0 +1,35 @@ +# Work and Charge + +This demo shows a work-and-charge scenario in the DotBot simulator, where agents alternate moving between two regions to perform some work and return to charge. + +## Install Python packages (pip) + +Install the Python packages required to run this demo. + +```bash +pip install pyyaml +``` + +## How to run + +### Specify the initial state + +Specify the initial state of the DotBots by replacing the file path for ```simulator_init_state_path``` in [config_sample.toml](https://github.com/DotBots/PyDotBot/blob/main/config_sample.toml). + +```toml +simulator_init_state_path = "dotbot/examples/work_and_charge/init_state.toml" +``` + +### Start the controller in simulator mode + +```bash +python -m dotbot.controller_app --config-path config_sample.toml -p dotbot-simulator -a dotbot-simulator --log-level error +``` + +### Run the work-and-charge scenario + +Open a new terminal and run the work-and-charge scenario. + +```bash +python -m dotbot.examples.work_and_charge.work_and_charge +``` diff --git a/dotbot/examples/work_and_charge/controller.py b/dotbot/examples/work_and_charge/controller.py new file mode 100644 index 0000000..8be9819 --- /dev/null +++ b/dotbot/examples/work_and_charge/controller.py @@ -0,0 +1,136 @@ +import math +from dotbot.models import ( + DotBotLH2Position, +) +from dotbot.examples.sct import SCT + +class Controller: + def __init__(self, address: str, path: str): + self.address = address + + # SCT initialization + self.sct = SCT(path) + self.add_callbacks() + + self.waypoint_current = None + self.waypoint_threshold = 50 # default threshold + + self.led = (0, 0, 0) # initial LED color + self.energy = 'high' # initial energy level + + + def set_work_waypoint(self, waypoint: DotBotLH2Position): + self.waypoint_work = waypoint + + + def set_charge_waypoint(self, waypoint: DotBotLH2Position): + self.waypoint_charge = waypoint + + + def set_current_position(self, position: DotBotLH2Position): + self.position_current = position + + + def control_step(self): + + # Calculate distance to work waypoint + dx = self.waypoint_work.x - self.position_current.x + dy = self.waypoint_work.y - self.position_current.y + self.dist_work = math.sqrt(dx * dx + dy * dy) + + # Calculate distance to charge waypoint + dx = self.waypoint_charge.x - self.position_current.x + dy = self.waypoint_charge.y - self.position_current.y + self.dist_charge = math.sqrt(dx * dx + dy * dy) + + # Run SCT control step + self.sct.run_step() + + + # Register callback functions to the generator player + def add_callbacks(self): + + # Automatic addition of callbacks + # 1. Get list of events and list specifying whether an event is controllable or not. + # 2. For each event, check controllable or not and add callback. + + events, controllability_list = self.sct.get_events() + + for event, index in events.items(): + is_controllable = controllability_list[index] + stripped_name = event.split('EV_', 1)[1] # Strip preceding string 'EV_' + + if is_controllable: # Add controllable event + func_name = '_callback_{0}'.format(stripped_name) + func = getattr(self, func_name) + self.sct.add_callback(event, func, None, None) + else: # Add uncontrollable event + func_name = '_check_{0}'.format(stripped_name) + func = getattr(self, func_name) + self.sct.add_callback(event, None, func, None) + + + # Callback functions (controllable events) + def _callback_moveToWork(self, data: any): + # print(f'DotBot {self.address}. ACTION: moveToWork') + self.waypoint_current = self.waypoint_work + self.led = (0, 255, 0) # Green LED when moving to work + + + def _callback_moveToCharge(self, data: any): + # print(f'DotBot {self.address}. ACTION: moveToCharge') + self.waypoint_current = self.waypoint_charge + self.led = (255, 0, 0) # Red LED when moving to charge + + + def _callback_work(self, data: any): + # print(f'DotBot {self.address}. ACTION: work') + self.energy = 'low' # After working, energy level goes low + + + def _callback_charge(self, data: any): + # print(f'DotBot {self.address}. ACTION: charge') + self.energy = 'high' # After charging, energy level goes high + + + # Callback functions (uncontrollable events) + def _check_atWork(self, data: any): + if self.dist_work < self.waypoint_threshold: + # print(f'DotBot {self.address}. EVENT: atWork') + return True + return False + + + def _check_notAtWork(self, data: any): + if self.dist_work >= self.waypoint_threshold: + # print(f'DotBot {self.address}. EVENT: notAtWork') + return True + return False + + + def _check_atCharger(self, data: any): + if self.dist_charge < self.waypoint_threshold: + # print(f'DotBot {self.address}. EVENT: atCharger') + return True + return False + + + def _check_notAtCharger(self, data: any): + if self.dist_charge >= self.waypoint_threshold: + # print(f'DotBot {self.address}. EVENT: notAtCharger') + return True + return False + + + def _check_lowEnergy(self, data: any): + if self.energy == 'low': + # print(f'DotBot {self.address}. EVENT: lowEnergy') + return True + return False + + + def _check_highEnergy(self, data: any): + if self.energy == 'high': + # print(f'DotBot {self.address}. EVENT: highEnergy') + return True + return False \ No newline at end of file diff --git a/dotbot/examples/work_and_charge/gen_init_pose.py b/dotbot/examples/work_and_charge/gen_init_pose.py new file mode 100644 index 0000000..2dbe876 --- /dev/null +++ b/dotbot/examples/work_and_charge/gen_init_pose.py @@ -0,0 +1,48 @@ +from pathlib import Path + +def generate_dotbot_script(): + # Configuration Constants + NUM_ROBOTS = 8 # Total robots to generate + START_ID = 1 # Start at AAAAAAAA00000001 + X_RIGHT = 800 + X_LEFT = 100 + START_Y = 200 + Y_STEP = 200 + THETA = 3.14 + FILENAME = "dotbots_config.toml" + + lines = [] + + for i in range(NUM_ROBOTS): + # 1. Address: Increments by 1 every robot + address_hex = f"AAAAAAAA{START_ID + i:08X}" + + # 2. X Position: Alternates 800, 100, 800, 100... + pos_x_val = X_RIGHT if i % 2 == 0 else X_LEFT + + # 3. Y Position: Increases by 200 for EVERY robot + pos_y_val = START_Y + (i * Y_STEP) + + # 4. Format numbers with underscores (e.g., 800_000) + pos_x = f"{pos_x_val:,}".replace(",", "_") + pos_y = f"{pos_y_val:,}".replace(",", "_") + + # Build the TOML block + block = ( + f"[[dotbots]]\n" + f"address = \"{address_hex}\"\n" + f"pos_x = {pos_x}\n" + f"pos_y = {pos_y}\n" + f"theta = {THETA}\n" + ) + lines.append(block) + + # Save to file + output_path = Path(__file__).resolve().parent / "init_state.toml" + with open(output_path, "w") as f: + f.write("\n".join(lines)) + + print(f"Generated TOML file at {output_path}") + +if __name__ == "__main__": + generate_dotbot_script() \ No newline at end of file diff --git a/dotbot/examples/work_and_charge/init_state.toml b/dotbot/examples/work_and_charge/init_state.toml new file mode 100644 index 0000000..75622c9 --- /dev/null +++ b/dotbot/examples/work_and_charge/init_state.toml @@ -0,0 +1,47 @@ +[[dotbots]] +address = "AAAAAAAA00000001" +pos_x = 800 +pos_y = 200 +theta = 3.14 + +[[dotbots]] +address = "AAAAAAAA00000002" +pos_x = 100 +pos_y = 400 +theta = 3.14 + +[[dotbots]] +address = "AAAAAAAA00000003" +pos_x = 800 +pos_y = 600 +theta = 3.14 + +[[dotbots]] +address = "AAAAAAAA00000004" +pos_x = 100 +pos_y = 800 +theta = 3.14 + +[[dotbots]] +address = "AAAAAAAA00000005" +pos_x = 800 +pos_y = 1_000 +theta = 3.14 + +[[dotbots]] +address = "AAAAAAAA00000006" +pos_x = 100 +pos_y = 1_200 +theta = 3.14 + +[[dotbots]] +address = "AAAAAAAA00000007" +pos_x = 800 +pos_y = 1_400 +theta = 3.14 + +[[dotbots]] +address = "AAAAAAAA00000008" +pos_x = 100 +pos_y = 1_600 +theta = 3.14 diff --git a/dotbot/examples/work_and_charge/models/E1.xml b/dotbot/examples/work_and_charge/models/E1.xml new file mode 100644 index 0000000..6e53df5 --- /dev/null +++ b/dotbot/examples/work_and_charge/models/E1.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/E2.xml b/dotbot/examples/work_and_charge/models/E2.xml new file mode 100644 index 0000000..493542d --- /dev/null +++ b/dotbot/examples/work_and_charge/models/E2.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/E3.xml b/dotbot/examples/work_and_charge/models/E3.xml new file mode 100644 index 0000000..c6edfc0 --- /dev/null +++ b/dotbot/examples/work_and_charge/models/E3.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/E4.xml b/dotbot/examples/work_and_charge/models/E4.xml new file mode 100644 index 0000000..64df924 --- /dev/null +++ b/dotbot/examples/work_and_charge/models/E4.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/G1.xml b/dotbot/examples/work_and_charge/models/G1.xml new file mode 100644 index 0000000..0a1337f --- /dev/null +++ b/dotbot/examples/work_and_charge/models/G1.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/G2.xml b/dotbot/examples/work_and_charge/models/G2.xml new file mode 100644 index 0000000..2d7a2fa --- /dev/null +++ b/dotbot/examples/work_and_charge/models/G2.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/G3.xml b/dotbot/examples/work_and_charge/models/G3.xml new file mode 100644 index 0000000..53b1111 --- /dev/null +++ b/dotbot/examples/work_and_charge/models/G3.xml @@ -0,0 +1,10 @@ + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/Sloc1.xml b/dotbot/examples/work_and_charge/models/Sloc1.xml new file mode 100644 index 0000000..325fd60 --- /dev/null +++ b/dotbot/examples/work_and_charge/models/Sloc1.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/Sloc2.xml b/dotbot/examples/work_and_charge/models/Sloc2.xml new file mode 100644 index 0000000..a833722 --- /dev/null +++ b/dotbot/examples/work_and_charge/models/Sloc2.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/Sloc3.xml b/dotbot/examples/work_and_charge/models/Sloc3.xml new file mode 100644 index 0000000..c7aa189 --- /dev/null +++ b/dotbot/examples/work_and_charge/models/Sloc3.xml @@ -0,0 +1,60 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/Sloc4.xml b/dotbot/examples/work_and_charge/models/Sloc4.xml new file mode 100644 index 0000000..4b08ce8 --- /dev/null +++ b/dotbot/examples/work_and_charge/models/Sloc4.xml @@ -0,0 +1,60 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/dotbot/examples/work_and_charge/models/script.txt b/dotbot/examples/work_and_charge/models/script.txt new file mode 100644 index 0000000..57d4413 --- /dev/null +++ b/dotbot/examples/work_and_charge/models/script.txt @@ -0,0 +1,14 @@ +Gloc1 = Sync(G1,G3) +Gloc2 = Sync(G1,G3) +Gloc3 = Sync(G1,G2) +Gloc4 = Sync(G1,G2) + +Kloc1 = Sync(Gloc1,E1) +Kloc2 = Sync(Gloc2,E2) +Kloc3 = Sync(Gloc3,E3) +Kloc4 = Sync(Gloc4,E4) + +Sloc1 = SupC(Gloc1,Kloc1) +Sloc2 = SupC(Gloc2,Kloc2) +Sloc3 = SupC(Gloc3,Kloc3) +Sloc4 = SupC(Gloc4,Kloc4) \ No newline at end of file diff --git a/dotbot/examples/work_and_charge/models/supervisor.yaml b/dotbot/examples/work_and_charge/models/supervisor.yaml new file mode 100644 index 0000000..1299fef --- /dev/null +++ b/dotbot/examples/work_and_charge/models/supervisor.yaml @@ -0,0 +1,10 @@ + +num_events: 10 +num_supervisors: 4 +events: [ EV_lowEnergy, EV_highEnergy, EV_atWork, EV_atCharger, EV_charge, EV_notAtWork, EV_work, EV_moveToCharge, EV_moveToWork, EV_notAtCharger ] +ev_controllable: [ 0, 0, 0, 0, 1, 0, 1, 1, 1, 0 ] +sup_events: [ [1, 1, 0, 0, 1, 0, 1, 1, 1, 0], [1, 1, 0, 0, 1, 0, 1, 1, 1, 0], [0, 0, 1, 1, 1, 1, 1, 1, 1, 1], [0, 0, 1, 1, 1, 1, 1, 1, 1, 1] ] +sup_init_state: [ 5, 6, 3, 7 ] +sup_current_state: [ 5, 6, 3, 7 ] +sup_data_pos: [ 0, 77, 154, 279 ] +sup_data: [ 3, EV_highEnergy, 0, 0, EV_lowEnergy, 0, 3, EV_charge, 0, 1, 3, EV_moveToWork, 0, 6, EV_highEnergy, 0, 1, EV_lowEnergy, 0, 5, 3, EV_moveToCharge, 0, 3, EV_lowEnergy, 0, 2, EV_highEnergy, 0, 7, 3, EV_charge, 0, 5, EV_lowEnergy, 0, 3, EV_highEnergy, 0, 0, 3, EV_lowEnergy, 0, 4, EV_work, 0, 2, EV_highEnergy, 0, 6, 2, EV_highEnergy, 0, 1, EV_lowEnergy, 0, 5, 3, EV_highEnergy, 0, 6, EV_work, 0, 7, EV_lowEnergy, 0, 4, 3, EV_lowEnergy, 0, 2, EV_highEnergy, 0, 7, EV_moveToCharge, 0, 0, 3, EV_moveToWork, 0, 4, EV_lowEnergy, 0, 0, EV_highEnergy, 0, 6, 3, EV_charge, 0, 6, EV_highEnergy, 0, 1, EV_lowEnergy, 0, 3, 2, EV_lowEnergy, 0, 5, EV_highEnergy, 0, 2, 3, EV_lowEnergy, 0, 3, EV_highEnergy, 0, 1, EV_charge, 0, 0, 3, EV_highEnergy, 0, 7, EV_work, 0, 5, EV_lowEnergy, 0, 4, 3, EV_highEnergy, 0, 2, EV_lowEnergy, 0, 5, EV_moveToCharge, 0, 3, 3, EV_moveToWork, 0, 7, EV_highEnergy, 0, 6, EV_lowEnergy, 0, 0, 3, EV_lowEnergy, 0, 4, EV_work, 0, 2, EV_highEnergy, 0, 7, 5, EV_notAtWork, 0, 7, EV_atCharger, 0, 0, EV_atWork, 0, 0, EV_charge, 0, 6, EV_notAtCharger, 0, 0, 5, EV_atCharger, 0, 1, EV_atWork, 0, 1, EV_notAtWork, 0, 2, EV_moveToCharge, 0, 0, EV_notAtCharger, 0, 1, 5, EV_notAtWork, 0, 2, EV_atWork, 0, 1, EV_atCharger, 0, 2, EV_moveToCharge, 0, 7, EV_notAtCharger, 0, 2, 5, EV_notAtCharger, 0, 3, EV_notAtWork, 0, 3, EV_moveToWork, 0, 5, EV_atWork, 0, 6, EV_atCharger, 0, 3, 5, EV_notAtWork, 0, 5, EV_atCharger, 0, 4, EV_atWork, 0, 4, EV_notAtCharger, 0, 4, EV_work, 0, 1, 4, EV_atCharger, 0, 5, EV_atWork, 0, 4, EV_notAtWork, 0, 5, EV_notAtCharger, 0, 5, 5, EV_atWork, 0, 6, EV_atCharger, 0, 6, EV_notAtCharger, 0, 6, EV_notAtWork, 0, 3, EV_moveToWork, 0, 4, 5, EV_notAtWork, 0, 7, EV_notAtCharger, 0, 7, EV_atCharger, 0, 7, EV_atWork, 0, 0, EV_charge, 0, 3, 4, EV_notAtWork, 0, 0, EV_notAtCharger, 0, 0, EV_atWork, 0, 0, EV_atCharger, 0, 5, 5, EV_atCharger, 0, 4, EV_atWork, 0, 1, EV_notAtCharger, 0, 1, EV_moveToCharge, 0, 0, EV_notAtWork, 0, 1, 5, EV_notAtCharger, 0, 2, EV_work, 0, 1, EV_atCharger, 0, 3, EV_atWork, 0, 2, EV_notAtWork, 0, 2, 5, EV_notAtWork, 0, 3, EV_atCharger, 0, 3, EV_work, 0, 4, EV_atWork, 0, 3, EV_notAtCharger, 0, 2, 5, EV_atWork, 0, 4, EV_atCharger, 0, 4, EV_moveToCharge, 0, 5, EV_notAtCharger, 0, 1, EV_notAtWork, 0, 4, 5, EV_atWork, 0, 5, EV_notAtCharger, 0, 0, EV_notAtWork, 0, 5, EV_atCharger, 0, 5, EV_charge, 0, 6, 5, EV_notAtWork, 0, 6, EV_moveToWork, 0, 3, EV_atCharger, 0, 6, EV_atWork, 0, 6, EV_notAtCharger, 0, 7, 5, EV_notAtCharger, 0, 7, EV_moveToWork, 0, 2, EV_notAtWork, 0, 7, EV_atWork, 0, 7, EV_atCharger, 0, 6 ] \ No newline at end of file diff --git a/dotbot/examples/work_and_charge/work_and_charge.py b/dotbot/examples/work_and_charge/work_and_charge.py new file mode 100644 index 0000000..ba7af8e --- /dev/null +++ b/dotbot/examples/work_and_charge/work_and_charge.py @@ -0,0 +1,326 @@ +import asyncio +import math +import os +import time +from typing import Dict, List + +from dotbot.examples.orca import ( + Agent, + OrcaParams, + compute_orca_velocity_for_agent, +) +from dotbot.examples.vec2 import Vec2 +from dotbot.models import ( + DotBotLH2Position, + DotBotModel, + DotBotMoveRawCommandModel, + DotBotQueryModel, + DotBotRgbLedCommandModel, + DotBotStatus, + DotBotWaypoints, + WSRgbLed, + WSWaypoints, +) +from dotbot.protocol import ApplicationType +from dotbot.rest import RestClient, rest_client +from dotbot.websocket import DotBotWsClient + +from dotbot.examples.sct import SCT +from dotbot.examples.work_and_charge.controller import Controller + +import numpy as np +from scipy.spatial import cKDTree + +ORCA_RANGE = 200 + +THRESHOLD = 50 # Acceptable distance error to consider a waypoint reached +DT = 0.05 # Control loop period (seconds) + +# TODO: Measure these values for real dotbots +BOT_RADIUS = 40 # Physical radius of a DotBot (unit), used for collision avoidance +MAX_SPEED = 300 # Maximum allowed linear speed of a bot (mm/s) + +(QUEUE_HEAD_X, QUEUE_HEAD_Y) = ( + 200, + 200, +) # World-frame (X, Y) position of the charging queue head +QUEUE_SPACING = ( + 200 # Spacing between consecutive bots in the charging queue (along X axis) +) + +CHARGE_REGION_X = QUEUE_HEAD_X +WORK_REGION_X = 1800 + +dotbot_controllers = dict() + + +async def fetch_active_dotbots(client: RestClient) -> List[DotBotModel]: + return await client.fetch_dotbots( + query=DotBotQueryModel(status=DotBotStatus.ACTIVE) + ) + + +def order_bots( + dotbots: List[DotBotModel], base_x: int, base_y: int +) -> List[DotBotModel]: + def key(bot: DotBotModel): + dx = bot.lh2_position.x - base_x + dy = bot.lh2_position.y - base_y + return (dx * dx + dy * dy, bot.address) + + return sorted(dotbots, key=key) + + +def assign_goals( + ordered: List[DotBotModel], + head_x: int, + head_y: int, + spacing: int, +) -> Dict[str, dict]: + goals = {} + for i, bot in enumerate(ordered): + + # TODO: depending on the robot's current state (moving to base or work regions), assign a different goal + + goals[bot.address] = { + "x": head_x, + "y": head_y + i * spacing, + } + return goals + + +def preferred_vel(dotbot: DotBotModel, goal: Vec2 | None) -> Vec2: + if goal is None: + return Vec2(x=0, y=0) + + dx = goal["x"] - dotbot.lh2_position.x + dy = goal["y"] - dotbot.lh2_position.y + dist = math.sqrt(dx * dx + dy * dy) + + dist1000 = dist * 1000 + # If close to goal, stop + if dist1000 < THRESHOLD: + return Vec2(x=0, y=0) + + # Right-hand rule bias + bias_angle = 0.0 + # Bot can only walk on a cone [-60, 60] in front of himself + max_deviation = math.radians(60) + + # Convert bot direction into radians + direction = direction_to_rad(dotbot.direction) + + # Angle to goal + angle_to_goal = math.atan2(dy, dx) + bias_angle + + delta = angle_to_goal - direction + # Wrap to [-π, +π] + delta = math.atan2(math.sin(delta), math.cos(delta)) + + # Clamp delta to [-MAX, +MAX] + if delta > max_deviation: + delta = max_deviation + if delta < -max_deviation: + delta = -max_deviation + + # Final allowed direction + final_angle = direction + delta + result = Vec2( + x=math.cos(final_angle) * MAX_SPEED, y=math.sin(final_angle) * MAX_SPEED + ) + return result + + +def direction_to_rad(direction: float) -> float: + rad = (direction + 90) * math.pi / 180.0 + return math.atan2(math.sin(rad), math.cos(rad)) # normalize to [-π, π] + + +async def compute_orca_velocity( + agent: Agent, + neighbors: List[Agent], + params: OrcaParams, +) -> Vec2: + return compute_orca_velocity_for_agent(agent, neighbors, params) + + +async def main() -> None: + params = OrcaParams(time_horizon=5 * DT, time_step=DT) + url = os.getenv("DOTBOT_CONTROLLER_URL", "localhost") + port = os.getenv("DOTBOT_CONTROLLER_PORT", "8000") + use_https = os.getenv("DOTBOT_CONTROLLER_USE_HTTPS", False) + sct_path = os.getenv("DOTBOT_SCT_PATH", "dotbot/examples/work_and_charge/models/supervisor.yaml") + async with rest_client(url, port, use_https) as client: + dotbots = await fetch_active_dotbots(client) + + # Initialization + for dotbot in dotbots: + + # Init controller + controller = Controller(dotbot.address, sct_path) + dotbot_controllers[dotbot.address] = controller + + # Cosmetic: all bots are red + await client.send_rgb_led_command( + address=dotbot.address, + command=DotBotRgbLedCommandModel(red=255, green=0, blue=0), + ) + + # Set work and charge goals for each robot + # sorted_bots = order_bots(dotbots, QUEUE_HEAD_X, QUEUE_HEAD_Y) + sorted_bots = sorted(dotbots, key=lambda bot: bot.address) + base_goals = assign_goals(sorted_bots, QUEUE_HEAD_X, QUEUE_HEAD_Y, QUEUE_SPACING) + work_goals = assign_goals(sorted_bots, WORK_REGION_X, QUEUE_HEAD_Y, QUEUE_SPACING) + + for address, controller in dotbot_controllers.items(): + goal = base_goals[address] + waypoint_charge = DotBotLH2Position(x=goal['x'], y=goal['y'], z=0) + controller.set_charge_waypoint(waypoint_charge) + + goal = work_goals[address] + waypoint_work = DotBotLH2Position(x=goal['x'], y=goal['y'], z=0) + controller.set_work_waypoint(waypoint_work) + + while True: + try: + ws = DotBotWsClient(url, port) + await ws.connect() + + while True: + dotbots = await fetch_active_dotbots(client) + + goals = dict() + agents: Dict[str, Agent] = {} + + for bot in dotbots: + + # print(f'waypoint_current for DotBot {bot.address}: {dotbot_controllers.get(bot.address).waypoint_current}') + + # Get current goals + if dotbot_controllers.get(bot.address).waypoint_current is not None: + goals[bot.address] = { + "x": dotbot_controllers[bot.address].waypoint_current.x, + "y": dotbot_controllers[bot.address].waypoint_current.y, + } + + agents[bot.address] = Agent( + id=bot.address, + position=Vec2(x=bot.lh2_position.x, y=bot.lh2_position.y), + velocity=Vec2(x=0, y=0), + radius=BOT_RADIUS, + max_speed=MAX_SPEED, + preferred_velocity=preferred_vel( + dotbot=bot, goal=goals.get(bot.address) + ), + ) + + # Prepare coordinates for all agents + # Extract [x, y] for every agent in the same order + agent_list = list(agents.values()) + positions = np.array([[a.position.x, a.position.y] for a in agent_list]) + + # Build the KD-Tree + tree = cKDTree(positions) + + # Run controller for each robot + for dotbot in dotbots: + agent = agents[dotbot.address] + pos = dotbot.lh2_position + # print(f"DotBot {dotbot.address}: Position ({pos.x:.2f}, {pos.y:.2f}), Direction {dotbot.direction:.2f}°") + + # Run controller + controller = dotbot_controllers[dotbot.address] + controller.set_current_position(pos) # update position + controller.control_step() # run SCT step + + # Get current goal + goal = controller.waypoint_current + goals[dotbot.address] = { + "x": goal.x, + "y": goal.y, + } + + ### Send goal + + # Without KD-Tree + # local_neighbors = [neighbor for neighbor in agents.values() if neighbor.id != agent.id] + + # Using KD-Tree: Prepare neighbor list using KD-Tree + neighbor_indices = tree.query_ball_point([agent.position.x, agent.position.y], r=ORCA_RANGE) + local_neighbors = [agent_list[idx] for idx in neighbor_indices if agent_list[idx].id != agent.id] + + if not local_neighbors: + orca_vel = agent.preferred_velocity + else: + orca_vel = await compute_orca_velocity( + agent, neighbors=local_neighbors, params=params + ) + step = Vec2(x=orca_vel.x, y=orca_vel.y) + + # print(f'goal for DotBot {dotbot.address}: ({goal.x:.2f}, {goal.y:.2f}), step: ({step.x:.4f}, {step.y:.4f})') + + # ---- CLAMP STEP TO GOAL DISTANCE ---- + goal = goals.get(agent.id) + if goal is not None: + dx = goal["x"] - agent.position.x + dy = goal["y"] - agent.position.y + dist_to_goal = math.hypot(dx, dy) + + step_len = math.hypot(step.x, step.y) + if step_len > dist_to_goal and step_len > 0: + scale = dist_to_goal / step_len + step = Vec2(x=step.x * scale, y=step.y * scale) + # ------------------------------------ + + ### TEMPORARY: The simulator does not accept negative coordinates, + # so we clamp the step to keep the next position non-negative. + if agent.position.x + step.x < 0: + step.y = step.y * (abs(step.x) / MAX_SPEED) + step.x = -agent.position.x + if agent.position.y + step.y < 0: + step.x = step.x * (abs(step.y) / MAX_SPEED) + step.y = -agent.position.y + ### + + waypoints = DotBotWaypoints( + threshold=THRESHOLD, + waypoints=[ + DotBotLH2Position( + x=agent.position.x + step.x, y=agent.position.y + step.y, z=0 + ) + ], + ) + await ws.send( + WSWaypoints( + cmd="waypoints", + address=agent.id, + application=ApplicationType.DotBot, + data=waypoints, + ) + ) + await ws.send( + WSRgbLed( + cmd="rgb_led", + address=agent.id, + application=ApplicationType.DotBot, + data=DotBotRgbLedCommandModel( + red=controller.led[0], + green=controller.led[1], + blue=controller.led[2], + ), + ) + ) + + await asyncio.sleep(DT) + except Exception as e: + print(f"Connection lost: {e}") + print("Retrying in 1 seconds...") + await asyncio.sleep(1) # Wait before trying to reconnect + finally: + await ws.close() + + return None + + +if __name__ == "__main__": + asyncio.run(main())