Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dotbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
NETWORK_ID_DEFAULT = "0000"
CONTROLLER_HTTP_PROTOCOL_DEFAULT = "http"
CONTROLLER_HTTP_HOSTNAME_DEFAULT = "localhost"
CONTROLLER_HTTP_PORT_DEFAULT = "8000"
CONTROLLER_HTTP_PORT_DEFAULT = 8000
CONTROLLER_ADAPTER_DEFAULT = "serial"
MQTT_HOST_DEFAULT = "localhost"
MQTT_PORT_DEFAULT = 1883
Expand Down
274 changes: 1 addition & 273 deletions dotbot/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
from dotbot_utils.protocol import Frame, Payload
from dotbot_utils.serial_interface import SerialInterfaceException
from fastapi import WebSocket
from pydantic import ValidationError
from pydantic.tools import parse_obj_as
from qrkey import QrkeyController, SubscriptionModel, qrkey_settings

from dotbot import (
CONTROLLER_ADAPTER_DEFAULT,
Expand Down Expand Up @@ -56,29 +53,15 @@
DotBotLH2Position,
DotBotMapSizeModel,
DotBotModel,
DotBotMoveRawCommandModel,
DotBotNotificationCommand,
DotBotNotificationModel,
DotBotNotificationUpdate,
DotBotQueryModel,
DotBotReplyModel,
DotBotRequestModel,
DotBotRequestType,
DotBotRgbLedCommandModel,
DotBotStatus,
DotBotWaypoints,
DotBotXGOActionCommandModel,
)
from dotbot.protocol import (
ApplicationType,
PayloadCommandMoveRaw,
PayloadCommandRgbLed,
PayloadCommandXgoAction,
PayloadGPSPosition,
PayloadGPSWaypoints,
PayloadLh2CalibrationHomography,
PayloadLH2Location,
PayloadLH2Waypoints,
PayloadType,
)
from dotbot.server import api
Expand All @@ -91,7 +74,6 @@
# )


CONTROLLERS = {}
INACTIVE_DELAY = 5 # seconds
LOST_DELAY = 60 # seconds
LH2_POSITION_DISTANCE_THRESHOLD = 20 # mm
Expand Down Expand Up @@ -213,243 +195,6 @@ def __init__(self, settings: ControllerSettings):
height=int(settings.map_size.split("x")[1]),
)
api.controller = self
self.qrkey = None

self.subscriptions = [
SubscriptionModel(
topic="/command/+/+/+/move_raw", callback=self.on_command_move_raw
),
SubscriptionModel(
topic="/command/+/+/+/rgb_led", callback=self.on_command_rgb_led
),
SubscriptionModel(
topic="/command/+/+/+/xgo_action", callback=self.on_command_xgo_action
),
SubscriptionModel(
topic="/command/+/+/+/waypoints", callback=self.on_command_waypoints
),
SubscriptionModel(
topic="/command/+/+/+/clear_position_history",
callback=self.on_command_clear_position_history,
),
]

def on_command_move_raw(self, topic, payload):
"""Called when a move raw command is received."""
logger = self.logger.bind(command="move_raw", topic=topic)
topic_split = topic.split("/")[2:]
if len(topic_split) != 4 or topic_split[-1] != "move_raw":
logger.warning("Invalid move_raw command topic")
return
_, address, application, _ = topic_split
try:
command = DotBotMoveRawCommandModel(**payload)
except ValidationError as exc:
self.logger.warning(f"Invalid move raw command: {exc.errors()}")
return
logger.bind(
address=address,
application=ApplicationType(int(application)).name,
**command.model_dump(),
)
if address not in self.dotbots:
logger.warning("DotBot not found")
return
payload = PayloadCommandMoveRaw(
left_x=command.left_x,
left_y=command.left_y,
right_x=command.right_x,
right_y=command.right_y,
)
logger.info(
"Sending MQTT command", address=address, command=payload.__class__.__name__
)
self.send_payload(int(address, 16), payload=payload)
self.dotbots[address].move_raw = command

