From f43ddc66654984ad12a14592f3cfb7863df84212 Mon Sep 17 00:00:00 2001 From: 8mansi Date: Sat, 6 Dec 2025 22:10:07 +0530 Subject: [PATCH] PPO Model for Driver Assistance --- DriverAssistanceSim.py | 672 +++++++++++++++++++++++++---------------- PPO.py | 93 +++++- 2 files changed, 488 insertions(+), 277 deletions(-) diff --git a/DriverAssistanceSim.py b/DriverAssistanceSim.py index 9727af7..bdc13a3 100644 --- a/DriverAssistanceSim.py +++ b/DriverAssistanceSim.py @@ -1,6 +1,6 @@ import pybullet as p import pybullet_data -import time +import torch import numpy as np import os import math @@ -11,18 +11,16 @@ class DriverAssistanceSim: def __init__(self): self.timeStep = 0.1 self.num_steps = 500 - self.max_range = 5 + self.max_range = 3 self.z_offset = 0.05 self.shelf_scale = 0.5 self.num_cars = 10 - self.step_size = 0.1 # movement speed + self.step_size = 0.1 self.lane_width = 2.0 self.offset = 0.1 self.lane_width_total = 2 * self.lane_width + self.offset - self.ang_vel = 5 + self.ang_vel = 6 self.initialize_environment() - # self.initialize_road() - # self.initialize_robot() def reset(self): p.resetSimulation() @@ -33,7 +31,6 @@ def reset(self): # Ground self.plane_id = p.loadURDF("plane.urdf") - # Recreate your road using existing function self.initialize_road() # Reset robot to original position @@ -46,14 +43,23 @@ def reset(self): self.danger_points = [] def initialize_environment(self): - p.connect(p.GUI) + p.connect(p.DIRECT) p.setGravity(0, 0, -9.8) p.setTimeStep(self.timeStep) p.setRealTimeSimulation(0) p.setAdditionalSearchPath(pybullet_data.getDataPath()) + # Suppress warnings + p.setPhysicsEngineParameter(enableFileCaching=0) + p.configureDebugVisualizer(p.COV_ENABLE_GUI, 1) + p.setPhysicsEngineParameter(enableConeFriction=0) + + def initialize_road(self): self.road_id = p.loadURDF("plane.urdf") + self.left_boundary = self.lane_width - (self.offset * 6) + self.right_boundary = -self.lane_width + (self.offset * 6) + p.changeVisualShape(self.road_id, -1, rgbaColor=[0.5, 0.5, 0.5, 1]) # Draw lane lines p.addUserDebugLine([0, self.lane_width - self.offset, 0.01], [200, self.lane_width - self.offset, 0.01], lineColorRGB=[1, 1, 1], lineWidth=4) @@ -73,35 +79,35 @@ def initialize_robot(self): robot_path = os.path.join(pybullet_data.getDataPath(), "husky/husky.urdf") self.robot_id = p.loadURDF(robot_path, self.robot_start_pos, self.robot_start_orientation, globalScaling=1.0) self.wheel_joints = [2, 3, 4, 5] + + def disconnect_environmnent(self): + p.disconnect() def set_robot_wheel_velocities(self, linear_vel, angular_vel): - L = 0.55 # Husky wheel base (distance between left/right wheels) - - # Convert linear+angular to left/right wheel velocities - v_left = linear_vel - (angular_vel * L / 2) - v_right = linear_vel + (angular_vel * L / 2) - + WHEEL_DISTANCE = 0.55 # metres (left-right wheel separation) + MAX_FORCE = 200 + + # Convert cmd_vel → wheel velocities + left_vel = linear_vel - (angular_vel * WHEEL_DISTANCE / 2) + right_vel = linear_vel + (angular_vel * WHEEL_DISTANCE / 2) + for i, joint in enumerate(self.wheel_joints): - if i % 2 == 0: # left wheels - p.setJointMotorControl2( - bodyIndex=self.robot_id, - jointIndex=joint, - controlMode=p.VELOCITY_CONTROL, - targetVelocity=v_left, - force=100 - ) - else: # right wheels - p.setJointMotorControl2( - bodyIndex=self.robot_id, - jointIndex=joint, - controlMode=p.VELOCITY_CONTROL, - targetVelocity=v_right, - force=100 - ) - + if i % 2 == 0: # left wheels = 0,2 + vel = left_vel + else: # right wheels = 1,3 + vel = right_vel + + p.setJointMotorControl2( + bodyIndex=self.robot_id, + jointIndex=joint, + controlMode=p.VELOCITY_CONTROL, + targetVelocity=vel, + force=MAX_FORCE + ) + def initialize_car(self, new_car=False): min_dist_between_cars = 1.0 # minimum distance between cars - robot_safe_dist = 3.0 # minimum distance from robot start + robot_safe_dist = 5.0 # minimum distance from robot start placed = False while not placed: @@ -129,9 +135,6 @@ def initialize_car(self, new_car=False): self.cars.append({'id': car_id, 'vel': velocity, 'x_pos': x, 'y_pos': y}) placed = True - def get_robot_position(self, robot_id): - return p.getBasePositionAndOrientation(robot_id) - def clear_debug_lines(self): # clear old debug lines prev_ids = globals().get('debug_line_ids', []) @@ -142,11 +145,8 @@ def clear_debug_lines(self): pass return [] - def get_debug_lines(self): - return globals().get('debug_line_ids', []) - def get_angles(self, r_pos, z_offset, max_range ): - angles = np.linspace(-math.pi/3, math.pi/3, 30, endpoint=False) + angles = np.linspace(-math.pi/5, math.pi/5, 30, endpoint=False) from_points = [] to_points = [] for a in angles: @@ -161,7 +161,6 @@ def get_angles(self, r_pos, z_offset, max_range ): def get_closest_hit(self, hits_cube): # return the closest hit - # dist, hit_pos, ang_w = hits_cube if len(hits_cube) > 0: hits_cube.sort(key=lambda x: x[0]) # sort by distance closest_hit = hits_cube[0][1] @@ -183,141 +182,196 @@ def move_cars(self): pos, orn = p.getBasePositionAndOrientation(cid) new_pos = [pos[0] - car['vel'], pos[1], pos[2]] # move along -x p.resetBasePositionAndOrientation(cid, new_pos, orn) + + def get_lane_camera_image(self, robot_id, z_offset=0.2, forward_offset=0.3): + base_pos, base_orn = p.getBasePositionAndOrientation(robot_id) + local_camera_pos = [0.7, 0.0, 0.5] + pitch_angle = -math.pi / 2 + local_camera_orn = p.getQuaternionFromEuler([0, pitch_angle, 0]) + cam_pos, cam_orn = p.multiplyTransforms( + base_pos, base_orn, + local_camera_pos, local_camera_orn + ) - def get_camera_image(self, robot_id, direction, yaw_offset_deg=90): - """ - Capture a camera image looking at an offset angle from robot's forward. - yaw_offset_deg: - 0 -> forward - 90 -> left - -90 -> right - """ - pos, orn = p.getBasePositionAndOrientation(robot_id) - # Convert robot quaternion to Euler - euler = p.getEulerFromQuaternion(orn) - - # Apply yaw offset - yaw = euler[2] + math.radians(yaw_offset_deg) - pitch = -15 * math.pi/180 # slightly down - - if direction == "left": - cam_offset = [0.5, 0.3, 0.3] # Adjusted for better view - elif direction == "right": - cam_offset = [0.5, -0.3, 0.3] # Adjusted for better view - else: - cam_offset = [0.5, 0.0, 0.3] # Forward view + cam_yaw = p.getEulerFromQuaternion(cam_orn)[2] + forward_x = math.cos(cam_yaw) + forward_y = math.sin(cam_yaw) - cam_pos = [pos[0] + cam_offset[0], - pos[1] + cam_offset[1], - pos[2] + cam_offset[2]] - - # Compute camera target from yaw/pitch - cam_target = [ - cam_pos[0] + math.cos(yaw) * math.cos(pitch), - cam_pos[1] + math.sin(yaw) * math.cos(pitch), - cam_pos[2] + math.sin(pitch) + target = [ + cam_pos[0], + cam_pos[1], + base_pos[2] # Ground level ] - up = [0, 0, 1] # world-up - view = p.computeViewMatrix(cam_pos, cam_target, up) - proj = p.computeProjectionMatrixFOV(fov=60, aspect=1.0, nearVal=0.012, farVal=3.0) - + # Up direction in world + up_vec = [forward_x, forward_y, 0] + + view = p.computeViewMatrix( + cameraEyePosition=cam_pos, + cameraTargetPosition=target, + cameraUpVector=up_vec + ) + proj = p.computeProjectionMatrixFOV( + fov=30, + aspect=1.0, + nearVal=0.01, + farVal=1.0 + ) + width, height, rgb, depth, seg = p.getCameraImage( - width=128, height=128, - viewMatrix=view, projectionMatrix=proj + width=64, height=64, + viewMatrix=view, + projectionMatrix=proj ) - - return rgb, depth, seg - def detect_lane_free_from_camera(self, rgb_img): - # Reshape image (128x128) - img = np.reshape(rgb_img, (128, 128, 4))[:, :, :3] # drop alpha - gray = img.mean(axis=2) # simple grayscale + rgb_array = np.array(rgb, dtype=np.uint8).reshape((height, width, 4)) # RGBA + gray = np.dot(rgb_array[:, :, :3], [0.2989, 0.5870, 0.1140]) + gray = gray.astype(np.uint8) - # Divide image into 3 zones: left, center, right - left_zone = gray[:, :40] - center_zone = gray[:, 40:88] - right_zone = gray[:, 88:] + return gray, depth, seg - # Simple threshold: dark = obstacle - left_blocked = np.mean(left_zone) < 80 - right_blocked = np.mean(right_zone) < 80 + def check_lane_crossing(self, robot_id, threshold= 100,forward_offset=0.3, z_offset=0.05, lateral_gap=0.1): + rgb, _, _ = self.get_lane_camera_image(robot_id) + lane_visible = (np.mean(rgb) > threshold) + + pos, _ = p.getBasePositionAndOrientation(robot_id) + robot_y = pos[1] + + left_lane = False + right_lane = False + + if lane_visible: + if robot_y > 0: + print("Left lane crossed") + left_lane = True + elif robot_y < 0: + print("Right lane crossed") + right_lane = True + return left_lane, right_lane - return {"left": not left_blocked, "right": not right_blocked} - def show_camera_image(self, rgb): img = np.reshape(rgb, (128, 128, 4))[:, :, :3] plt.imshow(img) plt.axis('off') plt.show() - def four_parallel_robot_beams(self, robot_pos, max_range=None): - """ - 4 robot-aligned beams: - - 2 forward inside robot width - - 2 angled beams (±20°) - Returns meters: B1_left, B2_right, B3_ang_left, B4_ang_right - """ + def four_parallel_robot_beams(self, robot_pos, max_range=None, rays_per_beam=5, beam_width=0.2): + # Emit multiple parallel rays for each of the 4 main beams (center-left, center-right, angled-left, angled-right) if max_range is None: - max_range = self.max_range /1.5 + max_range = self.max_range / 1.5 - BEAM_Z = 0.05 - GAP = 0.34 # inside Husky width (0.55 m) - ANG = math.radians(15) # 15° for angled beams - - base = [robot_pos[0], robot_pos[1], BEAM_Z] + BEAM_Z = self.z_offset + ANG = math.radians(15) # angled beams + LATERAL_GAP = beam_width / 2 # spread across parallel rays + FRONT_OFFSET = 0.3 _, orn = p.getBasePositionAndOrientation(self.robot_id) yaw = p.getEulerFromQuaternion(orn)[2] - fx = math.cos(yaw); fy = math.sin(yaw) - px = -math.sin(yaw); py = math.cos(yaw) - - # ---- 2 inside-width beam start points ---- - left_start = [base[0] + px * (-GAP/2), base[1] + py * (-GAP/2), BEAM_Z] - right_start = [base[0] + px * (+GAP/2), base[1] + py * (+GAP/2), BEAM_Z] + # Heading vectors + fx = math.cos(yaw) + fy = math.sin(yaw) + fx_L = math.cos(yaw + ANG) + fy_L = math.sin(yaw + ANG) + fx_R = math.cos(yaw - ANG) + fy_R = math.sin(yaw - ANG) + + def generate_rays(base_pos, forward_vec): + rays_start = [] + rays_end = [] + for i in range(rays_per_beam): + offset = -LATERAL_GAP + i * (beam_width / max(rays_per_beam-1,1)) + lat_vec = [-forward_vec[1], forward_vec[0]] + start = [ + base_pos[0] + offset * lat_vec[0], + base_pos[1] + offset * lat_vec[1], + base_pos[2] + ] + end = [ + start[0] + forward_vec[0] * max_range, + start[1] + forward_vec[1] * max_range, + start[2] + ] + rays_start.append(start) + rays_end.append(end) + return rays_start, rays_end + + # Define main beams + px = -math.sin(yaw) + py = math.cos(yaw) + front_pos = [ + robot_pos[0] + FRONT_OFFSET * fx, + robot_pos[1] + FRONT_OFFSET * fy, + self.z_offset + ] - # ---- 2 angled directions ---- - fx_L = math.cos(yaw + ANG); fy_L = math.sin(yaw + ANG) - fx_R = math.cos(yaw - ANG); fy_R = math.sin(yaw - ANG) + left_base = [robot_pos[0] + px * (0.17), robot_pos[1] + py * (0.17), BEAM_Z] # center-left + right_base = [robot_pos[0] - px * (0.17), robot_pos[1] - py * (0.17), BEAM_Z] # center-right + # center_base = [robot_pos[0], robot_pos[1], BEAM_Z] + center_base = front_pos - starts = [ - left_start, right_start, base, base - ] - ends = [ - [left_start[0] + fx * max_range, left_start[1] + fy * max_range, BEAM_Z], # B1 center-left - [right_start[0] + fx * max_range, right_start[1] + fy * max_range, BEAM_Z], # B2 center-right - [base[0] + fx_L * max_range, base[1] + fy_L * max_range, BEAM_Z], # B3 angled-left - [base[0] + fx_R * max_range, base[1] + fy_R * max_range, BEAM_Z] # B4 angled-right + beams = [ + (left_base, [fx, fy]), # straight-left + (right_base, [fx, fy]), # straight-right + (center_base, [fx_L, fy_L]), # angled-left + (center_base, [fx_R, fy_R]) # angled-right ] - results = p.rayTestBatch(starts, ends) + # Generate all rays + all_starts = [] + all_ends = [] + beam_ranges = [] + for b_start, b_vec in beams: + starts, ends = generate_rays(b_start, b_vec) + all_starts.extend(starts) + all_ends.extend(ends) + beam_ranges.append(len(starts)) + + results = p.rayTestBatch(all_starts, all_ends) car_ids = [c['id'] for c in self.cars] distances = [] debug_ids = self.clear_debug_lines() - - for i, res in enumerate(results): - hit = res[0] - frac = res[2] - dist = frac * max_range if hit in car_ids else max_range - distances.append(dist) - - endp = [ - starts[i][0] + (ends[i][0] - starts[i][0]) * frac, - starts[i][1] + (ends[i][1] - starts[i][1]) * frac, - BEAM_Z - ] - color = [1,0,1] if dist < max_range else [0,1,1] - lid = p.addUserDebugLine(starts[i], endp, color, 2 if i < 2 else 1, 0) - debug_ids.append(lid) + idx = 0 + for n_rays in beam_ranges: + dists = [] + for _ in range(n_rays): + res = results[idx] + frac = res[2] + hit = res[0] + dist = frac * max_range if hit in car_ids else max_range + dists.append(dist) + endp = [ + all_starts[idx][0] + (all_ends[idx][0] - all_starts[idx][0]) * frac, + all_starts[idx][1] + (all_ends[idx][1] - all_starts[idx][1]) * frac, + all_starts[idx][2] + ] + + if hit in car_ids: + # print("hit car:", hit) + color = [1,0,1] + lid = p.addUserDebugLine(all_starts[idx], endp, color, 1, 0) + debug_ids.append(lid) + + idx += 1 + # average distance across parallel rays + distances.append(sum(dists)/len(dists)) globals()['debug_line_ids'] = debug_ids return distances[0], distances[1], distances[2], distances[3] - - def beam_sensor(self, robot_pos, z_offset=0.1, max_range=5): - r_pos = [robot_pos[0], robot_pos[1], z_offset] + # r_pos = [robot_pos[0], robot_pos[1], z_offset] + _, orn = p.getBasePositionAndOrientation(self.robot_id) + yaw = p.getEulerFromQuaternion(orn)[2] + fx = math.cos(yaw) + fy = math.sin(yaw) + forward_offset = 0.35 + r_pos = [ + robot_pos[0] + fx * forward_offset, + robot_pos[1] + fy * forward_offset, + robot_pos[2] + z_offset + ] + new_debug_ids = self.clear_debug_lines() angles, from_points, to_points = self.get_angles(r_pos, z_offset, max_range) @@ -333,7 +387,7 @@ def beam_sensor(self, robot_pos, z_offset=0.1, max_range=5): if hit_object_uid in car_ids: color = [1,0,0] ang_w = angles[i] - # print("ang_w", ang_w) + measured = hit_fraction * max_range ep = [r_pos[0] + measured * math.cos(ang_w), r_pos[1] + measured * math.sin(ang_w), @@ -342,10 +396,9 @@ def beam_sensor(self, robot_pos, z_offset=0.1, max_range=5): lid = p.addUserDebugLine(from_points[i], ep, lineColorRGB=color, lineWidth=2.0, lifeTime=0) new_debug_ids.append(lid) - # find euclidean distance dist = self.calculate_euclidean_dist(hit_position, r_pos, z_offset) - # print("hittttttt uid = ", hit_object_uid, " at fraction ", hit_fraction, " at pos ", hit_position, " dist ", dist) + # print("hit uid = ", hit_object_uid, " at fraction ", hit_fraction, " at pos ", hit_position, " dist ", dist) hits_cube.append((dist, hit_position, ang_w)) else: @@ -356,23 +409,18 @@ def beam_sensor(self, robot_pos, z_offset=0.1, max_range=5): globals()['debug_line_ids'] = new_debug_ids return self.get_closest_hit(hits_cube) - def is_path_clear(self, rgb_img, threshold=80): - # Convert flat list to (H, W, 3) array - img = np.reshape(rgb_img, (128, 128, 4))[:, :, :3] # drop alpha - gray = img.mean(axis=2) # grayscale - # print("value is ",np.mean(gray) ) - return np.mean(gray) > threshold # True = path is clear + def norm_beam(self, d): + d = np.clip(d, 0, self.max_range) + return (self.max_range - d) / self.max_range # 0 = safe, 1 = close def get_state(self, robot_pos): b1_left, b2_right, b3_angL, b4_angR = self.four_parallel_robot_beams(robot_pos) - # normalize 0..1 - b1 = b1_left / self.max_range - b2 = b2_right / self.max_range - b3 = b3_angL / self.max_range - b4 = b4_angR / self.max_range - + b1 = self.norm_beam(b1_left) + b2 = self.norm_beam(b2_right) + b3 = self.norm_beam(b3_angL) + b4 = self.norm_beam(b4_angR) # Beam-based obstacle detection hit_x, hit_y, angle = self.beam_sensor(robot_pos, z_offset=self.z_offset, max_range=self.max_range) @@ -380,113 +428,164 @@ def get_state(self, robot_pos): beam_dist = self.max_range else: beam_dist = np.linalg.norm(np.array([hit_x, hit_y]) - np.array(robot_pos[:2])) - beam_dist = beam_dist / self.max_range # normalize 0–1 + beam_dist = np.clip(beam_dist, 0, self.max_range) + beam_dist = (self.max_range - beam_dist) / self.max_range # Camera clarity scores - rgb_left, _, _ = self.get_camera_image(self.robot_id, "left", yaw_offset_deg=90) - rgb_right, _, _ = self.get_camera_image(self.robot_id, "right", yaw_offset_deg=-90) - imgL = np.reshape(rgb_left, (128,128,4))[:, :, :3].mean() / 255.0 - imgR = np.reshape(rgb_right, (128,128,4))[:, :, :3].mean() / 255.0 + SIDE_THRESHOLD = 0.4 + + dist_left = abs(robot_pos[1] - self.left_boundary) + dist_right = abs(robot_pos[1] - self.right_boundary) - left_score = float(imgL) - right_score = float(imgR) + if dist_left < SIDE_THRESHOLD or dist_right < SIDE_THRESHOLD: + left_lane, right_lane = self.check_lane_crossing(self.robot_id) + else: + left_lane, right_lane = 0,0 # Lane offset pos, orn = p.getBasePositionAndOrientation(self.robot_id) - lane_offset = pos[1] / (self.lane_width - self.offset) # normalized -1 to +1 + lane_offset = np.clip(pos[1] / (self.lane_width / 2), -1.0, 1.0) # normalized -1 to +1 # Heading error (yaw) yaw = p.getEulerFromQuaternion(orn)[2] - heading_error = yaw / np.pi # normalize -1 to 1 + heading_error = np.clip(yaw / np.pi, -1.0, 1.0) - return np.array([b1,b2,b3,b4,beam_dist, left_score, right_score, lane_offset, heading_error], dtype=np.float32) + return np.array([b1,b2,b3,b4, + beam_dist, + left_lane, right_lane, + lane_offset, + heading_error + ], dtype=np.float32) def compute_reward(self, state, action, done): # unpack state + reward = 0 b1, b2, b3, b4 = state[0], state[1], state[2], state[3] beam_dist = state[4] # front safety 0–1 - left_score = state[5] - right_score = state[6] + left_lane = state[5] + right_lane = state[6] lane_offset = abs(state[7]) heading_error = abs(state[8]) - - reward = 0.0 - - # ----------------------------------------- - # 1) Forward progress reward - # ----------------------------------------- - reward += 3.0 * beam_dist # straight driving when front is clear - - # ----------------------------------------- - # 2) Lane keeping reward - # ----------------------------------------- - reward += 4.0 * (1.0 - lane_offset) - - # ----------------------------------------- - # 3) Obstacle avoidance using beams - # ----------------------------------------- - min_beam = min(b1, b2, b3, b4) - reward -= 6.0 * (1.0 - min_beam) # penalty only proportional and bounded - - # ----------------------------------------- - # 4) Heading stability - # ----------------------------------------- - reward += 1.5 * (1.0 - heading_error) - - # ----------------------------------------- - # 5) Action–sensor consistency (MOST IMPORTANT PART) - # ----------------------------------------- - left_clear = (b1 + b3) + left_score - right_clear = (b2 + b4) + right_score - - if action == 0: # GO STRAIGHT - reward += 2.0 if min(b1, b2) > 0.35 else -2.0 # only if space ahead - - elif action == 1: # TURN LEFT - reward += left_clear - right_clear # positive only when left safer - - elif action == 2: # TURN RIGHT - reward += right_clear - left_clear # positive only when right safer - - # ----------------------------------------- - # 6) Terminal penalty (only once) - # ----------------------------------------- + + # Obstacle avoidance using beams + max_beam = max(b1, b2, b3, b4) + reward += (1 - max_beam)*3 + + if max_beam > 0.85: + reward -= 12 + + if left_lane or right_lane: # crossing lane + reward -= 8.0 + + # To encourage straight driving + reward += 2.0 * (1.0 - lane_offset) + reward += 0.5 * (1.0 - heading_error) + + left_clear = 1 - max(b1,b3) + right_clear = 1 - max(b2,b4) + SAFE = max_beam < 0.30 + DANGER = max_beam > 0.45 + + if action == 0: # STRAIGHT + if SAFE: + reward += 1.5 # encourage straight if safe + elif max_beam > 0.45: + reward -= 4.0 # negative reward if going straight when it's unsafe + + elif action in (1, 2): # left/right + if SAFE: + reward -= 0.8 # negative reward for unnecessary turn + else: + # reward a turn only if it moves toward a clearer side + left_clear = 1 - max(b1, b3) + right_clear = 1 - max(b2, b4) + if action == 1 and left_clear > right_clear + 0.15: + reward += 2.0 + if action == 2 and right_clear > left_clear + 0.15: + reward += 2.0 + elif action == 3: # slow + if max_beam > 0.55: + reward += 1.0 + else: + reward -= 0.5 if done: - reward -= 60 # collision OR lane exit - - # ----------------------------------------- - # 7) Safety clamp to stabilize PPO - # ----------------------------------------- - reward = float(np.clip(reward, -8.0, +8.0)) + reward -= 20 # collision or lane exit return reward - - - def spawn_cars(self, car_count): for _ in range(car_count): self.initialize_car(new_car=True) - def plot_trajectory(self, ep, steps, reward, loss): + def plot_metrics(self, episode_metrics, up): + ep = episode_metrics["episode"] + steps = episode_metrics["step"] + + fig, axs = plt.subplots(2, 2, figsize=(12,8)) + ax = axs[0,0] + axs[0,0].plot(ep, episode_metrics["actor_loss"], label="Actor Loss", color='blue') + axs[0,0].set_title("Actor Loss over Episodes") + axs[0,0].set_xlabel("Episode") + axs[0,0].set_ylabel("Loss") + axs[0,0].grid(True) + + ax2 = ax.twinx() + ax2.plot(ep, steps, linestyle='dotted', color='black', alpha=0.6, label="Steps") + ax2.legend(loc="upper right") + + ax = axs[0,1] + axs[0,1].plot(ep, episode_metrics["critic_loss"], label="Critic Loss", color='red') + axs[0,1].set_title("Critic Loss over Episodes") + axs[0,1].set_xlabel("Episode") + axs[0,1].set_ylabel("Loss") + axs[0,1].grid(True) + ax2 = ax.twinx() + ax2.plot(ep, steps, linestyle='dotted', color='black', alpha=0.6, label="Steps") + ax2.legend(loc="upper right") + + ax = axs[1,0] + axs[1,0].plot(ep, episode_metrics["entropy"], label="Entropy", color='green') + axs[1,0].set_title("Policy Entropy over Episodes") + axs[1,0].set_xlabel("Episode") + axs[1,0].set_ylabel("Entropy") + axs[1,0].grid(True) + ax2 = ax.twinx() + ax2.plot(ep, steps, linestyle='dotted', color='black', alpha=0.6, label="Steps") + ax2.legend(loc="upper right") + + if episode_metrics["reward"]: + ax = axs[1,1] + axs[1,1].plot(ep, episode_metrics["reward"], label="Reward", color='purple') + axs[1,1].set_title("Cumulative Reward over Episodes") + axs[1,1].set_xlabel("Episode") + axs[1,1].set_ylabel("Reward") + axs[1,1].grid(True) + ax2 = ax.twinx() + ax2.plot(ep, steps, linestyle='dotted', color='black', alpha=0.6, label="Steps") + ax2.legend(loc="upper right") + + plt.tight_layout() + filename = f"metrics_{up}.png" + plt.savefig(filename, dpi=200) + + def plot_trajectory(self, ep, steps, reward, loss, actor_loss, critic_loss, entropy): plt.figure(figsize=(10, 4)) traj = np.array([(x, y) for (x, y, a) in self.trajectory]) plt.plot(traj[:, 0], traj[:, 1], label="Full Trajectory") - colors = {0: "blue", 1: "green", 2: "red"} - labels = {0: "Straight", 1: "Left", 2: "Right"} + colors = {0: "blue", 1: "green", 2: "red", 3: "orange"} + labels = {0: "Straight", 1: "Left", 2: "Right", 3: "Slow/Stop"} - for x, y, a in self.trajectory: - plt.scatter(x, y, color=colors[a], s=8) for x, y, a in self.danger_points: plt.scatter(x, y, color=colors[a], s=60, marker='X', edgecolors='black') plt.axhline(self.lane_width, color='black', linestyle='--') plt.axhline(-self.lane_width, color='black', linestyle='--') - plt.title(f"Robot Trajectory with Actions – Episode: {ep}, Steps: {steps}, Reward: {reward:.2f}, Loss: {loss:.4f}") + plt.title(f"Robot Trajectory with Actions – Episode: {ep}, Steps: {steps}, Reward: {reward:.2f}, Loss: {loss:.4f} \n Actor Loss: {actor_loss.item():.4f}, Critic Loss: {critic_loss.item():.4f}, Entropy: {entropy.item():.4f}") + plt.xlabel("X Position") plt.ylabel("Y Position") plt.grid(True) - for a in [0, 1, 2]: + for a in [0, 1, 2, 3]: plt.scatter([], [], c=colors[a], label=labels[a]) plt.legend() @@ -494,69 +593,79 @@ def plot_trajectory(self, ep, steps, reward, loss): plt.savefig(filename, dpi=200) plt.close() - def run(self): - agent = PPOAgent() - EPISODES = 30 + def run(self, agent, EPISODES, is_training=True): reward_list = [] + initial_entropy = 0.5 + final_entropy = 0.01 + + episode_metrics = { + "episode": [], + "step": [], + "actor_loss": [], + "critic_loss": [], + "entropy": [], + "reward": [] + } for ep in range(EPISODES): - print("\n==== EPISODE", ep, "====") + # print("\n==== EPISODE", ep, "====") episode_reward = 0 self.reset() terminal_step = self.num_steps reset_angle = 0 + self.entropy_coef = initial_entropy * (1 - ep / EPISODES) + final_entropy * (ep / EPISODES) for step in range(self.num_steps): - if step % 100 == 0 and step > 0: - self.spawn_cars(3) # Respawn cars - p.stepSimulation() - time.sleep(self.timeStep) - self.move_cars() - pos, _ = p.getBasePositionAndOrientation(self.robot_id) robot_pos = [pos[0], pos[1], pos[2]] - # ------------------- - # STATE - # ------------------- # Step 1. Get state from environment state = self.get_state(robot_pos) - # ------------------- - # PPO ACTION - # ------------------- + # Step 2. Get action from PPO agent, by passing state to policy network action, logprob = agent.act(state) self.trajectory.append((robot_pos[0], robot_pos[1], action)) - beam = state[0] - if beam < 1.0: + beam = state[4] + if beam > 0.6: self.danger_points.append((robot_pos[0], robot_pos[1], action)) # Map actions to wheel speeds if action == 0: # go straight - self.set_robot_wheel_velocities(6, reset_angle) + self.set_robot_wheel_velocities(4, reset_angle) reset_angle = 0 # reset angle for straight elif action == 1: # turn left self.set_robot_wheel_velocities(3, +self.ang_vel) reset_angle -= self.ang_vel - # if reset_angle < -math.pi: - # reset_angle += 2 * math.pi elif action == 2: # turn right self.set_robot_wheel_velocities(3, -self.ang_vel) reset_angle += self.ang_vel - - p.resetDebugVisualizerCamera( - cameraDistance=5, - cameraYaw=0, - cameraPitch=-80, - cameraTargetPosition=robot_pos # camera follows robot - ) + elif action == 3: # SLOW/STOP + self.set_robot_wheel_velocities(1.5, reset_angle) + reset_angle = 0 + + p.stepSimulation() + # time.sleep(self.timeStep) + self.move_cars() + + new_pos, _ = p.getBasePositionAndOrientation(self.robot_id) + new_robot_pos = [new_pos[0], new_pos[1], new_pos[2]] + new_state = self.get_state(new_robot_pos) + progress = new_robot_pos[0] - robot_pos[0] # if robot moves forward, this is positive + progress_reward = 2.0 * progress # scale as needed # Terminal condition - done = ((state[4] < (0.5/self.max_range)) or (abs(pos[1]) > (self.lane_width + self.offset))) + left_lane, right_lane = new_state[5], new_state[6] + contacts = p.getContactPoints(bodyA=self.robot_id) + car_ids = {car['id'] for car in self.cars} + collision = any((c[1] in car_ids) or (c[2] in car_ids) for c in contacts) + + done = (collision or + (abs(new_robot_pos[1]) > (self.lane_width + self.offset)) or + (left_lane) or (right_lane)) # Step 3. Compute reward - reward = self.compute_reward(state, action,done) + reward = self.compute_reward(new_state, action,done) + progress_reward episode_reward += reward print("Episode = ",ep , "Step =", step, " State =", state, " Action =", action, " LogProb =", logprob, " Reward =", reward, "total_reward =", episode_reward) @@ -565,18 +674,59 @@ def run(self): agent.remember(state, action, logprob, reward, done) if done: + agent.remember(state, action, logprob, -20.0, True) terminal_step = step - print("Episode terminated early") + print("Episode", ep, " terminated early reward = ", episode_reward, " at step ", step) break + + p.resetDebugVisualizerCamera( + cameraDistance=5, + cameraYaw=0, + cameraPitch=-80, + cameraTargetPosition=new_robot_pos # camera follows robot + ) + reward_list.append(episode_reward) print("Episode Reward =", episode_reward) # Step 5. Collect the above batch data, and train PPO agent after each episode using value function network - loss = agent.train() - self.plot_trajectory(ep, terminal_step, episode_reward, loss) + def to_float(x): + if torch.is_tensor(x): + return float(x.detach().cpu()) + return float(x) + if( is_training): + loss, actor_loss, critic_loss, entropy = agent.train(force=True,entropy_coef=self.entropy_coef ) + episode_metrics["episode"].append(ep) + episode_metrics["actor_loss"].append(to_float(actor_loss)) + episode_metrics["critic_loss"].append(to_float(critic_loss)) + episode_metrics["entropy"].append(to_float(entropy)) + episode_metrics["reward"].append(to_float(episode_reward)) + episode_metrics["step"].append(terminal_step) + self.plot_trajectory(ep, terminal_step, episode_reward, loss, actor_loss, critic_loss, entropy) + else: + print(f"TEST Episode {ep+1}/{EPISODES} | Total Reward: {episode_reward:.2f} | Steps: {step}") + + if (is_training and ep % 5 == 0): + self.plot_metrics(episode_metrics, ep) + agent.save_model("./ppo_driver_model.pth") print("All Episodes Completed: reward list - ", reward_list) - p.disconnect() + + def test_model(self, agent, total_episodes): + print("=================Testing trained PPO agent============================") + agent.load_model("./ppo_driver_model.pth") + self.run(agent=agent, EPISODES=total_episodes, is_training=False ) +#For training +agent = PPOAgent() sim = DriverAssistanceSim() -sim.run() \ No newline at end of file +EPISODES = 500 +sim.run(agent, EPISODES) +sim.disconnect_environmnent() + +# For testing +# sim = DriverAssistanceSim() +# agent = PPOAgent(state_dim=9, action_dim=4) +# agent.load_model("./ppo_driver_model.pth") +# sim.test_model(agent, total_episodes=1) +# sim.disconnect_environmnent() diff --git a/PPO.py b/PPO.py index 7d0aee5..895446d 100644 --- a/PPO.py +++ b/PPO.py @@ -1,15 +1,12 @@ -# PPO.py -- robust replacement import torch import torch.nn as nn import torch.optim as optim import torch.distributions as dist import numpy as np -# ----------------------- # PPO NETWORK -# ----------------------- class PPOActorCritic(nn.Module): - def __init__(self, state_dim=5, action_dim=3): # adapt state_dim to your get_state() + def __init__(self, state_dim=9, action_dim=4): super().__init__() self.shared = nn.Sequential( nn.Linear(state_dim, 128), nn.ReLU(), @@ -19,6 +16,21 @@ def __init__(self, state_dim=5, action_dim=3): # adapt state_dim to your get_s self.policy = nn.Linear(128, action_dim) self.value = nn.Linear(128, 1) + for m in self.shared.modules(): + if isinstance(m, nn.Linear): + nn.init.uniform_(m.weight, -0.01, 0.01) + nn.init.zeros_(m.bias) + + for m in self.policy.modules(): + if isinstance(m, nn.Linear): + nn.init.uniform_(m.weight, -0.01, 0.01) + nn.init.zeros_(m.bias) + + for m in self.value.modules(): + if isinstance(m, nn.Linear): + nn.init.uniform_(m.weight, -0.01, 0.01) + nn.init.zeros_(m.bias) + # Orthogonal initialization for stability for m in self.shared: if isinstance(m, nn.Linear): @@ -30,29 +42,32 @@ def __init__(self, state_dim=5, action_dim=3): # adapt state_dim to your get_s nn.init.constant_(self.value.bias, 0.0) def forward(self, x): - # Expect x shape: (batch, state_dim) x = self.shared(x) return self.policy(x), self.value(x) -# ----------------------- # PPO AGENT -# ----------------------- class PPOAgent: def __init__( self, state_dim=9, - action_dim=3, - lr=1e-3, + action_dim=4, + actor_lr=3e-4, + critic_lr=1e-3, gamma=0.99, clip_eps=0.2, update_epochs=8, batch_size=64, min_batch_size=256, - entropy_coef=0.01 + entropy_coef=0.03 ): self.model = PPOActorCritic(state_dim=state_dim, action_dim=action_dim) - self.optimizer = optim.Adam(self.model.parameters(), lr=lr) + self.optimizer = optim.Adam([ + {"params": self.model.policy.parameters(), "lr": actor_lr}, + {"params": self.model.value.parameters(), "lr": critic_lr}, + {"params": self.model.shared.parameters(), "lr": actor_lr} # usually shared with actor + ]) + self.gamma = gamma self.clip_eps = clip_eps @@ -60,7 +75,7 @@ def __init__( self.batch_size = batch_size self.min_batch_size = min_batch_size self.entropy_coef = entropy_coef - + self.last_action = None self.memory = {"states": [], "actions": [], "logprobs": [], "rewards": [], "dones": []} # ---------- action selection ---------- @@ -69,7 +84,28 @@ def act(self, state): state_t = torch.FloatTensor(state).unsqueeze(0) # make batch dim: (1, state_dim) logits, _ = self.model(state_t) # logits shape: (1, action_dim) logits = logits.squeeze(0) # shape: (action_dim,) - dist_pi = dist.Categorical(logits=logits) + + safe_actions = [0, 1, 2, 3] # all actions + front_dist = state[4] # front distance + left_dist = state[0] # left distance + right_dist = state[1] # right distance + # print("#################### front dist", front_dist, " left_dist ", left_dist, " right dist ", right_dist) + + if front_dist > 1 - 0.35 and 0 in safe_actions: + safe_actions.remove(0) + if left_dist > 1 - 0.25 and 1 in safe_actions: + safe_actions.remove(1) + if right_dist > 1 - 0.25 and 2 in safe_actions: + safe_actions.remove(2) + if front_dist < 0.4 and 3 in safe_actions: + safe_actions.remove(3) + + # Mask logits + mask = torch.zeros_like(logits) + mask[safe_actions] = 1.0 + masked_logits = logits + (mask + 1e-8).log() # avoid log(0) + + dist_pi = dist.Categorical(logits=masked_logits) action = dist_pi.sample() logprob = dist_pi.log_prob(action) @@ -77,18 +113,24 @@ def act(self, state): return int(action.item()), float(logprob.item()) def remember(self, s, a, lp, r, d): + if self.last_action is not None and a == 3 and self.last_action == 3: + r -= 0.02 self.memory["states"].append(np.array(s, dtype=np.float32)) self.memory["actions"].append(int(a)) self.memory["logprobs"].append(float(lp)) self.memory["rewards"].append(float(r)) self.memory["dones"].append(bool(d)) + self.last_action = a # ---------- PPO update ---------- - def train(self, force=False): + def train(self, force=False, entropy_coef=None): """ Runs PPO update. If not enough samples (len(states) < min_batch_size) and not force, update is skipped to avoid unstable small-batch updates. """ + if entropy_coef is None: + entropy_coef = self.entropy_coef + mem_size = len(self.memory["states"]) if mem_size == 0: print("PPO.train(): memory empty -> nothing to do.") @@ -114,6 +156,7 @@ def train(self, force=False): G = r + self.gamma * G returns.insert(0, G) returns = torch.FloatTensor(returns) # (N,) + returns = (returns - returns.mean()) / (returns.std(unbiased=False) + 1e-8) # Basic NaN check if torch.isnan(states).any() or torch.isnan(returns).any(): @@ -140,6 +183,7 @@ def train(self, force=False): # Compute advantages values = values.squeeze(1) # (B,) advantages = ret_batch - values # (B,) + critic_loss = 0.5 * (advantages.pow(2).mean()) # Robust normalization: avoid divide-by-zero adv_mean = advantages.mean() @@ -156,7 +200,6 @@ def train(self, force=False): surr2 = torch.clamp(ratio, 1.0 - self.clip_eps, 1.0 + self.clip_eps) * advantages actor_loss = -torch.min(surr1, surr2).mean() - critic_loss = 0.5 * (advantages.pow(2).mean()) # MSE on advantage ~ (returns - value)^2 loss = actor_loss + critic_loss - self.entropy_coef * entropy @@ -171,4 +214,22 @@ def train(self, force=False): # Clear memory after update self.memory = {k: [] for k in self.memory} print("PPO.train(): Update finished. Final loss:", final_loss) - return final_loss + + + return final_loss, actor_loss, critic_loss, entropy + + def save_model(self, filepath): + """Saves the state dictionary of the underlying ActorCritic network.""" + torch.save(self.model.state_dict(), filepath) + print(f"PPOAgent: Model saved successfully to {filepath}") + + def load_model(self, filepath): + """Loads the state dictionary into the underlying ActorCritic network.""" + try: + self.model.load_state_dict(torch.load(filepath)) + print(f"PPOAgent: Model loaded successfully from {filepath}") + self.model.eval() # Important for inference + except FileNotFoundError: + print(f"PPOAgent: Error - Model file not found at {filepath}. Starting from scratch.") + except RuntimeError as e: + print(f"PPOAgent: Error loading model state dict. Error: {e}") \ No newline at end of file