From 6e9b52bcae55eb8b508fa79097489093fa1e3d9a Mon Sep 17 00:00:00 2001 From: Michael Osthege Date: Sun, 21 Dec 2025 00:56:21 +0100 Subject: [PATCH 1/3] Match signature and docstring of `_send_command` to its implementation --- cri_lib/cri_controller.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cri_lib/cri_controller.py b/cri_lib/cri_controller.py index 8bae446..9df9518 100644 --- a/cri_lib/cri_controller.py +++ b/cri_lib/cri_controller.py @@ -163,7 +163,7 @@ def _send_command( command: str, register_answer: bool = False, fixed_answer_name: str | None = None, - ) -> int | None: + ) -> int: """Sends the given command to iRC. Parameters @@ -173,9 +173,13 @@ def _send_command( Returns ------- - int | None - If the command was sent the message_id gets returned or None if there was an error. + int + The sent message_id. + Raises + ------ + CRIConnectionError + When not connected or connection was lost. """ if not self.connected or self.sock is None: logger.error("Not connected. Use connect() to establish a connection.") From a313a2f494047f3120ad56f65dad84645c131d60 Mon Sep 17 00:00:00 2001 From: Michael Osthege Date: Sun, 21 Dec 2025 01:35:04 +0100 Subject: [PATCH 2/3] Simplify by removing unreachable branches --- cri_lib/cri_controller.py | 574 ++++++++++++++++---------------------- 1 file changed, 239 insertions(+), 335 deletions(-) diff --git a/cri_lib/cri_controller.py b/cri_lib/cri_controller.py index 9df9518..28cab07 100644 --- a/cri_lib/cri_controller.py +++ b/cri_lib/cri_controller.py @@ -284,14 +284,14 @@ def _bg_receive_thread(self) -> None: continue_parsing = False def _wait_for_answer( - self, message_id: str, timeout: float | None = None + self, message_id: str | int, timeout: float | None = None ) -> None | str: """Waits for an answer to a message. The answer event will be removed after the call, even if there was a timeout. Choose timeout accordingly. Parameters ---------- - message_id : int + message_id : int or str message id of sent message of which an answer is expected timeout : float | None @@ -309,7 +309,7 @@ def _wait_for_answer( raised if no answer was received in given timeout """ - + message_id = str(message_id) with self.answer_events_lock: if message_id not in self.answer_events: return None @@ -391,18 +391,16 @@ def reset(self) -> bool: `True` if request was successful `False` if request was not successful """ - if (msg_id := self._send_command("CMD Reset", True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in RESET command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD Reset", True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in RESET command: %s", error_msg) return False + else: + return True def enable(self) -> bool: """Enable robot @@ -414,18 +412,16 @@ def enable(self) -> bool: `True` if request was successful `False` if request was not successful """ - if (msg_id := self._send_command("CMD Enable", True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in ENABLE command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD Enable", True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in ENABLE command: %s", error_msg) return False + else: + return True def disable(self) -> bool: """Disable robot @@ -436,18 +432,16 @@ def disable(self) -> bool: `True` if request was successful `False` if request was not successful """ - if (msg_id := self._send_command("CMD Disable", True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in DISABLE command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD Disable", True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in DISABLE command: %s", error_msg) return False + else: + return True def set_active_control(self, active: bool) -> bool: """Acquire or return active control of robot @@ -458,25 +452,20 @@ def set_active_control(self, active: bool) -> bool: `True` acquire active control `False` return active control """ + self._send_command( + f"CMD SetActive {str(active).lower()}", + True, + f"Active_{str(active).lower()}", + ) if ( - self._send_command( - f"CMD SetActive {str(active).lower()}", - True, - f"Active_{str(active).lower()}", + error_msg := self._wait_for_answer( + f"Active_{str(active).lower()}", timeout=self.DEFAULT_ANSWER_TIMEOUT ) - is not None - ): - if ( - error_msg := self._wait_for_answer( - f"Active_{str(active).lower()}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in set active control command: %s", error_msg) - return False - else: - return True - else: + ) is not None: + logger.debug("Error in set active control command: %s", error_msg) return False + else: + return True def zero_all_joints(self) -> bool: """Set all joints to zero @@ -487,19 +476,16 @@ def zero_all_joints(self) -> bool: `True` if request was successful `False` if request was not successful """ - - if (msg_id := self._send_command("CMD SetJointsToZero", True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in SetJointsToZero command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD SetJointsToZero", True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in SetJointsToZero command: %s", error_msg) return False + else: + return True def reference_all_joints(self, *, timeout: float = 30) -> bool: """Reference all joints. Long timout of 30 seconds. @@ -510,17 +496,12 @@ def reference_all_joints(self, *, timeout: float = 30) -> bool: `True` if request was successful `False` if request was not successful """ - - if (msg_id := self._send_command("CMD ReferenceAllJoints", True)) is not None: - if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=timeout) - ) is not None: - logger.debug("Error in ReferenceAllJoints command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD ReferenceAllJoints", True) + if (error_msg := self._wait_for_answer(msg_id, timeout=timeout)) is not None: + logger.debug("Error in ReferenceAllJoints command: %s", error_msg) return False + else: + return True def reference_single_joint(self, joint: str, *, timeout: float = 30) -> bool: """Reference a single joint. Long timout of 30 seconds. @@ -544,18 +525,12 @@ def reference_single_joint(self, joint: str, *, timeout: float = 30) -> bool: else: return False - if ( - msg_id := self._send_command(f"CMD ReferenceSingleJoint {joint_msg}", True) - ) is not None: - if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=timeout) - ) is not None: - logger.debug("Error in ReferenceSingleJoint command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(f"CMD ReferenceSingleJoint {joint_msg}", True) + if (error_msg := self._wait_for_answer(msg_id, timeout=timeout)) is not None: + logger.debug("Error in ReferenceSingleJoint command: %s", error_msg) return False + else: + return True def get_referencing_info(self): """Reference all joints. Long timout of 30 seconds. @@ -566,21 +541,16 @@ def get_referencing_info(self): `True` if request was successful `False` if request was not successful """ + self._send_command("CMD GetReferencingInfo", True, "info_referencing") if ( - self._send_command("CMD GetReferencingInfo", True, "info_referencing") - is not None - ): - if ( - error_msg := self._wait_for_answer( - "info_referencing", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in GetReferencingInfo command: %s", error_msg) - return False - else: - return True - else: + error_msg := self._wait_for_answer( + "info_referencing", timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in GetReferencingInfo command: %s", error_msg) return False + else: + return True def wait_for_kinematics_ready(self, timeout: float = 30) -> bool: """Wait until drive state is indicated as ready. @@ -659,26 +629,20 @@ def move_joints( if wait_move_finished: self._register_answer("EXECEND") - if (msg_id := self._send_command(command, True)) is not None: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id, timeout=30.0)) is not None: + logger.debug("Error in Move Joints command: %s", error_msg) + return False + + if wait_move_finished: if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=30.0) + error_msg := self._wait_for_answer( + "EXECEND", timeout=move_finished_timeout + ) ) is not None: - logger.debug("Error in Move Joints command: %s", error_msg) + logger.debug("Exec Error in Move Joints command: %s", error_msg) return False - - if wait_move_finished: - if ( - error_msg := self._wait_for_answer( - "EXECEND", timeout=move_finished_timeout - ) - ) is not None: - logger.debug("Exec Error in Move Joints command: %s", error_msg) - return False - - return True - - else: - return False + return True def move_joints_relative( self, @@ -729,28 +693,22 @@ def move_joints_relative( if wait_move_finished: self._register_answer("EXECEND") - if (msg_id := self._send_command(command, True)) is not None: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id, timeout=30.0)) is not None: + logger.debug("Error in Move Joints command: %s", error_msg) + return False + + if wait_move_finished: if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=30.0) + error_msg := self._wait_for_answer( + "EXECEND", timeout=move_finished_timeout + ) ) is not None: - logger.debug("Error in Move Joints command: %s", error_msg) + logger.debug( + "Exec Error in Move Joints Relative command: %s", error_msg + ) return False - - if wait_move_finished: - if ( - error_msg := self._wait_for_answer( - "EXECEND", timeout=move_finished_timeout - ) - ) is not None: - logger.debug( - "Exec Error in Move Joints Relative command: %s", error_msg - ) - return False - - return True - - else: - return False + return True def move_cartesian( self, @@ -807,26 +765,21 @@ def move_cartesian( if wait_move_finished: self._register_answer("EXECEND") - if (msg_id := self._send_command(command, True)) is not None: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id, timeout=30.0)) is not None: + logger.debug("Error in Move Joints command: %s", error_msg) + return False + + if wait_move_finished: if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=30.0) + error_msg := self._wait_for_answer( + "EXECEND", timeout=move_finished_timeout + ) ) is not None: - logger.debug("Error in Move Joints command: %s", error_msg) + logger.debug("Exec Error in Move Cartesian command: %s", error_msg) return False - if wait_move_finished: - if ( - error_msg := self._wait_for_answer( - "EXECEND", timeout=move_finished_timeout - ) - ) is not None: - logger.debug("Exec Error in Move Cartesian command: %s", error_msg) - return False - - return True - - else: - return False + return True def move_base_relative( self, @@ -882,28 +835,21 @@ def move_base_relative( if wait_move_finished: self._register_answer("EXECEND") - if (msg_id := self._send_command(command, True)) is not None: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id, timeout=30.0)) is not None: + logger.debug("Error in Move Joints command: %s", error_msg) + return False + + if wait_move_finished: if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=30.0) + error_msg := self._wait_for_answer( + "EXECEND", timeout=move_finished_timeout + ) ) is not None: - logger.debug("Error in Move Joints command: %s", error_msg) + logger.debug("Exec Error in Move BaseRelative command: %s", error_msg) return False - if wait_move_finished: - if ( - error_msg := self._wait_for_answer( - "EXECEND", timeout=move_finished_timeout - ) - ) is not None: - logger.debug( - "Exec Error in Move BaseRelative command: %s", error_msg - ) - return False - - return True - - else: - return False + return True def move_tool_relative( self, @@ -959,26 +905,21 @@ def move_tool_relative( if wait_move_finished: self._register_answer("EXECEND") - if (msg_id := self._send_command(command, True)) is not None: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id, timeout=30.0)) is not None: + logger.debug("Error in Move Joints command: %s", error_msg) + return False + + if wait_move_finished: if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=30.0) + error_msg := self._wait_for_answer( + "EXECEND", timeout=move_finished_timeout + ) ) is not None: - logger.debug("Error in Move Joints command: %s", error_msg) + logger.debug("Exec Error in Move BaseTool command: %s", error_msg) return False - if wait_move_finished: - if ( - error_msg := self._wait_for_answer( - "EXECEND", timeout=move_finished_timeout - ) - ) is not None: - logger.debug("Exec Error in Move BaseTool command: %s", error_msg) - return False - - return True - - else: - return False + return True def stop_move(self) -> bool: """Stop movement @@ -990,16 +931,12 @@ def stop_move(self) -> bool: `False` if request was not successful """ - if (msg_id := self._send_command("CMD Move Stop", True)) is not None: - if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=5.0) - ) is not None: - logger.debug("Error in Move Stop command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD Move Stop", True) + if (error_msg := self._wait_for_answer(msg_id, timeout=5.0)) is not None: + logger.debug("Error in Move Stop command: %s", error_msg) return False + else: + return True def start_jog(self): """starts live jog. Set speeds via set_jog_values""" @@ -1071,18 +1008,16 @@ def set_motion_type(self, motion_type: MotionType): """ command = f"CMD MotionType{motion_type.value}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in MotionType command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in MotionType command: %s", error_msg) return False + else: + return True def set_override(self, override: float): """Set override @@ -1100,18 +1035,16 @@ def set_override(self, override: float): """ command = f"CMD Override {override}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in Override command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in Override command: %s", error_msg) return False + else: + return True def set_dout(self, id: int, value: bool): """Set digital out @@ -1135,18 +1068,16 @@ def set_dout(self, id: int, value: bool): command = f"CMD DOUT {id} {str(value).lower()}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in DOUT command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in DOUT command: %s", error_msg) return False + else: + return True def set_din(self, id: int, value: bool): """Set digital inout, only available in simulation @@ -1170,18 +1101,16 @@ def set_din(self, id: int, value: bool): command = f"CMD DIN {id} {str(value).lower()}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in DIN command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in DIN command: %s", error_msg) return False + else: + return True def set_global_signal(self, id: int, value: bool): """Set global signal @@ -1205,18 +1134,16 @@ def set_global_signal(self, id: int, value: bool): command = f"CMD GSIG {id} {str(value).lower()}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in DIN command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in DIN command: %s", error_msg) return False + else: + return True def load_programm(self, program_name: str) -> bool: """Load a program file from disk into the robot controller @@ -1234,18 +1161,16 @@ def load_programm(self, program_name: str) -> bool: """ command = f"CMD LoadProgram {program_name}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in load_program command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in load_program command: %s", error_msg) return False + else: + return True def load_logic_programm(self, program_name: str) -> bool: """Load a logic program file from disk into the robot controller @@ -1263,18 +1188,16 @@ def load_logic_programm(self, program_name: str) -> bool: """ command = f"CMD LoadLogicProgram {program_name}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in load_logic_program command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in load_logic_program command: %s", error_msg) return False + else: + return True def start_programm(self) -> bool: """Start currently loaded Program @@ -1287,18 +1210,16 @@ def start_programm(self) -> bool: """ command = "CMD StartProgram" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in start_program command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in start_program command: %s", error_msg) return False + else: + return True def stop_programm(self) -> bool: """Stop currently running Program @@ -1311,18 +1232,16 @@ def stop_programm(self) -> bool: """ command = "CMD StopProgram" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in stop_program command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in stop_program command: %s", error_msg) return False + else: + return True def pause_programm(self) -> bool: """Pause currently running Program @@ -1335,18 +1254,16 @@ def pause_programm(self) -> bool: """ command = "CMD PauseProgram" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in pause_program command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if ( + error_msg := self._wait_for_answer( + msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in pause_program command: %s", error_msg) return False + else: + return True def upload_file(self, path: str | Path, target_directory: str) -> bool: """Uploads file to iRC into `/Data/` @@ -1386,19 +1303,16 @@ def upload_file(self, path: str | Path, target_directory: str) -> bool: command = f"CMD UploadFileInit {target_directory + '/' + str(file_path.name)} {len(lines)} 0" - if self._send_command(command, True) is None: - return False + self._send_command(command, True) for line in lines: command = f"CMD UploadFileLine {line.rstrip()}" - if self._send_command(command, True) is None: - return False + self._send_command(command, True) command = "CMD UploadFileFinish" - if self._send_command(command, True) is None: - return False + self._send_command(command, True) return True def enable_can_bridge(self, enabled: bool) -> None: @@ -1473,21 +1387,16 @@ def get_board_temperatures( timeout: float | None timeout for waiting in seconds or None for infinite waiting """ + self._send_command("SYSTEM GetBoardTemp", True, "info_boardtemp") if ( - self._send_command("SYSTEM GetBoardTemp", True, "info_boardtemp") - is not None - ): - if ( - error_msg := self._wait_for_answer( - "info_boardtemp", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in GetBoardTemp command: %s", error_msg) - return False - else: - return True - else: + error_msg := self._wait_for_answer( + "info_boardtemp", timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in GetBoardTemp command: %s", error_msg) return False + else: + return True def get_motor_temperatures( self, blocking: bool = True, timeout: float | None = None @@ -1502,21 +1411,16 @@ def get_motor_temperatures( timeout: float | None timeout for waiting in seconds or None for infinite waiting """ + self._send_command("SYSTEM GetMotorTemp", True, "info_motortemp") if ( - self._send_command("SYSTEM GetMotorTemp", True, "info_motortemp") - is not None - ): - if ( - error_msg := self._wait_for_answer( - "info_motortemp", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in GetMotorTemp command: %s", error_msg) - return False - else: - return True - else: + error_msg := self._wait_for_answer( + "info_motortemp", timeout=self.DEFAULT_ANSWER_TIMEOUT + ) + ) is not None: + logger.debug("Error in GetMotorTemp command: %s", error_msg) return False + else: + return True # Monkey patch to maintain backward compatibility From 46874dd79599156864508fd7ae00c4b5905fbe1c Mon Sep 17 00:00:00 2001 From: Michael Osthege Date: Sun, 21 Dec 2025 13:03:18 +0100 Subject: [PATCH 3/3] Make `_wait_for_answer` use `DEFAULT_ANSWER_TIMEOUT` by default Also fixes `get_board/motor_temperatures` functions ignoring their `timeout` parameter. --- cri_lib/cri_controller.py | 128 +++++++++++--------------------------- 1 file changed, 38 insertions(+), 90 deletions(-) diff --git a/cri_lib/cri_controller.py b/cri_lib/cri_controller.py index 28cab07..3f8f135 100644 --- a/cri_lib/cri_controller.py +++ b/cri_lib/cri_controller.py @@ -15,6 +15,10 @@ logger = logging.getLogger(__name__) +DEFAULT = object() +"""Placeholder for defaulting a parameter to runtime-configurable default values.""" + + class MotionType(Enum): """Robot Motion Type for Jogging""" @@ -284,7 +288,9 @@ def _bg_receive_thread(self) -> None: continue_parsing = False def _wait_for_answer( - self, message_id: str | int, timeout: float | None = None + self, + message_id: str | int, + timeout: float | None = DEFAULT, # type: ignore ) -> None | str: """Waits for an answer to a message. The answer event will be removed after the call, even if there was a timeout. Choose timeout accordingly. @@ -294,8 +300,10 @@ def _wait_for_answer( message_id : int or str message id of sent message of which an answer is expected - timeout : float | None - timeout for wait in seconds. `None` will wait indefinetly + timeout : float | DEFAULT | None + timeout for wait in seconds. + - `DEFAULT` uses `self.DEFAULT_ANSWER_TIMEOUT` + - `None` will wait indefinetly Returns ------- @@ -315,12 +323,14 @@ def _wait_for_answer( return None wait_event = self.answer_events[message_id] - # prevent deadlock through answer_events_lock + if timeout is DEFAULT: + timeout = self.DEFAULT_ANSWER_TIMEOUT success = wait_event.wait(timeout=timeout) if not success: raise CRICommandTimeOutError() + # prevent deadlock through answer_events_lock with self.answer_events_lock: del self.answer_events[message_id] @@ -392,11 +402,7 @@ def reset(self) -> bool: `False` if request was not successful """ msg_id = self._send_command("CMD Reset", True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in RESET command: %s", error_msg) return False else: @@ -413,11 +419,7 @@ def enable(self) -> bool: `False` if request was not successful """ msg_id = self._send_command("CMD Enable", True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in ENABLE command: %s", error_msg) return False else: @@ -433,11 +435,7 @@ def disable(self) -> bool: `False` if request was not successful """ msg_id = self._send_command("CMD Disable", True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in DISABLE command: %s", error_msg) return False else: @@ -458,9 +456,7 @@ def set_active_control(self, active: bool) -> bool: f"Active_{str(active).lower()}", ) if ( - error_msg := self._wait_for_answer( - f"Active_{str(active).lower()}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) + error_msg := self._wait_for_answer(f"Active_{str(active).lower()}") ) is not None: logger.debug("Error in set active control command: %s", error_msg) return False @@ -477,11 +473,7 @@ def zero_all_joints(self) -> bool: `False` if request was not successful """ msg_id = self._send_command("CMD SetJointsToZero", True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in SetJointsToZero command: %s", error_msg) return False else: @@ -542,11 +534,7 @@ def get_referencing_info(self): `False` if request was not successful """ self._send_command("CMD GetReferencingInfo", True, "info_referencing") - if ( - error_msg := self._wait_for_answer( - "info_referencing", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer("info_referencing")) is not None: logger.debug("Error in GetReferencingInfo command: %s", error_msg) return False else: @@ -1009,11 +997,7 @@ def set_motion_type(self, motion_type: MotionType): command = f"CMD MotionType{motion_type.value}" msg_id = self._send_command(command, True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in MotionType command: %s", error_msg) return False else: @@ -1036,11 +1020,7 @@ def set_override(self, override: float): command = f"CMD Override {override}" msg_id = self._send_command(command, True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in Override command: %s", error_msg) return False else: @@ -1069,11 +1049,7 @@ def set_dout(self, id: int, value: bool): command = f"CMD DOUT {id} {str(value).lower()}" msg_id = self._send_command(command, True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in DOUT command: %s", error_msg) return False else: @@ -1102,11 +1078,7 @@ def set_din(self, id: int, value: bool): command = f"CMD DIN {id} {str(value).lower()}" msg_id = self._send_command(command, True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in DIN command: %s", error_msg) return False else: @@ -1135,11 +1107,7 @@ def set_global_signal(self, id: int, value: bool): command = f"CMD GSIG {id} {str(value).lower()}" msg_id = self._send_command(command, True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in DIN command: %s", error_msg) return False else: @@ -1162,11 +1130,7 @@ def load_programm(self, program_name: str) -> bool: command = f"CMD LoadProgram {program_name}" msg_id = self._send_command(command, True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in load_program command: %s", error_msg) return False else: @@ -1189,11 +1153,7 @@ def load_logic_programm(self, program_name: str) -> bool: command = f"CMD LoadLogicProgram {program_name}" msg_id = self._send_command(command, True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in load_logic_program command: %s", error_msg) return False else: @@ -1211,11 +1171,7 @@ def start_programm(self) -> bool: command = "CMD StartProgram" msg_id = self._send_command(command, True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in start_program command: %s", error_msg) return False else: @@ -1233,11 +1189,7 @@ def stop_programm(self) -> bool: command = "CMD StopProgram" msg_id = self._send_command(command, True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in stop_program command: %s", error_msg) return False else: @@ -1255,11 +1207,7 @@ def pause_programm(self) -> bool: command = "CMD PauseProgram" msg_id = self._send_command(command, True) - if ( - error_msg := self._wait_for_answer( - msg_id, timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: + if (error_msg := self._wait_for_answer(msg_id)) is not None: logger.debug("Error in pause_program command: %s", error_msg) return False else: @@ -1375,7 +1323,9 @@ def can_receive( return item def get_board_temperatures( - self, blocking: bool = True, timeout: float | None = None + self, + blocking: bool = True, + timeout: float | None = DEFAULT, # type: ignore ) -> bool: """Receive motor controller PCB temperatures and save in robot state @@ -1389,9 +1339,7 @@ def get_board_temperatures( """ self._send_command("SYSTEM GetBoardTemp", True, "info_boardtemp") if ( - error_msg := self._wait_for_answer( - "info_boardtemp", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) + error_msg := self._wait_for_answer("info_boardtemp", timeout=timeout) ) is not None: logger.debug("Error in GetBoardTemp command: %s", error_msg) return False @@ -1399,7 +1347,9 @@ def get_board_temperatures( return True def get_motor_temperatures( - self, blocking: bool = True, timeout: float | None = None + self, + blocking: bool = True, + timeout: float | None = DEFAULT, # type: ignore ) -> bool: """Receive motor temperatures and save in robot state @@ -1413,9 +1363,7 @@ def get_motor_temperatures( """ self._send_command("SYSTEM GetMotorTemp", True, "info_motortemp") if ( - error_msg := self._wait_for_answer( - "info_motortemp", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) + error_msg := self._wait_for_answer("info_motortemp", timeout=timeout) ) is not None: logger.debug("Error in GetMotorTemp command: %s", error_msg) return False