def on_command_rgb_led(self, topic, payload):
"""Called when an rgb led command is received."""
logger = self.logger.bind(command="rgb_led", topic=topic)
topic_split = topic.split("/")[2:]
if len(topic_split) != 4 or topic_split[-1] != "rgb_led":
logger.warning("Invalid rgb_led command topic")
return
_, address, application, _ = topic_split
try:
command = DotBotRgbLedCommandModel(**payload)
except ValidationError as exc:
LOGGER.warning(f"Invalid rgb led command: {exc.errors()}")
return
logger = logger.bind(
address=address,
application=ApplicationType(int(application)).name,
**command.model_dump(),
)
if address not in self.dotbots:
logger.warning("DotBot not found")
return
payload = PayloadCommandRgbLed(
red=command.red, green=command.green, blue=command.blue
)
logger.info(
"Sending MQTT command", address=address, command=payload.__class__.__name__
)
self.send_payload(int(address, 16), payload=payload)
self.dotbots[address].rgb_led = command
self.qrkey.publish(
"/notify",
DotBotNotificationModel(cmd=DotBotNotificationCommand.RELOAD).model_dump(
exclude_none=True
),
)

def on_command_xgo_action(self, topic, payload):
"""Called when an rgb led command is received."""
logger = self.logger.bind(command="xgo_action", topic=topic)
topic_split = topic.split("/")[2:]
if len(topic_split) != 4 or topic_split[-1] != "xgo_action":
logger.warning("Invalid xgo_action command topic")
return
_, address, application, _ = topic_split
try:
command = DotBotXGOActionCommandModel(**payload)
except ValidationError as exc:
LOGGER.warning(f"Invalid rgb led command: {exc.errors()}")
return
logger = logger.bind(
address=address,
application=ApplicationType(int(application)).name,
**command.model_dump(),
)
if address not in self.dotbots:
logger.warning("DotBot not found")
return
payload = PayloadCommandXgoAction(action=command.action)
logger.info(
"Sending MQTT command", address=address, command=payload.__class__.__name__
)
self.send_payload(int(address, 16), payload=payload)

def on_command_waypoints(self, topic, payload):
"""Called when a list of waypoints is received."""
logger = self.logger.bind(command="waypoints", topic=topic)
topic_split = topic.split("/")[2:]
if len(topic_split) != 4 or topic_split[-1] != "waypoints":
logger.warning("Invalid waypoints command topic")
return
_, address, application, _ = topic_split
command = parse_obj_as(DotBotWaypoints, payload)
logger = logger.bind(
address=address,
application=ApplicationType(int(application)).name,
threshold=command.threshold,
length=len(command.waypoints),
)
if address not in self.dotbots:
logger.warning("DotBot not found")
return
waypoints_list = command.waypoints
if ApplicationType(int(application)) == ApplicationType.SailBot:
if self.dotbots[address].gps_position is not None:
waypoints_list = [
self.dotbots[address].gps_position
] + command.waypoints
payload = PayloadGPSWaypoints(
threshold=command.threshold,
count=len(command.waypoints),
waypoints=[
PayloadGPSPosition(
latitude=int(waypoint.latitude * 1e6),
longitude=int(waypoint.longitude * 1e6),
)
for waypoint in command.waypoints
],
)
else: # DotBot application
if self.dotbots[address].lh2_position is not None:
waypoints_list = [
self.dotbots[address].lh2_position
] + command.waypoints
payload = PayloadLH2Waypoints(
threshold=command.threshold,
count=len(command.waypoints),
waypoints=[
PayloadLH2Location(
pos_x=int(waypoint.x),
pos_y=int(waypoint.y),
pos_z=int(waypoint.z),
)
for waypoint in command.waypoints
],
)
logger.info(
"Sending MQTT command", address=address, command=payload.__class__.__name__
)
self.send_payload(int(address, 16), payload=payload)
self.dotbots[address].waypoints = waypoints_list
self.dotbots[address].waypoints_threshold = command.threshold
self.qrkey.publish(
"/notify",
DotBotNotificationModel(cmd=DotBotNotificationCommand.RELOAD).model_dump(
exclude_none=True
),
)

