Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion utama_core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
### REAL CONTROLLER SETTINGS ###
BAUD_RATE = 115200
PORT = "/dev/ttyUSB0"
TIMEOUT = 0.1
KICKER_COOLDOWN_TIME = 10 # in seconds to prevent kicker from being actuated too frequently
KICKER_COOLDOWN_TIMESTEPS = int(KICKER_COOLDOWN_TIME * CONTROL_FREQUENCY) # in timesteps
KICKER_PERSIST_TIMESTEPS = 10 # in timesteps to persist the kick command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,11 @@ def _add_robot_command(self, command: RobotCommand, robot_id: int) -> None:

@abc.abstractmethod
def get_robots_responses(self) -> Optional[List[RobotResponse]]:
"""Returns the robot response from the last sent commands."""
return self._robots_info.popleft() if self._robots_info else None
"""
Pull the latest robot response.

Returns:
A list of RobotResponse objects (each containing the robot ID
and its ball-contact state), or None if no response was received.
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
KICKER_COOLDOWN_TIMESTEPS,
KICKER_PERSIST_TIMESTEPS,
PORT,
TIMEOUT,
TIMESTEP,
)
from utama_core.entities.data.command import RobotCommand, RobotResponse
Expand All @@ -24,7 +23,6 @@

logger = logging.getLogger(__name__)

# NB: A major assumption is that the robot IDs are 0-5 for the friendly team.
MAX_VEL = REAL_PARAMS.MAX_VEL
MAX_ANGULAR_VEL = REAL_PARAMS.MAX_ANGULAR_VEL

Expand All @@ -44,23 +42,59 @@ class RealRobotController(AbstractRobotController):
n_robots (int): The number of robots in the team. Directly affects output buffer size. Default is 6.
"""

HEADER = 0xAA
TAIL = 0x55

def __init__(self, is_team_yellow: bool, n_friendly: int):
super().__init__(is_team_yellow, n_friendly)
self._serial_port = self._init_serial()
self._rbt_cmd_size = 10 # packet size for one robot
self._rx_buffer = bytearray()
self._rbt_cmd_size = 10 # packet size (bytes) for one robot
self._robot_info_size = 2 # number of bytes in feedback packet for all robots
self._out_packet = self._empty_command()
self._in_packet_size = 1 # size of the feedback packet received from the robots
self._robots_info: List[RobotResponse] = [None] * self._n_friendly
logger.debug(f"Serial port: {PORT} opened with baudrate: {BAUD_RATE} and timeout {TIMEOUT}")
logger.debug(f"Serial port: {PORT} opened with baudrate: {BAUD_RATE}.")
self._assigned_mapping = {} # mapping of robot_id to index in the out_packet

# track last kick time for each robot to transmit kick as HIGH for n timesteps after command
self._kicker_tracker: Dict[int, KickTrackerEntry] = {}

def get_robots_responses(self) -> Optional[List[RobotResponse]]:
### TODO: Not implemented yet
def get_robots_responses(self) -> Optional[list[RobotResponse]]:
# For now, we are not processing feedback from the robots, but this method can be implemented similarly to the GRSimRobotController if needed in the future.
self._serial_port.read_all() # Clear the input buffer to avoid overflow
return None

# TODO: implement when feedback is actually working
def get_robots_responses_real(self) -> Optional[list[RobotResponse]]:
FRAME_SIZE = 1 + self._robot_info_size + 1

# --- Step 1: Non-blocking read ---
waiting = self._serial_port.in_waiting
if waiting:
self._rx_buffer.extend(self._serial_port.read(waiting))

latest_payload = None

# --- Step 2: Forward scan for complete frames ---
while len(self._rx_buffer) >= FRAME_SIZE:
if self._rx_buffer[0] != self.HEADER:
next_header = self._rx_buffer.find(bytes([self.HEADER]))
if next_header == -1:
self._rx_buffer.clear()
break
self._rx_buffer = self._rx_buffer[next_header:]
continue

if self._rx_buffer[FRAME_SIZE - 1] == self.TAIL:
latest_payload = self._rx_buffer[1 : FRAME_SIZE - 1]
self._rx_buffer = self._rx_buffer[FRAME_SIZE:]
else:
# Partial frame — keep it in buffer until next read
break

latest_payload
return None # TODO: parse latest_payload into list of RobotResponse and return.

def send_robot_commands(self) -> None:
"""Sends the robot commands to the appropriate team (yellow or blue)."""
# print(list(self.out_packet))
Expand All @@ -71,10 +105,6 @@ def send_robot_commands(self) -> None:
f"Only {len(self._assigned_mapping)} out of {self._n_friendly} robots have been assigned commands. Sending incomplete command packet."
)
self._serial_port.write(self._out_packet)
self._serial_port.read_all()
# data_in = self._serial.read_all()
# print(data_in)
# TODO: add receiving feedback from the robots

### update kick and chip trackers. We persist the kick/chip command for KICKER_PERSIST_TIMESTEPS
### this feature is to combat packet loss and to ensure the robot does not kick within its cooldown period
Expand Down Expand Up @@ -145,18 +175,6 @@ def _add_robot_command(self, command: RobotCommand, robot_id: int) -> None:
is_kick=False,
)

# def _populate_robots_info(self, data_in: bytes) -> None:
# """
# Populates the robots_info list with the data received from the robots.
# """
# for i in range(self._n_friendly):
# has_ball = False
# if data_in[0] & 0b10000000:
# has_ball = True
# info = RobotInfo(has_ball=has_ball)
# self._robots_info[i] = info
# data_in = data_in << 1 # shift to the next robot's data

def _generate_command_buffer(self, robot_id: int, c_command: RobotCommand) -> bytes:
"""Generates the command buffer to be sent to the robot."""
assert robot_id < 6, "Invalid robot_id. Must be between 0 and 5."
Expand Down Expand Up @@ -272,7 +290,7 @@ def _empty_command(self) -> bytearray:
# * first byte is robot ID (here INVALID_RBT_ID)
# * remaining (self._rbt_cmd_size - 1) bytes are zeros
# - 0x55: end byte
cmd = bytearray([0xAA] + [INVALID_RBT_ID] + [0] * (self._rbt_cmd_size - 1) + [0x55])
cmd = bytearray([self.HEADER] + [INVALID_RBT_ID] + [0] * (self._rbt_cmd_size - 1) + [self.TAIL])
commands.extend(cmd)
self._cached_empty_command = commands
return self._cached_empty_command.copy()
Expand All @@ -287,7 +305,7 @@ def _init_serial(self) -> Serial:
bytesize=EIGHTBITS, # 8 data bits
parity=PARITY_EVEN, # Even parity (makes it 9 bits total)
stopbits=STOPBITS_TWO, # 2 stop bits
timeout=0.1,
timeout=0,
)
return serial_port
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def _update_robot_info(self, data: bytes) -> None:
# ignore robot_info for robots that are not expected (ie deactivated since the start of the game)
self._robots_info.append(robots_info)

def get_robots_responses(self) -> Optional[RobotResponse]:
def get_robots_responses(self) -> Optional[list[RobotResponse]]:
if self._robots_info is None or len(self._robots_info) == 0:
for i in range(self._n_friendly):
self.add_robot_commands(RobotCommand(0, 0, 0, False, False, False), i)
Expand Down