diff --git a/src/Driving_Car/Autonomous_vehicle_perpendicular_reverse_parking.py b/src/Driving_Car/Autonomous_vehicle_perpendicular_reverse_parking.py new file mode 100644 index 000000000..46ef32b5d --- /dev/null +++ b/src/Driving_Car/Autonomous_vehicle_perpendicular_reverse_parking.py @@ -0,0 +1,297 @@ +# 导入Carla仿真器核心库,用于与仿真世界、车辆、传感器交互 +import carla +# 导入时间库,用于控制循环频率和添加延时 +import time +# 导入numpy数值计算库,用于位姿、误差的矩阵/数组运算 +import numpy as np +# 导入数学库,用于三角函数、弧度转换等基础数学运算 +import math +# 导入pygame库,用于可视化摄像头画面和处理窗口事件 +import pygame + +# 初始化pygame模块(主要用于实时显示摄像头画面,也可扩展为键盘控制) +pygame.init() +# 创建pygame显示窗口,分辨率设置为800x600像素 +screen = pygame.display.set_mode((800, 600)) +# 设置pygame窗口标题,便于识别当前仿真程序 +pygame.display.set_caption("Carla Parking Control") + +class CarlaParking: + """ + Carla无人车倒车入库核心类 + 功能概述: + 1. 建立与Carla仿真器的连接,初始化仿真场景(车辆、传感器、目标车位) + 2. 实时获取车辆位姿,计算与目标车位的误差 + 3. 基于纯追踪+PID比例控制实现倒车入库的车辆控制 + 4. 可视化摄像头画面,管理仿真资源(车辆、传感器的创建与销毁) + """ + def __init__(self): + """类初始化函数:建立Carla连接,初始化核心变量""" + # 1. 连接Carla服务器(默认本地地址localhost,端口2000) + self.client = carla.Client('localhost', 2000) + # 设置服务器连接超时时间为10秒,避免程序无限等待 + self.client.set_timeout(10.0) + # 获取当前Carla仿真世界对象(所有演员、地图的容器) + self.world = self.client.get_world() + # 获取蓝图库(包含所有可生成的车辆、传感器等演员的模板) + self.blueprint_library = self.world.get_blueprint_library() + # 获取当前仿真地图对象(用于获取道路生成点、车位参考等) + self.map = self.world.get_map() + + # 2. 初始化演员对象变量(后续在_init_scene中生成) + self.vehicle = None # 无人车演员对象(核心控制目标) + self.camera = None # RGB摄像头传感器对象(可视化用) + self.lidar = None # LiDAR传感器对象(预留车位检测用) + self.spectator = self.world.get_spectator() # 仿真器旁观者视角(用于跟随车辆) + + # 3. 初始化车辆控制相关参数 + self.control = carla.VehicleControl() # Carla车辆控制指令对象(封装油门、刹车、转向、倒车等指令) + self.max_speed = 5.0 # 倒车入库的最大速度(单位:km/h),低速保证入库精度和安全性 + self.target_parking_transform = None # 目标车位的位姿(Carla.Transform类型,包含位置和朝向) + + # 4. 调用场景初始化函数,生成车辆、传感器和目标车位 + self._init_scene() + + def _init_scene(self): + """ + 私有场景初始化函数: + 1. 清理场景中现有车辆和传感器,避免冲突 + 2. 生成指定型号的无人车,设置初始位置 + 3. 手动定义目标车位的位姿(可扩展为自动检测) + 4. 生成并挂载摄像头传感器(LiDAR预留) + """ + # 第一步:清理场景中已存在的车辆和传感器(重置仿真环境) + # 筛选并销毁所有车辆演员 + for actor in self.world.get_actors().filter('*vehicle*'): + actor.destroy() + # 筛选并销毁所有传感器演员 + for actor in self.world.get_actors().filter('*sensor*'): + actor.destroy() + + # 第二步:生成无人车(特斯拉Model3) + # 从蓝图库中筛选出特斯拉Model3的车辆蓝图(返回列表,取第一个) + vehicle_bp = self.blueprint_library.filter('model3')[0] + # 设置车辆颜色为红色(RGB值:255,0,0),增强可视化辨识度 + vehicle_bp.set_attribute('color', '255,0,0') + + # 获取地图预定义的车辆生成点(道路合法位置,避免生成在障碍物/人行道) + spawn_points = self.map.get_spawn_points() + # 选择第30个生成点作为车辆初始位置(不同地图的生成点分布不同,可按需调整索引) + # 初始位置要求:离目标车位有一定距离(约5m),便于展示倒车入库过程 + start_transform = spawn_points[30] + # 在初始位置生成车辆演员,并将对象赋值给self.vehicle + self.vehicle = self.world.spawn_actor(vehicle_bp, start_transform) + # 打印车辆ID,便于调试时识别车辆对象 + print(f"车辆生成成功:{self.vehicle.id}") + + # 第三步:定义目标车位的位姿(手动设置,可替换为LiDAR/视觉自动检测) + # 车位位置:在车辆初始位置的x轴负方向(后方)5m,y轴正方向(右侧)2m,z轴不变(地面高度) + parking_location = start_transform.location + carla.Location(x=-5.0, y=2.0, z=0.0) + # 车位朝向:偏航角(yaw)比初始位置大180°,即车辆倒车后车头朝向与初始方向相反 + parking_rotation = start_transform.rotation + carla.Rotation(yaw=180.0) + # 组合位置和朝向,生成目标车位的Transform对象(Carla中位姿的标准表示) + self.target_parking_transform = carla.Transform(parking_location, parking_rotation) + + # 第四步:生成RGB摄像头传感器(用于实时可视化车辆视角) + # 从蓝图库中获取RGB摄像头的传感器蓝图 + camera_bp = self.blueprint_library.find('sensor.camera.rgb') + # 设置摄像头分辨率:宽度800像素,高度600像素(与pygame窗口匹配) + camera_bp.set_attribute('image_size_x', '800') + camera_bp.set_attribute('image_size_y', '600') + # 设置摄像头视场角(FOV)为90°,模拟人眼视角范围 + camera_bp.set_attribute('fov', '90') + # 定义摄像头安装位姿:在车辆中心前1.5m、上2.4m处(俯视前方视角) + camera_transform = carla.Transform(carla.Location(x=1.5, z=2.4)) + # 生成摄像头传感器,并挂载到车辆上(随车辆移动) + self.camera = self.world.spawn_actor(camera_bp, camera_transform, attach_to=self.vehicle) + # 注册摄像头数据回调函数:每帧图像生成后自动调用_process_camera_image处理 + self.camera.listen(lambda image: self._process_camera_image(image)) + + # 第五步:生成LiDAR传感器(预留,用于后续自动车位检测功能) + # lidar_bp = self.blueprint_library.find('sensor.lidar.ray_cast') # 获取LiDAR蓝图 + # lidar_bp.set_attribute('range', '50') # 设置LiDAR检测范围为50m + # lidar_transform = carla.Transform(carla.Location(z=2.0)) # 安装在车辆顶部2m处 + # self.lidar = self.world.spawn_actor(lidar_bp, lidar_transform, attach_to=self.vehicle) # 生成并挂载LiDAR + # self.lidar.listen(lambda point_cloud: self._process_lidar(point_cloud)) # 注册点云处理回调 + + def _process_camera_image(self, image): + """ + 摄像头图像回调处理函数: + 将Carla摄像头的原始数据转换为pygame可显示的格式,并更新窗口画面 + 参数: + image: carla.Image类型,包含摄像头原始像素数据、分辨率等信息 + """ + # 1. 将Carla图像原始字节数据转换为numpy数组(dtype=uint8,对应0-255像素值) + array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) + # 2. 调整数组形状为(图像高度, 图像宽度, 4),对应RGBA四通道(红、绿、蓝、透明度) + array = np.reshape(array, (image.height, image.width, 4)) + # 3. 去除alpha(透明度)通道,仅保留RGB三通道(pygame显示无需透明度) + array = array[:, :, :3] + # 4. 交换数组的0轴和1轴(Carla图像是(height, width),pygame是(width, height)) + array = array.swapaxes(0, 1) + # 5. 将numpy数组转换为pygame的Surface对象(可直接绘制到窗口) + surface = pygame.surfarray.make_surface(array) + # 6. 将Surface对象绘制到pygame窗口的(0,0)位置(左上角) + screen.blit(surface, (0, 0)) + # 7. 更新pygame窗口显示,使新绘制的画面生效 + pygame.display.update() + + def _process_lidar(self, point_cloud): + """ + LiDAR点云处理回调函数(占位): + 后续可扩展为:点云聚类→车位轮廓提取→目标车位位姿计算 + 参数: + point_cloud: carla.LidarMeasurement类型,包含LiDAR点云数据 + """ + pass + + def get_vehicle_pose(self): + """ + 获取车辆当前位姿(简化为2D平面) + 返回: + numpy数组 [x, y, yaw],其中: + x: 车辆在地图中的x坐标(米) + y: 车辆在地图中的y坐标(米) + yaw: 车辆偏航角(弧度,绕z轴旋转,0为东,逆时针为正) + """ + # 获取车辆当前的Transform对象(包含位置和旋转) + transform = self.vehicle.get_transform() + # 提取x、y坐标(z坐标为高度,2D平面入库无需考虑) + x = transform.location.x + y = transform.location.y + # 提取偏航角(Carla中为角度,转换为弧度便于三角函数计算) + yaw = math.radians(transform.rotation.yaw) + # 返回位姿数组 + return np.array([x, y, yaw]) + + def calculate_parking_error(self): + """ + 计算车辆当前位姿与目标车位的误差(倒车入库的核心参考量) + 返回: + pos_error: numpy数组 [x_error, y_error],位置误差(米),目标-当前 + yaw_error: 浮点数,朝向误差(弧度),归一化到[-π, π] + """ + # 获取车辆当前位姿 + current_pose = self.get_vehicle_pose() + # 构建目标车位的位姿数组(与当前位姿格式一致) + target_pose = np.array([ + self.target_parking_transform.location.x, # 车位x坐标 + self.target_parking_transform.location.y, # 车位y坐标 + math.radians(self.target_parking_transform.rotation.yaw) # 车位偏航角(转弧度) + ]) + + # 计算xy平面的位置误差(目标位置 - 当前位置) + pos_error = target_pose[:2] - current_pose[:2] + # 计算朝向误差(目标偏航角 - 当前偏航角) + yaw_error = target_pose[2] - current_pose[2] + # 归一化朝向误差到[-π, π]范围(避免误差超过360°,如350°误差等价于-10°) + yaw_error = (yaw_error + math.pi) % (2 * math.pi) - math.pi + + return pos_error, yaw_error + + def parking_control(self): + """ + 倒车入库核心控制算法(纯追踪+PID比例控制) + 控制逻辑: + 1. 优先调整车辆朝向,消除偏航误差 + 2. 速度与距离误差成正比,近距离减速,保证入库平稳 + 3. 误差小于阈值时停止车辆,判定入库完成 + 返回: + bool类型,True表示入库完成(距离误差<0.2m),False表示未完成 + """ + # 第一步:获取位姿误差 + pos_error, yaw_error = self.calculate_parking_error() + # 计算xy平面的距离误差(欧氏距离,即车辆到车位的直线距离) + distance = np.linalg.norm(pos_error) + + # 第二步:定义控制参数(可根据车辆特性调优) + k_p_pos = 0.1 # 位置比例系数(预留,当前仅用朝向控制) + k_p_yaw = 0.5 # 朝向比例系数(越大,转向响应越灵敏) + max_steer = math.radians(30) # 最大转向角(30°,模拟真实车辆的转向限位) + + # 第三步:朝向控制(优先消除偏航误差,避免倒车时偏离车位) + # 比例控制:转向角 = 比例系数 × 朝向误差 + steer = k_p_yaw * yaw_error + # 限制转向角在[-max_steer, max_steer]范围内,避免过度转向 + steer = np.clip(steer, -max_steer, max_steer) + + # 第四步:速度控制(倒车模式,负速度表示倒车) + # 速度策略:距离5m时达到最大速度,距离越近速度越慢(线性减速) + speed = -min(self.max_speed * (distance / 5.0), self.max_speed) + # 距离误差小于0.2m时,判定入库完成,停止车辆 + if distance < 0.2: + speed = 0.0 # 速度置0 + steer = 0.0 # 转向角置0 + print("倒车入库完成!") + + # 第五步:转换为Carla车辆控制指令(Carla的控制范围有固定要求) + self.control.steer = float(steer / max_steer) # 转向角归一化到[-1, 1] + # 油门控制:倒车时油门为0(依赖reverse标志实现倒车),前进时油门0.5(本场景仅倒车) + self.control.throttle = 0.0 if speed < 0 else 0.5 + self.control.brake = 0.0 # 无刹车(靠减速策略停车) + self.control.reverse = speed < 0 # 设置倒车标志(True表示倒车) + + # 第六步:将控制指令应用到车辆 + self.vehicle.apply_control(self.control) + + # 返回入库完成状态(距离误差<0.2m) + return distance < 0.2 + + def run(self): + """ + 主循环函数: + 1. 处理pygame窗口事件(如关闭窗口) + 2. 循环执行倒车入库控制逻辑 + 3. 更新旁观者视角,跟随车辆移动 + 4. 保证循环频率,清理仿真资源 + """ + try: + # 初始化入库完成标志 + finished = False + # 主循环:直到入库完成或关闭窗口 + while not finished: + # 处理pygame事件(仅处理窗口关闭事件,可扩展为键盘控制) + for event in pygame.event.get(): + # 检测到窗口关闭事件时,终止循环 + if event.type == pygame.QUIT: + finished = True + + # 执行倒车入库控制逻辑,更新完成标志 + finished = self.parking_control() or finished + + # 更新旁观者视角(便于观察车辆入库过程) + # 获取车辆当前位姿 + vehicle_transform = self.vehicle.get_transform() + # 定义旁观者位姿:在车辆后方10m、上方5m,俯角15°,与车辆同向 + spectator_transform = carla.Transform( + vehicle_transform.location + carla.Location(x=-10, z=5), + carla.Rotation(yaw=vehicle_transform.rotation.yaw, pitch=-15) + ) + # 设置旁观者视角 + self.spectator.set_transform(spectator_transform) + + # 控制循环频率为20Hz(每次循环休眠0.05秒),避免CPU占用过高 + time.sleep(0.05) + + finally: + """无论循环正常结束还是异常终止,都要清理仿真资源""" + # 销毁车辆演员 + if self.vehicle: + self.vehicle.destroy() + # 销毁摄像头传感器 + if self.camera: + self.camera.destroy() + # 销毁LiDAR传感器(若启用) + if self.lidar: + self.lidar.destroy() + # 退出pygame模块 + pygame.quit() + # 打印清理完成提示 + print("资源清理完成") + +# 程序入口:当脚本直接运行时执行 +if __name__ == '__main__': + # 实例化倒车入库类 + parking = CarlaParking() + # 运行主循环 + parking.run() \ No newline at end of file diff --git a/src/Driving_Car/Autonomous_vehicle_simulation_obstacle_avoidance.py b/src/Driving_Car/Autonomous_vehicle_simulation_obstacle_avoidance.py index cab284fd0..422ce6106 100644 --- a/src/Driving_Car/Autonomous_vehicle_simulation_obstacle_avoidance.py +++ b/src/Driving_Car/Autonomous_vehicle_simulation_obstacle_avoidance.py @@ -142,7 +142,7 @@ def detect_obstacle(): if len(front_points) == 0: return False - # 计算每个点的水平角度(相对于车辆前进方向,单位:度) + # 计算每个点的水平角(相对于车辆前进方向,单位:度) # 车辆前进方向为x轴正方向,y轴正方向为左,负方向为右 angles = np.degrees(np.arctan2(front_points[:, 1], front_points[:, 0])) # 过滤角度在[-阈值, +阈值]范围内的点(前方左右各OBSTACLE_ANGLE_THRESHOLD度) diff --git a/src/Driving_Car/README.md b/src/Driving_Car/README.md index aea3eb655..dc77a0dfe 100644 --- a/src/Driving_Car/README.md +++ b/src/Driving_Car/README.md @@ -15,7 +15,7 @@ **车辆控制**: PID控制器实现精确的转向和速度控制 -**传感器模拟**: 模拟激光雷达、摄像头和超声波传感器数据 +**传感器模拟**: 模拟激光雷达摄像头和超声波传感器数据 **3D可视化**: 基于PyGame的实时场景渲染 diff --git a/src/Driving_Car/Simulation_obstacle_avoidance_system.py b/src/Driving_Car/Simulation_obstacle_avoidance_system.py new file mode 100644 index 000000000..91ff6f7a1 --- /dev/null +++ b/src/Driving_Car/Simulation_obstacle_avoidance_system.py @@ -0,0 +1,334 @@ +import carla +import random +import time +import numpy as np +from collections import deque +from dataclasses import dataclass, field +from typing import Dict, List, Callable, Optional + +# -------------------------- 配置参数 -------------------------- +# Carla服务器连接配置 +CARLA_HOST = 'localhost' # 服务器地址(本地调试用localhost) +CARLA_PORT = 2000 # 服务器端口(默认2000) +CARLA_TIMEOUT = 10.0 # 客户端超时时间(秒) + +# 车辆运动控制配置 +TARGET_SPEED = 5.0 # 目标行驶速度(米/秒) +CONTROL_FREQUENCY = 20.0 # 车辆控制频率(赫兹) +CONTROL_DELAY = 1.0 / CONTROL_FREQUENCY # 控制循环延迟(秒) + +# 避障核心阈值 +OBSTACLE_DISTANCE_THRESHOLD = 8.0 # 障碍物距离阈值(米) +OBSTACLE_ANGLE_THRESHOLD = 30.0 # 障碍物检测角度范围(度),左右各30度 +BRAKE_INTENSITY = 1.0 # 制动强度(0.0-1.0,1.0为全力制动) +STEER_ANGLE = 0.2 # 避障转向角度(-1.0左~1.0右) + +# 激光雷达(LiDAR)配置字典 +LIDAR_CONFIG = { + 'range': 50.0, # 最大探测距离(米) + 'channels': 32, # 激光雷达通道数 + 'points_per_second': 100000, # 每秒生成点云数量 + 'rotation_frequency': 10, # 旋转频率(赫兹) + 'upper_fov': 10.0, # 上视场角(度) + 'lower_fov': -30.0, # 下视场角(度) + 'location': (0.8, 0.0, 1.8) # 安装位置(相对车辆:前0.8m,中0m,高1.8m) +} + + +# -------------------------- 全局注册与状态管理 -------------------------- +@dataclass +class 传感器注册表: + """传感器集中式注册表(生命周期管理)""" + sensors: Dict[str, carla.Actor] = field(default_factory=dict) # 传感器实例字典 + callbacks: Dict[str, Callable] = field(default_factory=dict) # 回调函数字典 + + def 注册传感器(self, 传感器名称: str, 传感器实例: carla.Actor, 回调函数: Callable): + """ + 注册传感器并绑定数据回调函数 + 参数: + 传感器名称: 传感器唯一标识(如"lidar_front") + 传感器实例: Carla已生成的传感器Actor + 回调函数: 数据处理回调函数 + """ + self.sensors[传感器名称] = 传感器实例 + self.sensors[传感器名称].listen(回调函数) + print(f"✅ 传感器注册成功:{传感器名称}(ID:{传感器实例.id})") + + def 注销所有传感器(self): + """注销并销毁所有已注册的传感器(防止Carla内存泄漏)""" + for 传感器名称, 传感器 in self.sensors.items(): + if 传感器.is_alive: + 传感器.stop() # 停止数据监听 + 传感器.destroy() # 销毁传感器Actor + print(f"❌ 传感器已注销:{传感器名称}") + self.sensors.clear() + self.callbacks.clear() + + +@dataclass +class 车辆状态注册表: + """车辆状态跟踪注册表""" + 车辆实例: Optional[carla.Vehicle] = None # 车辆Actor实例 + 当前速度: float = 0.0 # 当前车速(米/秒) + 检测到障碍物: bool = False # 障碍物检测状态 + 控制指令历史: List[carla.VehicleControl] = field(default_factory=list) # 控制指令历史 + + def 更新车速(self): + """从Carla API更新当前车速(米/秒)""" + if self.车辆实例: + 速度矢量 = self.车辆实例.get_velocity() + # 计算三维速度(x/y/z轴合成) + self.当前速度 = np.sqrt(速度矢量.x ** 2 + 速度矢量.y ** 2 + 速度矢量.z ** 2) + + def 记录控制指令(self, 控制指令: carla.VehicleControl): + """记录控制指令(用于调试)""" + self.控制指令历史.append(控制指令) + # 仅保留最近100条记录(节省内存) + if len(self.控制指令历史) > 100: + self.控制指令历史.pop(0) + + +# 初始化全局注册表 +传感器注册表实例 = 传感器注册表() +车辆状态实例 = 车辆状态注册表() +激光雷达数据缓冲区 = deque(maxlen=1) # 线程安全的激光雷达数据缓冲区(仅保留最新1帧) + + +# -------------------------- 核心功能模块 -------------------------- +def 激光雷达数据回调(数据: carla.LidarMeasurement): + """ + 激光雷达数据处理回调函数 + 将原始点云数据转换为numpy数组并存储到线程安全缓冲区 + 数组格式:每个点[x, y, z, 强度值] + """ + try: + # 原始二进制数据转numpy数组(float32类型,每行4个值) + 点云数组 = np.frombuffer(data.raw_data, dtype=np.float32).reshape(-1, 4) + 激光雷达数据缓冲区.append(点云数组) + except Exception as e: + print(f"⚠️ 激光雷达回调异常:{str(e)}") + + +def 障碍物检测算法() -> bool: + """ + 车辆前方障碍物检测核心算法 + 检测逻辑: + 1. 过滤有效点云(车辆前方:x>0) + 2. 计算每个点相对车辆航向的水平角度 + 3. 校验是否有点云落在距离/角度阈值内 + 4. 过滤地面点(避免误判) + 返回: + bool: 检测到障碍物返回True,否则False + """ + # 无点云数据直接返回无障碍物 + if not 激光雷达数据缓冲区: + return False + + # 获取最新一帧点云数据 + 点云数组 = 激光雷达数据缓冲区[0] + + # 步骤1:过滤前方距离阈值内的点云 + 前方点云 = 点云数组[ + (点云数组[:, 0] > 0) & # x>0表示车辆前方(车辆坐标系x轴向前) + (点云数组[:, 0] < OBSTACLE_DISTANCE_THRESHOLD) # 距离小于阈值 + ] + # 无前方点云则返回无障碍物 + if len(前方点云) == 0: + return False + + # 步骤2:计算每个点相对车辆航向的水平角度(度) + # 车辆航向为x轴正方向;y轴正方向=左,y轴负方向=右 + 水平角度 = np.degrees(np.arctan2(前方点云[:, 1], 前方点云[:, 0])) + + # 步骤3:过滤角度阈值内的点云(左右各OBSTACLE_ANGLE_THRESHOLD度) + 有效角度掩码 = np.abs(水平角度) <= OBSTACLE_ANGLE_THRESHOLD + 障碍物候选点云 = 前方点云[有效角度掩码] + + # 步骤4:过滤地面点(简单高度过滤,地面点z通常<-1.0,可根据车辆高度调整) + 非地面点云 = 障碍物候选点云[障碍物候选点云[:, 2] > -1.0] + + # 至少10个有效点才判定为障碍物(避免单点噪声误判) + 检测到障碍物 = len(非地面点云) > 10 + return 检测到障碍物 + + +def 注册自动驾驶车辆(世界: carla.World, 蓝图库: carla.BlueprintLibrary) -> bool: + """ + 注册并生成标准化配置的自动驾驶车辆 + 参数: + 世界: Carla世界实例 + 蓝图库: Carla蓝图库实例 + 返回: + bool: 车辆注册成功返回True,失败返回False + """ + try: + # 选择特斯拉Model3蓝图(可替换为其他车型,如'vehicle.bmw.grandtourer') + 车辆蓝图 = 蓝图库.find('vehicle.tesla.model3') + 车辆蓝图.set_attribute('color', '255,0,0') # 设置车辆颜色为红色(RGB) + 车辆蓝图.set_attribute('role_name', 'autonomous') # 设置角色名为自动驾驶车辆 + + # 获取地图生成点 + 生成点列表 = 世界.get_map().get_spawn_points() + if not 生成点列表: + # 无默认生成点时使用自定义坐标(x=100, y=100, z=2) + 生成点 = carla.Transform(carla.Location(x=100, y=100, z=2)) + print(f"⚠️ 无默认生成点,使用自定义坐标:{生成点}") + else: + 生成点 = random.choice(生成点列表) + + # 生成车辆Actor + 车辆实例 = 世界.spawn_actor(车辆蓝图, 生成点) + 车辆状态实例.车辆实例 = 车辆实例 + + # 初始化车辆状态 + 车辆实例.set_autopilot(False) # 关闭Carla默认自动驾驶 + 车辆状态实例.更新车速() # 初始化车速 + + print(f"✅ 自动驾驶车辆注册成功:ID={车辆实例.id},生成点={生成点.location}") + return True + + except Exception as e: + print(f"❌ 车辆注册失败:{str(e)}") + return False + + +def 注册激光雷达传感器(世界: carla.World, 蓝图库: carla.BlueprintLibrary): + """ + 注册激光雷达传感器(绑定到已注册的车辆) + 参数: + 世界: Carla世界实例 + 蓝图库: Carla蓝图库实例 + """ + if not 车辆状态实例.车辆实例: + raise RuntimeError("无法注册激光雷达 - 未找到已注册的车辆") + + # 创建激光雷达蓝图 + 激光雷达蓝图 = 蓝图库.find('sensor.lidar.ray_cast') + + # 应用激光雷达配置参数 + 激光雷达蓝图.set_attribute('range', str(LIDAR_CONFIG['range'])) + 激光雷达蓝图.set_attribute('channels', str(LIDAR_CONFIG['channels'])) + 激光雷达蓝图.set_attribute('points_per_second', str(LIDAR_CONFIG['points_per_second'])) + 激光雷达蓝图.set_attribute('rotation_frequency', str(LIDAR_CONFIG['rotation_frequency'])) + 激光雷达蓝图.set_attribute('upper_fov', str(LIDAR_CONFIG['upper_fov'])) + 激光雷达蓝图.set_attribute('lower_fov', str(LIDAR_CONFIG['lower_fov'])) + + # 设置激光雷达安装位置 + 安装位置 = carla.Transform( + carla.Location( + x=LIDAR_CONFIG['location'][0], + y=LIDAR_CONFIG['location'][1], + z=LIDAR_CONFIG['location'][2] + ) + ) + + # 生成激光雷达Actor并绑定到车辆 + 激光雷达实例 = 世界.spawn_actor(激光雷达蓝图, 安装位置, attach_to=车辆状态实例.车辆实例) + + # 注册到传感器注册表 + 传感器注册表实例.注册传感器( + 传感器名称="front_lidar", + 传感器实例=激光雷达实例, + 回调函数=激光雷达数据回调 + ) + + +def 车辆避障控制循环(): + """ + 主控制循环:感知-决策-控制 + 按CONTROL_FREQUENCY频率执行避障逻辑 + """ + print("🚗 开始避障控制循环...") + 车辆控制指令 = carla.VehicleControl() # 初始化控制指令 + + while True: + try: + # 1. 更新车辆状态 + 车辆状态实例.更新车速() + + # 2. 检测前方障碍物 + 车辆状态实例.检测到障碍物 = 障碍物检测算法() + + # 3. 避障决策逻辑 + if 车辆状态实例.检测到障碍物: + # 检测到障碍物:紧急制动 + 小幅度转向 + 车辆控制指令.brake = BRAKE_INTENSITY # 全力制动 + 车辆控制指令.throttle = 0.0 # 关闭油门 + 车辆控制指令.steer = STEER_ANGLE # 向右转向避障(左转为负) + print(f"⚠️ 检测到障碍物!当前车速:{车辆状态实例.当前速度:.2f}m/s,执行制动转向") + else: + # 无障碍物:保持匀速直行 + if 车辆状态实例.当前速度 < TARGET_SPEED: + 车辆控制指令.throttle = 0.5 # 油门开度50% + else: + 车辆控制指令.throttle = 0.0 # 关闭油门(滑行) + 车辆控制指令.brake = 0.0 # 松开刹车 + 车辆控制指令.steer = 0.0 # 保持直行 + + # 4. 应用控制指令并记录 + 车辆状态实例.车辆实例.apply_control(车辆控制指令) + 车辆状态实例.记录控制指令(车辆控制指令) + + # 5. 控制频率延迟 + time.sleep(CONTROL_DELAY) + + except KeyboardInterrupt: + print("\n🛑 控制循环被用户中断") + break + except Exception as e: + print(f"❌ 控制循环异常:{str(e)}") + break + + +# -------------------------- 主程序入口 -------------------------- +def main(): + 客户端 = None + 世界 = None + + try: + # 1. 连接Carla服务器 + 客户端 = carla.Client(CARLA_HOST, CARLA_PORT) + 客户端.set_timeout(CARLA_TIMEOUT) + 世界 = 客户端.get_world() + 蓝图库 = 世界.get_blueprint_library() + print(f"✅ 成功连接Carla服务器:{CARLA_HOST}:{CARLA_PORT}") + + # 2. 加载地图并设置天气 + 世界 = 客户端.load_world('Town03') # 加载Town03地图(可替换为Town01/Town02) + 天气参数 = carla.WeatherParameters( + sun_altitude_angle=70.0, # 太阳高度角(70度=晴天) + fog_density=0.0 # 雾浓度(0=无雾) + ) + 世界.set_weather(天气参数) + print("✅ 地图加载完成:Town03,天气:晴天无雾") + + # 3. 注册自动驾驶车辆 + if not 注册自动驾驶车辆(世界, 蓝图库): + raise RuntimeError("车辆注册失败,程序终止") + + # 4. 注册激光雷达传感器 + 注册激光雷达传感器(世界, 蓝图库) + + # 5. 启动避障控制循环 + 车辆避障控制循环() + + except KeyboardInterrupt: + print("\n🛑 程序被用户手动中断") + except Exception as e: + print(f"❌ 程序执行异常:{str(e)}") + finally: + # 6. 资源清理(关键:防止Carla残留Actor) + print("\n🧹 开始清理资源...") + # 注销所有传感器 + 传感器注册表实例.注销所有传感器() + # 销毁车辆实例 + if 车辆状态实例.车辆实例 and 车辆状态实例.车辆实例.is_alive: + 车辆状态实例.车辆实例.destroy() + print(f"✅ 自动驾驶车辆已销毁(ID:{车辆状态实例.车辆实例.id})") + # 重置车辆状态 + 车辆状态实例.车辆实例 = None + print("✅ 所有资源清理完成") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/src/Driving_Car/Unmanned_vehicle_lane_detection_simulation.py b/src/Driving_Car/Unmanned_vehicle_lane_detection_simulation.py new file mode 100644 index 000000000..26e414786 --- /dev/null +++ b/src/Driving_Car/Unmanned_vehicle_lane_detection_simulation.py @@ -0,0 +1,175 @@ +import cv2 +import numpy as np +import matplotlib.pyplot as plt + +def grayscale(img): + """图像灰度化""" + return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + +def gaussian_blur(img, kernel_size): + """高斯模糊去噪""" + return cv2.GaussianBlur(img, (kernel_size, kernel_size), 0) + +def canny_edge_detection(img, low_threshold, high_threshold): + """Canny边缘检测""" + return cv2.Canny(img, low_threshold, high_threshold) + +def region_of_interest(img, vertices): + """提取感兴趣区域(只保留道路区域,过滤背景)""" + # 创建掩码 + mask = np.zeros_like(img) + # 填充掩码的感兴趣区域 + cv2.fillPoly(mask, vertices, 255) + # 与原图像进行按位与操作 + masked_img = cv2.bitwise_and(img, mask) + return masked_img + +def draw_lines(img, lines, color=(0, 255, 0), thickness=5): + """绘制检测到的车道线(拟合左右车道线后绘制)""" + left_x = [] + left_y = [] + right_x = [] + right_y = [] + + if lines is None: + return img + + for line in lines: + for x1, y1, x2, y2 in line: + # 计算斜率 + slope = (y2 - y1) / (x2 - x1) if (x2 - x1) != 0 else 0 + # 过滤斜率过小的直线(非车道线) + if abs(slope) < 0.5: + continue + # 区分左车道(斜率为负)和右车道(斜率为正) + if slope < 0: + left_x.extend([x1, x2]) + left_y.extend([y1, y2]) + else: + right_x.extend([x1, x2]) + right_y.extend([y1, y2]) + + # 拟合左车道线 + if left_x and left_y: + left_fit = np.polyfit(left_y, left_x, 1) + left_func = np.poly1d(left_fit) + # 定义车道线的上下边界(y坐标) + y_min = int(img.shape[0] * 0.6) # 车道线上边界 + y_max = img.shape[0] # 车道线下边界 + # 计算对应x坐标 + x_left_min = int(left_func(y_min)) + x_left_max = int(left_func(y_max)) + cv2.line(img, (x_left_min, y_min), (x_left_max, y_max), color, thickness) + + # 拟合右车道线 + if right_x and right_y: + right_fit = np.polyfit(right_y, right_x, 1) + right_func = np.poly1d(right_fit) + y_min = int(img.shape[0] * 0.6) + y_max = img.shape[0] + x_right_min = int(right_func(y_min)) + x_right_max = int(right_func(y_max)) + cv2.line(img, (x_right_min, y_min), (x_right_max, y_max), color, thickness) + + return img + +def hough_lines(img, rho, theta, threshold, min_line_len, max_line_gap): + """霍夫变换检测直线""" + lines = cv2.HoughLinesP(img, rho, theta, threshold, np.array([]), + minLineLength=min_line_len, maxLineGap=max_line_gap) + line_img = np.zeros((img.shape[0], img.shape[1], 3), dtype=np.uint8) + draw_lines(line_img, lines) + return line_img + +def weighted_img(img, initial_img, α=0.8, β=1., γ=0.): + """将检测到的车道线与原图像融合""" + return cv2.addWeighted(initial_img, α, img, β, γ) + +def lane_detection_pipeline(image): + """车道检测完整流水线""" + # 1. 预处理:灰度化 + 高斯模糊 + Canny边缘检测 + gray = grayscale(image) + blur_gray = gaussian_blur(gray, kernel_size=5) + edges = canny_edge_detection(blur_gray, low_threshold=50, high_threshold=150) + + # 2. 定义感兴趣区域(多边形顶点,根据图像尺寸调整) + imshape = image.shape + vertices = np.array([[(0, imshape[0]), + (imshape[1] / 2 - 20, imshape[0] / 2 + 60), + (imshape[1] / 2 + 20, imshape[0] / 2 + 60), + (imshape[1], imshape[0])]], dtype=np.int32) + masked_edges = region_of_interest(edges, vertices) + + # 3. 霍夫变换检测直线 + rho = 1 # 霍夫空间的rho步长 + theta = np.pi / 180 # 霍夫空间的theta步长 + threshold = 15 # 检测直线的阈值 + min_line_len = 40 # 直线的最小长度 + max_line_gap = 20 # 直线之间的最大间隙 + line_img = hough_lines(masked_edges, rho, theta, threshold, min_line_len, max_line_gap) + + # 4. 融合车道线与原图像 + result = weighted_img(line_img, image) + + return result, edges, masked_edges, line_img + +# -------------------------- 主程序 -------------------------- +if __name__ == "__main__": + # 读取测试图像(可替换为自己的道路图像,建议使用车载摄像头视角的道路图) + # 若没有测试图,可使用OpenCV生成模拟道路图像 + # 生成模拟道路图像 + def create_simulation_road_image(width=800, height=600): + """生成模拟的道路图像(包含左右车道线)""" + img = np.ones((height, width, 3), dtype=np.uint8) * 255 # 白色背景 + # 绘制道路(灰色) + cv2.rectangle(img, (100, 0), (700, height), (128, 128, 128), -1) + # 绘制左车道线(白色) + cv2.line(img, (250, height), (350, height//2), (255, 255, 255), 5) + # 绘制右车道线(白色) + cv2.line(img, (550, height), (450, height//2), (255, 255, 255), 5) + return img + + # 生成模拟道路图像 + road_img = create_simulation_road_image(width=800, height=600) + + # 执行车道检测 + result, edges, masked_edges, line_img = lane_detection_pipeline(road_img) + + # 显示结果 + plt.figure(figsize=(16, 12)) + plt.subplot(2, 2, 1) + plt.imshow(cv2.cvtColor(road_img, cv2.COLOR_BGR2RGB)) + plt.title('Original Road Image') + plt.axis('off') + + plt.subplot(2, 2, 2) + plt.imshow(edges, cmap='gray') + plt.title('Canny Edges') + plt.axis('off') + + plt.subplot(2, 2, 3) + plt.imshow(masked_edges, cmap='gray') + plt.title('Masked Edges (ROI)') + plt.axis('off') + + plt.subplot(2, 2, 4) + plt.imshow(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) + plt.title('Lane Detection Result') + plt.axis('off') + + plt.tight_layout() + plt.show() + + # (可选)处理视频流(模拟车载摄像头实时检测) + # 若需要处理视频,可替换为视频路径或摄像头编号(0为默认摄像头) + # cap = cv2.VideoCapture('test_video.mp4') + # while cap.isOpened(): + # ret, frame = cap.read() + # if not ret: + # break + # result, _, _, _ = lane_detection_pipeline(frame) + # cv2.imshow('Lane Detection', result) + # if cv2.waitKey(1) & 0xFF == ord('q'): + # break + # cap.release() + # cv2.destroyAllWindows() \ No newline at end of file