def on_command_clear_position_history(self, topic, _):
"""Called when a clear position history command is received."""
logger = self.logger.bind(command="clear_position_history", topic=topic)
topic_split = topic.split("/")[2:]
if len(topic_split) != 4 or topic_split[-1] != "clear_position_history":
logger.warning("Invalid clear_position_history command topic")
return
_, address, application, _ = topic_split
logger = logger.bind(
address=address,
application=ApplicationType(int(application)).name,
)
if address not in self.dotbots:
logger.warning("DotBot not found")
return
logger.info("Notify clear command", address=address)
self.dotbots[address].position_history = []
self.qrkey.publish(
"/notify",
DotBotNotificationModel(cmd=DotBotNotificationCommand.RELOAD).model_dump(
exclude_none=True
),
)

def on_request(self, payload):
logger = LOGGER.bind(topic="/request")
logger.info("Request received", **payload)
try:
request = DotBotRequestModel(**payload)
except ValidationError as exc:
logger.warning(f"Invalid request: {exc.errors()}")
return

reply_topic = f"/reply/{request.reply}"
if request.request == DotBotRequestType.DOTBOTS:
logger.info("Publish dotbots")
data = [
dotbot.model_dump(exclude_none=True)
for dotbot in self.get_dotbots(DotBotQueryModel())
]
message = DotBotReplyModel(
request=DotBotRequestType.DOTBOTS,
data=data,
).model_dump(exclude_none=True)
self.qrkey.publish(reply_topic, message)
elif request.request == DotBotRequestType.MAP_SIZE:
logger.info("Publish map size")
data = self.map_size.model_dump(exclude_none=True)
message = DotBotReplyModel(
request=DotBotRequestType.MAP_SIZE,
data=data,
).model_dump(exclude_none=True)
self.qrkey.publish(reply_topic, message)
else:
logger.warning("Unsupported request command")

async def _open_webbrowser(self):
"""Wait until the server is ready before opening a web browser."""
Expand All @@ -463,18 +208,7 @@ async def _open_webbrowser(self):
else:
writer.close()
break
url = (
f"http://localhost:{self.settings.controller_http_port}/PyDotBot?"
f"pin={self.qrkey.pin_code}&"
f"mqtt_host={qrkey_settings.mqtt_host}&"
f"mqtt_port={qrkey_settings.mqtt_ws_port}&"
f"mqtt_version={qrkey_settings.mqtt_version}&"
f"mqtt_use_ssl={qrkey_settings.mqtt_use_ssl}"
)
if qrkey_settings.mqtt_username is not None:
url += f"&mqtt_username={qrkey_settings.mqtt_username}"
if qrkey_settings.mqtt_password is not None:
url += f"&mqtt_password={qrkey_settings.mqtt_password}"
url = f"http://localhost:{self.settings.controller_http_port}/PyDotBot"
self.logger.debug("Using frontend URL", url=url)
if self.settings.webbrowser is True:
self.logger.info("Opening webbrowser", url=url)
Expand Down Expand Up @@ -712,7 +446,6 @@ async def notify_clients(self, notification):
for websocket in self.websockets
]
)
self.qrkey.publish("/notify", notification.model_dump(exclude_none=True))

def send_payload(self, destination: int, payload: Payload):
"""Sends a command in an HDLC frame over serial."""
Expand Down Expand Up @@ -841,13 +574,8 @@ async def _start_adapter(self):
async def run(self):
"""Launch the controller."""
tasks = []
self.qrkey = QrkeyController(self.on_request, LOGGER, root_topic="/pydotbot")
try:
tasks = [
asyncio.create_task(
name="QrKey controller",
coro=self.qrkey.start(subscriptions=self.subscriptions),
),
asyncio.create_task(name="Web server", coro=self.web()),
asyncio.create_task(name="Web browser", coro=self._open_webbrowser()),
asyncio.create_task(
Expand Down
2 changes: 1 addition & 1 deletion dotbot/dotbot_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def flush(self):
pass

def _packet_delivered(self):
return random.randint(0, 100) < self.network_pdr
return random.randint(0, 100) <= self.network_pdr

def handle_dotbot_frame(self, frame):
"""Send bytes to the fake serial, similar to the real gateway."""
Expand Down
Loading