diff --git a/socs/agents/hwp_supervisor/agent.py b/socs/agents/hwp_supervisor/agent.py index b12df5ec8..cb85e0ca1 100644 --- a/socs/agents/hwp_supervisor/agent.py +++ b/socs/agents/hwp_supervisor/agent.py @@ -363,6 +363,8 @@ class HWPState: gripper: Optional[GripperState] = None is_spinning: Optional[bool] = None + direction: Optional[str] = None + forward_is_cw: Optional[bool] = True supervisor_control_state: Optional[Dict[str, Any]] = None @@ -382,6 +384,7 @@ def from_args(cls, args: argparse.Namespace) -> "HWPState": temp_thresh=args.ybco_temp_thresh, ups_minutes_remaining_thresh=args.ups_minutes_remaining_thresh, pid_max_time_since_update=args.pid_max_time_since_update, + forward_is_cw=args.forward_dir == 'cw', ) if args.gripper_iboot_id is not None: @@ -556,6 +559,32 @@ def update_ups_state(self, test_mode=False): self.ups_last_connection_attempt = data['ups_connection']['last_attempt'] self.ups_connected = data['ups_connection']['connected'] + def update_spin_state(self): + """ + Updates hwp spin state values from the pid_state and control_state. + """ + if self.pid_last_updated is None or self.pid_current_freq is None or \ + self.pid_direction is None: + self.is_spinning = None + self.direction = None + elif time.time() - self.pid_last_updated > self.pid_max_time_since_update: + self.is_spinning = None + self.direction = None + elif self.pid_current_freq > self.pid_freq_tolerance: + self.is_spinning = True + else: + self.is_spinning = False + self.direction = None + if self.is_spinning and self.supervisor_control_state is not None: + control_state = self.supervisor_control_state['state_type'] + # If hwp is braking, pid_direction is different from actual direction. + # Therefore, we keep previous value and do not update direction. + if control_state not in ('Brake', 'WaitForBrake'): + if self.pid_direction == 0: + self.direction = 'cw' if self.forward_is_cw else 'ccw' + elif self.pid_direction == 1: + self.direction = 'ccw' if self.forward_is_cw else 'cw' + @property def pmx_action(self): """ @@ -594,6 +623,7 @@ def update(self, test_mode=False) -> Dict[str, Any]: 'pid': self.update_pid_state(test_mode=test_mode), 'ups': self.update_ups_state(), } + self.update_spin_state() self.last_updated = now if self.driver_iboot is not None: @@ -605,15 +635,6 @@ def update(self, test_mode=False) -> Dict[str, Any]: if self.gripper is not None: self.gripper.update() - if self.pid_last_updated is None or self.pid_current_freq is None: - self.is_spinning = None - elif now - self.pid_last_updated > self.pid_max_time_since_update: - self.is_spinning = None - elif self.pid_current_freq > self.pid_freq_tolerance: - self.is_spinning = True - else: - self.is_spinning = False - return ops @property @@ -1696,9 +1717,9 @@ def pid_to_freq(self, session, params): } """ if params['target_freq'] >= 0: - d = '0' if self.forward_is_cw else '1' - else: d = '1' if self.forward_is_cw else '0' + else: + d = '0' if self.forward_is_cw else '1' state = ControlState.PIDToFreq( target_freq=np.abs(params['target_freq']), @@ -2059,9 +2080,11 @@ def make_parser(parser=None): '--acu-block-motion-timeout', type=float, default=60., help="Time to wait for ACU motion to be blocked before aborting gripper command." ) - - pgroup.add_argument('--forward-dir', choices=['cw', 'ccw'], default="cw", - help="Whether the PID 'forward' direction is cw or ccw") + pgroup.add_argument( + '--forward-dir', choices=['cw', 'ccw'], default='cw', + help="Whether the PID 'forward' direction is cw or ccw. " + "cw or ccw is defined when hwp is viewed from the sky side." + ) return parser