diff --git a/dotbot/examples/minimum_naming_game/README.md b/dotbot/examples/minimum_naming_game/README.md
new file mode 100644
index 0000000..acf6714
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/README.md
@@ -0,0 +1,53 @@
+# Minimum Naming Game
+
+This demo runs the minimum naming game in the DotBot simulator, where the robots use local communication to converge on a single word.
+
+This demo includes two variants: a static setup without motion and a dynamic setup with motion.
+
+## Install Python packages (pip)
+
+Install the Python packages required to run this demo.
+
+```bash
+pip install pyyaml scipy
+```
+
+## How to run
+
+### Specify the initial state
+
+Specify the initial state of the DotBots by replacing the file path for ```simulator_init_state_path``` in [config_sample.toml](config_sample.toml).
+
+**Static setup** (without motion) using init_state.toml:
+
+```toml
+simulator_init_state_path = "dotbot/examples/minimum_naming_game/init_state.toml"
+```
+
+**Dynamic setup** (with motion) using init_state_with_motion.toml:
+
+```toml
+simulator_init_state_path = "dotbot/examples/minimum_naming_game/init_state_with_motion.toml"
+```
+
+### Start the controller in simulator mode
+
+```bash
+python -m dotbot.controller_app --config-path config_sample.toml -p dotbot-simulator -a dotbot-simulator --log-level error
+```
+
+### Run the minimum naming game scenario
+
+Open a new terminal and run the minimum naming game scenario.
+
+**Static setup** (without motion):
+
+```bash
+python -m dotbot.examples.minimum_naming_game.minimum_naming_game
+```
+
+**Dynamic setup** (with motion) :
+
+```bash
+python -m dotbot.examples.minimum_naming_game.minimum_naming_game_with_motion
+```
\ No newline at end of file
diff --git a/dotbot/examples/minimum_naming_game/controller.py b/dotbot/examples/minimum_naming_game/controller.py
new file mode 100644
index 0000000..cb5e467
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/controller.py
@@ -0,0 +1,212 @@
+import random
+from dotbot.models import (
+ DotBotLH2Position,
+ DotBotModel,
+)
+from dotbot.examples.sct import SCT
+
+DISTINCT_COLORS = [
+ (255, 0, 0), # Red
+ (0, 255, 0), # Lime
+ (0, 0, 255), # Blue
+ (255, 255, 0), # Yellow
+ (255, 0, 255), # Magenta
+ (0, 255, 255), # Cyan
+ (255, 165, 0), # Orange
+ (128, 0, 255), # Violet
+]
+
+
+class Controller:
+ def __init__(self, address: str, path: str):
+ self.address = address
+
+ self.position = DotBotLH2Position(x=0.0, y=0.0, z=0.0) # initial position
+ self.direction = 0.0 # initial orientation
+
+ self.neighbors: list[DotBotModel] = [] # initial empty neighbor list
+ self.vector = [0.0, 0.0] # initial movement vector
+
+ # SCT initialization
+ self.sct = SCT(path)
+ self.add_callbacks()
+
+ self.led = (0, 0, 0) # initial LED color
+
+ # --- Naming Game Variables ---
+ self.counter = 0 # FOR DEBUGGING
+ self.last_broadcast_ticks = 0 # Tracks timing
+ self.max_broadcast_ticks = 5
+
+ # Pre-defined words (e.g., num_words = 128)
+ self.num_words = 8
+ self.words = list(range(self.num_words))
+
+ # Word reception state
+ self.received_word = None
+ # self.received_word_checked = True
+ self.new_word_received = False
+
+ # Global variable for the word chosen for transmission
+ self.w_index = 0
+
+ # Inventory of known words
+ self.inventory = set()
+
+
+ def control_step(self):
+
+ self.counter += 1 # Increment step counter
+
+ # Run SCT control step
+ self.sct.run_step()
+
+ self.color_code() # Update LED color based on inventory state
+
+
+ # Register callback functions to the generator player
+ def add_callbacks(self):
+
+ # Automatic addition of callbacks
+ # 1. Get list of events and list specifying whether an event is controllable or not.
+ # 2. For each event, check controllable or not and add callback.
+
+ events, controllability_list = self.sct.get_events()
+
+ for event, index in events.items():
+ is_controllable = controllability_list[index]
+ stripped_name = event.split('EV_', 1)[1] # Strip preceding string 'EV_'
+
+ if is_controllable: # Add controllable event
+ func_name = '_callback_{0}'.format(stripped_name)
+ func = getattr(self, func_name)
+ self.sct.add_callback(event, func, None, None)
+ else: # Add uncontrollable event
+ func_name = '_check_{0}'.format(stripped_name)
+ func = getattr(self, func_name)
+ self.sct.add_callback(event, None, func, None)
+
+
+ # Callback functions (controllable events)
+ def _callback_startTimer(self, data: any):
+ """
+ Saves the current tick count to mark the start of the broadcast interval.
+ """
+ # print(f'DotBot {self.address}. ACTION: startTimer')
+ self.last_broadcast_ticks = self.counter
+
+
+ def _callback_selectAndBroadcast(self, data: any):
+ """
+ Selects a random word from the inventory, or invents a new one
+ if the inventory is empty. Sets the flag for transmission.
+ """
+ # print(f'DotBot {self.address}. ACTION: selectAndBroadcast', end=". ")
+
+ # Select or Invent a word
+ if not self.inventory:
+ # Inventory is empty: invent a new word from the pool
+ self.w_index = random.randrange(self.num_words)
+ # Store the word (equivalent to inventory[0] = words[w_index].data[0])
+ self.inventory.add(self.words[self.w_index])
+ else:
+ # Inventory is not empty: pick a random word from current known words
+ self.w_index = random.choice(list(self.inventory))
+
+ # Set broadcast flag for transmission
+ self.broadcast_word = True
+
+ # print(f'\tinventory: {self.inventory},\tselected word: {self.w_index}')
+
+
+ def _callback_updateInventory(self, data: any):
+ """
+ Updates the inventory based on the last received word.
+ If the word is known, the agent reaches a local consensus (inventory collapses).
+ If unknown, the word is added to the agent's vocabulary.
+ """
+ # print(f'DotBot {self.address}. ACTION: updateInventory', end=". ")
+
+ # Ensure we have a word to process
+ if self.received_word is None:
+ return
+
+ # Check if the received word is within the inventory
+ if self.received_word in self.inventory:
+ # SUCCESS: word is known.
+ # Remove all other words (collapse inventory to just this one)
+ self.inventory = {self.received_word}
+ # print(f' removed all other words, inventory now: {self.inventory}')
+ else:
+ # FAILURE: word is unknown.
+ # Insert it into the inventory
+ self.inventory.add(self.received_word)
+ # print(f' added word {self.received_word}, inventory now: {self.inventory}')
+
+ # Mark as checked
+ self.received_word_checked = True
+
+
+ # Callback functions (uncontrollable events)
+ def _check__selectAndBroadcast(self, data: any) -> bool:
+ """
+ Checks if a new word has been received.
+ Returns True (1) if a word is waiting to be processed, otherwise False (0).
+ """
+ if self.new_word_received:
+ # Reset the flag
+ self.new_word_received = False
+ return True
+
+ return False
+
+
+ def _check_timeout(self, data: any) -> bool:
+ """
+ Checks if the broadcast timer has expired.
+ Returns True if the current counter exceeds the last broadcast time
+ plus the defined interval.
+ """
+ if self.counter > (self.last_broadcast_ticks + self.max_broadcast_ticks):
+ return True
+
+ return False
+
+
+ def color_code(self):
+ """
+ Updates the LED color based on the inventory state.
+ - If the robot has not reached consensus (inventory size != 1), the LED is OFF.
+ - If consensus is reached, the word is mapped to a specific RGB color.
+ """
+ # 1. Check if the inventory has reached consensus (size exactly 1)
+ if len(self.inventory) != 1:
+ self.led = (0, 0, 0) # Turn LED off
+ return
+
+ # 2. Extract the single word known to the agent
+ word = list(self.inventory)[0]
+
+ # ------ ORIGINAL ------
+ # # 3. Calculate RGB components using the original base-4 logic
+ # # Mapping word index (0-127) to a color space (1-64)
+ # color = (word % 63) + 1
+
+ # r = color // 16
+ # rem1 = color % 16
+ # g = rem1 // 4
+ # b = rem1 % 4
+
+ # # 4. Update the LED state
+ # # Note: Original Kilobot RGB values are 0-3.
+ # # convert to range 0-255.
+ # self.led = (r * 85, g * 85, b * 85)
+ # ------------------------
+
+ # ------ NEW SIMPLIFIED COLOR CODING ------
+ # Map the word to an index (0-7)
+ color_idx = word % len(DISTINCT_COLORS)
+
+ # Assign the high-contrast color
+ self.led = DISTINCT_COLORS[color_idx]
+ # -----------------------------------------
\ No newline at end of file
diff --git a/dotbot/examples/minimum_naming_game/controller_with_motion.py b/dotbot/examples/minimum_naming_game/controller_with_motion.py
new file mode 100644
index 0000000..ecb2199
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/controller_with_motion.py
@@ -0,0 +1,232 @@
+import math
+import random
+from dotbot.models import (
+ DotBotLH2Position,
+ DotBotModel,
+)
+from dotbot.examples.sct import SCT
+from dotbot.examples.minimum_naming_game.walk_avoid import walk_avoid
+
+DISTINCT_COLORS = [
+ (255, 0, 0), # Red
+ (0, 255, 0), # Lime
+ (0, 0, 255), # Blue
+ (255, 255, 0), # Yellow
+ (255, 0, 255), # Magenta
+ (0, 255, 255), # Cyan
+ (255, 165, 0), # Orange
+ (128, 0, 255), # Violet
+]
+
+
+class Controller:
+ def __init__(self, address: str, path: str, max_speed: float, arena_limits: tuple[float, float]):
+ self.address = address
+ self.max_speed = max_speed
+ self.arena_limits = arena_limits
+
+ self.position = DotBotLH2Position(x=0.0, y=0.0, z=0.0) # initial position
+ self.direction = 0.0 # initial orientation
+ self.prev_position: DotBotLH2Position | None = None
+
+ self.neighbors: list[DotBotModel] = [] # initial empty neighbor list
+ self.vector = [0.0, 0.0] # initial movement vector
+
+ # SCT initialization
+ self.sct = SCT(path)
+ self.add_callbacks()
+
+ self.led = (0, 0, 0) # initial LED color
+
+ # --- Naming Game Variables ---
+ self.counter = 0 # FOR DEBUGGING
+ self.last_broadcast_ticks = 0 # Tracks timing
+ self.max_broadcast_ticks = 5
+
+ # Pre-defined words (e.g., num_words = 128)
+ self.num_words = 8
+ self.words = list(range(self.num_words))
+
+ # Word reception state
+ self.received_word = None
+ # self.received_word_checked = True
+ self.new_word_received = False
+
+ # Global variable for the word chosen for transmission
+ self.w_index = 0
+
+ # Inventory of known words
+ self.inventory = set()
+
+
+ def control_step(self):
+
+ self.counter += 1 # Increment step counter
+
+ # Run SCT control step
+ self.sct.run_step()
+
+ self.color_code() # Update LED color based on inventory state
+
+ self.vector = walk_avoid(self.position.x, self.position.y, self.direction, self.neighbors, self.max_speed, self.arena_limits)
+ # print(f'DotBot {self.address} Walk Vector: {self.vector}')
+
+
+ def update_pose(self, position: DotBotLH2Position) -> None:
+ if self.prev_position is not None:
+ dx = position.x - self.prev_position.x
+ dy = position.y - self.prev_position.y
+ if (dx * dx + dy * dy) > 1e-6:
+ heading_rad = math.atan2(dy, -dx)
+ self.direction = (math.degrees(heading_rad) + 360.0) % 360.0
+
+ self.prev_position = position
+ self.position = position
+
+
+ # Register callback functions to the generator player
+ def add_callbacks(self):
+
+ # Automatic addition of callbacks
+ # 1. Get list of events and list specifying whether an event is controllable or not.
+ # 2. For each event, check controllable or not and add callback.
+
+ events, controllability_list = self.sct.get_events()
+
+ for event, index in events.items():
+ is_controllable = controllability_list[index]
+ stripped_name = event.split('EV_', 1)[1] # Strip preceding string 'EV_'
+
+ if is_controllable: # Add controllable event
+ func_name = '_callback_{0}'.format(stripped_name)
+ func = getattr(self, func_name)
+ self.sct.add_callback(event, func, None, None)
+ else: # Add uncontrollable event
+ func_name = '_check_{0}'.format(stripped_name)
+ func = getattr(self, func_name)
+ self.sct.add_callback(event, None, func, None)
+
+
+ # Callback functions (controllable events)
+ def _callback_startTimer(self, data: any):
+ """
+ Saves the current tick count to mark the start of the broadcast interval.
+ """
+ # print(f'DotBot {self.address}. ACTION: startTimer')
+ self.last_broadcast_ticks = self.counter
+
+
+ def _callback_selectAndBroadcast(self, data: any):
+ """
+ Selects a random word from the inventory, or invents a new one
+ if the inventory is empty. Sets the flag for transmission.
+ """
+ # print(f'DotBot {self.address}. ACTION: selectAndBroadcast', end=". ")
+
+ # Select or Invent a word
+ if not self.inventory:
+ # Inventory is empty: invent a new word from the pool
+ self.w_index = random.randrange(self.num_words)
+ # Store the word (equivalent to inventory[0] = words[w_index].data[0])
+ self.inventory.add(self.words[self.w_index])
+ else:
+ # Inventory is not empty: pick a random word from current known words
+ self.w_index = random.choice(list(self.inventory))
+
+ # Set broadcast flag for transmission
+ self.broadcast_word = True
+
+ # print(f'\tinventory: {self.inventory},\tselected word: {self.w_index}')
+
+
+ def _callback_updateInventory(self, data: any):
+ """
+ Updates the inventory based on the last received word.
+ If the word is known, the agent reaches a local consensus (inventory collapses).
+ If unknown, the word is added to the agent's vocabulary.
+ """
+ # print(f'DotBot {self.address}. ACTION: updateInventory', end=". ")
+
+ # Ensure we have a word to process
+ if self.received_word is None:
+ return
+
+ # Check if the received word is within the inventory
+ if self.received_word in self.inventory:
+ # SUCCESS: word is known.
+ # Remove all other words (collapse inventory to just this one)
+ self.inventory = {self.received_word}
+ # print(f' removed all other words, inventory now: {self.inventory}')
+ else:
+ # FAILURE: word is unknown.
+ # Insert it into the inventory
+ self.inventory.add(self.received_word)
+ # print(f' added word {self.received_word}, inventory now: {self.inventory}')
+
+ # Mark as checked
+ self.received_word_checked = True
+
+
+ # Callback functions (uncontrollable events)
+ def _check__selectAndBroadcast(self, data: any) -> bool:
+ """
+ Checks if a new word has been received.
+ Returns True (1) if a word is waiting to be processed, otherwise False (0).
+ """
+ if self.new_word_received:
+ # Reset the flag
+ self.new_word_received = False
+ return True
+
+ return False
+
+
+ def _check_timeout(self, data: any) -> bool:
+ """
+ Checks if the broadcast timer has expired.
+ Returns True if the current counter exceeds the last broadcast time
+ plus the defined interval.
+ """
+ if self.counter > (self.last_broadcast_ticks + self.max_broadcast_ticks):
+ return True
+
+ return False
+
+
+ def color_code(self):
+ """
+ Updates the LED color based on the inventory state.
+ - If the robot has not reached consensus (inventory size != 1), the LED is OFF.
+ - If consensus is reached, the word is mapped to a specific RGB color.
+ """
+ # 1. Check if the inventory has reached consensus (size exactly 1)
+ if len(self.inventory) != 1:
+ self.led = (0, 0, 0) # Turn LED off
+ return
+
+ # 2. Extract the single word known to the agent
+ word = list(self.inventory)[0]
+
+ # ------ ORIGINAL ------
+ # # 3. Calculate RGB components using the original base-4 logic
+ # # Mapping word index (0-127) to a color space (1-64)
+ # color = (word % 63) + 1
+
+ # r = color // 16
+ # rem1 = color % 16
+ # g = rem1 // 4
+ # b = rem1 % 4
+
+ # # 4. Update the LED state
+ # # Note: Original Kilobot RGB values are 0-3.
+ # # convert to range 0-255.
+ # self.led = (r * 85, g * 85, b * 85)
+ # ------------------------
+
+ # ------ NEW SIMPLIFIED COLOR CODING ------
+ # Map the word to an index (0-7)
+ color_idx = word % len(DISTINCT_COLORS)
+
+ # Assign the high-contrast color
+ self.led = DISTINCT_COLORS[color_idx]
+ # -----------------------------------------
\ No newline at end of file
diff --git a/dotbot/examples/minimum_naming_game/gen_init_pos.py b/dotbot/examples/minimum_naming_game/gen_init_pos.py
new file mode 100644
index 0000000..617cf36
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/gen_init_pos.py
@@ -0,0 +1,48 @@
+import random
+import math
+from pathlib import Path
+
+def format_with_underscores(value):
+ """Formats an integer with underscores every three digits."""
+ return f"{value:_}"
+
+def generate_lattice_toml(width_count, height_count, sep_x, sep_y, start_x=120, start_y=120):
+ output_lines = []
+
+ for row in range(height_count):
+ for col in range(width_count):
+ bot_id = row * width_count + col + 1
+ address = f"AAAAAAAA{bot_id:08X}"
+
+ # Calculate positions
+ pos_x = start_x + (col * sep_x)
+ pos_y = start_y + (row * sep_y)
+
+ # Randomize theta between 0 and 2*pi
+ random_theta = round(random.uniform(0, 2 * math.pi), 2)
+
+ # Manually build the TOML entry string to preserve underscores
+ output_lines.append("[[dotbots]]")
+ output_lines.append(f'address = "{address}"')
+ output_lines.append(f'pos_x = {pos_x:_}')
+ output_lines.append(f'pos_y = {pos_y:_}')
+ output_lines.append(f"theta = {random_theta}")
+ output_lines.append("") # Empty line for readability
+
+ return "\n".join(output_lines)
+
+# --- Configuration ---
+WIDTH_NODES = 5 # Robots per row
+HEIGHT_NODES = 5 # Number of rows
+SEP_X = 240 # Separation between columns
+SEP_Y = 240 # Separation between rows
+
+# Generate
+toml_string = generate_lattice_toml(WIDTH_NODES, HEIGHT_NODES, SEP_X, SEP_Y)
+
+# Save to file
+output_path = Path(__file__).resolve().parent / "init_state.toml"
+with open(output_path, "w") as f:
+ f.write(toml_string)
+
+print(f"Generated TOML file at {output_path}")
\ No newline at end of file
diff --git a/dotbot/examples/minimum_naming_game/init_state.toml b/dotbot/examples/minimum_naming_game/init_state.toml
new file mode 100644
index 0000000..c2913fb
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/init_state.toml
@@ -0,0 +1,149 @@
+[[dotbots]]
+address = "AAAAAAAA00000001"
+pos_x = 120
+pos_y = 120
+theta = 4.38
+
+[[dotbots]]
+address = "AAAAAAAA00000002"
+pos_x = 360
+pos_y = 120
+theta = 0.56
+
+[[dotbots]]
+address = "AAAAAAAA00000003"
+pos_x = 600
+pos_y = 120
+theta = 2.12
+
+[[dotbots]]
+address = "AAAAAAAA00000004"
+pos_x = 840
+pos_y = 120
+theta = 2.37
+
+[[dotbots]]
+address = "AAAAAAAA00000005"
+pos_x = 1_080
+pos_y = 120
+theta = 0.69
+
+[[dotbots]]
+address = "AAAAAAAA00000006"
+pos_x = 120
+pos_y = 360
+theta = 5.21
+
+[[dotbots]]
+address = "AAAAAAAA00000007"
+pos_x = 360
+pos_y = 360
+theta = 5.09
+
+[[dotbots]]
+address = "AAAAAAAA00000008"
+pos_x = 600
+pos_y = 360
+theta = 3.86
+
+[[dotbots]]
+address = "AAAAAAAA00000009"
+pos_x = 840
+pos_y = 360
+theta = 2.66
+
+[[dotbots]]
+address = "AAAAAAAA0000000A"
+pos_x = 1_080
+pos_y = 360
+theta = 3.44
+
+[[dotbots]]
+address = "AAAAAAAA0000000B"
+pos_x = 120
+pos_y = 600
+theta = 2.64
+
+[[dotbots]]
+address = "AAAAAAAA0000000C"
+pos_x = 360
+pos_y = 600
+theta = 2.15
+
+[[dotbots]]
+address = "AAAAAAAA0000000D"
+pos_x = 600
+pos_y = 600
+theta = 5.72
+
+[[dotbots]]
+address = "AAAAAAAA0000000E"
+pos_x = 840
+pos_y = 600
+theta = 0.24
+
+[[dotbots]]
+address = "AAAAAAAA0000000F"
+pos_x = 1_080
+pos_y = 600
+theta = 0.07
+
+[[dotbots]]
+address = "AAAAAAAA00000010"
+pos_x = 120
+pos_y = 840
+theta = 4.61
+
+[[dotbots]]
+address = "AAAAAAAA00000011"
+pos_x = 360
+pos_y = 840
+theta = 2.62
+
+[[dotbots]]
+address = "AAAAAAAA00000012"
+pos_x = 600
+pos_y = 840
+theta = 4.47
+
+[[dotbots]]
+address = "AAAAAAAA00000013"
+pos_x = 840
+pos_y = 840
+theta = 4.52
+
+[[dotbots]]
+address = "AAAAAAAA00000014"
+pos_x = 1_080
+pos_y = 840
+theta = 6.11
+
+[[dotbots]]
+address = "AAAAAAAA00000015"
+pos_x = 120
+pos_y = 1_080
+theta = 2.42
+
+[[dotbots]]
+address = "AAAAAAAA00000016"
+pos_x = 360
+pos_y = 1_080
+theta = 1.86
+
+[[dotbots]]
+address = "AAAAAAAA00000017"
+pos_x = 600
+pos_y = 1_080
+theta = 1.56
+
+[[dotbots]]
+address = "AAAAAAAA00000018"
+pos_x = 840
+pos_y = 1_080
+theta = 0.86
+
+[[dotbots]]
+address = "AAAAAAAA00000019"
+pos_x = 1_080
+pos_y = 1_080
+theta = 3.28
diff --git a/dotbot/examples/minimum_naming_game/init_state_with_motion.toml b/dotbot/examples/minimum_naming_game/init_state_with_motion.toml
new file mode 100644
index 0000000..c211177
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/init_state_with_motion.toml
@@ -0,0 +1,53 @@
+[[dotbots]]
+address = "AAAAAAAA00000001"
+pos_x = 240
+pos_y = 240
+theta = 3.5
+
+[[dotbots]]
+address = "AAAAAAAA00000002"
+pos_x = 720
+pos_y = 240
+theta = 6.0
+
+[[dotbots]]
+address = "AAAAAAAA00000003"
+pos_x = 1_200
+pos_y = 240
+theta = 1.64
+
+[[dotbots]]
+address = "AAAAAAAA00000004"
+pos_x = 240
+pos_y = 720
+theta = 1.23
+
+[[dotbots]]
+address = "AAAAAAAA00000005"
+pos_x = 720
+pos_y = 720
+theta = 4.16
+
+[[dotbots]]
+address = "AAAAAAAA00000006"
+pos_x = 1_200
+pos_y = 720
+theta = 3.81
+
+[[dotbots]]
+address = "AAAAAAAA00000007"
+pos_x = 240
+pos_y = 1_200
+theta = 2.8
+
+[[dotbots]]
+address = "AAAAAAAA00000008"
+pos_x = 720
+pos_y = 1_200
+theta = 1.54
+
+[[dotbots]]
+address = "AAAAAAAA00000009"
+pos_x = 1_200
+pos_y = 1_200
+theta = 6.03
diff --git a/dotbot/examples/minimum_naming_game/minimum_naming_game.py b/dotbot/examples/minimum_naming_game/minimum_naming_game.py
new file mode 100644
index 0000000..553d5e2
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/minimum_naming_game.py
@@ -0,0 +1,151 @@
+import asyncio
+import os
+from typing import List
+
+from dotbot.models import (
+ DotBotLH2Position,
+ DotBotModel,
+ DotBotQueryModel,
+ DotBotRgbLedCommandModel,
+ DotBotStatus,
+ DotBotWaypoints,
+ WSRgbLed,
+)
+from dotbot.protocol import ApplicationType
+from dotbot.rest import RestClient, rest_client
+from dotbot.websocket import DotBotWsClient
+
+from dotbot.examples.minimum_naming_game.controller import Controller
+
+import numpy as np
+import random
+from scipy.spatial import cKDTree
+
+COMM_RANGE=250
+THRESHOLD=50
+
+# TODO: Measure these values for real dotbots
+BOT_RADIUS = 40 # Physical radius of a DotBot (unit), used for collision avoidance
+
+dotbot_controllers = dict()
+
+
+async def fetch_active_dotbots(client: RestClient) -> List[DotBotModel]:
+ return await client.fetch_dotbots(
+ query=DotBotQueryModel(status=DotBotStatus.ACTIVE)
+ )
+
+
+async def main() -> None:
+ url = os.getenv("DOTBOT_CONTROLLER_URL", "localhost")
+ port = os.getenv("DOTBOT_CONTROLLER_PORT", "8000")
+ use_https = os.getenv("DOTBOT_CONTROLLER_USE_HTTPS", False)
+ sct_path = os.getenv("DOTBOT_SCT_PATH", "dotbot/examples/minimum_naming_game/models/supervisor.yaml")
+
+ async with rest_client(url, port, use_https) as client:
+ dotbots = await fetch_active_dotbots(client)
+
+ # print(len(dotbots), "dotbots connected.")
+
+ # Initialization
+ for dotbot in dotbots:
+
+ # Init controller
+ controller = Controller(dotbot.address, sct_path)
+ dotbot_controllers[dotbot.address] = controller
+ # print(f'type of controller: {type(controller)} for DotBot {dotbot.address}')
+
+ # 1. Extract positions into a list of [x, y] coordinates
+ coords = [[dotbot.lh2_position.x, dotbot.lh2_position.y] for dotbot in dotbots]
+
+ # 2. Convert the list to a NumPy array
+ positions = np.array(coords)
+
+ # 3. Build the KD-Tree
+ # This tree can now be used for fast spatial queries (like finding neighbors)
+ tree = cKDTree(positions)
+
+ ws = DotBotWsClient(url, port)
+ await ws.connect()
+ try:
+
+ counter = 0
+
+ while True:
+ print("Step", counter)
+
+ for dotbot in dotbots:
+
+ controller = dotbot_controllers[dotbot.address]
+ controller.position = dotbot.lh2_position
+ controller.direction = dotbot.direction
+
+ # print(f'Controller position: {controller.position}, direction: {controller.direction}')
+
+ # 1. Query the tree for indices of neighbors
+ neighbor_indices = tree.query_ball_point([dotbot.lh2_position.x, dotbot.lh2_position.y], r=COMM_RANGE)
+
+ # 2. Convert indices back into actual DotBot objects
+ neighbors = [
+ dotbots[idx] for idx in neighbor_indices
+ if dotbots[idx].address != dotbot.address
+ ]
+
+ # print(f'neighbour of {dotbot.address}: {[n.address for n in neighbors]}')
+
+ # 3. If there are neighbors broadcasting, pick ONE randomly to listen to
+ if neighbors:
+ selected_neighbor = dotbot_controllers[random.choice(neighbors).address]
+
+ # Share the word: take the neighbor's chosen word index
+ if selected_neighbor.w_index != 0:
+ controller.received_word = selected_neighbor.w_index
+
+ # Set the flags so the robot knows it has a new message to process
+ controller.new_word_received = True
+ controller.received_word_checked = False
+
+ # Update controller's neighbor list
+ controller.neighbors = neighbors
+
+ # Run controller
+ controller.control_step() # run SCT step
+
+ # Force update
+ waypoints = DotBotWaypoints(
+ threshold=THRESHOLD,
+ waypoints=[
+ DotBotLH2Position(
+ x=dotbots[0].lh2_position.x, y=dotbots[0].lh2_position.y, z=0
+ )
+ ],
+ )
+ await client.send_waypoint_command(
+ address=dotbots[0].address,
+ application=ApplicationType.DotBot,
+ command=waypoints,
+ )
+
+ await ws.send(
+ WSRgbLed(
+ cmd="rgb_led",
+ address=dotbot.address,
+ application=ApplicationType.DotBot,
+ data=DotBotRgbLedCommandModel(
+ red=controller.led[0],
+ green=controller.led[1],
+ blue=controller.led[2],
+ ),
+ )
+ )
+
+ # await asyncio.sleep(0.1)
+ counter += 1
+ finally:
+ await ws.close()
+
+ return None
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/dotbot/examples/minimum_naming_game/minimum_naming_game_with_motion.py b/dotbot/examples/minimum_naming_game/minimum_naming_game_with_motion.py
new file mode 100644
index 0000000..51be125
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/minimum_naming_game_with_motion.py
@@ -0,0 +1,187 @@
+import asyncio
+import math
+import os
+from time import time
+from typing import Dict, List
+
+from dotbot.examples.vec2 import Vec2
+from dotbot.models import (
+ DotBotLH2Position,
+ DotBotModel,
+ DotBotMoveRawCommandModel,
+ DotBotQueryModel,
+ DotBotRgbLedCommandModel,
+ DotBotStatus,
+ DotBotWaypoints,
+ WSRgbLed,
+ WSMoveRaw,
+ WSWaypoints,
+)
+from dotbot.protocol import ApplicationType
+from dotbot.rest import RestClient, rest_client
+from dotbot.websocket import DotBotWsClient
+
+from dotbot.examples.minimum_naming_game.controller_with_motion import Controller
+
+import numpy as np
+import random
+from scipy.spatial import cKDTree
+
+COMM_RANGE=250
+THRESHOLD=0
+
+# TODO: Measure these values for real dotbots
+BOT_RADIUS = 40 # Physical radius of a DotBot (unit), used for collision avoidance
+MAX_SPEED = 300 # Maximum allowed linear speed of a bot (mm/s)
+
+ARENA_SIZE_X = 2000 # Width of the arena in mm
+ARENA_SIZE_Y = 2000 # Height of the arena in mm
+
+dotbot_controllers = dict()
+
+
+async def fetch_active_dotbots(client: RestClient) -> List[DotBotModel]:
+ return await client.fetch_dotbots(
+ query=DotBotQueryModel(status=DotBotStatus.ACTIVE)
+ )
+
+
+async def main() -> None:
+ url = os.getenv("DOTBOT_CONTROLLER_URL", "localhost")
+ port = os.getenv("DOTBOT_CONTROLLER_PORT", "8000")
+ use_https = os.getenv("DOTBOT_CONTROLLER_USE_HTTPS", False)
+ sct_path = os.getenv("DOTBOT_SCT_PATH", "dotbot/examples/minimum_naming_game/models/supervisor.yaml")
+
+ async with rest_client(url, port, use_https) as client:
+ dotbots = await fetch_active_dotbots(client)
+
+ # print(len(dotbots), "dotbots connected.")
+
+ # Initialization
+ for dotbot in dotbots:
+
+ # Init controller
+ controller = Controller(dotbot.address, sct_path, 0.9 * MAX_SPEED, arena_limits=(ARENA_SIZE_X, ARENA_SIZE_Y))
+ dotbot_controllers[dotbot.address] = controller
+ # print(f'type of controller: {type(controller)} for DotBot {dotbot.address}')
+
+ # 1. Extract positions into a list of [x, y] coordinates
+ coords = [[dotbot.lh2_position.x, dotbot.lh2_position.y] for dotbot in dotbots]
+
+ # 2. Convert the list to a NumPy array
+ positions = np.array(coords)
+
+ # 3. Build the KD-Tree
+ # This tree can now be used for fast spatial queries (like finding neighbors)
+ tree = cKDTree(positions)
+
+ ws = DotBotWsClient(url, port)
+ await ws.connect()
+ try:
+
+ counter = 0
+
+ while True:
+ print("Step", counter)
+
+ dotbots = await fetch_active_dotbots(client)
+
+ # 1. Extract positions into a list of [x, y] coordinates
+ # This loop iterates through your dotbot list and grabs the lh2_position attributes
+ coords = [[dotbot.lh2_position.x, dotbot.lh2_position.y] for dotbot in dotbots]
+
+ # 2. Convert the list to a NumPy array
+ # The structure will be (N, 2), where N is the number of dotbots
+ positions = np.array(coords)
+
+ # 3. Build the KD-Tree
+ # This tree can now be used for fast spatial queries (like finding neighbors)
+ tree = cKDTree(positions)
+
+ for dotbot in dotbots:
+
+ controller = dotbot_controllers[dotbot.address]
+ controller.update_pose(dotbot.lh2_position)
+
+ # print(f'Controller position: {controller.position}, direction: {controller.direction}')
+
+ # 1. Query the tree for indices of neighbors
+ neighbor_indices = tree.query_ball_point([dotbot.lh2_position.x, dotbot.lh2_position.y], r=COMM_RANGE)
+
+ # 2. Convert indices back into actual DotBot objects
+ neighbors = [
+ dotbots[idx] for idx in neighbor_indices
+ if dotbots[idx].address != dotbot.address
+ ]
+
+ # print(f'neighbour of {dotbot.address}: {[n.address for n in neighbors]}')
+
+ # 3. If there are neighbors broadcasting, pick ONE randomly to listen to
+ if neighbors:
+ selected_neighbor = dotbot_controllers[random.choice(neighbors).address]
+
+ # Share the word: take the neighbor's chosen word index
+ if selected_neighbor.w_index != 0:
+ controller.received_word = selected_neighbor.w_index
+
+ # Set the flags so the robot knows it has a new message to process
+ controller.new_word_received = True
+ controller.received_word_checked = False
+
+ # Update controller's neighbor list
+ controller.neighbors = neighbors
+
+ # Run controller
+ controller.control_step() # run SCT step
+
+ ### TEMPORARY: The simulator does not accept negative coordinates,
+ # so we set it to zero and scale the positive value proportionally.
+ point = DotBotLH2Position(x=controller.vector[0], y=controller.vector[1], z=0.0)
+ if dotbot.lh2_position.x + controller.vector[0] < 0:
+ point.y = controller.vector[1] * (controller.vector[0] / MAX_SPEED)
+ point.x = 0.0
+ if dotbot.lh2_position.y + controller.vector[1] < 0:
+ point.x = controller.vector[0] * (controller.vector[1] / MAX_SPEED)
+ point.y = 0.0
+ ###
+
+ waypoints = DotBotWaypoints(
+ threshold=THRESHOLD,
+ waypoints=[
+ DotBotLH2Position(
+ x=dotbot.lh2_position.x + round(point.x, 2),
+ y=dotbot.lh2_position.y + round(point.y, 2),
+ z=0
+ )
+ ],
+ )
+
+ await client.send_waypoint_command(
+ address=dotbot.address,
+ application=ApplicationType.DotBot,
+ command=waypoints,
+ )
+
+ await ws.send(
+ WSRgbLed(
+ cmd="rgb_led",
+ address=dotbot.address,
+ application=ApplicationType.DotBot,
+ data=DotBotRgbLedCommandModel(
+ red=controller.led[0],
+ green=controller.led[1],
+ blue=controller.led[2],
+ ),
+ )
+ )
+
+ # await asyncio.sleep(0.1)
+ counter += 1
+ finally:
+ await ws.close()
+
+ return None
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/dotbot/examples/minimum_naming_game/models/E1.xml b/dotbot/examples/minimum_naming_game/models/E1.xml
new file mode 100644
index 0000000..4ca4b55
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/models/E1.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/minimum_naming_game/models/E2.xml b/dotbot/examples/minimum_naming_game/models/E2.xml
new file mode 100644
index 0000000..dc3f468
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/models/E2.xml
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/minimum_naming_game/models/G1.xml b/dotbot/examples/minimum_naming_game/models/G1.xml
new file mode 100644
index 0000000..8f2d4f5
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/models/G1.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/minimum_naming_game/models/G2.xml b/dotbot/examples/minimum_naming_game/models/G2.xml
new file mode 100644
index 0000000..ec83205
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/models/G2.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/minimum_naming_game/models/G3.xml b/dotbot/examples/minimum_naming_game/models/G3.xml
new file mode 100644
index 0000000..a88ab9b
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/models/G3.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/minimum_naming_game/models/G4.xml b/dotbot/examples/minimum_naming_game/models/G4.xml
new file mode 100644
index 0000000..4663d81
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/models/G4.xml
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/minimum_naming_game/models/Sloc1.xml b/dotbot/examples/minimum_naming_game/models/Sloc1.xml
new file mode 100644
index 0000000..5f136a9
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/models/Sloc1.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/minimum_naming_game/models/Sloc2.xml b/dotbot/examples/minimum_naming_game/models/Sloc2.xml
new file mode 100644
index 0000000..9b0c480
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/models/Sloc2.xml
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/minimum_naming_game/models/script.txt b/dotbot/examples/minimum_naming_game/models/script.txt
new file mode 100644
index 0000000..f36a4db
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/models/script.txt
@@ -0,0 +1,8 @@
+Gloc1 = Sync(G1,G3)
+Gloc2 = Sync(G2,G4)
+
+Kloc1 = Sync(Gloc1,E1)
+Kloc2 = Sync(Gloc2,E2)
+
+Sloc1 = SupC(Gloc1,Kloc1)
+Sloc2 = SupC(Gloc2,Kloc2)
\ No newline at end of file
diff --git a/dotbot/examples/minimum_naming_game/models/supervisor.yaml b/dotbot/examples/minimum_naming_game/models/supervisor.yaml
new file mode 100644
index 0000000..3abeea8
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/models/supervisor.yaml
@@ -0,0 +1,11 @@
+
+num_events: 5
+num_supervisors: 2
+events: [ EV_startTimer, EV_selectAndBroadcast, EV_updateInventory, EV__selectAndBroadcast, EV_timeout ]
+ev_controllable: [ 1, 1, 1, 0, 0 ]
+ev_public: [ 0, 1, 0, 1, 0 ]
+sup_events: [ [0, 0, 1, 1, 0], [1, 1, 0, 0, 1] ]
+sup_init_state: [ 1, 0 ]
+sup_current_state: [ 1, 0 ]
+sup_data_pos: [ 0, 11 ]
+sup_data: [ 2, EV_updateInventory, 0, 1, EV__selectAndBroadcast, 0, 0, 1, EV__selectAndBroadcast, 0, 0, 1, EV_startTimer, 0, 1, 1, EV_timeout, 0, 2, 1, EV_selectAndBroadcast, 0, 0 ]
\ No newline at end of file
diff --git a/dotbot/examples/minimum_naming_game/walk_avoid.py b/dotbot/examples/minimum_naming_game/walk_avoid.py
new file mode 100644
index 0000000..a5490c1
--- /dev/null
+++ b/dotbot/examples/minimum_naming_game/walk_avoid.py
@@ -0,0 +1,76 @@
+import math
+
+from dotbot.models import DotBotModel
+
+
+def walk_avoid(position_x: float,
+ position_y: float,
+ direction: float,
+ neighbors: list[DotBotModel],
+ max_speed: float,
+ arena_limits: tuple[float, float]) -> list[float]:
+ """
+ Walk straight while avoiding collisions and arena boundary.
+ Arena limits: x, y in [0.0, 1.0]
+ """
+ UNIT_SPEED = max_speed
+ MARGIN = 0.1 # Trigger turn when within 10% of any edge
+
+ # 1. Identify if any neighbor is too close
+ neighbor_collision = False
+ if neighbors:
+ neighbor_collision = True
+
+ # 2. Identify if any arena boundary is violated
+ curr_x = position_x
+ curr_y = position_y
+
+ wall_collision = (curr_x < MARGIN*arena_limits[0] or curr_x > (arena_limits[0] - MARGIN*arena_limits[0]) or
+ curr_y < MARGIN*arena_limits[1] or curr_y > (arena_limits[1] - MARGIN*arena_limits[1]))
+
+ # 3. Determine "Local" movement
+ local_v = [0.0, 0.0]
+
+ if neighbor_collision or wall_collision:
+
+ if wall_collision:
+ # Decide direction of repulsion (Left or Right)
+ if (curr_x < MARGIN*arena_limits[0]):
+ local_v[0] += UNIT_SPEED
+ if (curr_x > (arena_limits[0] - MARGIN*arena_limits[0])):
+ local_v[0] += -UNIT_SPEED
+ if (curr_y < MARGIN*arena_limits[1]):
+ local_v[1] += UNIT_SPEED
+ if (curr_y > (arena_limits[1] - MARGIN*arena_limits[1])):
+ local_v[1] += -UNIT_SPEED
+
+ if neighbor_collision:
+ avg_dx = sum(n.lh2_position.x - curr_x for n in neighbors)
+ avg_dy = sum(n.lh2_position.y - curr_y for n in neighbors)
+
+ mag = math.sqrt(avg_dx**2 + avg_dy**2)
+ if mag > 0:
+ # Add "Away" vector to the existing global movement
+ local_v[0] -= (avg_dx / mag) * UNIT_SPEED
+ local_v[1] -= (avg_dy / mag) * UNIT_SPEED
+ # print(f"Neighbor avoidance vector: {local_v} from neighbors {[n.address for n in neighbors]}")
+
+ # Normalize so we don't go double speed in corners
+ total_mag = math.sqrt(local_v[0]**2 + local_v[1]**2)
+ if total_mag > 0:
+ return (
+ (local_v[0] / total_mag) * UNIT_SPEED,
+ (local_v[1] / total_mag) * UNIT_SPEED,
+ )
+ return (0.0, 0.0)
+
+ else:
+ local_v = [UNIT_SPEED, 0.0] # Normal forward motion
+
+ # 4. Rotate Local Vector to Global Vector
+ theta_rad = math.radians(direction)
+
+ global_vx = (local_v[0] * -math.cos(theta_rad)) - (local_v[1] * math.sin(theta_rad))
+ global_vy = (local_v[0] * math.sin(theta_rad)) + (local_v[1] * -math.cos(theta_rad))
+
+ return (global_vx, global_vy)
diff --git a/dotbot/examples/sct.py b/dotbot/examples/sct.py
new file mode 100644
index 0000000..7f1432f
--- /dev/null
+++ b/dotbot/examples/sct.py
@@ -0,0 +1,314 @@
+import random
+import yaml
+
+class SCT:
+
+ def __init__(self, filename):
+
+ self.read_supervisor(filename)
+
+ self.callback = {}
+ self.input_buffer = None # Clear content after timestep
+ self.last_events = [0] * len(self.EV)
+
+
+ def read_supervisor(self, filename):
+ try:
+ with open(filename, 'r') as stream:
+ self.f = yaml.safe_load(stream)
+ except yaml.YAMLError as e:
+ print(e)
+
+ self.num_events = self.f['num_events']
+ self.num_supervisors = self.f['num_supervisors']
+ self.EV = {}
+ for i, ev in enumerate(self.f['events']):
+ self.EV[ev] = i
+
+ self.ev_controllable = self.f['ev_controllable']
+ self.sup_events = self.f['sup_events']
+ self.sup_init_state = self.f['sup_init_state']
+ self.sup_current_state = self.f['sup_current_state']
+ self.sup_data_pos = self.f['sup_data_pos']
+ self.sup_data = self.f['sup_data']
+
+
+ def add_callback(self, event, clbk, ci, sup_data):
+ func = {}
+ func['callback'] = clbk
+ func['check_input'] = ci
+ func['sup_data'] = sup_data
+ self.callback[event] = func
+
+
+ def run_step(self):
+ self.input_buffer = [] # clear buffer
+ self.update_input()
+
+ # Get all uncontrollable events
+ uce = self.input_buffer
+
+ # Apply all the uncontrollable events
+ while uce:
+ event = uce.pop(0)
+ self.make_transition(event)
+ self.exec_callback(event)
+
+ ce_exists, ce = self.get_next_controllable()
+
+ # Apply the chosen controllable event
+ if ce_exists:
+ self.make_transition(ce)
+ self.exec_callback(ce)
+
+
+ def input_read(self, ev):
+ event_name = self.get_event_name(ev)
+ if ev < self.num_events and self.callback[event_name]:
+ return self.callback[event_name]['check_input'](self.callback[event_name]['sup_data'])
+ return False
+
+
+ def update_input(self):
+ for i in range(0,self.num_events):
+ if not self.ev_controllable[i]: # Check the UCEs only
+ if self.input_read(i):
+ self.input_buffer.append(i)
+ self.last_events[i] = 1
+
+
+ def get_state_position(self, supervisor, state):
+ position = self.sup_data_pos[supervisor] # Jump to the start position of the supervisor
+ for s in range(0, state): # Keep iterating until the state is reached
+ en = self.sup_data[position] # The number of transitions in the state
+ position += en * 3 + 1 # Next state position (Number transitions * 3 + 1)
+ return position
+
+
+ def make_transition(self, ev):
+ num_transitions = None
+
+ # Apply transition to each local supervisor
+ for i in range(0, self.num_supervisors):
+ if self.sup_events[i][ev]: # Check if the given event is part of this supervisor
+
+ # Current state info of supervisor
+ position = self.get_state_position(i, self.sup_current_state[i])
+ num_transitions = self.sup_data[position]
+ position += 1 # Point to first transition
+
+ # Find the transition for the given event
+ while num_transitions:
+ num_transitions -= 1
+ value = self.get_value(self.sup_data[position])
+ if value == ev:
+ self.sup_current_state[i] = (self.sup_data[position + 1] * 256) + (self.sup_data[position + 2])
+ break
+ position += 3
+
+
+ def exec_callback(self, ev):
+ event_name = self.get_event_name(ev)
+ if ev < self.num_events and self.callback[event_name]['callback']:
+ self.callback[event_name]['callback'](self.callback[event_name]['sup_data'])
+
+
+ def get_next_controllable(self):
+
+ # Get controllable events that are enabled -> events
+ actives = self.get_active_controllable_events()
+
+ if not all(v == 0 for v in actives):
+ randomPos = random.randint(0,1000000000) % actives.count(1)
+ for i in range(0, self.num_events):
+ if not randomPos and actives[i]:
+ return True, i
+ elif actives[i]:
+ randomPos -= 1
+
+ return False, None
+
+
+ def get_active_controllable_events(self):
+
+ events = []
+
+ # Disable all non controllable events
+ for i in range(0, self.num_events):
+ if not self.ev_controllable[i]:
+ events.append(0)
+ else:
+ events.append(1)
+
+ # Check disabled events for all supervisors
+ for i in range(0, self.num_supervisors):
+
+ # Init an array where all events are disabled
+ ev_disable = [1] * self.num_events
+
+ # Enable all events that are not part of this supervisor
+ for j in range(0, self.num_events):
+ if not self.sup_events[i][j]:
+ ev_disable[j] = 0
+
+ # Get current state
+ position = self.get_state_position(i, self.sup_current_state[i])
+ num_transitions = self.sup_data[position]
+ position += 1
+
+ # Enable all events that have a transition from the current state
+ while num_transitions:
+ num_transitions -= 1
+ value = self.get_value(self.sup_data[position])
+ ev_disable[value] = 0
+ position += 3
+
+ # Remove the controllable events to disable, leaving an array of enabled controllable events
+ for j in range(0, self.num_events):
+ if ev_disable[j] == 1 and events[j]:
+ events[j] = 0
+
+ return events
+
+
+ def get_value(self, index):
+ if isinstance(index, str):
+ return self.EV[index]
+ return index
+
+
+ def get_event_name(self, index):
+ if isinstance(index, int):
+ return list(self.EV.keys())[list(self.EV.values()).index(index)]
+ return index
+
+
+ # Get function that returns event information (event names and controllability)
+ def get_events(self):
+ return self.EV, self.ev_controllable
+
+
+class SCTPub(SCT):
+
+ def __init__(self, filename):
+ super().__init__(filename)
+ self.ev_public = self.f['ev_public']
+
+
+ def run_step(self):
+ self.input_buffer = [] # clear buffer
+ self.input_buffer_pub = []
+ self.update_input()
+
+ # Apply all public uncontrollable events
+ public_uce = self.input_buffer_pub
+ while public_uce:
+ event = public_uce.pop(0)
+ self.make_transition(event)
+ self.exec_callback(event)
+
+ # Apply all private uncontrollable events
+ uce = self.input_buffer
+ while uce:
+ event = uce.pop(0)
+ self.make_transition(event)
+ self.exec_callback(event)
+
+ ce_exists, ce = self.get_next_controllable()
+
+ # Apply the chosen controllable event
+ if ce_exists:
+ self.make_transition(ce)
+ self.exec_callback(ce)
+
+
+ def update_input(self):
+ for i in range(0,self.num_events):
+ if not self.ev_controllable[i]: # Check the UCEs only
+ if self.input_read(i):
+ if self.ev_public[i]:
+ self.input_buffer_pub.append(i)
+ else:
+ self.input_buffer.append(i)
+ self.last_events[i] = 1
+
+class SCTProb(SCT):
+
+ def __init__(self, filename):
+ super().__init__(filename)
+ self.sup_data_prob_pos = self.f['sup_data_prob_pos']
+ self.sup_data_prob = self.f['sup_data_prob']
+
+
+ def get_state_position_prob(self, supervisor, state):
+ prob_position = self.sup_data_prob_pos[supervisor] # Jump to the start position of the supervisor
+ for s in range(0, state): # Keep iterating until the state is reached
+ en = self.sup_data_prob[prob_position] # The number of transitions in the state
+ prob_position += en + 1 # Next state position (Number transitions * 3 + 1)
+ return prob_position
+
+
+ def get_active_controllable_events_prob(self):
+
+ events = []
+
+ # Disable all non controllable events
+ for i in range(0, self.num_events):
+ if not self.ev_controllable[i]:
+ events.append(0)
+ else:
+ events.append(1)
+
+ # Check disabled events for all supervisors
+ for i in range(0, self.num_supervisors):
+
+ # Init an array where all events are disabled
+ ev_disable = [1] * self.num_events
+
+ # Enable all events that are not part of this supervisor
+ for j in range(0, self.num_events):
+ if not self.sup_events[i][j]:
+ ev_disable[j] = 0
+
+ # Get current state
+ position = self.get_state_position(i, self.sup_current_state[i])
+ position_prob = self.get_state_position_prob(i, self.sup_current_state[i])
+ num_transitions = self.sup_data[position]
+ position += 1
+ position_prob += 1
+
+ # Enable all events that have a transition from the current state
+ while num_transitions:
+ num_transitions -= 1
+ value = self.get_value(self.sup_data[position])
+
+ if self.ev_controllable[value] and self.sup_events[i][value]:
+ ev_disable[value] = 0 # Transition with this event, do not disable it, just calculate its probability contribution
+ events[value] = events[value] * self.sup_data_prob[position_prob]
+ position_prob += 1
+
+ position += 3
+
+ for j in range(self.num_events):
+ if ev_disable[j] == 1:
+ events[j] = 0
+
+ return events
+
+
+ def get_next_controllable(self):
+
+ # Get controllable events that are enabled -> events
+ events = self.get_active_controllable_events_prob()
+ prob_sum = sum(events)
+
+ if prob_sum > 0.0001: # If at least one event is enabled do
+ random_value = random.uniform(0, prob_sum)
+ random_sum = 0.0
+ for i in range(self.num_events):
+ random_sum += events[i]
+ if (random_value < random_sum) and self.ev_controllable[i]:
+ return True, i
+
+ return False, None
+
\ No newline at end of file
diff --git a/dotbot/examples/work_and_charge/README.md b/dotbot/examples/work_and_charge/README.md
new file mode 100644
index 0000000..292611c
--- /dev/null
+++ b/dotbot/examples/work_and_charge/README.md
@@ -0,0 +1,35 @@
+# Work and Charge
+
+This demo shows a work-and-charge scenario in the DotBot simulator, where agents alternate moving between two regions to perform some work and return to charge.
+
+## Install Python packages (pip)
+
+Install the Python packages required to run this demo.
+
+```bash
+pip install pyyaml
+```
+
+## How to run
+
+### Specify the initial state
+
+Specify the initial state of the DotBots by replacing the file path for ```simulator_init_state_path``` in [config_sample.toml](https://github.com/DotBots/PyDotBot/blob/main/config_sample.toml).
+
+```toml
+simulator_init_state_path = "dotbot/examples/work_and_charge/init_state.toml"
+```
+
+### Start the controller in simulator mode
+
+```bash
+python -m dotbot.controller_app --config-path config_sample.toml -p dotbot-simulator -a dotbot-simulator --log-level error
+```
+
+### Run the work-and-charge scenario
+
+Open a new terminal and run the work-and-charge scenario.
+
+```bash
+python -m dotbot.examples.work_and_charge.work_and_charge
+```
diff --git a/dotbot/examples/work_and_charge/controller.py b/dotbot/examples/work_and_charge/controller.py
new file mode 100644
index 0000000..8be9819
--- /dev/null
+++ b/dotbot/examples/work_and_charge/controller.py
@@ -0,0 +1,136 @@
+import math
+from dotbot.models import (
+ DotBotLH2Position,
+)
+from dotbot.examples.sct import SCT
+
+class Controller:
+ def __init__(self, address: str, path: str):
+ self.address = address
+
+ # SCT initialization
+ self.sct = SCT(path)
+ self.add_callbacks()
+
+ self.waypoint_current = None
+ self.waypoint_threshold = 50 # default threshold
+
+ self.led = (0, 0, 0) # initial LED color
+ self.energy = 'high' # initial energy level
+
+
+ def set_work_waypoint(self, waypoint: DotBotLH2Position):
+ self.waypoint_work = waypoint
+
+
+ def set_charge_waypoint(self, waypoint: DotBotLH2Position):
+ self.waypoint_charge = waypoint
+
+
+ def set_current_position(self, position: DotBotLH2Position):
+ self.position_current = position
+
+
+ def control_step(self):
+
+ # Calculate distance to work waypoint
+ dx = self.waypoint_work.x - self.position_current.x
+ dy = self.waypoint_work.y - self.position_current.y
+ self.dist_work = math.sqrt(dx * dx + dy * dy)
+
+ # Calculate distance to charge waypoint
+ dx = self.waypoint_charge.x - self.position_current.x
+ dy = self.waypoint_charge.y - self.position_current.y
+ self.dist_charge = math.sqrt(dx * dx + dy * dy)
+
+ # Run SCT control step
+ self.sct.run_step()
+
+
+ # Register callback functions to the generator player
+ def add_callbacks(self):
+
+ # Automatic addition of callbacks
+ # 1. Get list of events and list specifying whether an event is controllable or not.
+ # 2. For each event, check controllable or not and add callback.
+
+ events, controllability_list = self.sct.get_events()
+
+ for event, index in events.items():
+ is_controllable = controllability_list[index]
+ stripped_name = event.split('EV_', 1)[1] # Strip preceding string 'EV_'
+
+ if is_controllable: # Add controllable event
+ func_name = '_callback_{0}'.format(stripped_name)
+ func = getattr(self, func_name)
+ self.sct.add_callback(event, func, None, None)
+ else: # Add uncontrollable event
+ func_name = '_check_{0}'.format(stripped_name)
+ func = getattr(self, func_name)
+ self.sct.add_callback(event, None, func, None)
+
+
+ # Callback functions (controllable events)
+ def _callback_moveToWork(self, data: any):
+ # print(f'DotBot {self.address}. ACTION: moveToWork')
+ self.waypoint_current = self.waypoint_work
+ self.led = (0, 255, 0) # Green LED when moving to work
+
+
+ def _callback_moveToCharge(self, data: any):
+ # print(f'DotBot {self.address}. ACTION: moveToCharge')
+ self.waypoint_current = self.waypoint_charge
+ self.led = (255, 0, 0) # Red LED when moving to charge
+
+
+ def _callback_work(self, data: any):
+ # print(f'DotBot {self.address}. ACTION: work')
+ self.energy = 'low' # After working, energy level goes low
+
+
+ def _callback_charge(self, data: any):
+ # print(f'DotBot {self.address}. ACTION: charge')
+ self.energy = 'high' # After charging, energy level goes high
+
+
+ # Callback functions (uncontrollable events)
+ def _check_atWork(self, data: any):
+ if self.dist_work < self.waypoint_threshold:
+ # print(f'DotBot {self.address}. EVENT: atWork')
+ return True
+ return False
+
+
+ def _check_notAtWork(self, data: any):
+ if self.dist_work >= self.waypoint_threshold:
+ # print(f'DotBot {self.address}. EVENT: notAtWork')
+ return True
+ return False
+
+
+ def _check_atCharger(self, data: any):
+ if self.dist_charge < self.waypoint_threshold:
+ # print(f'DotBot {self.address}. EVENT: atCharger')
+ return True
+ return False
+
+
+ def _check_notAtCharger(self, data: any):
+ if self.dist_charge >= self.waypoint_threshold:
+ # print(f'DotBot {self.address}. EVENT: notAtCharger')
+ return True
+ return False
+
+
+ def _check_lowEnergy(self, data: any):
+ if self.energy == 'low':
+ # print(f'DotBot {self.address}. EVENT: lowEnergy')
+ return True
+ return False
+
+
+ def _check_highEnergy(self, data: any):
+ if self.energy == 'high':
+ # print(f'DotBot {self.address}. EVENT: highEnergy')
+ return True
+ return False
\ No newline at end of file
diff --git a/dotbot/examples/work_and_charge/gen_init_pose.py b/dotbot/examples/work_and_charge/gen_init_pose.py
new file mode 100644
index 0000000..2dbe876
--- /dev/null
+++ b/dotbot/examples/work_and_charge/gen_init_pose.py
@@ -0,0 +1,48 @@
+from pathlib import Path
+
+def generate_dotbot_script():
+ # Configuration Constants
+ NUM_ROBOTS = 8 # Total robots to generate
+ START_ID = 1 # Start at AAAAAAAA00000001
+ X_RIGHT = 800
+ X_LEFT = 100
+ START_Y = 200
+ Y_STEP = 200
+ THETA = 3.14
+ FILENAME = "dotbots_config.toml"
+
+ lines = []
+
+ for i in range(NUM_ROBOTS):
+ # 1. Address: Increments by 1 every robot
+ address_hex = f"AAAAAAAA{START_ID + i:08X}"
+
+ # 2. X Position: Alternates 800, 100, 800, 100...
+ pos_x_val = X_RIGHT if i % 2 == 0 else X_LEFT
+
+ # 3. Y Position: Increases by 200 for EVERY robot
+ pos_y_val = START_Y + (i * Y_STEP)
+
+ # 4. Format numbers with underscores (e.g., 800_000)
+ pos_x = f"{pos_x_val:,}".replace(",", "_")
+ pos_y = f"{pos_y_val:,}".replace(",", "_")
+
+ # Build the TOML block
+ block = (
+ f"[[dotbots]]\n"
+ f"address = \"{address_hex}\"\n"
+ f"pos_x = {pos_x}\n"
+ f"pos_y = {pos_y}\n"
+ f"theta = {THETA}\n"
+ )
+ lines.append(block)
+
+ # Save to file
+ output_path = Path(__file__).resolve().parent / "init_state.toml"
+ with open(output_path, "w") as f:
+ f.write("\n".join(lines))
+
+ print(f"Generated TOML file at {output_path}")
+
+if __name__ == "__main__":
+ generate_dotbot_script()
\ No newline at end of file
diff --git a/dotbot/examples/work_and_charge/init_state.toml b/dotbot/examples/work_and_charge/init_state.toml
new file mode 100644
index 0000000..75622c9
--- /dev/null
+++ b/dotbot/examples/work_and_charge/init_state.toml
@@ -0,0 +1,47 @@
+[[dotbots]]
+address = "AAAAAAAA00000001"
+pos_x = 800
+pos_y = 200
+theta = 3.14
+
+[[dotbots]]
+address = "AAAAAAAA00000002"
+pos_x = 100
+pos_y = 400
+theta = 3.14
+
+[[dotbots]]
+address = "AAAAAAAA00000003"
+pos_x = 800
+pos_y = 600
+theta = 3.14
+
+[[dotbots]]
+address = "AAAAAAAA00000004"
+pos_x = 100
+pos_y = 800
+theta = 3.14
+
+[[dotbots]]
+address = "AAAAAAAA00000005"
+pos_x = 800
+pos_y = 1_000
+theta = 3.14
+
+[[dotbots]]
+address = "AAAAAAAA00000006"
+pos_x = 100
+pos_y = 1_200
+theta = 3.14
+
+[[dotbots]]
+address = "AAAAAAAA00000007"
+pos_x = 800
+pos_y = 1_400
+theta = 3.14
+
+[[dotbots]]
+address = "AAAAAAAA00000008"
+pos_x = 100
+pos_y = 1_600
+theta = 3.14
diff --git a/dotbot/examples/work_and_charge/models/E1.xml b/dotbot/examples/work_and_charge/models/E1.xml
new file mode 100644
index 0000000..6e53df5
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/E1.xml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/E2.xml b/dotbot/examples/work_and_charge/models/E2.xml
new file mode 100644
index 0000000..493542d
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/E2.xml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/E3.xml b/dotbot/examples/work_and_charge/models/E3.xml
new file mode 100644
index 0000000..c6edfc0
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/E3.xml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/E4.xml b/dotbot/examples/work_and_charge/models/E4.xml
new file mode 100644
index 0000000..64df924
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/E4.xml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/G1.xml b/dotbot/examples/work_and_charge/models/G1.xml
new file mode 100644
index 0000000..0a1337f
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/G1.xml
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/G2.xml b/dotbot/examples/work_and_charge/models/G2.xml
new file mode 100644
index 0000000..2d7a2fa
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/G2.xml
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/G3.xml b/dotbot/examples/work_and_charge/models/G3.xml
new file mode 100644
index 0000000..53b1111
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/G3.xml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/Sloc1.xml b/dotbot/examples/work_and_charge/models/Sloc1.xml
new file mode 100644
index 0000000..325fd60
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/Sloc1.xml
@@ -0,0 +1,42 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/Sloc2.xml b/dotbot/examples/work_and_charge/models/Sloc2.xml
new file mode 100644
index 0000000..a833722
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/Sloc2.xml
@@ -0,0 +1,42 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/Sloc3.xml b/dotbot/examples/work_and_charge/models/Sloc3.xml
new file mode 100644
index 0000000..c7aa189
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/Sloc3.xml
@@ -0,0 +1,60 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/Sloc4.xml b/dotbot/examples/work_and_charge/models/Sloc4.xml
new file mode 100644
index 0000000..4b08ce8
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/Sloc4.xml
@@ -0,0 +1,60 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dotbot/examples/work_and_charge/models/script.txt b/dotbot/examples/work_and_charge/models/script.txt
new file mode 100644
index 0000000..57d4413
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/script.txt
@@ -0,0 +1,14 @@
+Gloc1 = Sync(G1,G3)
+Gloc2 = Sync(G1,G3)
+Gloc3 = Sync(G1,G2)
+Gloc4 = Sync(G1,G2)
+
+Kloc1 = Sync(Gloc1,E1)
+Kloc2 = Sync(Gloc2,E2)
+Kloc3 = Sync(Gloc3,E3)
+Kloc4 = Sync(Gloc4,E4)
+
+Sloc1 = SupC(Gloc1,Kloc1)
+Sloc2 = SupC(Gloc2,Kloc2)
+Sloc3 = SupC(Gloc3,Kloc3)
+Sloc4 = SupC(Gloc4,Kloc4)
\ No newline at end of file
diff --git a/dotbot/examples/work_and_charge/models/supervisor.yaml b/dotbot/examples/work_and_charge/models/supervisor.yaml
new file mode 100644
index 0000000..1299fef
--- /dev/null
+++ b/dotbot/examples/work_and_charge/models/supervisor.yaml
@@ -0,0 +1,10 @@
+
+num_events: 10
+num_supervisors: 4
+events: [ EV_lowEnergy, EV_highEnergy, EV_atWork, EV_atCharger, EV_charge, EV_notAtWork, EV_work, EV_moveToCharge, EV_moveToWork, EV_notAtCharger ]
+ev_controllable: [ 0, 0, 0, 0, 1, 0, 1, 1, 1, 0 ]
+sup_events: [ [1, 1, 0, 0, 1, 0, 1, 1, 1, 0], [1, 1, 0, 0, 1, 0, 1, 1, 1, 0], [0, 0, 1, 1, 1, 1, 1, 1, 1, 1], [0, 0, 1, 1, 1, 1, 1, 1, 1, 1] ]
+sup_init_state: [ 5, 6, 3, 7 ]
+sup_current_state: [ 5, 6, 3, 7 ]
+sup_data_pos: [ 0, 77, 154, 279 ]
+sup_data: [ 3, EV_highEnergy, 0, 0, EV_lowEnergy, 0, 3, EV_charge, 0, 1, 3, EV_moveToWork, 0, 6, EV_highEnergy, 0, 1, EV_lowEnergy, 0, 5, 3, EV_moveToCharge, 0, 3, EV_lowEnergy, 0, 2, EV_highEnergy, 0, 7, 3, EV_charge, 0, 5, EV_lowEnergy, 0, 3, EV_highEnergy, 0, 0, 3, EV_lowEnergy, 0, 4, EV_work, 0, 2, EV_highEnergy, 0, 6, 2, EV_highEnergy, 0, 1, EV_lowEnergy, 0, 5, 3, EV_highEnergy, 0, 6, EV_work, 0, 7, EV_lowEnergy, 0, 4, 3, EV_lowEnergy, 0, 2, EV_highEnergy, 0, 7, EV_moveToCharge, 0, 0, 3, EV_moveToWork, 0, 4, EV_lowEnergy, 0, 0, EV_highEnergy, 0, 6, 3, EV_charge, 0, 6, EV_highEnergy, 0, 1, EV_lowEnergy, 0, 3, 2, EV_lowEnergy, 0, 5, EV_highEnergy, 0, 2, 3, EV_lowEnergy, 0, 3, EV_highEnergy, 0, 1, EV_charge, 0, 0, 3, EV_highEnergy, 0, 7, EV_work, 0, 5, EV_lowEnergy, 0, 4, 3, EV_highEnergy, 0, 2, EV_lowEnergy, 0, 5, EV_moveToCharge, 0, 3, 3, EV_moveToWork, 0, 7, EV_highEnergy, 0, 6, EV_lowEnergy, 0, 0, 3, EV_lowEnergy, 0, 4, EV_work, 0, 2, EV_highEnergy, 0, 7, 5, EV_notAtWork, 0, 7, EV_atCharger, 0, 0, EV_atWork, 0, 0, EV_charge, 0, 6, EV_notAtCharger, 0, 0, 5, EV_atCharger, 0, 1, EV_atWork, 0, 1, EV_notAtWork, 0, 2, EV_moveToCharge, 0, 0, EV_notAtCharger, 0, 1, 5, EV_notAtWork, 0, 2, EV_atWork, 0, 1, EV_atCharger, 0, 2, EV_moveToCharge, 0, 7, EV_notAtCharger, 0, 2, 5, EV_notAtCharger, 0, 3, EV_notAtWork, 0, 3, EV_moveToWork, 0, 5, EV_atWork, 0, 6, EV_atCharger, 0, 3, 5, EV_notAtWork, 0, 5, EV_atCharger, 0, 4, EV_atWork, 0, 4, EV_notAtCharger, 0, 4, EV_work, 0, 1, 4, EV_atCharger, 0, 5, EV_atWork, 0, 4, EV_notAtWork, 0, 5, EV_notAtCharger, 0, 5, 5, EV_atWork, 0, 6, EV_atCharger, 0, 6, EV_notAtCharger, 0, 6, EV_notAtWork, 0, 3, EV_moveToWork, 0, 4, 5, EV_notAtWork, 0, 7, EV_notAtCharger, 0, 7, EV_atCharger, 0, 7, EV_atWork, 0, 0, EV_charge, 0, 3, 4, EV_notAtWork, 0, 0, EV_notAtCharger, 0, 0, EV_atWork, 0, 0, EV_atCharger, 0, 5, 5, EV_atCharger, 0, 4, EV_atWork, 0, 1, EV_notAtCharger, 0, 1, EV_moveToCharge, 0, 0, EV_notAtWork, 0, 1, 5, EV_notAtCharger, 0, 2, EV_work, 0, 1, EV_atCharger, 0, 3, EV_atWork, 0, 2, EV_notAtWork, 0, 2, 5, EV_notAtWork, 0, 3, EV_atCharger, 0, 3, EV_work, 0, 4, EV_atWork, 0, 3, EV_notAtCharger, 0, 2, 5, EV_atWork, 0, 4, EV_atCharger, 0, 4, EV_moveToCharge, 0, 5, EV_notAtCharger, 0, 1, EV_notAtWork, 0, 4, 5, EV_atWork, 0, 5, EV_notAtCharger, 0, 0, EV_notAtWork, 0, 5, EV_atCharger, 0, 5, EV_charge, 0, 6, 5, EV_notAtWork, 0, 6, EV_moveToWork, 0, 3, EV_atCharger, 0, 6, EV_atWork, 0, 6, EV_notAtCharger, 0, 7, 5, EV_notAtCharger, 0, 7, EV_moveToWork, 0, 2, EV_notAtWork, 0, 7, EV_atWork, 0, 7, EV_atCharger, 0, 6 ]
\ No newline at end of file
diff --git a/dotbot/examples/work_and_charge/work_and_charge.py b/dotbot/examples/work_and_charge/work_and_charge.py
new file mode 100644
index 0000000..ba7af8e
--- /dev/null
+++ b/dotbot/examples/work_and_charge/work_and_charge.py
@@ -0,0 +1,326 @@
+import asyncio
+import math
+import os
+import time
+from typing import Dict, List
+
+from dotbot.examples.orca import (
+ Agent,
+ OrcaParams,
+ compute_orca_velocity_for_agent,
+)
+from dotbot.examples.vec2 import Vec2
+from dotbot.models import (
+ DotBotLH2Position,
+ DotBotModel,
+ DotBotMoveRawCommandModel,
+ DotBotQueryModel,
+ DotBotRgbLedCommandModel,
+ DotBotStatus,
+ DotBotWaypoints,
+ WSRgbLed,
+ WSWaypoints,
+)
+from dotbot.protocol import ApplicationType
+from dotbot.rest import RestClient, rest_client
+from dotbot.websocket import DotBotWsClient
+
+from dotbot.examples.sct import SCT
+from dotbot.examples.work_and_charge.controller import Controller
+
+import numpy as np
+from scipy.spatial import cKDTree
+
+ORCA_RANGE = 200
+
+THRESHOLD = 50 # Acceptable distance error to consider a waypoint reached
+DT = 0.05 # Control loop period (seconds)
+
+# TODO: Measure these values for real dotbots
+BOT_RADIUS = 40 # Physical radius of a DotBot (unit), used for collision avoidance
+MAX_SPEED = 300 # Maximum allowed linear speed of a bot (mm/s)
+
+(QUEUE_HEAD_X, QUEUE_HEAD_Y) = (
+ 200,
+ 200,
+) # World-frame (X, Y) position of the charging queue head
+QUEUE_SPACING = (
+ 200 # Spacing between consecutive bots in the charging queue (along X axis)
+)
+
+CHARGE_REGION_X = QUEUE_HEAD_X
+WORK_REGION_X = 1800
+
+dotbot_controllers = dict()
+
+
+async def fetch_active_dotbots(client: RestClient) -> List[DotBotModel]:
+ return await client.fetch_dotbots(
+ query=DotBotQueryModel(status=DotBotStatus.ACTIVE)
+ )
+
+
+def order_bots(
+ dotbots: List[DotBotModel], base_x: int, base_y: int
+) -> List[DotBotModel]:
+ def key(bot: DotBotModel):
+ dx = bot.lh2_position.x - base_x
+ dy = bot.lh2_position.y - base_y
+ return (dx * dx + dy * dy, bot.address)
+
+ return sorted(dotbots, key=key)
+
+
+def assign_goals(
+ ordered: List[DotBotModel],
+ head_x: int,
+ head_y: int,
+ spacing: int,
+) -> Dict[str, dict]:
+ goals = {}
+ for i, bot in enumerate(ordered):
+
+ # TODO: depending on the robot's current state (moving to base or work regions), assign a different goal
+
+ goals[bot.address] = {
+ "x": head_x,
+ "y": head_y + i * spacing,
+ }
+ return goals
+
+
+def preferred_vel(dotbot: DotBotModel, goal: Vec2 | None) -> Vec2:
+ if goal is None:
+ return Vec2(x=0, y=0)
+
+ dx = goal["x"] - dotbot.lh2_position.x
+ dy = goal["y"] - dotbot.lh2_position.y
+ dist = math.sqrt(dx * dx + dy * dy)
+
+ dist1000 = dist * 1000
+ # If close to goal, stop
+ if dist1000 < THRESHOLD:
+ return Vec2(x=0, y=0)
+
+ # Right-hand rule bias
+ bias_angle = 0.0
+ # Bot can only walk on a cone [-60, 60] in front of himself
+ max_deviation = math.radians(60)
+
+ # Convert bot direction into radians
+ direction = direction_to_rad(dotbot.direction)
+
+ # Angle to goal
+ angle_to_goal = math.atan2(dy, dx) + bias_angle
+
+ delta = angle_to_goal - direction
+ # Wrap to [-π, +π]
+ delta = math.atan2(math.sin(delta), math.cos(delta))
+
+ # Clamp delta to [-MAX, +MAX]
+ if delta > max_deviation:
+ delta = max_deviation
+ if delta < -max_deviation:
+ delta = -max_deviation
+
+ # Final allowed direction
+ final_angle = direction + delta
+ result = Vec2(
+ x=math.cos(final_angle) * MAX_SPEED, y=math.sin(final_angle) * MAX_SPEED
+ )
+ return result
+
+
+def direction_to_rad(direction: float) -> float:
+ rad = (direction + 90) * math.pi / 180.0
+ return math.atan2(math.sin(rad), math.cos(rad)) # normalize to [-π, π]
+
+
+async def compute_orca_velocity(
+ agent: Agent,
+ neighbors: List[Agent],
+ params: OrcaParams,
+) -> Vec2:
+ return compute_orca_velocity_for_agent(agent, neighbors, params)
+
+
+async def main() -> None:
+ params = OrcaParams(time_horizon=5 * DT, time_step=DT)
+ url = os.getenv("DOTBOT_CONTROLLER_URL", "localhost")
+ port = os.getenv("DOTBOT_CONTROLLER_PORT", "8000")
+ use_https = os.getenv("DOTBOT_CONTROLLER_USE_HTTPS", False)
+ sct_path = os.getenv("DOTBOT_SCT_PATH", "dotbot/examples/work_and_charge/models/supervisor.yaml")
+ async with rest_client(url, port, use_https) as client:
+ dotbots = await fetch_active_dotbots(client)
+
+ # Initialization
+ for dotbot in dotbots:
+
+ # Init controller
+ controller = Controller(dotbot.address, sct_path)
+ dotbot_controllers[dotbot.address] = controller
+
+ # Cosmetic: all bots are red
+ await client.send_rgb_led_command(
+ address=dotbot.address,
+ command=DotBotRgbLedCommandModel(red=255, green=0, blue=0),
+ )
+
+ # Set work and charge goals for each robot
+ # sorted_bots = order_bots(dotbots, QUEUE_HEAD_X, QUEUE_HEAD_Y)
+ sorted_bots = sorted(dotbots, key=lambda bot: bot.address)
+ base_goals = assign_goals(sorted_bots, QUEUE_HEAD_X, QUEUE_HEAD_Y, QUEUE_SPACING)
+ work_goals = assign_goals(sorted_bots, WORK_REGION_X, QUEUE_HEAD_Y, QUEUE_SPACING)
+
+ for address, controller in dotbot_controllers.items():
+ goal = base_goals[address]
+ waypoint_charge = DotBotLH2Position(x=goal['x'], y=goal['y'], z=0)
+ controller.set_charge_waypoint(waypoint_charge)
+
+ goal = work_goals[address]
+ waypoint_work = DotBotLH2Position(x=goal['x'], y=goal['y'], z=0)
+ controller.set_work_waypoint(waypoint_work)
+
+ while True:
+ try:
+ ws = DotBotWsClient(url, port)
+ await ws.connect()
+
+ while True:
+ dotbots = await fetch_active_dotbots(client)
+
+ goals = dict()
+ agents: Dict[str, Agent] = {}
+
+ for bot in dotbots:
+
+ # print(f'waypoint_current for DotBot {bot.address}: {dotbot_controllers.get(bot.address).waypoint_current}')
+
+ # Get current goals
+ if dotbot_controllers.get(bot.address).waypoint_current is not None:
+ goals[bot.address] = {
+ "x": dotbot_controllers[bot.address].waypoint_current.x,
+ "y": dotbot_controllers[bot.address].waypoint_current.y,
+ }
+
+ agents[bot.address] = Agent(
+ id=bot.address,
+ position=Vec2(x=bot.lh2_position.x, y=bot.lh2_position.y),
+ velocity=Vec2(x=0, y=0),
+ radius=BOT_RADIUS,
+ max_speed=MAX_SPEED,
+ preferred_velocity=preferred_vel(
+ dotbot=bot, goal=goals.get(bot.address)
+ ),
+ )
+
+ # Prepare coordinates for all agents
+ # Extract [x, y] for every agent in the same order
+ agent_list = list(agents.values())
+ positions = np.array([[a.position.x, a.position.y] for a in agent_list])
+
+ # Build the KD-Tree
+ tree = cKDTree(positions)
+
+ # Run controller for each robot
+ for dotbot in dotbots:
+ agent = agents[dotbot.address]
+ pos = dotbot.lh2_position
+ # print(f"DotBot {dotbot.address}: Position ({pos.x:.2f}, {pos.y:.2f}), Direction {dotbot.direction:.2f}°")
+
+ # Run controller
+ controller = dotbot_controllers[dotbot.address]
+ controller.set_current_position(pos) # update position
+ controller.control_step() # run SCT step
+
+ # Get current goal
+ goal = controller.waypoint_current
+ goals[dotbot.address] = {
+ "x": goal.x,
+ "y": goal.y,
+ }
+
+ ### Send goal
+
+ # Without KD-Tree
+ # local_neighbors = [neighbor for neighbor in agents.values() if neighbor.id != agent.id]
+
+ # Using KD-Tree: Prepare neighbor list using KD-Tree
+ neighbor_indices = tree.query_ball_point([agent.position.x, agent.position.y], r=ORCA_RANGE)
+ local_neighbors = [agent_list[idx] for idx in neighbor_indices if agent_list[idx].id != agent.id]
+
+ if not local_neighbors:
+ orca_vel = agent.preferred_velocity
+ else:
+ orca_vel = await compute_orca_velocity(
+ agent, neighbors=local_neighbors, params=params
+ )
+ step = Vec2(x=orca_vel.x, y=orca_vel.y)
+
+ # print(f'goal for DotBot {dotbot.address}: ({goal.x:.2f}, {goal.y:.2f}), step: ({step.x:.4f}, {step.y:.4f})')
+
+ # ---- CLAMP STEP TO GOAL DISTANCE ----
+ goal = goals.get(agent.id)
+ if goal is not None:
+ dx = goal["x"] - agent.position.x
+ dy = goal["y"] - agent.position.y
+ dist_to_goal = math.hypot(dx, dy)
+
+ step_len = math.hypot(step.x, step.y)
+ if step_len > dist_to_goal and step_len > 0:
+ scale = dist_to_goal / step_len
+ step = Vec2(x=step.x * scale, y=step.y * scale)
+ # ------------------------------------
+
+ ### TEMPORARY: The simulator does not accept negative coordinates,
+ # so we clamp the step to keep the next position non-negative.
+ if agent.position.x + step.x < 0:
+ step.y = step.y * (abs(step.x) / MAX_SPEED)
+ step.x = -agent.position.x
+ if agent.position.y + step.y < 0:
+ step.x = step.x * (abs(step.y) / MAX_SPEED)
+ step.y = -agent.position.y
+ ###
+
+ waypoints = DotBotWaypoints(
+ threshold=THRESHOLD,
+ waypoints=[
+ DotBotLH2Position(
+ x=agent.position.x + step.x, y=agent.position.y + step.y, z=0
+ )
+ ],
+ )
+ await ws.send(
+ WSWaypoints(
+ cmd="waypoints",
+ address=agent.id,
+ application=ApplicationType.DotBot,
+ data=waypoints,
+ )
+ )
+ await ws.send(
+ WSRgbLed(
+ cmd="rgb_led",
+ address=agent.id,
+ application=ApplicationType.DotBot,
+ data=DotBotRgbLedCommandModel(
+ red=controller.led[0],
+ green=controller.led[1],
+ blue=controller.led[2],
+ ),
+ )
+ )
+
+ await asyncio.sleep(DT)
+ except Exception as e:
+ print(f"Connection lost: {e}")
+ print("Retrying in 1 seconds...")
+ await asyncio.sleep(1) # Wait before trying to reconnect
+ finally:
+ await ws.close()
+
+ return None
+
+
+if __name__ == "__main__":
+ asyncio.run(main())