diff --git a/cri_lib/cri_controller.py b/cri_lib/cri_controller.py index 8bae446..3f8f135 100644 --- a/cri_lib/cri_controller.py +++ b/cri_lib/cri_controller.py @@ -15,6 +15,10 @@ logger = logging.getLogger(__name__) +DEFAULT = object() +"""Placeholder for defaulting a parameter to runtime-configurable default values.""" + + class MotionType(Enum): """Robot Motion Type for Jogging""" @@ -163,7 +167,7 @@ def _send_command( command: str, register_answer: bool = False, fixed_answer_name: str | None = None, - ) -> int | None: + ) -> int: """Sends the given command to iRC. Parameters @@ -173,9 +177,13 @@ def _send_command( Returns ------- - int | None - If the command was sent the message_id gets returned or None if there was an error. + int + The sent message_id. + Raises + ------ + CRIConnectionError + When not connected or connection was lost. """ if not self.connected or self.sock is None: logger.error("Not connected. Use connect() to establish a connection.") @@ -280,18 +288,22 @@ def _bg_receive_thread(self) -> None: continue_parsing = False def _wait_for_answer( - self, message_id: str, timeout: float | None = None + self, + message_id: str | int, + timeout: float | None = DEFAULT, # type: ignore ) -> None | str: """Waits for an answer to a message. The answer event will be removed after the call, even if there was a timeout. Choose timeout accordingly. Parameters ---------- - message_id : int + message_id : int or str message id of sent message of which an answer is expected - timeout : float | None - timeout for wait in seconds. `None` will wait indefinetly + timeout : float | DEFAULT | None + timeout for wait in seconds. + - `DEFAULT` uses `self.DEFAULT_ANSWER_TIMEOUT` + - `None` will wait indefinetly Returns ------- @@ -305,18 +317,20 @@ def _wait_for_answer( raised if no answer was received in given timeout """ - + message_id = str(message_id) with self.answer_events_lock: if message_id not in self.answer_events: return None wait_event = self.answer_events[message_id] - # prevent deadlock through answer_events_lock + if timeout is DEFAULT: + timeout = self.DEFAULT_ANSWER_TIMEOUT success = wait_event.wait(timeout=timeout) if not success: raise CRICommandTimeOutError() + # prevent deadlock through answer_events_lock with self.answer_events_lock: del self.answer_events[message_id] @@ -387,18 +401,12 @@ def reset(self) -> bool: `True` if request was successful `False` if request was not successful """ - if (msg_id := self._send_command("CMD Reset", True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in RESET command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD Reset", True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in RESET command: %s", error_msg) return False + else: + return True def enable(self) -> bool: """Enable robot @@ -410,18 +418,12 @@ def enable(self) -> bool: `True` if request was successful `False` if request was not successful """ - if (msg_id := self._send_command("CMD Enable", True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in ENABLE command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD Enable", True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in ENABLE command: %s", error_msg) return False + else: + return True def disable(self) -> bool: """Disable robot @@ -432,18 +434,12 @@ def disable(self) -> bool: `True` if request was successful `False` if request was not successful """ - if (msg_id := self._send_command("CMD Disable", True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in DISABLE command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD Disable", True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in DISABLE command: %s", error_msg) return False + else: + return True def set_active_control(self, active: bool) -> bool: """Acquire or return active control of robot @@ -454,25 +450,18 @@ def set_active_control(self, active: bool) -> bool: `True` acquire active control `False` return active control """ + self._send_command( + f"CMD SetActive {str(active).lower()}", + True, + f"Active_{str(active).lower()}", + ) if ( - self._send_command( - f"CMD SetActive {str(active).lower()}", - True, - f"Active_{str(active).lower()}", - ) - is not None - ): - if ( - error_msg := self._wait_for_answer( - f"Active_{str(active).lower()}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in set active control command: %s", error_msg) - return False - else: - return True - else: + error_msg := self._wait_for_answer(f"Active_{str(active).lower()}") + ) is not None: + logger.debug("Error in set active control command: %s", error_msg) return False + else: + return True def zero_all_joints(self) -> bool: """Set all joints to zero @@ -483,19 +472,12 @@ def zero_all_joints(self) -> bool: `True` if request was successful `False` if request was not successful """ - - if (msg_id := self._send_command("CMD SetJointsToZero", True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in SetJointsToZero command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD SetJointsToZero", True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in SetJointsToZero command: %s", error_msg) return False + else: + return True def reference_all_joints(self, *, timeout: float = 30) -> bool: """Reference all joints. Long timout of 30 seconds. @@ -506,17 +488,12 @@ def reference_all_joints(self, *, timeout: float = 30) -> bool: `True` if request was successful `False` if request was not successful """ - - if (msg_id := self._send_command("CMD ReferenceAllJoints", True)) is not None: - if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=timeout) - ) is not None: - logger.debug("Error in ReferenceAllJoints command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD ReferenceAllJoints", True) + if (error_msg := self._wait_for_answer(msg_id, timeout=timeout)) is not None: + logger.debug("Error in ReferenceAllJoints command: %s", error_msg) return False + else: + return True def reference_single_joint(self, joint: str, *, timeout: float = 30) -> bool: """Reference a single joint. Long timout of 30 seconds. @@ -540,18 +517,12 @@ def reference_single_joint(self, joint: str, *, timeout: float = 30) -> bool: else: return False - if ( - msg_id := self._send_command(f"CMD ReferenceSingleJoint {joint_msg}", True) - ) is not None: - if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=timeout) - ) is not None: - logger.debug("Error in ReferenceSingleJoint command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(f"CMD ReferenceSingleJoint {joint_msg}", True) + if (error_msg := self._wait_for_answer(msg_id, timeout=timeout)) is not None: + logger.debug("Error in ReferenceSingleJoint command: %s", error_msg) return False + else: + return True def get_referencing_info(self): """Reference all joints. Long timout of 30 seconds. @@ -562,21 +533,12 @@ def get_referencing_info(self): `True` if request was successful `False` if request was not successful """ - if ( - self._send_command("CMD GetReferencingInfo", True, "info_referencing") - is not None - ): - if ( - error_msg := self._wait_for_answer( - "info_referencing", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in GetReferencingInfo command: %s", error_msg) - return False - else: - return True - else: + self._send_command("CMD GetReferencingInfo", True, "info_referencing") + if (error_msg := self._wait_for_answer("info_referencing")) is not None: + logger.debug("Error in GetReferencingInfo command: %s", error_msg) return False + else: + return True def wait_for_kinematics_ready(self, timeout: float = 30) -> bool: """Wait until drive state is indicated as ready. @@ -655,26 +617,20 @@ def move_joints( if wait_move_finished: self._register_answer("EXECEND") - if (msg_id := self._send_command(command, True)) is not None: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id, timeout=30.0)) is not None: + logger.debug("Error in Move Joints command: %s", error_msg) + return False + + if wait_move_finished: if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=30.0) + error_msg := self._wait_for_answer( + "EXECEND", timeout=move_finished_timeout + ) ) is not None: - logger.debug("Error in Move Joints command: %s", error_msg) + logger.debug("Exec Error in Move Joints command: %s", error_msg) return False - - if wait_move_finished: - if ( - error_msg := self._wait_for_answer( - "EXECEND", timeout=move_finished_timeout - ) - ) is not None: - logger.debug("Exec Error in Move Joints command: %s", error_msg) - return False - - return True - - else: - return False + return True def move_joints_relative( self, @@ -725,28 +681,22 @@ def move_joints_relative( if wait_move_finished: self._register_answer("EXECEND") - if (msg_id := self._send_command(command, True)) is not None: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id, timeout=30.0)) is not None: + logger.debug("Error in Move Joints command: %s", error_msg) + return False + + if wait_move_finished: if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=30.0) + error_msg := self._wait_for_answer( + "EXECEND", timeout=move_finished_timeout + ) ) is not None: - logger.debug("Error in Move Joints command: %s", error_msg) + logger.debug( + "Exec Error in Move Joints Relative command: %s", error_msg + ) return False - - if wait_move_finished: - if ( - error_msg := self._wait_for_answer( - "EXECEND", timeout=move_finished_timeout - ) - ) is not None: - logger.debug( - "Exec Error in Move Joints Relative command: %s", error_msg - ) - return False - - return True - - else: - return False + return True def move_cartesian( self, @@ -803,26 +753,21 @@ def move_cartesian( if wait_move_finished: self._register_answer("EXECEND") - if (msg_id := self._send_command(command, True)) is not None: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id, timeout=30.0)) is not None: + logger.debug("Error in Move Joints command: %s", error_msg) + return False + + if wait_move_finished: if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=30.0) + error_msg := self._wait_for_answer( + "EXECEND", timeout=move_finished_timeout + ) ) is not None: - logger.debug("Error in Move Joints command: %s", error_msg) + logger.debug("Exec Error in Move Cartesian command: %s", error_msg) return False - if wait_move_finished: - if ( - error_msg := self._wait_for_answer( - "EXECEND", timeout=move_finished_timeout - ) - ) is not None: - logger.debug("Exec Error in Move Cartesian command: %s", error_msg) - return False - - return True - - else: - return False + return True def move_base_relative( self, @@ -878,28 +823,21 @@ def move_base_relative( if wait_move_finished: self._register_answer("EXECEND") - if (msg_id := self._send_command(command, True)) is not None: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id, timeout=30.0)) is not None: + logger.debug("Error in Move Joints command: %s", error_msg) + return False + + if wait_move_finished: if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=30.0) + error_msg := self._wait_for_answer( + "EXECEND", timeout=move_finished_timeout + ) ) is not None: - logger.debug("Error in Move Joints command: %s", error_msg) + logger.debug("Exec Error in Move BaseRelative command: %s", error_msg) return False - if wait_move_finished: - if ( - error_msg := self._wait_for_answer( - "EXECEND", timeout=move_finished_timeout - ) - ) is not None: - logger.debug( - "Exec Error in Move BaseRelative command: %s", error_msg - ) - return False - - return True - - else: - return False + return True def move_tool_relative( self, @@ -955,26 +893,21 @@ def move_tool_relative( if wait_move_finished: self._register_answer("EXECEND") - if (msg_id := self._send_command(command, True)) is not None: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id, timeout=30.0)) is not None: + logger.debug("Error in Move Joints command: %s", error_msg) + return False + + if wait_move_finished: if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=30.0) + error_msg := self._wait_for_answer( + "EXECEND", timeout=move_finished_timeout + ) ) is not None: - logger.debug("Error in Move Joints command: %s", error_msg) + logger.debug("Exec Error in Move BaseTool command: %s", error_msg) return False - if wait_move_finished: - if ( - error_msg := self._wait_for_answer( - "EXECEND", timeout=move_finished_timeout - ) - ) is not None: - logger.debug("Exec Error in Move BaseTool command: %s", error_msg) - return False - - return True - - else: - return False + return True def stop_move(self) -> bool: """Stop movement @@ -986,16 +919,12 @@ def stop_move(self) -> bool: `False` if request was not successful """ - if (msg_id := self._send_command("CMD Move Stop", True)) is not None: - if ( - error_msg := self._wait_for_answer(f"{msg_id}", timeout=5.0) - ) is not None: - logger.debug("Error in Move Stop command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command("CMD Move Stop", True) + if (error_msg := self._wait_for_answer(msg_id, timeout=5.0)) is not None: + logger.debug("Error in Move Stop command: %s", error_msg) return False + else: + return True def start_jog(self): """starts live jog. Set speeds via set_jog_values""" @@ -1067,18 +996,12 @@ def set_motion_type(self, motion_type: MotionType): """ command = f"CMD MotionType{motion_type.value}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in MotionType command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in MotionType command: %s", error_msg) return False + else: + return True def set_override(self, override: float): """Set override @@ -1096,18 +1019,12 @@ def set_override(self, override: float): """ command = f"CMD Override {override}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in Override command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in Override command: %s", error_msg) return False + else: + return True def set_dout(self, id: int, value: bool): """Set digital out @@ -1131,18 +1048,12 @@ def set_dout(self, id: int, value: bool): command = f"CMD DOUT {id} {str(value).lower()}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in DOUT command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in DOUT command: %s", error_msg) return False + else: + return True def set_din(self, id: int, value: bool): """Set digital inout, only available in simulation @@ -1166,18 +1077,12 @@ def set_din(self, id: int, value: bool): command = f"CMD DIN {id} {str(value).lower()}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in DIN command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in DIN command: %s", error_msg) return False + else: + return True def set_global_signal(self, id: int, value: bool): """Set global signal @@ -1201,18 +1106,12 @@ def set_global_signal(self, id: int, value: bool): command = f"CMD GSIG {id} {str(value).lower()}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in DIN command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in DIN command: %s", error_msg) return False + else: + return True def load_programm(self, program_name: str) -> bool: """Load a program file from disk into the robot controller @@ -1230,18 +1129,12 @@ def load_programm(self, program_name: str) -> bool: """ command = f"CMD LoadProgram {program_name}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in load_program command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in load_program command: %s", error_msg) return False + else: + return True def load_logic_programm(self, program_name: str) -> bool: """Load a logic program file from disk into the robot controller @@ -1259,18 +1152,12 @@ def load_logic_programm(self, program_name: str) -> bool: """ command = f"CMD LoadLogicProgram {program_name}" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in load_logic_program command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in load_logic_program command: %s", error_msg) return False + else: + return True def start_programm(self) -> bool: """Start currently loaded Program @@ -1283,18 +1170,12 @@ def start_programm(self) -> bool: """ command = "CMD StartProgram" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in start_program command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in start_program command: %s", error_msg) return False + else: + return True def stop_programm(self) -> bool: """Stop currently running Program @@ -1307,18 +1188,12 @@ def stop_programm(self) -> bool: """ command = "CMD StopProgram" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in stop_program command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in stop_program command: %s", error_msg) return False + else: + return True def pause_programm(self) -> bool: """Pause currently running Program @@ -1331,18 +1206,12 @@ def pause_programm(self) -> bool: """ command = "CMD PauseProgram" - if (msg_id := self._send_command(command, True)) is not None: - if ( - error_msg := self._wait_for_answer( - f"{msg_id}", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in pause_program command: %s", error_msg) - return False - else: - return True - else: + msg_id = self._send_command(command, True) + if (error_msg := self._wait_for_answer(msg_id)) is not None: + logger.debug("Error in pause_program command: %s", error_msg) return False + else: + return True def upload_file(self, path: str | Path, target_directory: str) -> bool: """Uploads file to iRC into `/Data/` @@ -1382,19 +1251,16 @@ def upload_file(self, path: str | Path, target_directory: str) -> bool: command = f"CMD UploadFileInit {target_directory + '/' + str(file_path.name)} {len(lines)} 0" - if self._send_command(command, True) is None: - return False + self._send_command(command, True) for line in lines: command = f"CMD UploadFileLine {line.rstrip()}" - if self._send_command(command, True) is None: - return False + self._send_command(command, True) command = "CMD UploadFileFinish" - if self._send_command(command, True) is None: - return False + self._send_command(command, True) return True def enable_can_bridge(self, enabled: bool) -> None: @@ -1457,7 +1323,9 @@ def can_receive( return item def get_board_temperatures( - self, blocking: bool = True, timeout: float | None = None + self, + blocking: bool = True, + timeout: float | None = DEFAULT, # type: ignore ) -> bool: """Receive motor controller PCB temperatures and save in robot state @@ -1469,24 +1337,19 @@ def get_board_temperatures( timeout: float | None timeout for waiting in seconds or None for infinite waiting """ + self._send_command("SYSTEM GetBoardTemp", True, "info_boardtemp") if ( - self._send_command("SYSTEM GetBoardTemp", True, "info_boardtemp") - is not None - ): - if ( - error_msg := self._wait_for_answer( - "info_boardtemp", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in GetBoardTemp command: %s", error_msg) - return False - else: - return True - else: + error_msg := self._wait_for_answer("info_boardtemp", timeout=timeout) + ) is not None: + logger.debug("Error in GetBoardTemp command: %s", error_msg) return False + else: + return True def get_motor_temperatures( - self, blocking: bool = True, timeout: float | None = None + self, + blocking: bool = True, + timeout: float | None = DEFAULT, # type: ignore ) -> bool: """Receive motor temperatures and save in robot state @@ -1498,21 +1361,14 @@ def get_motor_temperatures( timeout: float | None timeout for waiting in seconds or None for infinite waiting """ + self._send_command("SYSTEM GetMotorTemp", True, "info_motortemp") if ( - self._send_command("SYSTEM GetMotorTemp", True, "info_motortemp") - is not None - ): - if ( - error_msg := self._wait_for_answer( - "info_motortemp", timeout=self.DEFAULT_ANSWER_TIMEOUT - ) - ) is not None: - logger.debug("Error in GetMotorTemp command: %s", error_msg) - return False - else: - return True - else: + error_msg := self._wait_for_answer("info_motortemp", timeout=timeout) + ) is not None: + logger.debug("Error in GetMotorTemp command: %s", error_msg) return False + else: + return True # Monkey patch to maintain backward compatibility