commit ec1ee13db15c0f72115ca2fad2e9f13d14cacd68 Author: wzh <2965067137@qq.com> Date: Fri Sep 26 12:46:01 2025 +0800 添加最优路径算法实现 diff --git a/src/__pycache__/ese_msjs_algorithm.cpython-311.pyc b/src/__pycache__/ese_msjs_algorithm.cpython-311.pyc new file mode 100644 index 0000000..03d9dca Binary files /dev/null and b/src/__pycache__/ese_msjs_algorithm.cpython-311.pyc differ diff --git a/src/__pycache__/ese_msjs_algorithm.cpython-312.pyc b/src/__pycache__/ese_msjs_algorithm.cpython-312.pyc new file mode 100644 index 0000000..1a02e22 Binary files /dev/null and b/src/__pycache__/ese_msjs_algorithm.cpython-312.pyc differ diff --git a/src/__pycache__/simplified_uav_planning.cpython-311.pyc b/src/__pycache__/simplified_uav_planning.cpython-311.pyc new file mode 100644 index 0000000..687201e Binary files /dev/null and b/src/__pycache__/simplified_uav_planning.cpython-311.pyc differ diff --git a/src/__pycache__/simplified_uav_planning.cpython-312.pyc b/src/__pycache__/simplified_uav_planning.cpython-312.pyc new file mode 100644 index 0000000..2ff8bb6 Binary files /dev/null and b/src/__pycache__/simplified_uav_planning.cpython-312.pyc differ diff --git a/src/ese_msjs_algorithm.py b/src/ese_msjs_algorithm.py new file mode 100644 index 0000000..18326df --- /dev/null +++ b/src/ese_msjs_algorithm.py @@ -0,0 +1,383 @@ +import numpy as np +import random +from typing import Callable, List, Tuple, Dict, Any +from abc import ABC, abstractmethod +import copy + + +class ConstraintHandler: + """约束处理器类,实现动态α-level比较策略""" + + def __init__(self, epsilon: float = 8.0): + self.epsilon = epsilon + + def calculate_satisfaction_level(self, constraints: Dict[str, float]) -> float: + """计算约束满足度 μ(Pi)""" + if not constraints: + return 1.0 + + min_satisfaction = 1.0 + for constraint_name, violation in constraints.items(): + if violation <= 0: # 满足约束 + satisfaction = 1.0 + else: # 违反约束 + # 使用分段线性函数 + a_m = 1.0 # 可以根据具体问题调整 + satisfaction = max(0.0, 1.0 - violation / a_m) + + min_satisfaction = min(min_satisfaction, satisfaction) + + return min_satisfaction + + def dynamic_alpha(self, t: int, t_max: int) -> float: + """动态α值计算""" + if t < t_max / 2: + return 1.0 / (1.0 + np.exp(4.0 - self.epsilon * t / t_max)) + else: + return 1.0 + + def alpha_level_comparison(self, sol1: Tuple[float, float], sol2: Tuple[float, float], + alpha: float) -> bool: + """α-level比较策略 + 返回True如果sol1优于sol2 + sol1, sol2格式: (fitness, satisfaction_level) + """ + f1, mu1 = sol1 + f2, mu2 = sol2 + + if mu1 >= alpha and mu2 >= alpha: + return f1 <= f2 # 都可行,比较目标函数 + elif mu1 == mu2: + return f1 <= f2 # 满足度相同,比较目标函数 + else: + return mu1 >= mu2 # 否则比较满足度 + + +class ESE_MSJS: + """进化状态估计的多策略水母搜索算法""" + + def __init__(self, + pop_size: int = 30, + max_iter: int = 50, + dim: int = 10, + lb: float = 0.0, + ub: float = 300.0, + epsilon: float = 4.0, + p_neighborhood: int = 2, + transition_prob: float = 0.5): + """ + 初始化ESE-MSJS算法 + + Args: + pop_size: 种群大小 + max_iter: 最大迭代次数 + dim: 问题维度 + lb: 下界 + ub: 上界 + epsilon: 混沌映射参数 + p_neighborhood: 邻域半径 + transition_prob: 转换概率 + """ + self.pop_size = pop_size + self.max_iter = max_iter + self.dim = dim + self.lb = lb if isinstance(lb, (list, np.ndarray)) else [lb] * dim + self.ub = ub if isinstance(ub, (list, np.ndarray)) else [ub] * dim + self.epsilon = epsilon + self.p_neighborhood = p_neighborhood + self.transition_prob = transition_prob + + # 种群和相关属性 + self.population = None + self.fitness = None + self.constraints = None + self.satisfaction_levels = None + self.best_solution = None + self.best_fitness = float('inf') + self.best_constraints = None + + # 约束处理器 + self.constraint_handler = ConstraintHandler() + + # 历史记录 + self.fitness_history = [] + self.best_fitness_history = [] + + def logistic_chaotic_map(self, x: float) -> float: + """逻辑混沌映射""" + return self.epsilon * x * (1 - x) + + def initialize_population(self): + """使用混沌映射初始化种群""" + self.population = np.zeros((self.pop_size, self.dim)) + + # 生成混沌序列 + x0 = random.uniform(0, 1) # 避免不动点 + while x0 in [0, 0.25, 0.5, 0.75, 1.0]: + x0 = random.uniform(0, 1) + + for i in range(self.pop_size): + for j in range(self.dim): + x0 = self.logistic_chaotic_map(x0) + self.population[i, j] = self.lb[j] + x0 * (self.ub[j] - self.lb[j]) + + def calculate_evolutionary_factor(self, jellyfish_idx: int, all_distances: list[np.floating],d_max: np.floating,d_min: np.floating) -> float: + """计算进化因子""" + if d_max == d_min: + return 0.1 + eF = (all_distances[jellyfish_idx] - d_min) / (d_max - d_min) + return max(0.0, min(1.0, eF)) + + def determine_evolutionary_state(self, eF: float) -> str: + """确定进化状态""" + if 2 / 3 < eF <= 1: + return "exploration" + elif 1 / 3 < eF <= 2 / 3: + return "transition" + else: + return "exploitation" + + def get_neighborhood_indices(self, idx: int) -> List[int]: + """获取环形拓扑邻域索引""" + neighborhood = [] + for i in range(-self.p_neighborhood, self.p_neighborhood + 1): + neighbor_idx = (idx + i) % self.pop_size + neighborhood.append(neighbor_idx) + return neighborhood + + def neighborhood_elite_learning(self, idx: int, rank_mapping: np.ndarray) -> np.ndarray: + """基于邻域拓扑的精英示例学习策略""" + neighborhood_indices = self.get_neighborhood_indices(idx) + + # 使用全局排序信息直接找到邻域最优个体(性能优化) + # TODO:完善约束与目标函数后进一步检查这部分 + best_neighbor_idx = min(neighborhood_indices, key=lambda x: rank_mapping[x]) + + # 计算邻域均值 + neighborhood_mean = np.mean([self.population[i] for i in neighborhood_indices], axis=0) + + # 加权均值计算 + eta = round(random.random()) # 0或1 + chosen_idx = random.choice([i for i in neighborhood_indices if i != idx]) + weighted_mean = (neighborhood_mean + + (eta * self.population[idx] + (1 - eta) * self.population[chosen_idx]) ) / 2 + + # 更新位置 - 进一步增大探索步长 + r = random.uniform(0.3, 2.5) # 进一步增大步长范围 + tau = 0.1 # 进一步减小分布系数,大幅增加移动幅度 + new_position = (self.population[idx] + + r * (self.population[best_neighbor_idx] - tau * r * weighted_mean)) + + return new_position + + def adaptive_scaling_factor_strategy(self, idx: int, fitness_ranks: np.ndarray, + rank_mapping: np.ndarray, best_idx: int) -> np.ndarray: + """最佳信息引导的自适应缩放因子策略""" + # 找到引导个体 + guide_idx = (idx + random.randint(1, self.pop_size - 1)) % self.pop_size + + # 使用预计算的排名信息 + # TODO:完善约束与目标函数后进一步检查这部分 + rank_b = rank_mapping[guide_idx] # O(1) + rank_l = rank_mapping[idx] # O(1) + + # 计算缩放因子 - 进一步增大移动幅度 + if rank_b < rank_l: # guide_idx排名更好 + kappa = 1.2 * (1 + (self.pop_size - rank_b) / self.pop_size) # 进一步增大系数 + else: + kappa = -np.random.normal(1.2, 0.5) # 进一步增大随机移动幅度 + + # 更新位置 - 进一步增加探索步长 + step_size = random.uniform(0.5, 2.5) # 进一步增加步长变化范围 + new_position = (self.population[best_idx] + + step_size * kappa * (self.population[guide_idx] - self.population[idx])) + + return new_position + + def gaussian_barebone_mechanism(self, idx: int, fitness_ranks: np.ndarray) -> np.ndarray: + """高斯裸骨机制""" + # 使用预计算的排名信息 + # TODO:完善约束与目标函数后进一步检查这部分 + rank_l = np.where(fitness_ranks == idx)[0][0] # 这里仍需要查找,但没有重新排序 + + # 计算概率系数 + PC_l = np.exp(-(rank_l)) + + if random.random() < PC_l: + # 高斯函数更新 - 增大标准差 + best_idx = fitness_ranks[0] + mean = (self.population[best_idx] + self.population[idx]) / 2 + std = np.abs(self.population[best_idx] - self.population[idx]) + std = np.maximum(std, 0.2 * (np.array(self.ub) - np.array(self.lb))) # 进一步增大最小标准差 + new_position = np.random.normal(mean, std) + else: + # DE变异策略 - 进一步增大变异步长 + indices = set() + while len(indices) < 3: + rand_idx = random.randint(0, self.pop_size - 1) + if rand_idx != idx: + indices.add(rand_idx) + i1, i2, i3 = list(indices) + mutation_factor = random.uniform(0.8, 3.0) # 进一步增大变异因子 + new_position = (self.population[i1] + + mutation_factor * (self.population[i2] - self.population[i3])) + + return new_position + + def jellyfish_dist(self)->list[np.floating]: + + distances = [] + all_distances = [] + for i in range(self.pop_size): + for j in range(self.pop_size): + if i!=j: + dist = np.sqrt(np.sum((self.population[i] - self.population[j]) ** 2)) + distances.append(dist) + dist_i=np.mean(distances) + all_distances.append(dist_i) + return all_distances + + def update_jellyfish(self, idx: int, iteration: int, fitness_ranks: np.ndarray, + rank_mapping: np.ndarray, best_idx: int) -> np.ndarray: + """根据进化状态更新水母位置""" + all_distances = self.jellyfish_dist() + d_min = min(all_distances) + d_max = max(all_distances) + eF = self.calculate_evolutionary_factor(idx,all_distances,d_max,d_min) + state = self.determine_evolutionary_state(eF) + + if state == "exploration": + new_position = self.neighborhood_elite_learning(idx, rank_mapping) + elif state == "exploitation": + new_position = self.adaptive_scaling_factor_strategy(idx, fitness_ranks, rank_mapping, best_idx) + else: # transition + new_position = self.gaussian_barebone_mechanism(idx, fitness_ranks) + + # 改进的边界处理:反弹策略而非直接裁剪 + new_position = self.boundary_handling(new_position) + + return new_position + + def boundary_handling(self, position: np.ndarray) -> np.ndarray: + """改进的边界处理策略:反弹+随机扰动""" + new_position = position.copy() + + for i in range(len(position)): + lb_i = self.lb[i] + ub_i = self.ub[i] + + if position[i] < lb_i: + # 反弹策略:超出下界时反弹到合理范围内 + overshoot = abs(position[i] - lb_i) + range_size = ub_i - lb_i + new_position[i] = lb_i + overshoot % range_size + elif position[i] > ub_i: + # 反弹策略:超出上界时反弹到合理范围内 + overshoot = abs(position[i] - ub_i) + range_size = ub_i - lb_i + new_position[i] = ub_i - overshoot % range_size + + # 添加更大的随机扰动以配合增大的移动步长 + if np.random.random() < 0.25: # 增加扰动概率到25% + for i in range(len(new_position)): + range_size = self.ub[i] - self.lb[i] + perturbation = np.random.normal(0, 0.05) * range_size # 增大扰动幅度 + new_position[i] += perturbation + + # 确保扰动后仍在边界内 + new_position[i] = max(self.lb[i], min(self.ub[i], new_position[i])) + + return new_position + + def evaluate_population(self, objective_func: Callable, constraint_func: Callable = None): + """评估种群""" + self.fitness = np.zeros(self.pop_size) + self.constraints = [] + self.satisfaction_levels = np.zeros(self.pop_size) + + for i in range(self.pop_size): + # 计算目标函数值 + + self.fitness[i] = objective_func(self.population[i]) + + # 计算约束 + if constraint_func: + constraints = constraint_func(self.population[i]) + self.constraints.append(constraints) + self.satisfaction_levels[i] = self.constraint_handler.calculate_satisfaction_level(constraints) + else: + self.constraints.append({}) + self.satisfaction_levels[i] = 1.0 + + def update_best_solution(self, iteration: int): + """使用α-level比较更新最优解""" + alpha = self.constraint_handler.dynamic_alpha(iteration, self.max_iter) + + for i in range(self.pop_size): + current_sol = (self.fitness[i], self.satisfaction_levels[i]) + best_sol = (self.best_fitness, 1.0 if self.best_constraints is None + else self.constraint_handler.calculate_satisfaction_level(self.best_constraints)) + + if self.constraint_handler.alpha_level_comparison(current_sol, best_sol, alpha): + self.best_solution = self.population[i].copy() + self.best_fitness = self.fitness[i] + self.best_constraints = self.constraints[i].copy() if self.constraints[i] else None + + def optimize(self, objective_func: Callable, constraint_func: Callable = None) -> Tuple[np.ndarray, float]: + """主优化循环""" + # 初始化 + self.initialize_population() + self.evaluate_population(objective_func, constraint_func) + + # 初始化最优解 + best_idx = np.argmin(self.fitness) + self.best_solution = self.population[best_idx].copy() + self.best_fitness = self.fitness[best_idx] + self.best_constraints = self.constraints[best_idx].copy() if self.constraints else None + + # 主循环 + for iteration in range(self.max_iter): + new_population = np.zeros_like(self.population) + + # 预计算排序信息(性能优化) + fitness_ranks = np.argsort(self.fitness) + rank_mapping = np.empty(self.pop_size, dtype=int) + rank_mapping[fitness_ranks] = np.arange(self.pop_size) + best_idx = fitness_ranks[0] + + # 更新每个水母 + for i in range(self.pop_size): + new_position = self.update_jellyfish(i, iteration, fitness_ranks, rank_mapping, best_idx) + + # 评估新位置 + new_fitness = objective_func(new_position) + new_constraints = constraint_func(new_position) if constraint_func else {} + new_satisfaction = self.constraint_handler.calculate_satisfaction_level(new_constraints) + + # α-level比较决定是否接受新位置 + alpha = self.constraint_handler.dynamic_alpha(iteration, self.max_iter) + current_sol = (self.fitness[i], self.satisfaction_levels[i]) + new_sol = (new_fitness, new_satisfaction) + + if self.constraint_handler.alpha_level_comparison(new_sol, current_sol, alpha): + new_population[i] = new_position + self.fitness[i] = new_fitness + self.constraints[i] = new_constraints + self.satisfaction_levels[i] = new_satisfaction + else: + new_population[i] = self.population[i] + + self.population = new_population + + # 更新最优解 + self.update_best_solution(iteration) + + # 记录历史 + self.fitness_history.append(self.fitness.copy()) + self.best_fitness_history.append(self.best_fitness) + + # 打印进度(可选) + if iteration % 50 == 0: + print(f"Iteration {iteration}: Best fitness = {self.best_fitness:.6f}") + + return self.best_solution, self.best_fitness \ No newline at end of file diff --git a/src/main_simplified.py b/src/main_simplified.py new file mode 100644 index 0000000..da566db --- /dev/null +++ b/src/main_simplified.py @@ -0,0 +1,461 @@ + + + +import numpy as np +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D +import time +import os + +# 导入简化模块 +from ese_msjs_algorithm import ESE_MSJS +from simplified_uav_planning import ( + SimplifiedUAVConfig, SimplifiedUAVEnvironment, SimplifiedUAVPlanner, + TerrainData, NoFlyZone3D +) + + +def create_test_scenario(): + """创建测试场景""" + # 创建配置 + config = SimplifiedUAVConfig( + num_uavs=2, + num_waypoints=8, + x_min=0.0, x_max=1000.0, + y_min=0.0, y_max=800.0, + z_min=50.0, z_max=400.0, + max_flight_distance=2500.0, # 增加最大飞行距离限制 + ground_clearance=30.0, # 地面安全间隙 + uav_speed=25.0, # 飞行速度 + collision_safe_distance=60.0 # 无人机间安全距离 + ) + + # 创建环境 + environment = SimplifiedUAVEnvironment(config) + + # 设置地形数据 + terrain = TerrainData( + x_range=(config.x_min, config.x_max), + y_range=(config.y_min, config.y_max), + resolution=20.0 + ) + environment.set_terrain_data(terrain) + + # 添加三维禁飞区 - 调整位置使其更容易规避 + nfz_zones = [ + NoFlyZone3D( + center=(300, 300, 150), + size=(80, 80, 100), # 减小长方体禁飞区 + shape="box" + ), + NoFlyZone3D( + center=(600, 400, 200), + size=(60, 60, 60), # 减小球形禁飞区 + shape="sphere" + ) + ] + + for nfz in nfz_zones: + environment.add_no_fly_zone(nfz) + + # 设置起始点和目标点 + start_points = [ + (50, 100, 100), # UAV1起始点 + (50, 700, 100), # UAV2起始点 + ] + + target_points = [ + (950, 150, 100), # UAV1目标点 + (950, 650, 100), # UAV2目标点 + ] + + environment.set_start_target_points(start_points, target_points) + + return config, environment + + +def evaluate_solution_quality(paths, constraints, planner): + """评估解的质量""" + print(f"\n=== 解质量评估 ===") + + # 约束满足情况 + print(f"约束满足情况:") + feasible_constraints = 0 + total_constraints = len(constraints) + + for constraint_name, violation in constraints.items(): + status = "满足" if violation <= 1e-6 else f"违反 (程度: {violation:.3f})" + if violation <= 1e-6: + feasible_constraints += 1 + print(f" {constraint_name}: {status}") + + feasibility_rate = feasible_constraints / total_constraints if total_constraints > 0 else 1.0 + print(f"约束满足率: {feasible_constraints}/{total_constraints} ({feasibility_rate*100:.1f}%)") + + # 路径统计 + stats = planner.get_path_statistics(paths) + print(f"\n路径统计:") + print(f" 总飞行距离: {stats['total_distance']:.1f}m") + print(f" 平均飞行距离: {stats['avg_distance']:.1f}m") + print(f" 最大飞行距离: {stats['max_distance']:.1f}m") + print(f" 最小飞行距离: {stats['min_distance']:.1f}m") + print(f" 最大飞行时间: {stats['max_flight_time']:.1f}s") + + # 详细路径信息 + print(f"\n各UAV路径详情:") + for i, path in enumerate(paths): + distance = planner.calculate_path_distance(path) + flight_time = planner.calculate_flight_time(path) + direct_distance = np.linalg.norm(path[-1] - path[0]) + efficiency = direct_distance / distance if distance > 0 else 0 + + print(f" UAV{i+1}: 距离={distance:.1f}m, 时间={flight_time:.1f}s, " + f"直线距离={direct_distance:.1f}m, 效率={efficiency:.3f}") + + # 输出完整航线数组 + print(f"\n完整航线数组:") + for i, path in enumerate(paths): + print(f" UAV{i+1} 航线 (起点->航路点->终点):") + print(f" 起点: [{path[0, 0]:.1f}, {path[0, 1]:.1f}, {path[0, 2]:.1f}]") + + print(f" 航路点:") + for j in range(1, len(path) - 1): # 排除起点和终点 + print(f" 点{j}: [{path[j, 0]:.1f}, {path[j, 1]:.1f}, {path[j, 2]:.1f}]") + + print(f" 终点: [{path[-1, 0]:.1f}, {path[-1, 1]:.1f}, {path[-1, 2]:.1f}]") + + # 完整数组形式 + waypoints_array = [] + for j in range(len(path)): + waypoints_array.append([round(path[j, 0], 1), round(path[j, 1], 1), round(path[j, 2], 1)]) + print(f" 完整数组: {waypoints_array}") + print() + + return feasibility_rate, stats + + +def test_simplified_planning(): + """测试简化的路径规划""" + print("=== 简化版无人机路径规划测试 ===") + + try: + # 创建测试场景 + config, environment = create_test_scenario() + + # 创建规划器 + planner = SimplifiedUAVPlanner(config, environment) + + print(f"问题维度: {planner.total_dim}") + print(f"禁飞区数量: {len(environment.no_fly_zones)}") + print(f"最大飞行距离限制: {config.max_flight_distance}m") + print(f"碰撞安全距离: {config.collision_safe_distance}m") + + # 创建ESE-MSJS算法实例 + algorithm = ESE_MSJS( + pop_size=30, + max_iter=2000, + dim=planner.total_dim, + lb=planner.lb, + ub=planner.ub + ) + + # 运行路径规划 + print(f"\n开始路径规划...") + start_time = time.time() + + best_solution, best_fitness = algorithm.optimize( + planner.objective_function, + planner.constraint_function + ) + + end_time = time.time() + + # 解析结果 + best_paths = planner.decode_solution(best_solution) + final_constraints = planner.constraint_function(best_solution) + + print(f"\n路径规划完成!") + print(f"最优目标函数值(总飞行距离): {best_fitness:.2f}m") + print(f"运行时间: {end_time - start_time:.2f}秒") + + # 评估解质量 + feasibility_rate, stats = evaluate_solution_quality(best_paths, final_constraints, planner) + + return best_paths, algorithm.best_fitness_history, environment, feasibility_rate >= 1.0 + + except Exception as e: + print(f"路径规划过程中出现错误: {e}") + import traceback + traceback.print_exc() + return None, None, None, False + + +def visualize_3d_results(paths, environment, fitness_history=None, save_fig=True): + """三维可视化结果""" + if paths is None: + print("没有可视化的路径数据") + return + + try: + fig = plt.figure(figsize=(20, 6)) + + # 3D路径图 + ax1 = fig.add_subplot(131, projection='3d') + colors = ['red', 'blue', 'green', 'orange', 'purple'] + + for i, path in enumerate(paths): + color = colors[i % len(colors)] + ax1.plot(path[:, 0], path[:, 1], path[:, 2], + color=color, linewidth=3, marker='o', markersize=5, + label=f'UAV{i + 1}') + + # 起始点和目标点 + ax1.scatter(path[0, 0], path[0, 1], path[0, 2], + color=color, s=200, marker='^', + edgecolors='black', linewidth=2) + ax1.scatter(path[-1, 0], path[-1, 1], path[-1, 2], + color=color, s=200, marker='s', + edgecolors='black', linewidth=2) + + # 绘制地形数据 + if environment.terrain_data is not None: + terrain = environment.terrain_data + # 创建地形网格 + x_terrain = np.linspace(terrain.x_min, terrain.x_max, terrain.height_map.shape[1]) + y_terrain = np.linspace(terrain.y_min, terrain.y_max, terrain.height_map.shape[0]) + X_terrain, Y_terrain = np.meshgrid(x_terrain, y_terrain) + + # 绘制地形表面(使用半透明的棕色) + ax1.plot_surface(X_terrain, Y_terrain, terrain.height_map, + alpha=0.3, color='brown', linewidth=0, antialiased=True) + + # 绘制禁飞区 + for nfz in environment.no_fly_zones: + cx, cy, cz = nfz.center + if nfz.shape == "box": + w, l, h = nfz.size + # 绘制立方体框架 + from mpl_toolkits.mplot3d.art3d import Poly3DCollection + + # 定义立方体的8个顶点 + vertices = [ + [cx-w/2, cy-l/2, cz-h/2], [cx+w/2, cy-l/2, cz-h/2], + [cx+w/2, cy+l/2, cz-h/2], [cx-w/2, cy+l/2, cz-h/2], + [cx-w/2, cy-l/2, cz+h/2], [cx+w/2, cy-l/2, cz+h/2], + [cx+w/2, cy+l/2, cz+h/2], [cx-w/2, cy+l/2, cz+h/2] + ] + + # 定义立方体的6个面 + faces = [ + [vertices[0], vertices[1], vertices[2], vertices[3]], # 底面 + [vertices[4], vertices[5], vertices[6], vertices[7]], # 顶面 + [vertices[0], vertices[1], vertices[5], vertices[4]], # 前面 + [vertices[2], vertices[3], vertices[7], vertices[6]], # 后面 + [vertices[1], vertices[2], vertices[6], vertices[5]], # 右面 + [vertices[4], vertices[7], vertices[3], vertices[0]] # 左面 + ] + + ax1.add_collection3d(Poly3DCollection(faces, alpha=0.3, + facecolor='red', edgecolor='darkred')) + + elif nfz.shape == "sphere": + # 绘制球体 + u, v = np.mgrid[0:2*np.pi:20j, 0:np.pi:20j] + radius = nfz.size[0] + x = cx + radius * np.cos(u) * np.sin(v) + y = cy + radius * np.sin(u) * np.sin(v) + z = cz + radius * np.cos(v) + ax1.plot_surface(x, y, z, alpha=0.3, color='red') + + ax1.set_xlabel('X (m)') + ax1.set_ylabel('Y (m)') + ax1.set_zlabel('Z (m)') + ax1.set_title('3D Flight Paths with Terrain & No-Fly Zones') + ax1.legend() + + # 俯视图 + ax2 = fig.add_subplot(132) + + # 绘制地形等高线(俯视图) + if environment.terrain_data is not None: + terrain = environment.terrain_data + x_terrain = np.linspace(terrain.x_min, terrain.x_max, terrain.height_map.shape[1]) + y_terrain = np.linspace(terrain.y_min, terrain.y_max, terrain.height_map.shape[0]) + X_terrain, Y_terrain = np.meshgrid(x_terrain, y_terrain) + + # 绘制等高线 + contours = ax2.contour(X_terrain, Y_terrain, terrain.height_map, + levels=8, colors='gray', alpha=0.4, linewidths=0.8) + ax2.clabel(contours, inline=True, fontsize=8, fmt='%dm') + + for i, path in enumerate(paths): + color = colors[i % len(colors)] + ax2.plot(path[:, 0], path[:, 1], color=color, linewidth=2, + marker='o', markersize=4, label=f'UAV{i + 1}') + ax2.scatter(path[0, 0], path[0, 1], color=color, s=100, marker='^') + ax2.scatter(path[-1, 0], path[-1, 1], color=color, s=100, marker='s') + + # 绘制禁飞区投影 + for nfz in environment.no_fly_zones: + cx, cy, cz = nfz.center + if nfz.shape == "box": + w, l, h = nfz.size + rect = plt.Rectangle((cx-w/2, cy-l/2), w, l, + fill=False, color='red', linestyle='--', linewidth=2) + ax2.add_patch(rect) + elif nfz.shape == "sphere": + radius = nfz.size[0] + circle = plt.Circle((cx, cy), radius, + fill=False, color='red', linestyle='--', linewidth=2) + ax2.add_patch(circle) + + ax2.set_xlabel('X (m)') + ax2.set_ylabel('Y (m)') + ax2.set_title('Top View (with Terrain Contours)') + ax2.set_aspect('equal') + ax2.grid(True, alpha=0.3) + ax2.legend() + + # 收敛曲线 + if fitness_history: + ax3 = fig.add_subplot(133) + ax3.plot(fitness_history, 'b-', linewidth=2) + ax3.set_xlabel('Iteration') + ax3.set_ylabel('Best Fitness (Total Distance)') + ax3.set_title('Convergence Curve') + ax3.grid(True, alpha=0.3) + + plt.tight_layout() + + if save_fig: + os.makedirs('results', exist_ok=True) + filename = f'results/simplified_uav_planning_{time.strftime("%Y%m%d_%H%M%S")}.png' + plt.savefig(filename, dpi=300, bbox_inches='tight') + print(f"图表已保存为: {filename}") + + plt.show() + + except Exception as e: + print(f"可视化过程中出现错误: {e}") + + +def run_performance_test(num_runs=5): + """运行性能测试""" + print(f"=== 运行 {num_runs} 次性能测试 ===") + + results = [] + for run in range(num_runs): + print(f"\n--- 第 {run + 1} 次运行 ---") + np.random.seed(42 + run) + + paths, fitness_history, environment, success = test_simplified_planning() + + if success and paths is not None: + # 创建简化规划器来计算统计信息 + config, _ = create_test_scenario() + planner = SimplifiedUAVPlanner(config, environment) + stats = planner.get_path_statistics(paths) + + results.append({ + 'total_distance': stats['total_distance'], + 'max_flight_time': stats['max_flight_time'], + 'success': True + }) + else: + results.append({ + 'total_distance': float('inf'), + 'max_flight_time': float('inf'), + 'success': False + }) + + # 统计结果 + successful_runs = [r for r in results if r['success']] + success_rate = len(successful_runs) / num_runs + + print(f"\n=== 性能测试结果统计 ===") + print(f"成功率: {len(successful_runs)}/{num_runs} ({success_rate*100:.1f}%)") + + if successful_runs: + distances = [r['total_distance'] for r in successful_runs] + times = [r['max_flight_time'] for r in successful_runs] + + print(f"总飞行距离统计:") + print(f" 最优: {min(distances):.2f}m") + print(f" 平均: {np.mean(distances):.2f}m") + print(f" 标准差: {np.std(distances):.2f}m") + + print(f"最大飞行时间统计:") + print(f" 最短: {min(times):.2f}s") + print(f" 平均: {np.mean(times):.2f}s") + print(f" 标准差: {np.std(times):.2f}s") + + # 可视化最好的结果 + best_idx = np.argmin(distances) + print(f"\n可视化第 {best_idx + 1} 次运行的最优结果") + + # 重新运行最优设置 + np.random.seed(42 + best_idx) + paths, fitness_history, environment, _ = test_simplified_planning() + if paths is not None: + visualize_3d_results(paths, environment, fitness_history) + + +def main(): + """主函数""" + print("简化版ESE-MSJS多无人机路径规划") + print("=" * 50) + + try: + # 随机种子选择 + print("选择随机种子模式:") + print("1. 固定种子 (seed=42, 结果可重复)") + print("2. 随机种子 (每次不同结果)") + + try: + seed_choice = input("请输入选择 (1 或 2,默认为 1): ").strip() + except EOFError: + seed_choice = "1" # 默认选择 + + if seed_choice == "2": + import time + seed = int(time.time() * 1000) % 100000 + np.random.seed(seed) + print(f"使用随机种子: {seed}") + else: + np.random.seed(42) + print(f"使用固定种子: 42") + + print("\n选择运行模式:") + print("1. 单次测试并可视化") + print("2. 性能测试(多次运行)") + + try: + choice = input("请输入选择 (1 或 2,默认为 1): ").strip() + except EOFError: + choice = "1" # 默认选择 + + if choice == "2": + run_performance_test(5) + else: + # 单次测试 + paths, fitness_history, environment, success = test_simplified_planning() + + if success and paths is not None: + print(f"\n开始可视化结果...") + visualize_3d_results(paths, environment, fitness_history) + print(f"\n测试完成!") + else: + print("测试失败,请检查错误信息") + print(f"\n开始可视化结果...") + visualize_3d_results(paths, environment, fitness_history) + + except KeyboardInterrupt: + print("\n用户中断程序") + except Exception as e: + print(f"程序运行出错: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/simplified_uav_planning.py b/src/simplified_uav_planning.py new file mode 100644 index 0000000..7f7ef1a --- /dev/null +++ b/src/simplified_uav_planning.py @@ -0,0 +1,400 @@ +import numpy as np +from typing import Dict, List, Tuple, Optional +from dataclasses import dataclass +import math + + +@dataclass +class SimplifiedUAVConfig: + """简化的无人机路径配置参数""" + num_uavs: int = 3 # 无人机数量 + num_waypoints: int = 10 # 航路点数量 + + # 环境边界 + x_min: float = 0.0 + x_max: float = 1000.0 + y_min: float = 0.0 + y_max: float = 1000.0 + z_min: float = 50.0 + z_max: float = 500.0 + + # 约束参数 + max_flight_distance: float = 1200.0 # 最大飞行距离(米) + ground_clearance: float = 30.0 # 地面安全间隙(米) + uav_speed: float = 30.0 # 无人机速度(m/s) + collision_safe_distance: float = 50.0 # 无人机间安全距离(米) + + +@dataclass +class TerrainData: + """地形数据""" + def __init__(self, x_range: Tuple[float, float], y_range: Tuple[float, float], + resolution: float = 10.0): + self.x_min, self.x_max = x_range + self.y_min, self.y_max = y_range + self.resolution = resolution + + # 创建简单的地形高度图(可以从实际DEM数据加载) + x_size = int((self.x_max - self.x_min) / resolution) + 1 + y_size = int((self.y_max - self.y_min) / resolution) + 1 + + # 简单的随机地形生成(实际应用中应从DEM文件读取) + np.random.seed(42) # 确保可重复 + self.height_map = np.random.uniform(0, 50, (y_size, x_size)) + + def get_height(self, x: float, y: float) -> float: + """获取指定坐标的地面高度""" + # 将坐标转换为网格索引 + i = int((y - self.y_min) / self.resolution) + j = int((x - self.x_min) / self.resolution) + + # 边界检查 + i = max(0, min(i, self.height_map.shape[0] - 1)) + j = max(0, min(j, self.height_map.shape[1] - 1)) + + return self.height_map[i, j] + + +@dataclass +class NoFlyZone3D: + """三维禁飞区定义""" + center: Tuple[float, float, float] # (x, y, z) 中心点 + size: Tuple[float, float, float] # (width, length, height) 尺寸 + shape: str = "box" # 形状:'box' 或 'sphere' + + def is_inside(self, point: np.ndarray) -> bool: + """判断点是否在禁飞区内""" + x, y, z = point + cx, cy, cz = self.center + + if self.shape == "box": + w, l, h = self.size + return (abs(x - cx) <= w/2 and + abs(y - cy) <= l/2 and + abs(z - cz) <= h/2) + elif self.shape == "sphere": + radius = self.size[0] # 使用第一个尺寸作为半径 + distance = np.sqrt((x-cx)**2 + (y-cy)**2 + (z-cz)**2) + return distance <= radius + + return False + + def distance_to_boundary(self, point: np.ndarray) -> float: + """计算点到禁飞区边界的距离(内部为负值)""" + x, y, z = point + cx, cy, cz = self.center + + if self.shape == "box": + w, l, h = self.size + # 计算到各个面的距离 + dx = max(0, abs(x - cx) - w/2) + dy = max(0, abs(y - cy) - l/2) + dz = max(0, abs(z - cz) - h/2) + + if self.is_inside(point): + # 内部点:返回到最近边界的负距离 + inner_dx = w/2 - abs(x - cx) + inner_dy = l/2 - abs(y - cy) + inner_dz = h/2 - abs(z - cz) + return -min(inner_dx, inner_dy, inner_dz) + else: + # 外部点:返回正距离 + return np.sqrt(dx**2 + dy**2 + dz**2) + + elif self.shape == "sphere": + radius = self.size[0] + distance = np.sqrt((x-cx)**2 + (y-cy)**2 + (z-cz)**2) + return distance - radius + + return 0.0 + + +class SimplifiedUAVEnvironment: + """简化的无人机环境""" + + def __init__(self, config: SimplifiedUAVConfig): + self.config = config + self.terrain_data: Optional[TerrainData] = None + self.no_fly_zones: List[NoFlyZone3D] = [] + + # 起始点和目标点 + self.start_points: List[Tuple[float, float, float]] = [] + self.target_points: List[Tuple[float, float, float]] = [] + + def set_terrain_data(self, terrain_data: TerrainData): + """设置地形数据""" + self.terrain_data = terrain_data + + def add_no_fly_zone(self, nfz: NoFlyZone3D): + """添加禁飞区""" + self.no_fly_zones.append(nfz) + + def set_start_target_points(self, starts: List[Tuple[float, float, float]], + targets: List[Tuple[float, float, float]]): + """设置起始点和目标点""" + self.start_points = starts + self.target_points = targets + + +class SimplifiedUAVPlanner: + """简化的无人机路径规划器""" + + def __init__(self, config: SimplifiedUAVConfig, environment: SimplifiedUAVEnvironment): + self.config = config + self.environment = environment + + # 计算编码维度 + self.dim_per_uav = config.num_waypoints * 3 + self.total_dim = self.dim_per_uav * config.num_uavs + + # 边界约束 + self.lb = [] + self.ub = [] + for _ in range(config.num_uavs): + for _ in range(config.num_waypoints): + self.lb.extend([config.x_min, config.y_min, config.z_min]) + self.ub.extend([config.x_max, config.y_max, config.z_max]) + + def decode_solution(self, solution: np.ndarray) -> List[np.ndarray]: + """解码解决方案为UAV路径""" + paths = [] + for i in range(self.config.num_uavs): + start_idx = i * self.dim_per_uav + end_idx = (i + 1) * self.dim_per_uav + uav_genes = solution[start_idx:end_idx] + + # 重构为(num_waypoints, 3)的路径 + path = uav_genes.reshape(self.config.num_waypoints, 3) + + # 添加起始点和目标点 + start_point = np.array(self.environment.start_points[i]) + target_point = np.array(self.environment.target_points[i]) + + full_path = np.vstack([start_point, path, target_point]) + paths.append(full_path) + + return paths + + def objective_function(self, solution: np.ndarray) -> float: + """目标函数:最小化总飞行距离""" + paths = self.decode_solution(solution) + total_distance = 0.0 + + for path in paths: + path_distance = self.calculate_path_distance(path) + total_distance += path_distance + + return total_distance + + def calculate_path_distance(self, path: np.ndarray) -> float: + """计算路径总距离""" + total_distance = 0.0 + for i in range(len(path) - 1): + segment_distance = np.linalg.norm(path[i + 1] - path[i]) + total_distance += segment_distance + return total_distance + + def constraint_function(self, solution: np.ndarray) -> Dict[str, float]: + """约束函数:返回四个约束的违反值""" + paths = self.decode_solution(solution) + constraints = {} + + # 1. 地形约束 + terrain_violations = [] + for path in paths: + violation = self.check_terrain_constraint(path) + terrain_violations.append(violation) + constraints['terrain'] = max(terrain_violations) + + # 2. 禁飞区约束 + nfz_violations = [] + for path in paths: + violation = self.check_no_fly_zone_constraint(path) + nfz_violations.append(violation) + constraints['no_fly_zone'] = max(nfz_violations) + + # 3. 路径长度约束 + distance_violations = [] + for path in paths: + violation = self.check_distance_constraint(path) + distance_violations.append(violation) + constraints['max_distance'] = max(distance_violations) + + # 4. 碰撞约束(新增) + constraints['collision'] = self.check_collision_constraint(paths) + + return constraints + + def check_terrain_constraint(self, path: np.ndarray) -> float: + """检查地形约束 + 返回值:0表示满足约束,>0表示约束违反程度 + """ + max_violation = 0.0 + + if self.environment.terrain_data is None: + # 如果没有地形数据,只检查最低高度 + for point in path: + violation = max(0, self.config.z_min - point[2]) + max_violation = max(max_violation, violation) + return max_violation + + # 检查每个路径点的地形约束 + for point in path: + x, y, z = point + ground_height = self.environment.terrain_data.get_height(x, y) + required_height = ground_height + self.config.ground_clearance + + # 违反约束:实际高度低于要求高度 + violation = max(0, required_height - z) + max_violation = max(max_violation, violation) + + # 还需要检查路径段是否与地形相交 + for i in range(len(path) - 1): + segment_violation = self.check_segment_terrain_collision(path[i], path[i + 1]) + max_violation = max(max_violation, segment_violation) + + return max_violation + + def check_segment_terrain_collision(self, start_point: np.ndarray, + end_point: np.ndarray, num_samples: int = 10) -> float: + """检查路径段是否与地形相撞""" + if self.environment.terrain_data is None: + return 0.0 + + max_violation = 0.0 + + # 在路径段上采样检查 + for i in range(num_samples): + t = i / (num_samples - 1) if num_samples > 1 else 0 + sample_point = start_point + t * (end_point - start_point) + + x, y, z = sample_point + ground_height = self.environment.terrain_data.get_height(x, y) + required_height = ground_height + self.config.ground_clearance + + violation = max(0, required_height - z) + max_violation = max(max_violation, violation) + + return max_violation + + def check_no_fly_zone_constraint(self, path: np.ndarray) -> float: + """检查禁飞区约束""" + max_violation = 0.0 + + # 检查每个路径点 + for point in path: + for nfz in self.environment.no_fly_zones: + if nfz.is_inside(point): + # 点在禁飞区内,违反约束 + distance_to_boundary = abs(nfz.distance_to_boundary(point)) + max_violation = max(max_violation, distance_to_boundary) + + # 检查路径段是否穿过禁飞区 + for i in range(len(path) - 1): + segment_violation = self.check_segment_nfz_collision(path[i], path[i + 1]) + max_violation = max(max_violation, segment_violation) + + return max_violation + + def check_segment_nfz_collision(self, start_point: np.ndarray, + end_point: np.ndarray, num_samples: int = 20) -> float: + """检查路径段是否穿过禁飞区""" + max_violation = 0.0 + + # 在路径段上采样检查 + for i in range(num_samples): + t = i / (num_samples - 1) if num_samples > 1 else 0 + sample_point = start_point + t * (end_point - start_point) + + for nfz in self.environment.no_fly_zones: + if nfz.is_inside(sample_point): + distance_to_boundary = abs(nfz.distance_to_boundary(sample_point)) + max_violation = max(max_violation, distance_to_boundary) + + return max_violation + + def check_distance_constraint(self, path: np.ndarray) -> float: + """检查路径长度约束""" + path_distance = self.calculate_path_distance(path) + + # 违反约束:实际距离超过最大允许距离 + violation = max(0, path_distance - self.config.max_flight_distance) + return violation + + def check_collision_constraint(self, paths: List[np.ndarray]) -> float: + """检查无人机间碰撞约束""" + max_violation = 0.0 + + if len(paths) < 2: + return 0.0 # 只有一架UAV,无碰撞风险 + + # 检查每对无人机 + for i in range(len(paths)): + for j in range(i + 1, len(paths)): + violation = self.check_pairwise_collision(paths[i], paths[j]) + max_violation = max(max_violation, violation) + + return max_violation + + def check_pairwise_collision(self, path1: np.ndarray, path2: np.ndarray) -> float: + """检查两架无人机之间的碰撞约束""" + max_violation = 0.0 + + # 假设两架UAV同步飞行(相同的路径点数) + min_points = min(len(path1), len(path2)) + + # 检查每个对应的路径点 + for i in range(min_points): + distance = np.linalg.norm(path1[i] - path2[i]) + violation = max(0, self.config.collision_safe_distance - distance) + max_violation = max(max_violation, violation) + + # 检查路径段之间的最小距离 + for i in range(min_points - 1): + # 检查线段之间的最小距离 + seg_violation = self.check_segment_collision( + path1[i], path1[i+1], path2[i], path2[i+1] + ) + max_violation = max(max_violation, seg_violation) + + return max_violation + + def check_segment_collision(self, p1_start: np.ndarray, p1_end: np.ndarray, + p2_start: np.ndarray, p2_end: np.ndarray, + num_samples: int = 10) -> float: + """检查两个路径段之间的最小距离""" + max_violation = 0.0 + + # 在两个路径段上采样点 + for i in range(num_samples): + t = i / (num_samples - 1) if num_samples > 1 else 0 + + # 在第一条路径段上的采样点 + sample1 = p1_start + t * (p1_end - p1_start) + + # 在第二条路径段上的采样点 + sample2 = p2_start + t * (p2_end - p2_start) + + # 计算距离 + distance = np.linalg.norm(sample1 - sample2) + violation = max(0, self.config.collision_safe_distance - distance) + max_violation = max(max_violation, violation) + + return max_violation + + def calculate_flight_time(self, path: np.ndarray) -> float: + """计算飞行时间""" + path_distance = self.calculate_path_distance(path) + return path_distance / self.config.uav_speed + + def get_path_statistics(self, paths: List[np.ndarray]) -> Dict: + """获取路径统计信息""" + stats = { + 'total_distance': sum(self.calculate_path_distance(path) for path in paths), + 'max_distance': max(self.calculate_path_distance(path) for path in paths), + 'min_distance': min(self.calculate_path_distance(path) for path in paths), + 'avg_distance': sum(self.calculate_path_distance(path) for path in paths) / len(paths), + 'flight_times': [self.calculate_flight_time(path) for path in paths], + 'max_flight_time': max(self.calculate_flight_time(path) for path in paths), + } + return stats \ No newline at end of file