diff --git a/utama_core/config/settings.py b/utama_core/config/settings.py index e2a2bce3..7ecba5b4 100644 --- a/utama_core/config/settings.py +++ b/utama_core/config/settings.py @@ -30,7 +30,6 @@ ### REAL CONTROLLER SETTINGS ### BAUD_RATE = 115200 PORT = "/dev/ttyUSB0" -TIMEOUT = 0.1 KICKER_COOLDOWN_TIME = 10 # in seconds to prevent kicker from being actuated too frequently KICKER_COOLDOWN_TIMESTEPS = int(KICKER_COOLDOWN_TIME * CONTROL_FREQUENCY) # in timesteps KICKER_PERSIST_TIMESTEPS = 10 # in timesteps to persist the kick command diff --git a/utama_core/team_controller/src/controllers/common/robot_controller_abstract.py b/utama_core/team_controller/src/controllers/common/robot_controller_abstract.py index 1d7509ba..9e9041e8 100644 --- a/utama_core/team_controller/src/controllers/common/robot_controller_abstract.py +++ b/utama_core/team_controller/src/controllers/common/robot_controller_abstract.py @@ -55,5 +55,11 @@ def _add_robot_command(self, command: RobotCommand, robot_id: int) -> None: @abc.abstractmethod def get_robots_responses(self) -> Optional[List[RobotResponse]]: - """Returns the robot response from the last sent commands.""" - return self._robots_info.popleft() if self._robots_info else None + """ + Pull the latest robot response. + + Returns: + A list of RobotResponse objects (each containing the robot ID + and its ball-contact state), or None if no response was received. + """ + pass diff --git a/utama_core/team_controller/src/controllers/real/real_robot_controller.py b/utama_core/team_controller/src/controllers/real/real_robot_controller.py index c93ff98d..3c1f70e9 100644 --- a/utama_core/team_controller/src/controllers/real/real_robot_controller.py +++ b/utama_core/team_controller/src/controllers/real/real_robot_controller.py @@ -13,7 +13,6 @@ KICKER_COOLDOWN_TIMESTEPS, KICKER_PERSIST_TIMESTEPS, PORT, - TIMEOUT, TIMESTEP, ) from utama_core.entities.data.command import RobotCommand, RobotResponse @@ -24,7 +23,6 @@ logger = logging.getLogger(__name__) -# NB: A major assumption is that the robot IDs are 0-5 for the friendly team. MAX_VEL = REAL_PARAMS.MAX_VEL MAX_ANGULAR_VEL = REAL_PARAMS.MAX_ANGULAR_VEL @@ -44,23 +42,59 @@ class RealRobotController(AbstractRobotController): n_robots (int): The number of robots in the team. Directly affects output buffer size. Default is 6. """ + HEADER = 0xAA + TAIL = 0x55 + def __init__(self, is_team_yellow: bool, n_friendly: int): super().__init__(is_team_yellow, n_friendly) self._serial_port = self._init_serial() - self._rbt_cmd_size = 10 # packet size for one robot + self._rx_buffer = bytearray() + self._rbt_cmd_size = 10 # packet size (bytes) for one robot + self._robot_info_size = 2 # number of bytes in feedback packet for all robots self._out_packet = self._empty_command() self._in_packet_size = 1 # size of the feedback packet received from the robots - self._robots_info: List[RobotResponse] = [None] * self._n_friendly - logger.debug(f"Serial port: {PORT} opened with baudrate: {BAUD_RATE} and timeout {TIMEOUT}") + logger.debug(f"Serial port: {PORT} opened with baudrate: {BAUD_RATE}.") self._assigned_mapping = {} # mapping of robot_id to index in the out_packet # track last kick time for each robot to transmit kick as HIGH for n timesteps after command self._kicker_tracker: Dict[int, KickTrackerEntry] = {} - def get_robots_responses(self) -> Optional[List[RobotResponse]]: - ### TODO: Not implemented yet + def get_robots_responses(self) -> Optional[list[RobotResponse]]: + # For now, we are not processing feedback from the robots, but this method can be implemented similarly to the GRSimRobotController if needed in the future. + self._serial_port.read_all() # Clear the input buffer to avoid overflow return None + # TODO: implement when feedback is actually working + def get_robots_responses_real(self) -> Optional[list[RobotResponse]]: + FRAME_SIZE = 1 + self._robot_info_size + 1 + + # --- Step 1: Non-blocking read --- + waiting = self._serial_port.in_waiting + if waiting: + self._rx_buffer.extend(self._serial_port.read(waiting)) + + latest_payload = None + + # --- Step 2: Forward scan for complete frames --- + while len(self._rx_buffer) >= FRAME_SIZE: + if self._rx_buffer[0] != self.HEADER: + next_header = self._rx_buffer.find(bytes([self.HEADER])) + if next_header == -1: + self._rx_buffer.clear() + break + self._rx_buffer = self._rx_buffer[next_header:] + continue + + if self._rx_buffer[FRAME_SIZE - 1] == self.TAIL: + latest_payload = self._rx_buffer[1 : FRAME_SIZE - 1] + self._rx_buffer = self._rx_buffer[FRAME_SIZE:] + else: + # Partial frame — keep it in buffer until next read + break + + latest_payload + return None # TODO: parse latest_payload into list of RobotResponse and return. + def send_robot_commands(self) -> None: """Sends the robot commands to the appropriate team (yellow or blue).""" # print(list(self.out_packet)) @@ -71,10 +105,6 @@ def send_robot_commands(self) -> None: f"Only {len(self._assigned_mapping)} out of {self._n_friendly} robots have been assigned commands. Sending incomplete command packet." ) self._serial_port.write(self._out_packet) - self._serial_port.read_all() - # data_in = self._serial.read_all() - # print(data_in) - # TODO: add receiving feedback from the robots ### update kick and chip trackers. We persist the kick/chip command for KICKER_PERSIST_TIMESTEPS ### this feature is to combat packet loss and to ensure the robot does not kick within its cooldown period @@ -145,18 +175,6 @@ def _add_robot_command(self, command: RobotCommand, robot_id: int) -> None: is_kick=False, ) - # def _populate_robots_info(self, data_in: bytes) -> None: - # """ - # Populates the robots_info list with the data received from the robots. - # """ - # for i in range(self._n_friendly): - # has_ball = False - # if data_in[0] & 0b10000000: - # has_ball = True - # info = RobotInfo(has_ball=has_ball) - # self._robots_info[i] = info - # data_in = data_in << 1 # shift to the next robot's data - def _generate_command_buffer(self, robot_id: int, c_command: RobotCommand) -> bytes: """Generates the command buffer to be sent to the robot.""" assert robot_id < 6, "Invalid robot_id. Must be between 0 and 5." @@ -272,7 +290,7 @@ def _empty_command(self) -> bytearray: # * first byte is robot ID (here INVALID_RBT_ID) # * remaining (self._rbt_cmd_size - 1) bytes are zeros # - 0x55: end byte - cmd = bytearray([0xAA] + [INVALID_RBT_ID] + [0] * (self._rbt_cmd_size - 1) + [0x55]) + cmd = bytearray([self.HEADER] + [INVALID_RBT_ID] + [0] * (self._rbt_cmd_size - 1) + [self.TAIL]) commands.extend(cmd) self._cached_empty_command = commands return self._cached_empty_command.copy() @@ -287,7 +305,7 @@ def _init_serial(self) -> Serial: bytesize=EIGHTBITS, # 8 data bits parity=PARITY_EVEN, # Even parity (makes it 9 bits total) stopbits=STOPBITS_TWO, # 2 stop bits - timeout=0.1, + timeout=0, ) return serial_port except Exception as e: diff --git a/utama_core/team_controller/src/controllers/sim/grsim_robot_controller.py b/utama_core/team_controller/src/controllers/sim/grsim_robot_controller.py index c81954ba..62338ff4 100644 --- a/utama_core/team_controller/src/controllers/sim/grsim_robot_controller.py +++ b/utama_core/team_controller/src/controllers/sim/grsim_robot_controller.py @@ -110,7 +110,7 @@ def _update_robot_info(self, data: bytes) -> None: # ignore robot_info for robots that are not expected (ie deactivated since the start of the game) self._robots_info.append(robots_info) - def get_robots_responses(self) -> Optional[RobotResponse]: + def get_robots_responses(self) -> Optional[list[RobotResponse]]: if self._robots_info is None or len(self._robots_info) == 0: for i in range(self._n_friendly): self.add_robot_commands(RobotCommand(0, 0, 0, False, False, False), i)