diff --git a/Config/ArmConfig.py b/Config/ArmConfig.py index 1971333..c66ec9a 100644 --- a/Config/ArmConfig.py +++ b/Config/ArmConfig.py @@ -2,6 +2,7 @@ import json from typing import Dict from typing import Any +from Config.ArmPhysicalParameters import ArmPhysicalParameters class ArmConfig: @@ -31,6 +32,8 @@ class ArmConfig: "remote_hal_port": "8000", } + armPhysicalParameters = ArmPhysicalParameters() # Default physical parameters for the arm + def load_config(self, config_file_path: str = 'config.json') -> Dict[str, Any]: config = dict(self.CONFIG_DEFAULT_DICT) # Create a copy of the default config json_object = {} diff --git a/Config/ArmPhysicalParameters.py b/Config/ArmPhysicalParameters.py new file mode 100644 index 0000000..c007dea --- /dev/null +++ b/Config/ArmPhysicalParameters.py @@ -0,0 +1,48 @@ +class ArmPhysicalParameters: + """ + Holds all physical and calibration parameters for the robotic arm. + Modify these values to match your specific hardware setup. + """ + + # --- Denavit-Hartenberg (DH) Parameters --- + # These define the geometry of the arm for kinematics calculations. + a1: float = 13.1 # [cm] Height from base to shoulder joint + a2: float = 3.25 # [cm] Horizontal offset from shoulder to elbow + a3: float = 11.4 # [cm] Length of upper arm (shoulder to elbow) + a4: float = 3.25 # [cm] Horizontal offset from elbow to wrist + a5: float = 5.8 # [cm] Length of forearm (elbow to wrist) + a6: float = 11.11 # [cm] Length from wrist to end-effector (gripper/tool) + + # --- Joint Limits --- + # Set the minimum and maximum allowed angles for each joint (in degrees). + base_min: float = 0 # Minimum base rotation + base_max: float = 270 # Maximum base rotation + joint1_min: float = 0 # Minimum angle for joint 1 + joint1_max: float = 90 # Maximum angle for joint 1 + joint2_min: float = 0 # Minimum angle for joint 2 + joint2_max: float = 75 # Maximum angle for joint 2 + + # --- Camera Calibration Parameters --- + # These are used for vision-based distance and position estimation. + sensor_size: float = 0.006 # [m] Physical width of camera sensor + focal_length: float = 0.0063 # [m] Focal length of camera lens + sensor_res: int = 1257 # [pixels] Horizontal resolution of camera sensor + fov: float = 60 # [degrees] Field of view of the camera + + # --- Offsets for Kinematic Calculations --- + kinematic_offset_y: float = 52.454 / 1000 # [m] Y offset for end-effector calibration + kinematic_offset_z: float = 33.704 / 1000 # [m] Z offset for end-effector calibration + + + # --- Notes for Modification --- + # - Always measure your arm's physical dimensions carefully and update the DH parameters. + # - If you change the camera or lens, update the camera calibration parameters. + # - Tune control parameters (K, lambda_, error_tolerance_coord) for smooth and accurate movement. + # - Adjust joint limits to match your hardware's safe operating range. + # - Offsets should be measured/calibrated for your specific end-effector/tool. + # - If you change image resolution or processing, update the vision parameters accordingly. + + # Example usage: + # from Config.ArmPhysicalParameters import ArmPhysicalParameters + # params = ArmPhysicalParameters() + # print(params.a1) \ No newline at end of file diff --git a/Config/ArmRuntime.py b/Config/ArmRuntime.py index 598e870..9dc52f5 100644 --- a/Config/ArmRuntime.py +++ b/Config/ArmRuntime.py @@ -1,9 +1,10 @@ from typing import Dict from typing import Any +from Config.ArmConfig import ArmConfig from HALs.HAL_base import HAL_base from Vision.VisualObjectIdentifier import VisualObjectIdentifier -from Controllers.Controller import Controller +from Controllers.Base.Controller import Controller from Modules.server.ServerBase import ServerBase from Modules.App.AppBase import AppBase from Modules.speech_to_text.STTBase import STTBase @@ -95,7 +96,7 @@ def console_input_handeler(input_str: str) -> None: # controler stuff from Controllers.FollowLargestObjectControler import FollowLargestObjectControler - self.selected_controller: Controller = FollowLargestObjectControler(self.selected_HAL, self.selected_object_identifier, "none") + self.selected_controller: Controller = FollowLargestObjectControler(self.selected_HAL, self.selected_object_identifier, arm_params=ArmConfig.armPhysicalParameters, target_label="none") # App stuff if config["use_app"]: @@ -103,7 +104,7 @@ def console_input_handeler(input_str: str) -> None: from Modules.App.App import App from Controllers.FollowClaw import FollowClawController self.selected_object_identifier: ColorObjectIdentifier = ColorObjectIdentifier() - self.selected_controller = FollowClawController(self.selected_HAL, self.selected_object_identifier) + self.selected_controller = FollowClawController(self.selected_HAL, self.selected_object_identifier, arm_params=ArmConfig.armPhysicalParameters, target_label="none") self.selected_app = App(self.selected_controller, self.selected_HAL, self.selected_object_identifier) # Server setup diff --git a/Controllers/Controller.py b/Controllers/Base/Controller.py similarity index 100% rename from Controllers/Controller.py rename to Controllers/Base/Controller.py diff --git a/Controllers/Base/MoveToPointInSpaceController.py b/Controllers/Base/MoveToPointInSpaceController.py new file mode 100644 index 0000000..df14079 --- /dev/null +++ b/Controllers/Base/MoveToPointInSpaceController.py @@ -0,0 +1,432 @@ +import math +import logging +import numpy as np +import sympy as sym +import time + +from Config.ArmPhysicalParameters import ArmPhysicalParameters +from HALs.HAL_base import HAL_base +from Controllers.Base.Controller import Controller + +# --- Constants for Magic Numbers --- +SMOOTHING_FACTOR = 0.1 +ANGLE_OFFSET_90 = 90 +ANGLE_OFFSET_180 = 180 +FRAME_SLEEP_SECONDS = 0.03 +HARD_CODED_ANGLE_RAD = math.radians(22.5) +A6_EPSILON = 0.0001 +M_TO_CM = 100 +PIXEL_TO_METER = 0.01 +resize_fx = 0.25 +resize_fy = 0.25 + +# Utility functions - - - - - - - - - - +def set_arm_target_position( + x: float, + y: float, + z: float, + hal: 'HAL_base', + arm_params: ArmPhysicalParameters, + smoothing_factor: float = 0.1 +) -> bool: + """ + Calculates and commands the arm to reach the given (x, y, z) position. + + Args: + x (float): Target x coordinate. + y (float): Target y coordinate. + z (float): Target z coordinate. + hal: Hardware abstraction layer for controlling the arm. + arm_params (ArmPhysicalParameters): Arm physical parameters (required). + smoothing_factor (float): Smoothing factor for joint movement. + + Returns: + bool: True if the position is reachable and command sent, False if fallback was used. + + Raises: + ValueError: If arm_params is not provided. + Exception: For unexpected errors. + """ + + try: + angles = _calculate_kinematics(x, y, z, arm_params) + if angles is None: + logging.warning("Target position unreachable, using fallback kinematics.") + _fallback_kinematics(x, y, z, hal, arm_params, smoothing_factor) + return False + else: + _command_hardware(angles, x, y, z, hal, arm_params, smoothing_factor) + return True + except Exception as err: + logging.exception(f"Exception in set_arm_target_position: {err}") + raise + +def _calculate_kinematics( + x: float, + y: float, + z: float, + arm_params: 'ArmPhysicalParameters' +) -> 'tuple[float, float, float] | None': + """ + Calculates joint angles using the arm's kinematics. + + Returns: + tuple or None: (theta1_deg, theta2_deg, theta3_deg) if reachable, else None. + """ + robot_arm = Three_Degree_Arm( + arm_params.a1, arm_params.a2, arm_params.a3, + arm_params.a4, arm_params.a5, arm_params.a6 + ) + return robot_arm.calculate_angles(sym.Matrix([x, y, z, 1])) + +def _fallback_kinematics( + x: float, + y: float, + z: float, + hal: 'HAL_base', + arm_params: ArmPhysicalParameters, + smoothing_factor: float +) -> None: + """ + Fallback for unreachable positions: computes simple base and shoulder angles. + """ + a1 = arm_params.a1 + ANGLE_OFFSET_90 = 90 + ANGLE_OFFSET_180 = 180 + + theta = sym.atan2(y, x) if (x != 0 or y != 0) else 0 + opp = z - a1 + dist = sym.sqrt(x ** 2 + y ** 2) + t0deg = float(sym.deg(theta).evalf()) + ANGLE_OFFSET_180 + t0deg = (t0deg + 360) % 360 + theta1 = sym.atan(opp / dist) if dist != 0 else ANGLE_OFFSET_90 + t1deg = ANGLE_OFFSET_90 - (float(sym.deg(theta1).evalf())) + t0n = hal.get_joint(0) + t1n = hal.get_joint(1) + t0n = t0n + (t0deg - t0n) * smoothing_factor + t1n = t1n + (t1deg - t1n) * smoothing_factor + hal.set_joint(0, t0n) + hal.set_joint(1, t1n) + hal.set_joint(2, 0) + logging.info(f"Fallback kinematics used: base={t0n:.2f}, shoulder={t1n:.2f}, elbow=0") + +def _command_hardware( + angles: tuple[float, float, float], + x: float, + y: float, + z: float, + hal: 'HAL_base', + arm_params: ArmPhysicalParameters, + smoothing_factor: float +) -> None: + """ + Sends calculated joint angles to the hardware, with optional smoothing. + """ + ANGLE_OFFSET_90 = 90 + ANGLE_OFFSET_180 = 180 + + theta = sym.atan2(y, x) if (x != 0 or y != 0) else 0 + t0deg = float(sym.deg(theta).evalf()) + ANGLE_OFFSET_180 + t0deg = (t0deg + 360) % 360 + angle_base = t0deg + angle_base = (angle_base + 360) % 360 + angle_1 = -(angles[1] - ANGLE_OFFSET_90) + angle_2 = -(angles[2] - ANGLE_OFFSET_90) + + # Always use smoothing for consistency + base_val = hal.get_joint(0) + smoothing_factor * (angle_base - hal.get_joint(0)) + shoulder_val = hal.get_joint(1) + smoothing_factor * (angle_1 - hal.get_joint(1)) + elbow_val = hal.get_joint(2) + smoothing_factor * (angle_2 - hal.get_joint(2)) + hal.set_joint(0, base_val) + hal.set_joint(1, shoulder_val) + hal.set_joint(2, elbow_val) + logging.info(f"Smooth move: base={base_val:.2f}, shoulder={shoulder_val:.2f}, elbow={elbow_val:.2f}") + +class Three_Degree_Arm: + def __init__(self, a1: float, a2: float, a3: float, a4: float, a5: float, a6: float) -> None: + self.a1 = a1 + self.a2 = a2 + self.a3 = a3 + self.a4 = a4 + self.a5 = a5 + self.a6 = a6 + + def DH_matrix(self, theta: float, alpha: float, r: float, d: float) -> sym.Matrix: + return sym.Matrix([ + [sym.cos(theta), -sym.sin(theta) * sym.cos(alpha), sym.sin(theta) * sym.sin(alpha), r * sym.cos(theta)], + [sym.sin(theta), sym.cos(theta) * sym.cos(alpha), -sym.cos(theta) * sym.sin(alpha), r * sym.sin(theta)], + [0, sym.sin(alpha), sym.cos(alpha), d], + [0, 0, 0, 1] + ]) + + def calculate_angles(self, EE: sym.Matrix) -> 'tuple[float, float, float] | None': + x, y, z = EE[0], EE[1], EE[2] + + # Condition checks + cond1 = (np.abs(x) - self.a3) ** 2 + (z - self.a1) ** 2 < (self.a5 + self.a6) ** 2 and np.abs( + x) > self.a3 and 0 < z < self.a1 + cond2 = np.abs(x) ** 2 + (z - self.a1) ** 2 < (self.a3 + self.a5 + self.a6) ** 2 and np.abs(x) ** 2 + ( + z - self.a1) ** 2 > self.a3 ** 2 and np.abs(x) > 0 and z > self.a1 + + isInRegion = cond1 or cond2 + + if isInRegion: + print('The point is inside the region.') + print(EE) + theta1 = sym.atan2(y, x) + + H0_1 = self.DH_matrix(theta1, sym.pi / 2, 0, self.a1) + EE_local_H0_1 = H0_1.inv() * EE + + EE_x_H0_1 = EE_local_H0_1[0] + # not used in this context + # EE_y_H0_1 = EE_local_H0_1[2] + EE_z_H0_1 = EE_local_H0_1[1] + + l1 = self.a3 + l2 = self.a5 + self.a6 + R = sym.sqrt(EE_x_H0_1 ** 2 + EE_z_H0_1 ** 2) + theta = sym.atan(EE_z_H0_1 / EE_x_H0_1) + + beta = sym.acos((R ** 2 - l1 ** 2 - l2 ** 2) / (-2 * l1 * l2)) + if not beta.is_real: + return None + alpha = sym.asin((l2 * sym.sin(beta)) / R) + print(alpha) + theta2 = theta + alpha + theta3 = beta - sym.pi / 2 + + theta4 = 0 + + print(theta2.evalf()) + print(theta3.evalf()) + + H1_2 = self.DH_matrix(theta2, 0, self.a3, -self.a2) + H2_3 = self.DH_matrix(theta3, sym.pi / 2, 0, self.a4) + H3_4 = self.DH_matrix(theta4, 0, 0, self.a5 + self.a6) + + H0_2 = H0_1 * H1_2 + H0_3 = H0_2 * H2_3 + H0_4 = H0_3 * H3_4 + + EE_position = H0_4[:3, 3] + print('EE Position (CM):') + # print(f'x: {float(EE_position[0].evalf()):.2f}') + # print(f'y: {float(EE_position[1].evalf()):.2f}') + # print(f'z: {float(EE_position[2].evalf()):.2f}') + + theta1_deg = sym.deg(theta1.evalf()) + theta2_deg = sym.deg(theta2.evalf()) + theta3_deg = sym.deg(theta3.evalf()) + + print('\nTheta Angles:') + print(f'Base: {theta1_deg.evalf():.2f} degrees') + print(f'1: {theta2_deg.evalf():.2f} degrees') + print(f'2: {theta3_deg.evalf():.2f} degrees') + return theta1_deg, theta2_deg, theta3_deg + else: + print('This point is not feasible') + return None + +# End Utility functions - - - - - - - - - - + +class MoveToPointInSpaceController(Controller): + + def __init__(self, selected_HAL: HAL_base, arm_params: ArmPhysicalParameters) -> None: + Controller.__init__(selected_HAL) + self.selected_HAL: HAL_base = selected_HAL + self.arm_params: ArmPhysicalParameters = arm_params + + def start(self) -> None: + # Initialize current position (could also read from hardware) + joint_positions = self.get_joint_positions()[-1] # End effector position + self.current_x = joint_positions[0] + self.current_y = joint_positions[1] + self.current_z = joint_positions[2] + self.target_cord_x = 0 + self.target_cord_y = 0 + self.target_cord_z = 0 + + self.target_diameter_pixels = 0 + + self.keep_running = True + + def stop(self) -> None: + self.keep_running = False + + def set_target_position_in_space(self, x: int, y: int, z: int, target_diameter_pixels: int) -> None: + """ + Set the target position in space for the arm to move to. + :param x: Target cordinate in the x direction. + :param y: Target cordinate in the y direction. + :param z: Target cordinate in the z direction. + :param target_diameter_pixels: Diameter of the target in pixels. + """ + self.target_cord_x = x + self.target_cord_y = y + self.target_cord_z = z + + self.target_diameter_pixels = target_diameter_pixels + + set_arm_target_position( + x / M_TO_CM, # Convert cm to m + y / M_TO_CM, # Convert cm to m + z / M_TO_CM, # Convert cm to m + self.selected_HAL, + self.arm_params, + smoothing_factor=SMOOTHING_FACTOR + ) + + def step_towards_target(self, step_size: float = 1.0) -> None: + # Move a small step towards the target + dx = self.target_cord_x - self.current_x + dy = self.target_cord_y - self.current_y + dz = self.target_cord_z - self.current_z + + distance = math.sqrt(dx**2 + dy**2 + dz**2) + if distance < step_size: + # Close enough, set to target + self.current_x = self.target_cord_x + self.current_y = self.target_cord_y + self.current_z = self.target_cord_z + else: + # Step towards target + self.current_x += dx / distance * step_size + self.current_y += dy / distance * step_size + self.current_z += dz / distance * step_size + + set_arm_target_position( + self.current_x / M_TO_CM, + self.current_y / M_TO_CM, + self.current_z / M_TO_CM, + self.selected_HAL, + self.arm_params, + smoothing_factor=SMOOTHING_FACTOR + ) + + def run_to_target(self, step_size: float = 1.0, interval: float = 0.03) -> None: + self.keep_running = True + while self.keep_running: + self.step_towards_target(step_size) + # Stop if at target + if (self.current_x == self.target_cord_x and + self.current_y == self.target_cord_y and + self.current_z == self.target_cord_z): + break + time.sleep(interval) + + def test_move(self, step_size: float = 2.0, interval: float = 0.05) -> None: + """ + Move the arm to the furthest extents in each axis in a loop. + Prints 'Success' when each point is reached. + Uses ArmPhysicalParameters to calculate the workspace. + """ + # Calculate reach in the X-Y plane (ignoring offsets for simplicity) + max_reach = self.arm_params.a3 + self.arm_params.a5 + self.arm_params.a6 + min_reach = self.arm_params.a2 # horizontal offset from shoulder to elbow + + # Z extents + z_min = self.arm_params.a1 # base height + z_max = self.arm_params.a1 + self.arm_params.a3 + self.arm_params.a5 + self.arm_params.a6 + + # Four extreme points (in centimeters) + points = [ + (max_reach, 0, z_min), # Far right, lowest + (0, max_reach, z_min), # Far forward, lowest + (max_reach, 0, z_max), # Far right, highest + (0, max_reach, z_max), # Far forward, highest + ] + + for idx, (x, y, z) in enumerate(points): + print(f"Moving to point {idx+1}: ({x:.2f}, {y:.2f}, {z:.2f})") + self.set_target_position_in_space(x, y, z, target_diameter_pixels=0) + joint_positions = self.get_joint_positions()[-1] + self.current_x = joint_positions[0] + self.current_y = joint_positions[1] + self.current_z = joint_positions[2] + while (abs(self.current_x - x) > 1 or + abs(self.current_y - y) > 1 or + abs(self.current_z - z) > 1): + self.step_towards_target(step_size) + time.sleep(interval) + print(f"Success: Reached point {idx+1} ({x:.2f}, {y:.2f}, {z:.2f})") + + def get_joint_angles(self) -> list[float]: + """Return a list of current joint angles in degrees.""" + return [self.selected_HAL.get_joint(i) for i in range(self.selected_HAL.joint_count())] + + def get_joint_positions(self) -> list[tuple[float, float, float]]: + """ + Return a list of (x, y, z) positions for each joint, including the end effector. + Uses DH parameters from ArmPhysicalParameters. + """ + # Get current joint angles in radians + thetas = [math.radians(self.selected_HAL.get_joint(i)) for i in range(self.selected_HAL.joint_count())] + # Get DH parameters + a1 = self.arm_params.a1 + a2 = self.arm_params.a2 + a3 = self.arm_params.a3 + a4 = self.arm_params.a4 + a5 = self.arm_params.a5 + a6 = self.arm_params.a6 + + # Build DH table: (theta, alpha, r, d) + dh_params = [ + (thetas[0], math.pi/2, 0, a1), + (thetas[1], 0, a3, -a2), + (thetas[2], math.pi/2, 0, a4), + (0, 0, 0, a5 + a6) + ] + + # Start with identity matrix + T = np.eye(4) + positions = [(0, 0, 0)] # Base at origin + + for theta, alpha, r, d in dh_params: + T = np.dot(T, self._dh_matrix(theta, alpha, r, d)) + pos = T[:3, 3] + positions.append(tuple(float(x) for x in pos)) + + return positions # Each is (x, y, z), last is end effector + + def get_end_effector_position(self) -> tuple[float, float, float]: + """Return the (x, y, z) position of the end effector.""" + return self.get_joint_positions()[-1] + + def get_end_effector_rotation(self) -> tuple[float, float, float]: + """ + Return the current forward direction (z-axis) of the end effector as a 3-tuple. + """ + # Get current joint angles in radians + thetas = [math.radians(self.selected_HAL.get_joint(i)) for i in range(self.selected_HAL.joint_count())] + a1 = self.arm_params.a1 + a2 = self.arm_params.a2 + a3 = self.arm_params.a3 + a4 = self.arm_params.a4 + a5 = self.arm_params.a5 + a6 = self.arm_params.a6 + + dh_params = [ + (thetas[0], math.pi/2, 0, a1), + (thetas[1], 0, a3, -a2), + (thetas[2], math.pi/2, 0, a4), + (0, 0, 0, a5 + a6) + ] + + T = np.eye(4) + for theta, alpha, r, d in dh_params: + T = np.dot(T, self._dh_matrix(theta, alpha, r, d)) + # The forward direction is the z-axis of the end effector frame (third column of rotation part) + forward = T[:3, 2] + return tuple(float(x) for x in forward) + + @staticmethod + def _dh_matrix(theta: float, alpha: float, r: float, d: float) -> np.ndarray: + """Return the DH transformation matrix.""" + return np.array([ + [np.cos(theta), -np.sin(theta)*np.cos(alpha), np.sin(theta)*np.sin(alpha), r*np.cos(theta)], + [np.sin(theta), np.cos(theta)*np.cos(alpha), -np.cos(theta)*np.sin(alpha), r*np.sin(theta)], + [0, np.sin(alpha), np.cos(alpha), d], + [0, 0, 0, 1] + ]) + diff --git a/Controllers/Base/MoveToPointOnScreenController.py b/Controllers/Base/MoveToPointOnScreenController.py new file mode 100644 index 0000000..a4246cf --- /dev/null +++ b/Controllers/Base/MoveToPointOnScreenController.py @@ -0,0 +1,90 @@ +import threading +import math +import asyncio + +from HALs.HAL_base import HAL_base +from Controllers.Base.Controller import Controller + +class MoveToPointOnScreenController(Controller): + + def __init__(self, selected_HAL: HAL_base): + Controller.__init__(selected_HAL) + self.selected_HAL = selected_HAL + + self.target_positional_tolerance = 5 # acceptable distance from the target screen position (was named error_tolerance_coord) + self.target_diameter_pixels = 0 # diameter of the target in screenspace pixels (was pixel_dia) + self.target_actual_diameter_pixels = 50 # actual diameter of the target in screenspace pixels (used to navigate closer) (was dia_target) + self.smoothness = 3 # how smooth the movements should be, higher is smoother and slower (was called lambda_) + self.error_movment_intensity = 1 # how much the servos should move when the error is detected (was called K) + + self.verbose_logging = False # if True, will print out the theta values to the console + + self.keep_running = True + self.paused = False + + def start(self) -> None: + self.horizontal_distance_from_center = 0 + self.vertical_distance_from_center = 0 + + def stop(self) -> None: + pass + + def set_target_position_on_screen(self, x: int, y: int, target_diameter_pixels): + """ + Set the target position on the screen for the arm to move to. + :param x: Horizontal distance from the center of the frame. + :param y: Vertical distance from the center of the frame. + :param target_diameter_pixels: Diameter of the target in pixels (used to decide when the arm is 'close enough'). + """ + self.horizontal_distance_from_center = x + self.vertical_distance_from_center = y + self.target_diameter_pixels = target_diameter_pixels + + def calculate_base_theta(self): + if abs(self.horizontal_distance_from_center) > self.target_positional_tolerance: + self.theta_base = self.selected_HAL.get_joint(0) - self.error_movment_intensity * self.horizontal_distance_from_center * math.exp(-self.smoothness) + + #move without slow decient + # self.theta_base = self.theta_base - self.error_movment_intensity * (1 - math.exp(-self.smoothness * self.horizontal_distance_from_center)) + return self.theta_base + + def calculate_servo_1_theta(self): + if abs(self.target_diameter_pixels) < self.target_actual_diameter_pixels: + self.servo_1 = self.servo_1 + 3 * self.target_diameter_pixels * math.exp(-self.smoothness) + if abs(self.target_diameter_pixels) > self.target_actual_diameter_pixels: + self.servo_1 = self.servo_1 - 3 * self.target_diameter_pixels * math.exp(-self.smoothness) + return self.servo_1 + + def calculate_servo_2_theta(self): + if abs(self.vertical_distance_from_center) > self.target_positional_tolerance: + self.servo_2 = self.selected_HAL.get_joint(2) - self.error_movment_intensity * self.vertical_distance_from_center * math.exp(-self.smoothness) + return self.servo_2 + + def apply_movment(self) -> bool: + theta_base = self.calculate_base_theta() + servo_1 = self.calculate_servo_1_theta() + servo_2 = self.calculate_servo_2_theta() + + if(theta_base < 0): + theta_base = 0 + if(servo_2 < 0): + servo_2 = 0 + + if self.verbose_logging: + print('theta_base: ' + str(theta_base)) + #print('servo_1: ' + str(servo_1)) + print('servo_2: ' + str(servo_2)) + print('------------------------------------------------------') + + set_0_sucess = self.selected_HAL.set_joint(0, theta_base) + #set_1_sucess = self.selected_HAL.set_joint(1, servo_1) + set_2_sucess = self.selected_HAL.set_joint(2, servo_2) + + return (set_0_sucess and set_2_sucess) # and set_1_sucess + + async def calculate_theta(self): + while self.keep_running: + # if there is an object found + if not self.paused: + self.apply_movment() + await asyncio.sleep(0.03) #run detection every 1/30 seconds \ No newline at end of file diff --git a/Controllers/Base/VisionControllerBase.py b/Controllers/Base/VisionControllerBase.py new file mode 100644 index 0000000..ae45870 --- /dev/null +++ b/Controllers/Base/VisionControllerBase.py @@ -0,0 +1,87 @@ +from typing import List +import threading +from threading import Lock + +from Vision.VisionObject import VisionObject +from Vision.VisualObjectIdentifier import VisualObjectIdentifier + +class VisionBaseController(): + + def __init__(self, vision: VisualObjectIdentifier, target_label: str = None): + self.vision: VisualObjectIdentifier = vision + + self.target_label = target_label + if self.target_label is not None: + self.target_label = self.target_label.lower() + + self.last_frame_objects: List[VisionObject] = [] + self.last_frame_objects_lock: Lock = threading.Lock() + + def set_last_frame_objects(self, objects: List[VisionObject]) -> None: + """Sets the last frame objects to the given list of objects.""" + with self.last_frame_objects_lock: + self.last_frame_objects = objects + + def set_target_label(self, label: str) -> bool: + """This controller will only target objects with the specified label.""" + if self.is_label_in_universe(label): + self.target_label = label.lower() + return True + else: + print(f"Label \"{label}\" is not in the universe of {type(self).__name__}.") + print(f"Please select a lable that is in universe: {self.vision.get_all_potential_labels()}") + return False + + def get_target_label(self) -> str: + """Returns the label of the object that the controller is currently targeting.""" + return self.target_label + + def is_label_in_universe(self, label: str) -> bool: + """Returns True if the label is something this controler can see, else false.""" + all_labels = self.vision.get_all_potential_labels() + return label.lower() in (l.lower() for l in all_labels) + + + def get_visible_object_labels(self) -> list[str]: + """Returns a list of identifiers of objects that are visible to the arm""" + with self.last_frame_objects_lock: + if self.last_frame_objects is None: + return [] + return [obj.label for obj in self.last_frame_objects] + + def get_visible_object_labels_detailed(self) -> list[str]: + """Returns a list of objects that are visible to the arm, including metadata""" + with self.last_frame_objects_lock: + if self.last_frame_objects is None: + return [] + # return a series of string that represent the object, starting withe object's label, then radius, then metadata + return [f"{obj.label} radius: {obj.radius} {obj.metadata}" for obj in self.last_frame_objects] + #return [f"{obj.label}_object" for obj in self.last_frame_objects] + + def get_all_posible_labels(self) -> list[str]: + """Returns a list of all possible labels that this controller can see, even if they are not currently visible.""" + return self.vision.get_all_potential_labels() + + def select_largest_target_object(self, detected_objects: List[VisionObject]) -> VisionObject: + # print("found: " + str(len(detected_objects)) + " objects") + + # # print the labels of all visible objects on one line + # for obj in detected_objects: + # print(obj.label, end=", ") + # print() + + if detected_objects: + found_target = None + for obj in detected_objects: + if obj.label.lower() == self.target_label: + if found_target is not None: + if obj.radius > found_target.radius: + found_target = obj + else: + found_target = obj + + # print("largest object: " + str(found_target.label)) + return found_target + else: + return None + \ No newline at end of file diff --git a/Controllers/Base/__init__.py b/Controllers/Base/__init__.py new file mode 100644 index 0000000..22f047b --- /dev/null +++ b/Controllers/Base/__init__.py @@ -0,0 +1 @@ +# This file marks the Controllers.Base directory as a Python package. diff --git a/Controllers/FollowLargestObjectControler.py b/Controllers/FollowLargestObjectControler.py index fb79e47..a255e43 100644 --- a/Controllers/FollowLargestObjectControler.py +++ b/Controllers/FollowLargestObjectControler.py @@ -1,207 +1,86 @@ -import threading -import math +""" +FollowLargestObjectControler.py + +Summary: + This script implements a controller for a robotic arm that uses computer vision to detect, track, and follow the largest object in the camera's view. + It integrates with a hardware abstraction layer (HAL), vision modules, and arm kinematics to compute and command joint movements based on the detected object's position. + The controller supports asynchronous operation, real-time visualization, and applies joint limits from the arm's physical parameters. + +Author: + UCSC Slugbotics Club, Arm Team +""" + from typing import List -import cv2 +from typing import Tuple import asyncio -from threading import Lock +import threading +import cv2 from HALs.HAL_base import HAL_base from Vision.VisionObject import VisionObject from Vision.VisualObjectIdentifier import VisualObjectIdentifier from Modules.Base.ImageProducer import ImageProducer -from Controllers.Controller import Controller +from Controllers.Base.Controller import Controller + +from Controllers.Base.VisionControllerBase import VisionBaseController +from Controllers.Base.MoveToPointOnScreenController import MoveToPointOnScreenController +from Config.ArmPhysicalParameters import ArmPhysicalParameters + +class FollowLargestObjectControler(MoveToPointOnScreenController, VisionBaseController): + def __init__( + self, + selected_HAL: HAL_base, + vision: VisualObjectIdentifier, + target_label: str = None, + arm_params: ArmPhysicalParameters = None + ): + + VisionBaseController.__init__(self, vision, target_label) + MoveToPointOnScreenController.__init__(self, selected_HAL) -class FollowLargestObjectControler(Controller): - def __init__(self, selected_HAL: HAL_base, vision: VisualObjectIdentifier, target_label: str = None): self.selected_HAL: HAL_base = selected_HAL self.vision: VisualObjectIdentifier = vision self.imageGetter: ImageProducer = selected_HAL - - self._task = None # To keep track of the running task - self.keep_running = False - self.thread = None - self.verbose_logging = False - self.target_label = target_label - if self.target_label is not None: - self.target_label = self.target_label.lower() - - self.last_frame_objects: List[VisionObject] = [] - self.last_frame_objects_lock: Lock = threading.Lock() - - self.K = 1 - self.lambda_ = 3 # change (3 is smooth in sim) increase number to go slower - self.error_tolerance_coord = 5 # change for error precision - self.dia_target = 50 # change for error precision - self.error_x_distance = 0 - self.error_y_distance = 0 - self.pixel_dia = 0 - self.max_pixel_dia = 18 ### CHANGES ACCORDING TO CAMERA - self.contours = False - self.object_found = False - self.theta_base = 0 - self.servo_1 = 90 - self.servo_2 = 0 - self.movement_speed = 1 # 1/10 seconds - self.servo1_constraint = 45 - selected_HAL.set_joint_min(0, 0) # set_base_min_degree(0) - selected_HAL.set_joint_max(0, 270) # set_base_max_degree(270) - selected_HAL.set_joint_max(2, 75) # set_joint_2_max(75) - - if self.target_label is None: - all_labels = self.vision.get_all_potential_labels() - if len(all_labels) > 0: - self.target_label = all_labels[0].lower() - - def set_target_label(self, label: str) -> bool: - """This controller will only target objects with the specified label.""" - if self.is_label_in_universe(label): - self.target_label = label.lower() - return True - else: - print(f"Label \"{label}\" is not in the universe of {type(self).__name__}.") - print(f"Please select a lable that is in universe: {self.vision.get_all_potential_labels()}") - return False - - def get_target_label(self) -> str: - """Returns the label of the object that the controller is currently targeting.""" - return self.target_label - - def is_label_in_universe(self, label: str) -> bool: - """Returns True if the label is something this controler can see, else false.""" - all_labels = self.vision.get_all_potential_labels() - return label.lower() in (l.lower() for l in all_labels) - - def get_visible_object_labels(self) -> list[str]: - """Returns a list of identifiers of objects that are visible to the arm""" - with self.last_frame_objects_lock: - if self.last_frame_objects is None: - return [] - return [obj.label for obj in self.last_frame_objects] - - def get_visible_object_labels_detailed(self) -> list[str]: - """Returns a list of objects that are visible to the arm, including metadata""" - with self.last_frame_objects_lock: - if self.last_frame_objects is None: - return [] - # return a series of string that represent the object, starting withe object's label, then radius, then metadata - return [f"{obj.label} radius: {obj.radius} {obj.metadata}" for obj in self.last_frame_objects] - #return [f"{obj.label}_object" for obj in self.last_frame_objects] - - def get_all_posible_labels(self) -> list[str]: - """Returns a list of all possible labels that this controller can see, even if they are not currently visible.""" - return self.vision.get_all_potential_labels() - - def get_error(self, frame_center, center): - return center - frame_center - - def calculate_base_theta(self): - if abs(self.error_x_distance) > self.error_tolerance_coord: - self.theta_base = self.selected_HAL.get_joint(0) - self.K * self.error_x_distance * math.exp(-self.lambda_) - - #move without slow decient - # self.theta_base = self.theta_base - self.K * (1 - math.exp(-self.lambda_ * self.error_x_distance)) - - def calculate_servo_1_theta(self): - if abs(self.pixel_dia) < self.dia_target: - self.servo_1 = self.servo_1 + 3 * self.pixel_dia * math.exp(-self.lambda_) - if abs(self.pixel_dia) > self.dia_target: - self.servo_1 = self.servo_1 - 3 * self.pixel_dia * math.exp(-self.lambda_) - - #constrain servo - if self.servo_1 < -self.servo1_constraint: - self.servo_1 = -self.servo1_constraint - if self.servo_1 > self.servo1_constraint: - self.servo_1 = self.servo1_constraint - - def calculate_servo_2_theta(self): - if abs(self.error_y_distance) > self.error_tolerance_coord: - self.servo_2 = self.selected_HAL.get_joint(2) - self.K * self.error_y_distance * math.exp(-self.lambda_) - - async def calculate_theta(self): - while self.keep_running: - # if there is an object found - if self.object_found: - self.calculate_base_theta() - self.calculate_servo_1_theta() - self.calculate_servo_2_theta() - - # if self.theta_base > 359: - # self.theta_base = 359 - # if self.theta_base < 0: - # self.theta_base = 0 - - # if self.servo_1 < -90: - # self.servo_1 = -90 - # if self.servo_1 > 90: - # self.servo_1 = 90 - - # if self.servo_2 < 0: - # self.servo_2 = 0 - # if self.servo_2 > 180: - # self.servo_2 = 180 - - + self.arm_params: ArmPhysicalParameters = arm_params # Store ArmPhysicalParameters - if(self.theta_base < 0): - self.theta_base = 0 - if(self.servo_2 < 0): - self.servo_2 = 0 + # Apply joint limits from ArmPhysicalParameters if provided + if self.arm_params is not None: + self.selected_HAL.set_joint_min(0, self.arm_params.base_min) + self.selected_HAL.set_joint_max(0, self.arm_params.base_max) + self.selected_HAL.set_joint_min(1, self.arm_params.joint1_min) + self.selected_HAL.set_joint_max(1, self.arm_params.joint1_max) + self.selected_HAL.set_joint_min(2, self.arm_params.joint2_min) + self.selected_HAL.set_joint_max(2, self.arm_params.joint2_max) - if self.verbose_logging: - print('theta_base: ' + str(self.theta_base)) - #print('servo_1: ' + str(self.servo_1)) - print('servo_2: ' + str(self.servo_2)) - print('------------------------------------------------------') + self.keep_running = False - self.selected_HAL.set_joint(0, self.theta_base) - #self.selected_HAL.set_joint(1, self.servo_1) - self.selected_HAL.set_joint(2, self.servo_2) - - await asyncio.sleep(0.03) #run detection every 1/30 seconds - - def select_largest_target_object(self, detected_objects: List[VisionObject]) -> VisionObject: - # print("found: " + str(len(detected_objects)) + " objects") - - # # print the labels of all visible objects on one line - # for obj in detected_objects: - # print(obj.label, end=", ") - # print() - - if detected_objects: - found_target = None - for obj in detected_objects: - if obj.label.lower() == self.target_label: - if found_target is not None: - if obj.radius > found_target.radius: - found_target = obj - else: - found_target = obj - - # print("largest object: " + str(found_target.label)) - return found_target - else: - return None - - async def move_towards_object(self, obj: VisionObject, draw_frame: cv2.typing.MatLike = None) -> None: + def get_error(self, frame_center, center): + return center - frame_center + + def get_object_center_in_screen_space(self, obj: VisionObject) -> Tuple[int, int]: #calculate the distance from the centmessage.params[1]er of the frame frame_center_x = obj.frame_width // 2 frame_center_y = obj.frame_height // 2 - self.error_x_distance = self.get_error(frame_center_x, obj.get_center_x()) - self.error_y_distance = self.get_error(frame_center_y, obj.get_center_y()) - self.pixel_dia = obj.radius - if self.verbose_logging: - print(f"Error X: {self.error_x_distance}, Error Y: {self.error_y_distance}, Radius: {self.pixel_dia}") + distance_from_center_x = self.get_error(frame_center_x, obj.get_center_x()) + distance_from_center_y = self.get_error(frame_center_y, obj.get_center_y()) - #draw the bounding circle and the center of it - if draw_frame is not None: - center = (obj.get_center_x(), obj.get_center_y()) - cv2.circle(draw_frame, center, obj.radius, (0, 255, 0), 2) - cv2.circle(draw_frame, center, 7, (255, 255, 255), -1) + return (distance_from_center_x, distance_from_center_y) - async def update_frame_loop(self): - while self.keep_running: - await self.update_frame() + def draw_target_visualization(self, frame, target_object: VisionObject): + #draw the bounding circle and the center of it + if frame is not None: + center = (target_object.get_center_x(), target_object.get_center_y()) + cv2.circle(frame, center, target_object.radius, (0, 255, 0), 2) + cv2.circle(frame, center, 7, (255, 255, 255), -1) + + flipped_source_frame_rgb = cv2.flip(target_object.source_frame_rgb, 0) + bgr_image = cv2.cvtColor(flipped_source_frame_rgb, cv2.COLOR_RGB2BGR) + cv2.imshow('Frame', bgr_image) + if target_object.mask is not None: + # flipped_mask = cv2.flip(target_object.mask, 0) + # cv2.imshow('Mask', flipped_mask) + cv2.imshow('Mask', target_object.mask) async def update_frame(self) -> bool: frame = self.imageGetter.capture_image() @@ -212,32 +91,41 @@ async def update_frame(self) -> bool: self.object_found = (target_object is not None) if(self.object_found): - await self.move_towards_object(target_object, frame) - - flipped_source_frame_rgb = cv2.flip(target_object.source_frame_rgb, 0) - bgr_image = cv2.cvtColor(flipped_source_frame_rgb, cv2.COLOR_RGB2BGR) - cv2.imshow('Frame', bgr_image) - if target_object.mask is not None: - # flipped_mask = cv2.flip(target_object.mask, 0) - # cv2.imshow('Mask', flipped_mask) - cv2.imshow('Mask', target_object.mask) + distance_from_center_x, distance_from_center_y = self.get_object_center_in_screen_space(target_object) + MoveToPointOnScreenController.set_target_position_on_screen(distance_from_center_x, distance_from_center_y, target_object.radius) + + if self.verbose_logging: + print(f"Error X: {distance_from_center_x}, Error Y: {distance_from_center_y}, Radius: {target_object.radius}") + + self.draw_target_visualization(frame, target_object) + # check if the user pressed 'q' to exit if cv2.waitKey(1) & 0xFF == ord('q'): - await asyncio.sleep(0.03) #run detection every 1/30 seconds + await asyncio.sleep(0.03) # wait and exit return False - with self.last_frame_objects_lock: - self.last_frame_objects = detected_objects - - await asyncio.sleep(0.03) #run detection every 1/30 seconds - + self.set_last_frame_objects(detected_objects) + return True - + +#regon Threadding stuff + + async def update_vision_loop(self): + while self.keep_running: + self.keep_running = await self.update_frame() + await asyncio.sleep(0.03) #run detection every 1/30 seconds + + async def update_movment_loop(self): + while self.keep_running: + if self.object_found: + await MoveToPointOnScreenController.apply_movment() + await asyncio.sleep(0.03) #run movment every 1/30 seconds + async def start_async(self): await asyncio.gather( - self.update_frame_loop(), - self.calculate_theta(), + self.update_vision_loop(), + self.update_movment_loop(), ) - + def thread_main(self): asyncio.run(self.start_async()) @@ -247,4 +135,6 @@ def start(self): self.thread.start() def stop(self): - self.keep_running = False \ No newline at end of file + self.keep_running = False + +#endregon Threadding stuff \ No newline at end of file diff --git a/Controllers/FollowObject3DControler.py b/Controllers/FollowObject3DControler.py new file mode 100644 index 0000000..a272649 --- /dev/null +++ b/Controllers/FollowObject3DControler.py @@ -0,0 +1,612 @@ +""" +FollowObject3DControler.py + +Summary: + This script implements a controller for a robotic arm that visually tracks and follows a 3D object using computer vision. + It integrates with a hardware abstraction layer (HAL), vision modules, and arm kinematics to compute and command joint movements + based on the detected position of a target object in camera frames. The controller supports asynchronous operation, + object selection by label, and provides utility methods for interacting with detected objects and arm state. + +Author: + UCSC Slugbotics Club, Arm Team +""" + +import threading +from threading import Lock +import math +from typing import List +import cv2 +import asyncio +import numpy as np +import sympy as sym +#import RPi.GPIO as GPIO +import time +import logging + +from HALs.HAL_base import HAL_base +from Vision.VisionObject import VisionObject +from Vision.VisualObjectIdentifier import VisualObjectIdentifier +from Modules.Base.ImageProducer import ImageProducer +from Controllers.Base.Controller import Controller +from Config.ArmPhysicalParameters import ArmPhysicalParameters + +# --- Constants for Magic Numbers --- +SMOOTHING_FACTOR = 0.1 +ANGLE_OFFSET_90 = 90 +ANGLE_OFFSET_180 = 180 +FRAME_SLEEP_SECONDS = 0.03 +HARD_CODED_ANGLE_RAD = math.radians(22.5) +A6_EPSILON = 0.0001 +M_TO_CM = 100 +PIXEL_TO_METER = 0.01 +resize_fx = 0.25 +resize_fy = 0.25 + +def set_arm_target_position( + x: float, + y: float, + z: float, + hal, + arm_params: ArmPhysicalParameters, + smoothing_factor: float = 0.1 +) -> bool: + """ + Calculates and commands the arm to reach the given (x, y, z) position. + + Args: + x (float): Target x coordinate. + y (float): Target y coordinate. + z (float): Target z coordinate. + hal: Hardware abstraction layer for controlling the arm. + arm_params (ArmPhysicalParameters): Arm physical parameters (required). + smoothing_factor (float): Smoothing factor for joint movement. + + Returns: + bool: True if the position is reachable and command sent, False if fallback was used. + + Raises: + ValueError: If arm_params is not provided. + Exception: For unexpected errors. + """ + + try: + angles = _calculate_kinematics(x, y, z, arm_params) + if angles is None: + logging.warning("Target position unreachable, using fallback kinematics.") + _fallback_kinematics(x, y, z, hal, arm_params, smoothing_factor) + return False + else: + _command_hardware(angles, x, y, z, hal, arm_params, smoothing_factor) + return True + except Exception as err: + logging.exception(f"Exception in set_arm_target_position: {err}") + raise + +def _calculate_kinematics( + x: float, + y: float, + z: float, + arm_params: 'ArmPhysicalParameters' +): + """ + Calculates joint angles using the arm's kinematics. + + Returns: + tuple or None: (theta1_deg, theta2_deg, theta3_deg) if reachable, else None. + """ + robot_arm = Three_Degree_Arm( + arm_params.a1, arm_params.a2, arm_params.a3, + arm_params.a4, arm_params.a5, arm_params.a6 + ) + return robot_arm.calculate_angles(sym.Matrix([x, y, z, 1])) + +def _fallback_kinematics( + x: float, + y: float, + z: float, + hal, + arm_params: ArmPhysicalParameters, + smoothing_factor: float +) -> None: + """ + Fallback for unreachable positions: computes simple base and shoulder angles. + """ + a1 = arm_params.a1 + ANGLE_OFFSET_90 = 90 + ANGLE_OFFSET_180 = 180 + + theta = sym.atan2(y, x) if (x != 0 or y != 0) else 0 + opp = z - a1 + dist = sym.sqrt(x ** 2 + y ** 2) + t0deg = float(sym.deg(theta).evalf()) + ANGLE_OFFSET_180 + t0deg = (t0deg + 360) % 360 + theta1 = sym.atan(opp / dist) if dist != 0 else ANGLE_OFFSET_90 + t1deg = ANGLE_OFFSET_90 - (float(sym.deg(theta1).evalf())) + t0n = hal.get_joint(0) + t1n = hal.get_joint(1) + t0n = t0n + (t0deg - t0n) * smoothing_factor + t1n = t1n + (t1deg - t1n) * smoothing_factor + hal.set_joint(0, t0n) + hal.set_joint(1, t1n) + hal.set_joint(2, 0) + logging.info(f"Fallback kinematics used: base={t0n:.2f}, shoulder={t1n:.2f}, elbow=0") + +def _command_hardware( + angles, + x: float, + y: float, + z: float, + hal, + arm_params: ArmPhysicalParameters, + smoothing_factor: float +) -> None: + """ + Sends calculated joint angles to the hardware, with optional smoothing. + """ + ANGLE_OFFSET_90 = 90 + ANGLE_OFFSET_180 = 180 + + theta = sym.atan2(y, x) if (x != 0 or y != 0) else 0 + t0deg = float(sym.deg(theta).evalf()) + ANGLE_OFFSET_180 + t0deg = (t0deg + 360) % 360 + angle_base = t0deg + angle_base = (angle_base + 360) % 360 + angle_1 = -(angles[1] - ANGLE_OFFSET_90) + angle_2 = -(angles[2] - ANGLE_OFFSET_90) + + # Always use smoothing for consistency + base_val = hal.get_joint(0) + smoothing_factor * (angle_base - hal.get_joint(0)) + shoulder_val = hal.get_joint(1) + smoothing_factor * (angle_1 - hal.get_joint(1)) + elbow_val = hal.get_joint(2) + smoothing_factor * (angle_2 - hal.get_joint(2)) + hal.set_joint(0, base_val) + hal.set_joint(1, shoulder_val) + hal.set_joint(2, elbow_val) + logging.info(f"Smooth move: base={base_val:.2f}, shoulder={shoulder_val:.2f}, elbow={elbow_val:.2f}") + +class Three_Degree_Arm: + def __init__(self, a1, a2, a3, a4, a5, a6) -> None: + self.a1 = a1 + self.a2 = a2 + self.a3 = a3 + self.a4 = a4 + self.a5 = a5 + self.a6 = a6 + + def DH_matrix(self, theta, alpha, r, d): + return sym.Matrix([ + [sym.cos(theta), -sym.sin(theta) * sym.cos(alpha), sym.sin(theta) * sym.sin(alpha), r * sym.cos(theta)], + [sym.sin(theta), sym.cos(theta) * sym.cos(alpha), -sym.cos(theta) * sym.sin(alpha), r * sym.sin(theta)], + [0, sym.sin(alpha), sym.cos(alpha), d], + [0, 0, 0, 1] + ]) + + def calculate_angles(self, EE) -> bool: + x, y, z = EE[0], EE[1], EE[2] + + # Condition checks + cond1 = (np.abs(x) - self.a3) ** 2 + (z - self.a1) ** 2 < (self.a5 + self.a6) ** 2 and np.abs( + x) > self.a3 and 0 < z < self.a1 + cond2 = np.abs(x) ** 2 + (z - self.a1) ** 2 < (self.a3 + self.a5 + self.a6) ** 2 and np.abs(x) ** 2 + ( + z - self.a1) ** 2 > self.a3 ** 2 and np.abs(x) > 0 and z > self.a1 + + isInRegion = cond1 or cond2 + + if isInRegion: + print('The point is inside the region.') + print(EE) + theta1 = sym.atan2(y, x) + + H0_1 = self.DH_matrix(theta1, sym.pi / 2, 0, self.a1) + EE_local_H0_1 = H0_1.inv() * EE + + EE_x_H0_1 = EE_local_H0_1[0] + # not used in this context + # EE_y_H0_1 = EE_local_H0_1[2] + EE_z_H0_1 = EE_local_H0_1[1] + + l1 = self.a3 + l2 = self.a5 + self.a6 + R = sym.sqrt(EE_x_H0_1 ** 2 + EE_z_H0_1 ** 2) + theta = sym.atan(EE_z_H0_1 / EE_x_H0_1) + + beta = sym.acos((R ** 2 - l1 ** 2 - l2 ** 2) / (-2 * l1 * l2)) + if not beta.is_real: + return None + alpha = sym.asin((l2 * sym.sin(beta)) / R) + print(alpha) + theta2 = theta + alpha + theta3 = beta - sym.pi / 2 + + theta4 = 0 + + print(theta2.evalf()) + print(theta3.evalf()) + + H1_2 = self.DH_matrix(theta2, 0, self.a3, -self.a2) + H2_3 = self.DH_matrix(theta3, sym.pi / 2, 0, self.a4) + H3_4 = self.DH_matrix(theta4, 0, 0, self.a5 + self.a6) + + H0_2 = H0_1 * H1_2 + H0_3 = H0_2 * H2_3 + H0_4 = H0_3 * H3_4 + + EE_position = H0_4[:3, 3] + print('EE Position (CM):') + # print(f'x: {float(EE_position[0].evalf()):.2f}') + # print(f'y: {float(EE_position[1].evalf()):.2f}') + # print(f'z: {float(EE_position[2].evalf()):.2f}') + + theta1_deg = sym.deg(theta1.evalf()) + theta2_deg = sym.deg(theta2.evalf()) + theta3_deg = sym.deg(theta3.evalf()) + + print('\nTheta Angles:') + print(f'Base: {theta1_deg.evalf():.2f} degrees') + print(f'1: {theta2_deg.evalf():.2f} degrees') + print(f'2: {theta3_deg.evalf():.2f} degrees') + return theta1_deg, theta2_deg, theta3_deg + else: + print('This point is not feasible') + return isInRegion + +class FollowObject3DControler(Controller): + def __init__(self, selected_HAL: HAL_base, vision: VisualObjectIdentifier, arm_params: ArmPhysicalParameters, target_label: str = None): + self.selected_HAL: HAL_base = selected_HAL + self.vision: VisualObjectIdentifier = vision + self.imageGetter: ImageProducer = selected_HAL + self.arm_params: ArmPhysicalParameters = arm_params # Save ArmPhysicalParameters + + self._task = None # To keep track of the running task + self.keep_running = False + self.thread = None + self.verbose_logging = False + self.target_label = target_label + + self.last_frame_objects: List[VisionObject] = [] + self.last_frame_objects_lock: Lock = threading.Lock() + + # Use arm_params for all relevant parameters + self.K = 1 + self.lambda_ = 3 # change (3 is smooth in sim) increase number to go slower + self.error_tolerance_coord = 5 # change for error precision + self.dia_target = 50 # change for error precision + self.error_x_distance = 0 + self.error_y_distance = 0 + self.pixel_dia = 0 + self.max_pixel_dia = 18 ### CHANGES ACCORDING TO CAMERA + self.contours = False + self.object_found = False + self.theta_base = 0 + self.servo_1 = 90 + self.servo_2 = 0 + self.movement_speed = 1 # 1/10 seconds + self.servo1_constraint = 45 + self.init = False + self.frame=False + self.mask=False + self.paused = False + self.use_controller = False + + # Camera calibration from arm_params + self.sensor_size = arm_params.sensor_size + self.focal_length = arm_params.focal_length + self.sensor_res = arm_params.sensor_res + self.fov = math.radians(arm_params.fov) + + self.contours = False + selected_HAL.set_joint_min(0, arm_params.base_min) + selected_HAL.set_joint_max(0, arm_params.base_max) + selected_HAL.set_joint_min(1, arm_params.joint1_min) + selected_HAL.set_joint_max(1, arm_params.joint1_max) + selected_HAL.set_joint_min(2, arm_params.joint2_min) + selected_HAL.set_joint_max(2, arm_params.joint2_max) + + def get_error(self, frame_center, center): + return center - frame_center + + def convert(self, T): + return np.array([ + T[0][3], + T[1][3], + T[2][3], + ]) + + def merge(self, FK, BP): + FK = FK * PIXEL_TO_METER + print(FK) + print(BP) + return np.array([ + FK[0] + BP[0], + FK[1] - BP[1], + FK[2] + BP[2], + ]) + + def DH(self, theta, alpha, r, d): + return np.array([ + [np.cos(theta), -np.sin(theta) * round(np.cos(alpha)), np.sin(theta) * round(np.sin(alpha)), + r * np.cos(theta)], + [np.sin(theta), np.cos(theta) * round(np.cos(alpha)), -np.cos(theta) * round(np.sin(alpha)), + r * np.sin(theta)], + [0, round(np.sin(alpha)), round(np.cos(alpha)), d], + [0, 0, 0, 1] + ]) + def calculate_theta(self): + # Stage 1: Check if an object is found + if self.object_found: + # Stage 2: Camera and pixel calculations + # Calculate camera intrinsics and estimate distance to the object using its pixel diameter + focal_len = self.sensor_size / (2 * math.tan(self.fov / 2)) + focal_len_pixels = focal_len * (self.sensor_res / self.sensor_size) + distance_to_object = focal_len_pixels * .02 / (2 * self.pixel_dia) + x_norm = self.error_x_distance / focal_len_pixels + y_norm = self.error_y_distance / focal_len_pixels + x_off = -math.tan(x_norm) * distance_to_object + y_off = math.tan(y_norm) * distance_to_object + print (f"x_off: {x_off}, y_off: {y_off}, depth: {math.pow(distance_to_object, 2) - math.pow(x_off, 2) - math.pow(y_off, 2)}") + # Calculate the depth (z) using the Pythagorean theorem + depth = math.sqrt(math.pow(distance_to_object, 2) - math.pow(x_off, 2) - math.pow(y_off, 2)) + + # Stage 3: Compute rotation matrices for current arm joint angles + # These matrices represent the orientation of the arm's end effector + b = -np.deg2rad(self.selected_HAL.get_joint(1)) - np.deg2rad(self.selected_HAL.get_joint(2)) # around y + a = -HARD_CODED_ANGLE_RAD # around x + c = np.deg2rad(self.selected_HAL.get_joint(0)) # around z + print(f"Around Y = {b}, Around Z = {c}") + R_x = np.array([ + [1, 0, 0], + [0, np.cos(a), -np.sin(a)], + [0, np.sin(a), np.cos(a)] + ]) + R_y = np.array([ + [np.cos(b), 0, -np.sin(b)], + [0, 1, 0], + [np.sin(b), 0, np.cos(b)] + ]) + R_z = np.array([ + [np.cos(c), np.sin(c), 0], + [-np.sin(c), np.cos(c), 0], + [0, 0, 1] + ]) + + # Stage 4: Transform the offset to the arm's base frame + # Convert the camera offset into the arm's coordinate system + offset_matrix = np.array([ + x_off, + y_off, + depth + ]) + # First order matrix and transformation (apply R_x) + l1rot = R_x + l1inv = np.linalg.inv(l1rot) + l1sol = np.matmul(l1inv, offset_matrix) + l1tr = np.array([ + l1sol[0], + l1sol[1] + self.arm_params.kinematic_offset_y, + l1sol[2] + self.arm_params.kinematic_offset_z + ]) + # Second order matrix (apply R_y and R_z) + matrix = np.matmul(R_y, R_z) + inv_rot = np.linalg.inv(matrix) + solution = np.matmul(inv_rot, l1tr) + # Negate y to match arm's coordinate system + print(f"x:{solution[0]}, y: {solution[1]},z: {solution[2]}") + solution[1] = -solution[1] + + # Stage 5: Forward kinematics for current arm position + # Compute the current end effector position using DH parameters + a1 = self.arm_params.a1 + a2 = self.arm_params.a2 + a3 = self.arm_params.a3 + a4 = self.arm_params.a4 + a5 = self.arm_params.a5 + a6 = A6_EPSILON # keep as before for calculation stability + theta1 = round(np.deg2rad(self.selected_HAL.get_joint(0)), 5) if np.abs(np.deg2rad(self.selected_HAL.get_joint(0))) > .001 else 0 + theta2 = round(np.deg2rad(self.selected_HAL.get_joint(1)), 5) + math.radians(ANGLE_OFFSET_90) if np.abs(np.deg2rad(self.selected_HAL.get_joint(1))) > .001 else math.radians(ANGLE_OFFSET_90) + theta3 = round(np.deg2rad(self.selected_HAL.get_joint(2)), 5) + math.radians(ANGLE_OFFSET_90) if np.abs(np.deg2rad(self.selected_HAL.get_joint(2))) > .001 else math.radians(ANGLE_OFFSET_90) + theta4 = 0 + H0_1 = self.DH(theta1, np.pi / 2, 0, a1) + H1_2 = self.DH(theta2, 0, a3, -a2) + H2_3 = self.DH(theta3, np.pi / 2, 0, a4) + H3_4 = self.DH(theta4, 0, 0, a5 + a6) + H0_2 = np.matmul(H0_1, H1_2) + H0_3 = np.matmul(H0_2, H2_3) + H0_4 = np.matmul(H0_3, H3_4) + print(f"t1: {theta1}, t2: {theta2}, t3: {theta3}") + print(f"FK: {self.convert(H0_4)}") + + # Stage 6: Compute the target position for the arm + # Merge the current end effector position and the computed offset, convert to centimeters + ballPosition = self.merge(self.convert(H0_4), solution) * M_TO_CM + print(f"Final Ball Position (cm) {ballPosition}") + + # Stage 7: Command the arm to move to the computed position + set_arm_target_position(ballPosition[0], ballPosition[1], ballPosition[2], self.selected_HAL, self.arm_params) + + # Stage 8: Sleep briefly to control update rate (should be awaited if in async context) + asyncio.sleep(FRAME_SLEEP_SECONDS) #run detection every 1/30 seconds + + def select_largest_target_object(self, detected_objects: List[VisionObject]) -> VisionObject: + # print("found: " + str(len(detected_objects)) + " objects") + + # # print the labels of all visible objects on one line + # for obj in detected_objects: + # print(obj.label, end=", ") + # print() + + if detected_objects: + found_target = None + for obj in detected_objects: + if obj.label == self.target_label: + if found_target is not None: + if obj.radius > found_target.radius: + found_target = obj + else: + found_target = obj + + # print("largest object: " + str(found_target.label)) + return found_target + else: + return None + + async def move_towards_object(self, obj: VisionObject, draw_frame: cv2.typing.MatLike = None) -> None: + #calculate the distance from the centmessage.params[1]er of the frame + frame_center_x = obj.frame_width // 2 + frame_center_y = obj.frame_height // 2 + self.error_x_distance = self.get_error(frame_center_x, obj.x) + self.error_y_distance = self.get_error(frame_center_y, obj.y) + self.pixel_dia = obj.radius + if self.verbose_logging: + print(f"Error X: {self.error_x_distance}, Error Y: {self.error_y_distance}, Radius: {self.pixel_dia}") + + #draw the bounding circle and the center of it + if draw_frame is not None: + center = (obj.x, obj.y) + cv2.circle(draw_frame, center, obj.radius, (0, 255, 0), 2) + cv2.circle(draw_frame, center, 7, (255, 255, 255), -1) + + async def update_frame_loop(self): + while self.keep_running: + if self.paused: + await self.update_frame() + if self.use_controller: + await self.update_controller() + + def get_frame_mask(self): + return cv2.resize(self.imageGetter.capture_image(), (0,0), fx=resize_fx, fy=resize_fy) , self.mask + + def calculate_sam_theta(self): + if self.object_found: + self.calculate_base_theta() + self.calculate_servo_1_theta() + self.calculate_servo_2_theta() + if(self.theta_base < 0): + self.theta_base = 0 + if(self.servo_2 < 0): + self.servo_2 = 0 + + print('theta_base: ' + str(self.theta_base)) + # print('servo_1: ' + str(self.servo_1)) + print('servo_2: ' + str(self.servo_2)) + print('------------------------------------------------------') + + self.selected_HAL.set_joint(0, self.theta_base) + # set_joint(1, self.servo_1) + self.selected_HAL.set_joint(2, self.servo_2) + + + async def update_frame(self) -> bool: + print("x") + frame = cv2.resize(self.imageGetter.capture_image(), (0,0), fx=resize_fx, fy=resize_fy) + # pframe = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) + # # For Sam Code ON physical**** + # pframe = cv2.flip(pframe, -1) + + detected_objects: List[VisionObject] = self.vision.process_frame(frame) + + largest_object: VisionObject = self.select_largest_target_object(detected_objects) + + self.object_found = (largest_object is not None) + if(self.object_found): + await self.move_towards_object(largest_object, frame) + + self.frame = cv2.flip(cv2.cvtColor(largest_object.source_frame_rgb, cv2.COLOR_RGB2BGR),0) + amask = cv2.cvtColor(largest_object.source_frame_rgb, cv2.COLOR_RGB2BGR) + self.calculate_sam_theta() + #self.calculate_theta() + center = (largest_object.x, largest_object.y) + cv2.circle(amask, center, largest_object.radius, (0, 255, 0), 2) + cv2.circle(amask, center, 7, (255, 255, 255), -1) + self.mask = cv2.flip(amask,-1) + print('Object Found!') + else: + self.mask = cv2.resize(self.imageGetter.capture_image(), (0,0), fx=resize_fx, fy=resize_fy) + if cv2.waitKey(1) & 0xFF == ord('q'): + await asyncio.sleep(0.03) #run detection every 1/30 seconds + return False + + with self.last_frame_objects_lock: + self.last_frame_objects = detected_objects + + await asyncio.sleep(0.03) #run detection every 1/30 seconds + def calculate_base_theta(self): + if abs(self.error_x_distance) > self.error_tolerance_coord: + self.theta_base = self.selected_HAL.get_joint(0) - self.K * self.error_x_distance * math.exp(-self.lambda_) + + #move without slow decient + # self.theta_base = self.theta_base - self.K * (1 - math.exp(-self.lambda_ * self.error_x_distance)) + + def calculate_servo_1_theta(self): + if abs(self.pixel_dia) < self.dia_target: + self.servo_1 = self.servo_1 + 3 * self.pixel_dia * math.exp(-self.lambda_) + if abs(self.pixel_dia) > self.dia_target: + self.servo_1 = self.servo_1 - 3 * self.pixel_dia * math.exp(-self.lambda_) + + #constrain servo + if self.servo_1 < -self.servo1_constraint: + self.servo_1 = -self.servo1_constraint + if self.servo_1 > self.servo1_constraint: + self.servo_1 = self.servo1_constraint + + def calculate_servo_2_theta(self): + if abs(self.error_y_distance) > self.error_tolerance_coord: + self.servo_2 = self.selected_HAL.get_joint(2) - self.K * self.error_y_distance * math.exp(-self.lambda_) + + async def start_async(self): + await asyncio.gather( + self.update_frame_loop() + ) + + def thread_main(self): + asyncio.run(self.start_async()) + + def start(self): + self.keep_running = True + self.paused = True + self.thread = threading.Thread(target=self.thread_main) + self.thread.start() + def pause_state(self,state): + self.paused = state + + def stop(self): + self.keep_running = False + + #region Controler Base Interface + def set_target_label(self, label: str) -> bool: + """This controller will only target objects with the specified label.""" + if self.is_label_in_universe(label): + self.target_label = label + return True + else: + print(f"Label {label} is not in the universe of {self.__name__}.") + print(f"Please select a lable that is in universe: {self.vision.get_all_potential_labels()}") + return False + + def get_target_label(self) -> str: + """Returns the label of the object that the controller is currently targeting.""" + return self.target_label + + def is_label_in_universe(self, label: str) -> bool: + """Returns True if the label is something this controler can see, else false.""" + all_labels = self.vision.get_all_potential_labels() + return (label in all_labels) + + def get_visible_object_labels(self) -> list[str]: + """Returns a list of identifiers of objects that are visible to the arm""" + with self.last_frame_objects_lock: + if self.last_frame_objects is None: + return [] + return [obj.label for obj in self.last_frame_objects] + + def get_visible_object_labels_detailed(self) -> list[str]: + """Returns a list of objects that are visible to the arm, including metadata""" + with self.last_frame_objects_lock: + if self.last_frame_objects is None: + return [] + # return a series of string that represent the object, starting withe object's label, then radius, then metadata + return [f"{obj.label} radius: {obj.radius} {obj.metadata}" for obj in self.last_frame_objects] + #return [f"{obj.label}_object" for obj in self.last_frame_objects] + + def get_all_posible_labels(self) -> list[str]: + """Returns a list of all possible labels that this controller can see, even if they are not currently visible.""" + return self.vision.get_all_potential_labels() + #endregion Controler Base Interface \ No newline at end of file diff --git a/Controllers/README.md b/Controllers/README.md new file mode 100644 index 0000000..8f2f3fd --- /dev/null +++ b/Controllers/README.md @@ -0,0 +1,52 @@ +# ARM Controllers + +This directory contains controller modules for the UCSC Slugbotics robotic arm project. Each controller implements a specific control strategy or interface for the robotic arm, integrating with hardware abstraction layers (HAL), vision modules, and kinematics. + +## File Overview + +### Controllers + +- **FollowObject3DControler.py** + Implements a controller for visually tracking and following a 3D object using computer vision. Integrates with the HAL, vision modules, and arm kinematics to compute and command joint movements based on the detected position of a target object in camera frames. Supports asynchronous operation and object selection by label. + +- **FollowLargestObjectControler.py** + Implements a controller that detects, tracks, and follows the largest object in the camera's view using computer vision. Integrates with HAL, vision modules, and arm kinematics. Supports asynchronous operation and real-time visualization. + +### Controllers/Base + +- **Controller.py** + Defines the base interface for all controllers, specifying required methods for starting, stopping, and interacting with object labels. + +- **MoveToPointInSpaceController.py** + Provides a controller for moving the robotic arm to a specified point in 3D space using inverse kinematics and arm parameters. Includes methods for stepping toward a target and testing workspace limits. + +- **MoveToPointOnScreenController.py** + Provides a controller for moving the arm to a specific point on the camera screen, using screen-space coordinates and error-based servo adjustments. + +- **VisionControllerBase.py** + Supplies a base class for vision-based controllers, handling label selection, object tracking, and thread-safe access to detected objects. + +- **__init__.py** + Marks the `Controllers.Base` directory as a Python package. + +### Controllers/legacy + +- **FollowClaw_old.py** + Legacy controller for following an object with the arm's claw using computer vision and direct kinematics. Contains older kinematic and control logic. + +- **FollowLargestObjectControler_old.py** + Legacy controller for following the largest detected object. Contains earlier implementations of object tracking and servo control. + +--- + +## TODO + +- [ ] Test and refine **MoveToPointInSpaceController** for accuracy and robustness. +- [ ] Test and refine **FollowObject3DControler** for reliable object tracking and following. +- [ ] Add more detailed usage examples for each controller. +- [ ] Improve documentation and code comments for clarity. +- [ ] Integrate error handling and logging improvements across all controllers. + +--- + +For more information about each controller, refer to the docstrings and comments within each file. diff --git a/Controllers/FollowClaw.py b/Controllers/legacy/FollowClaw_old.py similarity index 91% rename from Controllers/FollowClaw.py rename to Controllers/legacy/FollowClaw_old.py index eb786f5..1c282c1 100644 --- a/Controllers/FollowClaw.py +++ b/Controllers/legacy/FollowClaw_old.py @@ -13,18 +13,48 @@ from Vision.VisionObject import VisionObject from Vision.VisualObjectIdentifier import VisualObjectIdentifier from Modules.Base.ImageProducer import ImageProducer -from Controllers.Controller import Controller +from Controllers.Base.Controller import Controller + +# Define the DH parameters for the arm +# a1: The vertical distance from the base of the robotic arm to the first joint (shoulder joint). +# This represents the height of the base. Modify this value if the base height of the arm changes. +a1 = 13.1 + +# a2: The horizontal offset or distance between the first joint (shoulder) and the second joint (elbow). +# This represents the length of the link connecting these joints. Adjust this if the arm's shoulder-to-elbow link changes. +a2 = 3.25 + +# a3: The length of the upper arm, which is the distance between the shoulder joint and the elbow joint. +# Modify this value if the upper arm's length changes. +a3 = 11.4 + +# a4: The horizontal offset or distance between the elbow joint and the wrist joint. +# This represents the length of the link connecting these joints. Adjust this if the elbow-to-wrist link changes. +a4 = 3.25 + +# a5: The length of the forearm, which is the distance between the elbow joint and the wrist joint. +# Modify this value if the forearm's length changes. +a5 = 5.8 + +# a6: The distance from the wrist joint to the end-effector (e.g., gripper or tool). +# This represents the final segment of the robotic arm. Adjust this if the wrist-to-end-effector length changes. +a6 = 11.11 + +# How to Modify These Parameters: +# Measure the Physical Dimensions: + +# If the robotic arm's design changes (e.g., different link lengths or offsets), measure the new dimensions and update the corresponding parameter. +# Ensure Consistency: + +# When modifying these parameters, ensure that the values match the actual physical dimensions of the robotic arm. Incorrect values will lead to inaccurate kinematics calculations. +# Test After Modification: + +# After updating the parameters, test the arm's forward and inverse kinematics to verify that the changes are correctly reflected in the arm's movements. def coordinate_input(x, y, z,hal,vision=False): global mtr global sim try: - a1 = 13.1 - a2 = 3.25 - a3 = 11.4 - a4 = 3.25 - a5 = 5.8 - a6 = 11.11 robot_arm1 = Three_Degree_Arm(a1, a2, a3, a4, a5, a6) # caluclate angles angles = robot_arm1.calculate_angles(sym.Matrix([x, y, z, 1])) @@ -228,44 +258,7 @@ def __init__(self, selected_HAL: HAL_base, vision: VisualObjectIdentifier, targe selected_HAL.set_joint_max(0, 270) # set_base_max_degree(270) selected_HAL.set_joint_max(2, 75) # set_joint_2_max(75) - def set_target_label(self, label: str) -> bool: - """This controller will only target objects with the specified label.""" - if self.is_label_in_universe(label): - self.target_label = label - return True - else: - print(f"Label {label} is not in the universe of {self.__name__}.") - print(f"Please select a lable that is in universe: {self.vision.get_all_potential_labels()}") - return False - - def get_target_label(self) -> str: - """Returns the label of the object that the controller is currently targeting.""" - return self.target_label - - def is_label_in_universe(self, label: str) -> bool: - """Returns True if the label is something this controler can see, else false.""" - all_labels = self.vision.get_all_potential_labels() - return (label in all_labels) - def get_visible_object_labels(self) -> list[str]: - """Returns a list of identifiers of objects that are visible to the arm""" - with self.last_frame_objects_lock: - if self.last_frame_objects is None: - return [] - return [obj.label for obj in self.last_frame_objects] - - def get_visible_object_labels_detailed(self) -> list[str]: - """Returns a list of objects that are visible to the arm, including metadata""" - with self.last_frame_objects_lock: - if self.last_frame_objects is None: - return [] - # return a series of string that represent the object, starting withe object's label, then radius, then metadata - return [f"{obj.label} radius: {obj.radius} {obj.metadata}" for obj in self.last_frame_objects] - #return [f"{obj.label}_object" for obj in self.last_frame_objects] - - def get_all_posible_labels(self) -> list[str]: - """Returns a list of all possible labels that this controller can see, even if they are not currently visible.""" - return self.vision.get_all_potential_labels() def get_error(self, frame_center, center): return center - frame_center @@ -558,4 +551,45 @@ def pause_state(self,state): self.paused = state def stop(self): - self.keep_running = False \ No newline at end of file + self.keep_running = False + + #region Controler Base Interface + def set_target_label(self, label: str) -> bool: + """This controller will only target objects with the specified label.""" + if self.is_label_in_universe(label): + self.target_label = label + return True + else: + print(f"Label {label} is not in the universe of {self.__name__}.") + print(f"Please select a lable that is in universe: {self.vision.get_all_potential_labels()}") + return False + + def get_target_label(self) -> str: + """Returns the label of the object that the controller is currently targeting.""" + return self.target_label + + def is_label_in_universe(self, label: str) -> bool: + """Returns True if the label is something this controler can see, else false.""" + all_labels = self.vision.get_all_potential_labels() + return (label in all_labels) + + def get_visible_object_labels(self) -> list[str]: + """Returns a list of identifiers of objects that are visible to the arm""" + with self.last_frame_objects_lock: + if self.last_frame_objects is None: + return [] + return [obj.label for obj in self.last_frame_objects] + + def get_visible_object_labels_detailed(self) -> list[str]: + """Returns a list of objects that are visible to the arm, including metadata""" + with self.last_frame_objects_lock: + if self.last_frame_objects is None: + return [] + # return a series of string that represent the object, starting withe object's label, then radius, then metadata + return [f"{obj.label} radius: {obj.radius} {obj.metadata}" for obj in self.last_frame_objects] + #return [f"{obj.label}_object" for obj in self.last_frame_objects] + + def get_all_posible_labels(self) -> list[str]: + """Returns a list of all possible labels that this controller can see, even if they are not currently visible.""" + return self.vision.get_all_potential_labels() + #endregion Controler Base Interface \ No newline at end of file diff --git a/Controllers/legacy/FollowLargestObjectControler_old.py b/Controllers/legacy/FollowLargestObjectControler_old.py new file mode 100644 index 0000000..301b164 --- /dev/null +++ b/Controllers/legacy/FollowLargestObjectControler_old.py @@ -0,0 +1,255 @@ +import threading +import math +from typing import List +import cv2 +import asyncio +from threading import Lock + +from HALs.HAL_base import HAL_base +from Vision.VisionObject import VisionObject +from Vision.VisualObjectIdentifier import VisualObjectIdentifier +from Modules.Base.ImageProducer import ImageProducer +from Controllers.Base.Controller import Controller + +class FollowLargestObjectControler(Controller): + def __init__(self, selected_HAL: HAL_base, vision: VisualObjectIdentifier, target_label: str = None): + self.selected_HAL: HAL_base = selected_HAL + self.vision: VisualObjectIdentifier = vision + self.imageGetter: ImageProducer = selected_HAL + + self._task = None # To keep track of the running task + self.keep_running = False + self.thread = None + self.verbose_logging = False + self.target_label = target_label + if self.target_label is not None: + self.target_label = self.target_label.lower() + + self.last_frame_objects: List[VisionObject] = [] + self.last_frame_objects_lock: Lock = threading.Lock() + + self.K = 1 + self.lambda_ = 3 # change (3 is smooth in sim) increase number to go slower + self.error_tolerance_coord = 5 # change for error precision + self.dia_target = 50 # change for error precision + self.error_x_distance = 0 + self.error_y_distance = 0 + self.pixel_dia = 0 + self.max_pixel_dia = 18 ### CHANGES ACCORDING TO CAMERA + self.contours = False + self.object_found = False + self.theta_base = 0 + self.servo_1 = 90 + self.servo_2 = 0 + self.movement_speed = 1 # 1/10 seconds + self.servo1_constraint = 45 + selected_HAL.set_joint_min(0, 0) # set_base_min_degree(0) + selected_HAL.set_joint_max(0, 270) # set_base_max_degree(270) + selected_HAL.set_joint_max(2, 75) # set_joint_2_max(75) + + if self.target_label is None: + all_labels = self.vision.get_all_potential_labels() + if len(all_labels) > 0: + self.target_label = all_labels[0].lower() + + + + def get_error(self, frame_center, center): + return center - frame_center + + def calculate_base_theta(self): + if abs(self.error_x_distance) > self.error_tolerance_coord: + self.theta_base = self.selected_HAL.get_joint(0) - self.K * self.error_x_distance * math.exp(-self.lambda_) + + #move without slow decient + # self.theta_base = self.theta_base - self.K * (1 - math.exp(-self.lambda_ * self.error_x_distance)) + + def calculate_servo_1_theta(self): + if abs(self.pixel_dia) < self.dia_target: + self.servo_1 = self.servo_1 + 3 * self.pixel_dia * math.exp(-self.lambda_) + if abs(self.pixel_dia) > self.dia_target: + self.servo_1 = self.servo_1 - 3 * self.pixel_dia * math.exp(-self.lambda_) + + #constrain servo + if self.servo_1 < -self.servo1_constraint: + self.servo_1 = -self.servo1_constraint + if self.servo_1 > self.servo1_constraint: + self.servo_1 = self.servo1_constraint + + def calculate_servo_2_theta(self): + if abs(self.error_y_distance) > self.error_tolerance_coord: + self.servo_2 = self.selected_HAL.get_joint(2) - self.K * self.error_y_distance * math.exp(-self.lambda_) + + async def calculate_theta(self): + while self.keep_running: + # if there is an object found + if self.object_found: + self.calculate_base_theta() + self.calculate_servo_1_theta() + self.calculate_servo_2_theta() + + # if self.theta_base > 359: + # self.theta_base = 359 + # if self.theta_base < 0: + # self.theta_base = 0 + + # if self.servo_1 < -90: + # self.servo_1 = -90 + # if self.servo_1 > 90: + # self.servo_1 = 90 + + # if self.servo_2 < 0: + # self.servo_2 = 0 + # if self.servo_2 > 180: + # self.servo_2 = 180 + + + + if(self.theta_base < 0): + self.theta_base = 0 + if(self.servo_2 < 0): + self.servo_2 = 0 + + if self.verbose_logging: + print('theta_base: ' + str(self.theta_base)) + #print('servo_1: ' + str(self.servo_1)) + print('servo_2: ' + str(self.servo_2)) + print('------------------------------------------------------') + + self.selected_HAL.set_joint(0, self.theta_base) + #self.selected_HAL.set_joint(1, self.servo_1) + self.selected_HAL.set_joint(2, self.servo_2) + + await asyncio.sleep(0.03) #run detection every 1/30 seconds + + def select_largest_target_object(self, detected_objects: List[VisionObject]) -> VisionObject: + # print("found: " + str(len(detected_objects)) + " objects") + + # # print the labels of all visible objects on one line + # for obj in detected_objects: + # print(obj.label, end=", ") + # print() + + if detected_objects: + found_target = None + for obj in detected_objects: + if obj.label.lower() == self.target_label: + if found_target is not None: + if obj.radius > found_target.radius: + found_target = obj + else: + found_target = obj + + # print("largest object: " + str(found_target.label)) + return found_target + else: + return None + + async def move_towards_object(self, obj: VisionObject, draw_frame: cv2.typing.MatLike = None) -> None: + #calculate the distance from the centmessage.params[1]er of the frame + frame_center_x = obj.frame_width // 2 + frame_center_y = obj.frame_height // 2 + self.error_x_distance = self.get_error(frame_center_x, obj.get_center_x()) + self.error_y_distance = self.get_error(frame_center_y, obj.get_center_y()) + self.pixel_dia = obj.radius + if self.verbose_logging: + print(f"Error X: {self.error_x_distance}, Error Y: {self.error_y_distance}, Radius: {self.pixel_dia}") + + #draw the bounding circle and the center of it + if draw_frame is not None: + center = (obj.get_center_x(), obj.get_center_y()) + cv2.circle(draw_frame, center, obj.radius, (0, 255, 0), 2) + cv2.circle(draw_frame, center, 7, (255, 255, 255), -1) + + async def update_frame_loop(self): + while self.keep_running: + await self.update_frame() + + async def update_frame(self) -> bool: + frame = self.imageGetter.capture_image() + + detected_objects: List[VisionObject] = self.vision.process_frame(frame) + + target_object: VisionObject = self.select_largest_target_object(detected_objects) + + self.object_found = (target_object is not None) + if(self.object_found): + await self.move_towards_object(target_object, frame) + + flipped_source_frame_rgb = cv2.flip(target_object.source_frame_rgb, 0) + bgr_image = cv2.cvtColor(flipped_source_frame_rgb, cv2.COLOR_RGB2BGR) + cv2.imshow('Frame', bgr_image) + if target_object.mask is not None: + # flipped_mask = cv2.flip(target_object.mask, 0) + # cv2.imshow('Mask', flipped_mask) + cv2.imshow('Mask', target_object.mask) + if cv2.waitKey(1) & 0xFF == ord('q'): + await asyncio.sleep(0.03) #run detection every 1/30 seconds + return False + + with self.last_frame_objects_lock: + self.last_frame_objects = detected_objects + + await asyncio.sleep(0.03) #run detection every 1/30 seconds + + return True + + async def start_async(self): + await asyncio.gather( + self.update_frame_loop(), + self.calculate_theta(), + ) + + def thread_main(self): + asyncio.run(self.start_async()) + + def start(self): + self.keep_running = True + self.thread = threading.Thread(target=self.thread_main) + self.thread.start() + + def stop(self): + self.keep_running = False + + #region Controler Base Interface + + def set_target_label(self, label: str) -> bool: + """This controller will only target objects with the specified label.""" + if self.is_label_in_universe(label): + self.target_label = label.lower() + return True + else: + print(f"Label \"{label}\" is not in the universe of {type(self).__name__}.") + print(f"Please select a lable that is in universe: {self.vision.get_all_potential_labels()}") + return False + + def get_target_label(self) -> str: + """Returns the label of the object that the controller is currently targeting.""" + return self.target_label + + def is_label_in_universe(self, label: str) -> bool: + """Returns True if the label is something this controler can see, else false.""" + all_labels = self.vision.get_all_potential_labels() + return label.lower() in (l.lower() for l in all_labels) + + + def get_visible_object_labels(self) -> list[str]: + """Returns a list of identifiers of objects that are visible to the arm""" + with self.last_frame_objects_lock: + if self.last_frame_objects is None: + return [] + return [obj.label for obj in self.last_frame_objects] + + def get_visible_object_labels_detailed(self) -> list[str]: + """Returns a list of objects that are visible to the arm, including metadata""" + with self.last_frame_objects_lock: + if self.last_frame_objects is None: + return [] + # return a series of string that represent the object, starting withe object's label, then radius, then metadata + return [f"{obj.label} radius: {obj.radius} {obj.metadata}" for obj in self.last_frame_objects] + #return [f"{obj.label}_object" for obj in self.last_frame_objects] + + def get_all_posible_labels(self) -> list[str]: + """Returns a list of all possible labels that this controller can see, even if they are not currently visible.""" + return self.vision.get_all_potential_labels() + #endregion Controler Base Interface \ No newline at end of file diff --git a/HALs/sim_HAL.py b/HALs/sim_HAL.py index 53dd672..41d9e8d 100644 --- a/HALs/sim_HAL.py +++ b/HALs/sim_HAL.py @@ -138,17 +138,18 @@ def get_joint(self, joint_index) -> float: return self.sim.getJointPosition(self.mtr[int(joint_index)]) * R_TO_D def get_arm_cam_img_rgb(self) -> cv2.typing.MatLike: + """Capture an image from the arm camera. + Returns: + cv2.typing.MatLike: The captured image in RGB format. + """ with self.lock: image, resolution = self.sim.getVisionSensorImg(self.sensorHandle) image_data = unpack_uint8_table(image) image_array_rgb = np.array(image_data, dtype=np.uint8) image_array_rgb = np.reshape(image_array_rgb, (resolution[1], resolution[0], 3)) - image_array_rgb_vertically_flipped = cv2.flip(image_array_rgb, 0) # flip image vertically - # Convert the image from RGB to HSV color space - # hsv_image = cv2.cvtColor(image_array_rgb, cv2.COLOR_RGB2HSV) - return image_array_rgb_vertically_flipped + return image_array_rgb def calculate_focal_length(self, sim, sensor_handle): """ diff --git a/Modules/App/App.py b/Modules/App/App.py index 5f27512..aa6884f 100644 --- a/Modules/App/App.py +++ b/Modules/App/App.py @@ -11,9 +11,9 @@ from kivy.properties import ObjectProperty from kivy.uix.tabbedpanel import TabbedPanel -from Controllers import FollowClaw -from Controllers.Controller import Controller -from Controllers.FollowClaw import coordinate_input +from Controllers import FollowObject3DControler +from Controllers.Base.Controller import Controller +from Controllers.FollowObject3DControler import coordinate_input from HALs.HAL_base import HAL_base from Modules.App.AppBase import AppBase from Vision.ColorObjectIdentifier import ColorObjectIdentifier @@ -63,7 +63,7 @@ class Test(TabbedPanel): moveEvent = 0 - def __init__(self, hal: HAL_base, controller: FollowClaw, vision: ColorObjectIdentifier, **kwargs, ): + def __init__(self, hal: HAL_base, controller: FollowObject3DControler, vision: ColorObjectIdentifier, **kwargs, ): self.hal = hal diff --git a/Modules/Commands/DefaultCommands.py b/Modules/Commands/DefaultCommands.py index e0bf9a7..70529d6 100644 --- a/Modules/Commands/DefaultCommands.py +++ b/Modules/Commands/DefaultCommands.py @@ -1,4 +1,4 @@ -from Controllers.Controller import Controller +from Controllers.Base.Controller import Controller from typing import Callable import Modules.Commands.Commands as Commands diff --git a/Modules/Language/LanguageTools.py b/Modules/Language/LanguageTools.py index 90d2fd0..66a6190 100644 --- a/Modules/Language/LanguageTools.py +++ b/Modules/Language/LanguageTools.py @@ -1,4 +1,4 @@ -from Controllers.Controller import Controller +from Controllers.Base.Controller import Controller from Modules.Language.LanguageInterpreterBase import LanguageInterpreterBase, tool import requests from typing import Callable diff --git a/Modules/server/http_server.py b/Modules/server/http_server.py index 9afca09..db4d1cf 100644 --- a/Modules/server/http_server.py +++ b/Modules/server/http_server.py @@ -9,7 +9,7 @@ import base64 import numpy as np -from Controllers.Controller import Controller +from Controllers.Base.Controller import Controller from HALs.HAL_base import HAL_base from Modules.server.ServerBase import ServerBase from Vision.ColorObjectIdentifier import ColorObjectIdentifier diff --git a/main.py b/main.py index 0690905..5b17e37 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,8 @@ import subprocess import sys import os +import logging +from datetime import datetime # Add the project root to the Python path sys.path.append(os.path.dirname(os.path.abspath(__file__))) @@ -12,6 +14,16 @@ from Config.ArmConfig import ArmConfig if __name__ == "__main__": + # ----------------- LOGGING SETUP ----------------- + log_filename = f"Logs/arm_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s', + handlers=[ + logging.FileHandler(log_filename, encoding='utf-8'), + logging.StreamHandler() + ] + ) # ----------------- SETUP ----------------- print("Arm startup") diff --git a/scripts/extract_DH_parameters_copila_sim.py b/scripts/extract_DH_parameters_copila_sim.py new file mode 100644 index 0000000..4aac697 --- /dev/null +++ b/scripts/extract_DH_parameters_copila_sim.py @@ -0,0 +1,80 @@ +""" +Script to calculate and print Denavit-Hartenberg (DH) parameters +from a CoppeliaSim scene using the ZeroMQ Remote API. + +This script assumes the arm's joint hierarchy matches the provided screenshot. +""" + +from coppeliasim_zmqremoteapi_client import RemoteAPIClient +import numpy as np + +def get_transform(sim, child, parent): + """Get the transformation matrix from parent to child.""" + pos = sim.getObjectPosition(child, parent) + quat = sim.getObjectQuaternion(child, parent) + # Convert quaternion [x, y, z, w] to rotation matrix + x, y, z, w = quat + rot = np.array([ + [1 - 2*(y**2 + z**2), 2*(x*y - z*w), 2*(x*z + y*w)], + [ 2*(x*y + z*w), 1 - 2*(x**2 + z**2), 2*(y*z - x*w)], + [ 2*(x*z - y*w), 2*(y*z + x*w), 1 - 2*(x**2 + y**2)] + ]) + T = np.eye(4) + T[:3,:3] = rot + T[:3,3] = pos + return T + +def dh_from_transform(T): + """ + Extract DH parameters (a, alpha, d, theta) from a homogeneous transform. + This is a simplified version and assumes standard DH conventions. + """ + a = np.linalg.norm(T[0:3,3]) + alpha = np.arctan2(T[2,1], T[2,2]) + d = T[2,3] + theta = np.arctan2(T[1,0], T[0,0]) + return a, np.degrees(alpha), d, np.degrees(theta) + +def main(): + client = RemoteAPIClient() + sim = client.require('sim') + + # List of joint paths in order (from base to end-effector) + joint_paths = [ + '/Base/BaseRevolute', + '/Base/BaseRevolute/LowerSmallerLimb/Limb1Revolute', + '/Base/BaseRevolute/LowerSmallerLimb/Limb1Revolute/BigLimb/Limb2Revolute', + '/Base/BaseRevolute/LowerSmallerLimb/Limb1Revolute/BigLimb/Limb2Revolute/UpperSmallLimb/Limb4Revolute' + ] + + # Get handles for each joint + joint_handles = [sim.getObjectHandle(path) for path in joint_paths] + + print("DH Parameters (a [m], alpha [deg], d [m], theta [deg]):") + dh_params = [] + for i in range(len(joint_handles)-1): + parent = joint_handles[i] + child = joint_handles[i+1] + T = get_transform(sim, child, parent) + a, alpha, d, theta = dh_from_transform(T) + dh_params.append((a, alpha, d, theta)) + print(f"Joint {i+1}: a={a:.4f} m, alpha={alpha:.2f}°, d={d:.4f} m, theta={theta:.2f}°") + + # Map extracted DH a values to a1-a6 (fill with 0 if not enough joints) + a_values = [a for a, _, _, _ in dh_params] + while len(a_values) < 6: + a_values.append(0.0) + + print("\n# --- ArmPhysicalParameters (extracted from simulation) ---") + print("class ArmPhysicalParameters:") + print(" \"\"\"Extracted from CoppeliaSim scene.\"\"\"") + print(f" a1: float = {a_values[0]*100:.2f} # [cm] Height from base to shoulder joint") + print(f" a2: float = {a_values[1]*100:.2f} # [cm] Horizontal offset from shoulder to elbow") + print(f" a3: float = {a_values[2]*100:.2f} # [cm] Length of upper arm (shoulder to elbow)") + print(f" a4: float = {a_values[3]*100:.2f} # [cm] Horizontal offset from elbow to wrist") + print(f" a5: float = {a_values[4]*100:.2f} # [cm] Length of forearm (elbow to wrist)") + print(f" a6: float = {a_values[5]*100:.2f} # [cm] Length from wrist to end-effector (gripper/tool)") + print(" # ...other parameters as needed...") + +if __name__ == "__main__": + main() \ No newline at end of file