添加最优路径算法实现

This commit is contained in:
wzh 2025-09-26 12:46:01 +08:00
commit ec1ee13db1
7 changed files with 1244 additions and 0 deletions

Binary file not shown.

Binary file not shown.

383
src/ese_msjs_algorithm.py Normal file
View File

@ -0,0 +1,383 @@
import numpy as np
import random
from typing import Callable, List, Tuple, Dict, Any
from abc import ABC, abstractmethod
import copy
class ConstraintHandler:
"""约束处理器类,实现动态α-level比较策略"""
def __init__(self, epsilon: float = 8.0):
self.epsilon = epsilon
def calculate_satisfaction_level(self, constraints: Dict[str, float]) -> float:
"""计算约束满足度 μ(Pi)"""
if not constraints:
return 1.0
min_satisfaction = 1.0
for constraint_name, violation in constraints.items():
if violation <= 0: # 满足约束
satisfaction = 1.0
else: # 违反约束
# 使用分段线性函数
a_m = 1.0 # 可以根据具体问题调整
satisfaction = max(0.0, 1.0 - violation / a_m)
min_satisfaction = min(min_satisfaction, satisfaction)
return min_satisfaction
def dynamic_alpha(self, t: int, t_max: int) -> float:
"""动态α值计算"""
if t < t_max / 2:
return 1.0 / (1.0 + np.exp(4.0 - self.epsilon * t / t_max))
else:
return 1.0
def alpha_level_comparison(self, sol1: Tuple[float, float], sol2: Tuple[float, float],
alpha: float) -> bool:
"""α-level比较策略
返回True如果sol1优于sol2
sol1, sol2格式: (fitness, satisfaction_level)
"""
f1, mu1 = sol1
f2, mu2 = sol2
if mu1 >= alpha and mu2 >= alpha:
return f1 <= f2 # 都可行,比较目标函数
elif mu1 == mu2:
return f1 <= f2 # 满足度相同,比较目标函数
else:
return mu1 >= mu2 # 否则比较满足度
class ESE_MSJS:
"""进化状态估计的多策略水母搜索算法"""
def __init__(self,
pop_size: int = 30,
max_iter: int = 50,
dim: int = 10,
lb: float = 0.0,
ub: float = 300.0,
epsilon: float = 4.0,
p_neighborhood: int = 2,
transition_prob: float = 0.5):
"""
初始化ESE-MSJS算法
Args:
pop_size: 种群大小
max_iter: 最大迭代次数
dim: 问题维度
lb: 下界
ub: 上界
epsilon: 混沌映射参数
p_neighborhood: 邻域半径
transition_prob: 转换概率
"""
self.pop_size = pop_size
self.max_iter = max_iter
self.dim = dim
self.lb = lb if isinstance(lb, (list, np.ndarray)) else [lb] * dim
self.ub = ub if isinstance(ub, (list, np.ndarray)) else [ub] * dim
self.epsilon = epsilon
self.p_neighborhood = p_neighborhood
self.transition_prob = transition_prob
# 种群和相关属性
self.population = None
self.fitness = None
self.constraints = None
self.satisfaction_levels = None
self.best_solution = None
self.best_fitness = float('inf')
self.best_constraints = None
# 约束处理器
self.constraint_handler = ConstraintHandler()
# 历史记录
self.fitness_history = []
self.best_fitness_history = []
def logistic_chaotic_map(self, x: float) -> float:
"""逻辑混沌映射"""
return self.epsilon * x * (1 - x)
def initialize_population(self):
"""使用混沌映射初始化种群"""
self.population = np.zeros((self.pop_size, self.dim))
# 生成混沌序列
x0 = random.uniform(0, 1) # 避免不动点
while x0 in [0, 0.25, 0.5, 0.75, 1.0]:
x0 = random.uniform(0, 1)
for i in range(self.pop_size):
for j in range(self.dim):
x0 = self.logistic_chaotic_map(x0)
self.population[i, j] = self.lb[j] + x0 * (self.ub[j] - self.lb[j])
def calculate_evolutionary_factor(self, jellyfish_idx: int, all_distances: list[np.floating],d_max: np.floating,d_min: np.floating) -> float:
"""计算进化因子"""
if d_max == d_min:
return 0.1
eF = (all_distances[jellyfish_idx] - d_min) / (d_max - d_min)
return max(0.0, min(1.0, eF))
def determine_evolutionary_state(self, eF: float) -> str:
"""确定进化状态"""
if 2 / 3 < eF <= 1:
return "exploration"
elif 1 / 3 < eF <= 2 / 3:
return "transition"
else:
return "exploitation"
def get_neighborhood_indices(self, idx: int) -> List[int]:
"""获取环形拓扑邻域索引"""
neighborhood = []
for i in range(-self.p_neighborhood, self.p_neighborhood + 1):
neighbor_idx = (idx + i) % self.pop_size
neighborhood.append(neighbor_idx)
return neighborhood
def neighborhood_elite_learning(self, idx: int, rank_mapping: np.ndarray) -> np.ndarray:
"""基于邻域拓扑的精英示例学习策略"""
neighborhood_indices = self.get_neighborhood_indices(idx)
# 使用全局排序信息直接找到邻域最优个体(性能优化)
# TODO完善约束与目标函数后进一步检查这部分
best_neighbor_idx = min(neighborhood_indices, key=lambda x: rank_mapping[x])
# 计算邻域均值
neighborhood_mean = np.mean([self.population[i] for i in neighborhood_indices], axis=0)
# 加权均值计算
eta = round(random.random()) # 0或1
chosen_idx = random.choice([i for i in neighborhood_indices if i != idx])
weighted_mean = (neighborhood_mean +
(eta * self.population[idx] + (1 - eta) * self.population[chosen_idx]) ) / 2
# 更新位置 - 进一步增大探索步长
r = random.uniform(0.3, 2.5) # 进一步增大步长范围
tau = 0.1 # 进一步减小分布系数,大幅增加移动幅度
new_position = (self.population[idx] +
r * (self.population[best_neighbor_idx] - tau * r * weighted_mean))
return new_position
def adaptive_scaling_factor_strategy(self, idx: int, fitness_ranks: np.ndarray,
rank_mapping: np.ndarray, best_idx: int) -> np.ndarray:
"""最佳信息引导的自适应缩放因子策略"""
# 找到引导个体
guide_idx = (idx + random.randint(1, self.pop_size - 1)) % self.pop_size
# 使用预计算的排名信息
# TODO完善约束与目标函数后进一步检查这部分
rank_b = rank_mapping[guide_idx] # O(1)
rank_l = rank_mapping[idx] # O(1)
# 计算缩放因子 - 进一步增大移动幅度
if rank_b < rank_l: # guide_idx排名更好
kappa = 1.2 * (1 + (self.pop_size - rank_b) / self.pop_size) # 进一步增大系数
else:
kappa = -np.random.normal(1.2, 0.5) # 进一步增大随机移动幅度
# 更新位置 - 进一步增加探索步长
step_size = random.uniform(0.5, 2.5) # 进一步增加步长变化范围
new_position = (self.population[best_idx] +
step_size * kappa * (self.population[guide_idx] - self.population[idx]))
return new_position
def gaussian_barebone_mechanism(self, idx: int, fitness_ranks: np.ndarray) -> np.ndarray:
"""高斯裸骨机制"""
# 使用预计算的排名信息
# TODO完善约束与目标函数后进一步检查这部分
rank_l = np.where(fitness_ranks == idx)[0][0] # 这里仍需要查找,但没有重新排序
# 计算概率系数
PC_l = np.exp(-(rank_l))
if random.random() < PC_l:
# 高斯函数更新 - 增大标准差
best_idx = fitness_ranks[0]
mean = (self.population[best_idx] + self.population[idx]) / 2
std = np.abs(self.population[best_idx] - self.population[idx])
std = np.maximum(std, 0.2 * (np.array(self.ub) - np.array(self.lb))) # 进一步增大最小标准差
new_position = np.random.normal(mean, std)
else:
# DE变异策略 - 进一步增大变异步长
indices = set()
while len(indices) < 3:
rand_idx = random.randint(0, self.pop_size - 1)
if rand_idx != idx:
indices.add(rand_idx)
i1, i2, i3 = list(indices)
mutation_factor = random.uniform(0.8, 3.0) # 进一步增大变异因子
new_position = (self.population[i1] +
mutation_factor * (self.population[i2] - self.population[i3]))
return new_position
def jellyfish_dist(self)->list[np.floating]:
distances = []
all_distances = []
for i in range(self.pop_size):
for j in range(self.pop_size):
if i!=j:
dist = np.sqrt(np.sum((self.population[i] - self.population[j]) ** 2))
distances.append(dist)
dist_i=np.mean(distances)
all_distances.append(dist_i)
return all_distances
def update_jellyfish(self, idx: int, iteration: int, fitness_ranks: np.ndarray,
rank_mapping: np.ndarray, best_idx: int) -> np.ndarray:
"""根据进化状态更新水母位置"""
all_distances = self.jellyfish_dist()
d_min = min(all_distances)
d_max = max(all_distances)
eF = self.calculate_evolutionary_factor(idx,all_distances,d_max,d_min)
state = self.determine_evolutionary_state(eF)
if state == "exploration":
new_position = self.neighborhood_elite_learning(idx, rank_mapping)
elif state == "exploitation":
new_position = self.adaptive_scaling_factor_strategy(idx, fitness_ranks, rank_mapping, best_idx)
else: # transition
new_position = self.gaussian_barebone_mechanism(idx, fitness_ranks)
# 改进的边界处理:反弹策略而非直接裁剪
new_position = self.boundary_handling(new_position)
return new_position
def boundary_handling(self, position: np.ndarray) -> np.ndarray:
"""改进的边界处理策略:反弹+随机扰动"""
new_position = position.copy()
for i in range(len(position)):
lb_i = self.lb[i]
ub_i = self.ub[i]
if position[i] < lb_i:
# 反弹策略:超出下界时反弹到合理范围内
overshoot = abs(position[i] - lb_i)
range_size = ub_i - lb_i
new_position[i] = lb_i + overshoot % range_size
elif position[i] > ub_i:
# 反弹策略:超出上界时反弹到合理范围内
overshoot = abs(position[i] - ub_i)
range_size = ub_i - lb_i
new_position[i] = ub_i - overshoot % range_size
# 添加更大的随机扰动以配合增大的移动步长
if np.random.random() < 0.25: # 增加扰动概率到25%
for i in range(len(new_position)):
range_size = self.ub[i] - self.lb[i]
perturbation = np.random.normal(0, 0.05) * range_size # 增大扰动幅度
new_position[i] += perturbation
# 确保扰动后仍在边界内
new_position[i] = max(self.lb[i], min(self.ub[i], new_position[i]))
return new_position
def evaluate_population(self, objective_func: Callable, constraint_func: Callable = None):
"""评估种群"""
self.fitness = np.zeros(self.pop_size)
self.constraints = []
self.satisfaction_levels = np.zeros(self.pop_size)
for i in range(self.pop_size):
# 计算目标函数值
self.fitness[i] = objective_func(self.population[i])
# 计算约束
if constraint_func:
constraints = constraint_func(self.population[i])
self.constraints.append(constraints)
self.satisfaction_levels[i] = self.constraint_handler.calculate_satisfaction_level(constraints)
else:
self.constraints.append({})
self.satisfaction_levels[i] = 1.0
def update_best_solution(self, iteration: int):
"""使用α-level比较更新最优解"""
alpha = self.constraint_handler.dynamic_alpha(iteration, self.max_iter)
for i in range(self.pop_size):
current_sol = (self.fitness[i], self.satisfaction_levels[i])
best_sol = (self.best_fitness, 1.0 if self.best_constraints is None
else self.constraint_handler.calculate_satisfaction_level(self.best_constraints))
if self.constraint_handler.alpha_level_comparison(current_sol, best_sol, alpha):
self.best_solution = self.population[i].copy()
self.best_fitness = self.fitness[i]
self.best_constraints = self.constraints[i].copy() if self.constraints[i] else None
def optimize(self, objective_func: Callable, constraint_func: Callable = None) -> Tuple[np.ndarray, float]:
"""主优化循环"""
# 初始化
self.initialize_population()
self.evaluate_population(objective_func, constraint_func)
# 初始化最优解
best_idx = np.argmin(self.fitness)
self.best_solution = self.population[best_idx].copy()
self.best_fitness = self.fitness[best_idx]
self.best_constraints = self.constraints[best_idx].copy() if self.constraints else None
# 主循环
for iteration in range(self.max_iter):
new_population = np.zeros_like(self.population)
# 预计算排序信息(性能优化)
fitness_ranks = np.argsort(self.fitness)
rank_mapping = np.empty(self.pop_size, dtype=int)
rank_mapping[fitness_ranks] = np.arange(self.pop_size)
best_idx = fitness_ranks[0]
# 更新每个水母
for i in range(self.pop_size):
new_position = self.update_jellyfish(i, iteration, fitness_ranks, rank_mapping, best_idx)
# 评估新位置
new_fitness = objective_func(new_position)
new_constraints = constraint_func(new_position) if constraint_func else {}
new_satisfaction = self.constraint_handler.calculate_satisfaction_level(new_constraints)
# α-level比较决定是否接受新位置
alpha = self.constraint_handler.dynamic_alpha(iteration, self.max_iter)
current_sol = (self.fitness[i], self.satisfaction_levels[i])
new_sol = (new_fitness, new_satisfaction)
if self.constraint_handler.alpha_level_comparison(new_sol, current_sol, alpha):
new_population[i] = new_position
self.fitness[i] = new_fitness
self.constraints[i] = new_constraints
self.satisfaction_levels[i] = new_satisfaction
else:
new_population[i] = self.population[i]
self.population = new_population
# 更新最优解
self.update_best_solution(iteration)
# 记录历史
self.fitness_history.append(self.fitness.copy())
self.best_fitness_history.append(self.best_fitness)
# 打印进度(可选)
if iteration % 50 == 0:
print(f"Iteration {iteration}: Best fitness = {self.best_fitness:.6f}")
return self.best_solution, self.best_fitness

461
src/main_simplified.py Normal file
View File

@ -0,0 +1,461 @@
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import time
import os
# 导入简化模块
from ese_msjs_algorithm import ESE_MSJS
from simplified_uav_planning import (
SimplifiedUAVConfig, SimplifiedUAVEnvironment, SimplifiedUAVPlanner,
TerrainData, NoFlyZone3D
)
def create_test_scenario():
"""创建测试场景"""
# 创建配置
config = SimplifiedUAVConfig(
num_uavs=2,
num_waypoints=8,
x_min=0.0, x_max=1000.0,
y_min=0.0, y_max=800.0,
z_min=50.0, z_max=400.0,
max_flight_distance=2500.0, # 增加最大飞行距离限制
ground_clearance=30.0, # 地面安全间隙
uav_speed=25.0, # 飞行速度
collision_safe_distance=60.0 # 无人机间安全距离
)
# 创建环境
environment = SimplifiedUAVEnvironment(config)
# 设置地形数据
terrain = TerrainData(
x_range=(config.x_min, config.x_max),
y_range=(config.y_min, config.y_max),
resolution=20.0
)
environment.set_terrain_data(terrain)
# 添加三维禁飞区 - 调整位置使其更容易规避
nfz_zones = [
NoFlyZone3D(
center=(300, 300, 150),
size=(80, 80, 100), # 减小长方体禁飞区
shape="box"
),
NoFlyZone3D(
center=(600, 400, 200),
size=(60, 60, 60), # 减小球形禁飞区
shape="sphere"
)
]
for nfz in nfz_zones:
environment.add_no_fly_zone(nfz)
# 设置起始点和目标点
start_points = [
(50, 100, 100), # UAV1起始点
(50, 700, 100), # UAV2起始点
]
target_points = [
(950, 150, 100), # UAV1目标点
(950, 650, 100), # UAV2目标点
]
environment.set_start_target_points(start_points, target_points)
return config, environment
def evaluate_solution_quality(paths, constraints, planner):
"""评估解的质量"""
print(f"\n=== 解质量评估 ===")
# 约束满足情况
print(f"约束满足情况:")
feasible_constraints = 0
total_constraints = len(constraints)
for constraint_name, violation in constraints.items():
status = "满足" if violation <= 1e-6 else f"违反 (程度: {violation:.3f})"
if violation <= 1e-6:
feasible_constraints += 1
print(f" {constraint_name}: {status}")
feasibility_rate = feasible_constraints / total_constraints if total_constraints > 0 else 1.0
print(f"约束满足率: {feasible_constraints}/{total_constraints} ({feasibility_rate*100:.1f}%)")
# 路径统计
stats = planner.get_path_statistics(paths)
print(f"\n路径统计:")
print(f" 总飞行距离: {stats['total_distance']:.1f}m")
print(f" 平均飞行距离: {stats['avg_distance']:.1f}m")
print(f" 最大飞行距离: {stats['max_distance']:.1f}m")
print(f" 最小飞行距离: {stats['min_distance']:.1f}m")
print(f" 最大飞行时间: {stats['max_flight_time']:.1f}s")
# 详细路径信息
print(f"\n各UAV路径详情:")
for i, path in enumerate(paths):
distance = planner.calculate_path_distance(path)
flight_time = planner.calculate_flight_time(path)
direct_distance = np.linalg.norm(path[-1] - path[0])
efficiency = direct_distance / distance if distance > 0 else 0
print(f" UAV{i+1}: 距离={distance:.1f}m, 时间={flight_time:.1f}s, "
f"直线距离={direct_distance:.1f}m, 效率={efficiency:.3f}")
# 输出完整航线数组
print(f"\n完整航线数组:")
for i, path in enumerate(paths):
print(f" UAV{i+1} 航线 (起点->航路点->终点):")
print(f" 起点: [{path[0, 0]:.1f}, {path[0, 1]:.1f}, {path[0, 2]:.1f}]")
print(f" 航路点:")
for j in range(1, len(path) - 1): # 排除起点和终点
print(f"{j}: [{path[j, 0]:.1f}, {path[j, 1]:.1f}, {path[j, 2]:.1f}]")
print(f" 终点: [{path[-1, 0]:.1f}, {path[-1, 1]:.1f}, {path[-1, 2]:.1f}]")
# 完整数组形式
waypoints_array = []
for j in range(len(path)):
waypoints_array.append([round(path[j, 0], 1), round(path[j, 1], 1), round(path[j, 2], 1)])
print(f" 完整数组: {waypoints_array}")
print()
return feasibility_rate, stats
def test_simplified_planning():
"""测试简化的路径规划"""
print("=== 简化版无人机路径规划测试 ===")
try:
# 创建测试场景
config, environment = create_test_scenario()
# 创建规划器
planner = SimplifiedUAVPlanner(config, environment)
print(f"问题维度: {planner.total_dim}")
print(f"禁飞区数量: {len(environment.no_fly_zones)}")
print(f"最大飞行距离限制: {config.max_flight_distance}m")
print(f"碰撞安全距离: {config.collision_safe_distance}m")
# 创建ESE-MSJS算法实例
algorithm = ESE_MSJS(
pop_size=30,
max_iter=2000,
dim=planner.total_dim,
lb=planner.lb,
ub=planner.ub
)
# 运行路径规划
print(f"\n开始路径规划...")
start_time = time.time()
best_solution, best_fitness = algorithm.optimize(
planner.objective_function,
planner.constraint_function
)
end_time = time.time()
# 解析结果
best_paths = planner.decode_solution(best_solution)
final_constraints = planner.constraint_function(best_solution)
print(f"\n路径规划完成!")
print(f"最优目标函数值(总飞行距离): {best_fitness:.2f}m")
print(f"运行时间: {end_time - start_time:.2f}")
# 评估解质量
feasibility_rate, stats = evaluate_solution_quality(best_paths, final_constraints, planner)
return best_paths, algorithm.best_fitness_history, environment, feasibility_rate >= 1.0
except Exception as e:
print(f"路径规划过程中出现错误: {e}")
import traceback
traceback.print_exc()
return None, None, None, False
def visualize_3d_results(paths, environment, fitness_history=None, save_fig=True):
"""三维可视化结果"""
if paths is None:
print("没有可视化的路径数据")
return
try:
fig = plt.figure(figsize=(20, 6))
# 3D路径图
ax1 = fig.add_subplot(131, projection='3d')
colors = ['red', 'blue', 'green', 'orange', 'purple']
for i, path in enumerate(paths):
color = colors[i % len(colors)]
ax1.plot(path[:, 0], path[:, 1], path[:, 2],
color=color, linewidth=3, marker='o', markersize=5,
label=f'UAV{i + 1}')
# 起始点和目标点
ax1.scatter(path[0, 0], path[0, 1], path[0, 2],
color=color, s=200, marker='^',
edgecolors='black', linewidth=2)
ax1.scatter(path[-1, 0], path[-1, 1], path[-1, 2],
color=color, s=200, marker='s',
edgecolors='black', linewidth=2)
# 绘制地形数据
if environment.terrain_data is not None:
terrain = environment.terrain_data
# 创建地形网格
x_terrain = np.linspace(terrain.x_min, terrain.x_max, terrain.height_map.shape[1])
y_terrain = np.linspace(terrain.y_min, terrain.y_max, terrain.height_map.shape[0])
X_terrain, Y_terrain = np.meshgrid(x_terrain, y_terrain)
# 绘制地形表面(使用半透明的棕色)
ax1.plot_surface(X_terrain, Y_terrain, terrain.height_map,
alpha=0.3, color='brown', linewidth=0, antialiased=True)
# 绘制禁飞区
for nfz in environment.no_fly_zones:
cx, cy, cz = nfz.center
if nfz.shape == "box":
w, l, h = nfz.size
# 绘制立方体框架
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
# 定义立方体的8个顶点
vertices = [
[cx-w/2, cy-l/2, cz-h/2], [cx+w/2, cy-l/2, cz-h/2],
[cx+w/2, cy+l/2, cz-h/2], [cx-w/2, cy+l/2, cz-h/2],
[cx-w/2, cy-l/2, cz+h/2], [cx+w/2, cy-l/2, cz+h/2],
[cx+w/2, cy+l/2, cz+h/2], [cx-w/2, cy+l/2, cz+h/2]
]
# 定义立方体的6个面
faces = [
[vertices[0], vertices[1], vertices[2], vertices[3]], # 底面
[vertices[4], vertices[5], vertices[6], vertices[7]], # 顶面
[vertices[0], vertices[1], vertices[5], vertices[4]], # 前面
[vertices[2], vertices[3], vertices[7], vertices[6]], # 后面
[vertices[1], vertices[2], vertices[6], vertices[5]], # 右面
[vertices[4], vertices[7], vertices[3], vertices[0]] # 左面
]
ax1.add_collection3d(Poly3DCollection(faces, alpha=0.3,
facecolor='red', edgecolor='darkred'))
elif nfz.shape == "sphere":
# 绘制球体
u, v = np.mgrid[0:2*np.pi:20j, 0:np.pi:20j]
radius = nfz.size[0]
x = cx + radius * np.cos(u) * np.sin(v)
y = cy + radius * np.sin(u) * np.sin(v)
z = cz + radius * np.cos(v)
ax1.plot_surface(x, y, z, alpha=0.3, color='red')
ax1.set_xlabel('X (m)')
ax1.set_ylabel('Y (m)')
ax1.set_zlabel('Z (m)')
ax1.set_title('3D Flight Paths with Terrain & No-Fly Zones')
ax1.legend()
# 俯视图
ax2 = fig.add_subplot(132)
# 绘制地形等高线(俯视图)
if environment.terrain_data is not None:
terrain = environment.terrain_data
x_terrain = np.linspace(terrain.x_min, terrain.x_max, terrain.height_map.shape[1])
y_terrain = np.linspace(terrain.y_min, terrain.y_max, terrain.height_map.shape[0])
X_terrain, Y_terrain = np.meshgrid(x_terrain, y_terrain)
# 绘制等高线
contours = ax2.contour(X_terrain, Y_terrain, terrain.height_map,
levels=8, colors='gray', alpha=0.4, linewidths=0.8)
ax2.clabel(contours, inline=True, fontsize=8, fmt='%dm')
for i, path in enumerate(paths):
color = colors[i % len(colors)]
ax2.plot(path[:, 0], path[:, 1], color=color, linewidth=2,
marker='o', markersize=4, label=f'UAV{i + 1}')
ax2.scatter(path[0, 0], path[0, 1], color=color, s=100, marker='^')
ax2.scatter(path[-1, 0], path[-1, 1], color=color, s=100, marker='s')
# 绘制禁飞区投影
for nfz in environment.no_fly_zones:
cx, cy, cz = nfz.center
if nfz.shape == "box":
w, l, h = nfz.size
rect = plt.Rectangle((cx-w/2, cy-l/2), w, l,
fill=False, color='red', linestyle='--', linewidth=2)
ax2.add_patch(rect)
elif nfz.shape == "sphere":
radius = nfz.size[0]
circle = plt.Circle((cx, cy), radius,
fill=False, color='red', linestyle='--', linewidth=2)
ax2.add_patch(circle)
ax2.set_xlabel('X (m)')
ax2.set_ylabel('Y (m)')
ax2.set_title('Top View (with Terrain Contours)')
ax2.set_aspect('equal')
ax2.grid(True, alpha=0.3)
ax2.legend()
# 收敛曲线
if fitness_history:
ax3 = fig.add_subplot(133)
ax3.plot(fitness_history, 'b-', linewidth=2)
ax3.set_xlabel('Iteration')
ax3.set_ylabel('Best Fitness (Total Distance)')
ax3.set_title('Convergence Curve')
ax3.grid(True, alpha=0.3)
plt.tight_layout()
if save_fig:
os.makedirs('results', exist_ok=True)
filename = f'results/simplified_uav_planning_{time.strftime("%Y%m%d_%H%M%S")}.png'
plt.savefig(filename, dpi=300, bbox_inches='tight')
print(f"图表已保存为: {filename}")
plt.show()
except Exception as e:
print(f"可视化过程中出现错误: {e}")
def run_performance_test(num_runs=5):
"""运行性能测试"""
print(f"=== 运行 {num_runs} 次性能测试 ===")
results = []
for run in range(num_runs):
print(f"\n--- 第 {run + 1} 次运行 ---")
np.random.seed(42 + run)
paths, fitness_history, environment, success = test_simplified_planning()
if success and paths is not None:
# 创建简化规划器来计算统计信息
config, _ = create_test_scenario()
planner = SimplifiedUAVPlanner(config, environment)
stats = planner.get_path_statistics(paths)
results.append({
'total_distance': stats['total_distance'],
'max_flight_time': stats['max_flight_time'],
'success': True
})
else:
results.append({
'total_distance': float('inf'),
'max_flight_time': float('inf'),
'success': False
})
# 统计结果
successful_runs = [r for r in results if r['success']]
success_rate = len(successful_runs) / num_runs
print(f"\n=== 性能测试结果统计 ===")
print(f"成功率: {len(successful_runs)}/{num_runs} ({success_rate*100:.1f}%)")
if successful_runs:
distances = [r['total_distance'] for r in successful_runs]
times = [r['max_flight_time'] for r in successful_runs]
print(f"总飞行距离统计:")
print(f" 最优: {min(distances):.2f}m")
print(f" 平均: {np.mean(distances):.2f}m")
print(f" 标准差: {np.std(distances):.2f}m")
print(f"最大飞行时间统计:")
print(f" 最短: {min(times):.2f}s")
print(f" 平均: {np.mean(times):.2f}s")
print(f" 标准差: {np.std(times):.2f}s")
# 可视化最好的结果
best_idx = np.argmin(distances)
print(f"\n可视化第 {best_idx + 1} 次运行的最优结果")
# 重新运行最优设置
np.random.seed(42 + best_idx)
paths, fitness_history, environment, _ = test_simplified_planning()
if paths is not None:
visualize_3d_results(paths, environment, fitness_history)
def main():
"""主函数"""
print("简化版ESE-MSJS多无人机路径规划")
print("=" * 50)
try:
# 随机种子选择
print("选择随机种子模式:")
print("1. 固定种子 (seed=42, 结果可重复)")
print("2. 随机种子 (每次不同结果)")
try:
seed_choice = input("请输入选择 (1 或 2默认为 1): ").strip()
except EOFError:
seed_choice = "1" # 默认选择
if seed_choice == "2":
import time
seed = int(time.time() * 1000) % 100000
np.random.seed(seed)
print(f"使用随机种子: {seed}")
else:
np.random.seed(42)
print(f"使用固定种子: 42")
print("\n选择运行模式:")
print("1. 单次测试并可视化")
print("2. 性能测试(多次运行)")
try:
choice = input("请输入选择 (1 或 2默认为 1): ").strip()
except EOFError:
choice = "1" # 默认选择
if choice == "2":
run_performance_test(5)
else:
# 单次测试
paths, fitness_history, environment, success = test_simplified_planning()
if success and paths is not None:
print(f"\n开始可视化结果...")
visualize_3d_results(paths, environment, fitness_history)
print(f"\n测试完成!")
else:
print("测试失败,请检查错误信息")
print(f"\n开始可视化结果...")
visualize_3d_results(paths, environment, fitness_history)
except KeyboardInterrupt:
print("\n用户中断程序")
except Exception as e:
print(f"程序运行出错: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,400 @@
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
import math
@dataclass
class SimplifiedUAVConfig:
"""简化的无人机路径配置参数"""
num_uavs: int = 3 # 无人机数量
num_waypoints: int = 10 # 航路点数量
# 环境边界
x_min: float = 0.0
x_max: float = 1000.0
y_min: float = 0.0
y_max: float = 1000.0
z_min: float = 50.0
z_max: float = 500.0
# 约束参数
max_flight_distance: float = 1200.0 # 最大飞行距离(米)
ground_clearance: float = 30.0 # 地面安全间隙(米)
uav_speed: float = 30.0 # 无人机速度m/s
collision_safe_distance: float = 50.0 # 无人机间安全距离(米)
@dataclass
class TerrainData:
"""地形数据"""
def __init__(self, x_range: Tuple[float, float], y_range: Tuple[float, float],
resolution: float = 10.0):
self.x_min, self.x_max = x_range
self.y_min, self.y_max = y_range
self.resolution = resolution
# 创建简单的地形高度图可以从实际DEM数据加载
x_size = int((self.x_max - self.x_min) / resolution) + 1
y_size = int((self.y_max - self.y_min) / resolution) + 1
# 简单的随机地形生成实际应用中应从DEM文件读取
np.random.seed(42) # 确保可重复
self.height_map = np.random.uniform(0, 50, (y_size, x_size))
def get_height(self, x: float, y: float) -> float:
"""获取指定坐标的地面高度"""
# 将坐标转换为网格索引
i = int((y - self.y_min) / self.resolution)
j = int((x - self.x_min) / self.resolution)
# 边界检查
i = max(0, min(i, self.height_map.shape[0] - 1))
j = max(0, min(j, self.height_map.shape[1] - 1))
return self.height_map[i, j]
@dataclass
class NoFlyZone3D:
"""三维禁飞区定义"""
center: Tuple[float, float, float] # (x, y, z) 中心点
size: Tuple[float, float, float] # (width, length, height) 尺寸
shape: str = "box" # 形状:'box' 或 'sphere'
def is_inside(self, point: np.ndarray) -> bool:
"""判断点是否在禁飞区内"""
x, y, z = point
cx, cy, cz = self.center
if self.shape == "box":
w, l, h = self.size
return (abs(x - cx) <= w/2 and
abs(y - cy) <= l/2 and
abs(z - cz) <= h/2)
elif self.shape == "sphere":
radius = self.size[0] # 使用第一个尺寸作为半径
distance = np.sqrt((x-cx)**2 + (y-cy)**2 + (z-cz)**2)
return distance <= radius
return False
def distance_to_boundary(self, point: np.ndarray) -> float:
"""计算点到禁飞区边界的距离(内部为负值)"""
x, y, z = point
cx, cy, cz = self.center
if self.shape == "box":
w, l, h = self.size
# 计算到各个面的距离
dx = max(0, abs(x - cx) - w/2)
dy = max(0, abs(y - cy) - l/2)
dz = max(0, abs(z - cz) - h/2)
if self.is_inside(point):
# 内部点:返回到最近边界的负距离
inner_dx = w/2 - abs(x - cx)
inner_dy = l/2 - abs(y - cy)
inner_dz = h/2 - abs(z - cz)
return -min(inner_dx, inner_dy, inner_dz)
else:
# 外部点:返回正距离
return np.sqrt(dx**2 + dy**2 + dz**2)
elif self.shape == "sphere":
radius = self.size[0]
distance = np.sqrt((x-cx)**2 + (y-cy)**2 + (z-cz)**2)
return distance - radius
return 0.0
class SimplifiedUAVEnvironment:
"""简化的无人机环境"""
def __init__(self, config: SimplifiedUAVConfig):
self.config = config
self.terrain_data: Optional[TerrainData] = None
self.no_fly_zones: List[NoFlyZone3D] = []
# 起始点和目标点
self.start_points: List[Tuple[float, float, float]] = []
self.target_points: List[Tuple[float, float, float]] = []
def set_terrain_data(self, terrain_data: TerrainData):
"""设置地形数据"""
self.terrain_data = terrain_data
def add_no_fly_zone(self, nfz: NoFlyZone3D):
"""添加禁飞区"""
self.no_fly_zones.append(nfz)
def set_start_target_points(self, starts: List[Tuple[float, float, float]],
targets: List[Tuple[float, float, float]]):
"""设置起始点和目标点"""
self.start_points = starts
self.target_points = targets
class SimplifiedUAVPlanner:
"""简化的无人机路径规划器"""
def __init__(self, config: SimplifiedUAVConfig, environment: SimplifiedUAVEnvironment):
self.config = config
self.environment = environment
# 计算编码维度
self.dim_per_uav = config.num_waypoints * 3
self.total_dim = self.dim_per_uav * config.num_uavs
# 边界约束
self.lb = []
self.ub = []
for _ in range(config.num_uavs):
for _ in range(config.num_waypoints):
self.lb.extend([config.x_min, config.y_min, config.z_min])
self.ub.extend([config.x_max, config.y_max, config.z_max])
def decode_solution(self, solution: np.ndarray) -> List[np.ndarray]:
"""解码解决方案为UAV路径"""
paths = []
for i in range(self.config.num_uavs):
start_idx = i * self.dim_per_uav
end_idx = (i + 1) * self.dim_per_uav
uav_genes = solution[start_idx:end_idx]
# 重构为(num_waypoints, 3)的路径
path = uav_genes.reshape(self.config.num_waypoints, 3)
# 添加起始点和目标点
start_point = np.array(self.environment.start_points[i])
target_point = np.array(self.environment.target_points[i])
full_path = np.vstack([start_point, path, target_point])
paths.append(full_path)
return paths
def objective_function(self, solution: np.ndarray) -> float:
"""目标函数:最小化总飞行距离"""
paths = self.decode_solution(solution)
total_distance = 0.0
for path in paths:
path_distance = self.calculate_path_distance(path)
total_distance += path_distance
return total_distance
def calculate_path_distance(self, path: np.ndarray) -> float:
"""计算路径总距离"""
total_distance = 0.0
for i in range(len(path) - 1):
segment_distance = np.linalg.norm(path[i + 1] - path[i])
total_distance += segment_distance
return total_distance
def constraint_function(self, solution: np.ndarray) -> Dict[str, float]:
"""约束函数:返回四个约束的违反值"""
paths = self.decode_solution(solution)
constraints = {}
# 1. 地形约束
terrain_violations = []
for path in paths:
violation = self.check_terrain_constraint(path)
terrain_violations.append(violation)
constraints['terrain'] = max(terrain_violations)
# 2. 禁飞区约束
nfz_violations = []
for path in paths:
violation = self.check_no_fly_zone_constraint(path)
nfz_violations.append(violation)
constraints['no_fly_zone'] = max(nfz_violations)
# 3. 路径长度约束
distance_violations = []
for path in paths:
violation = self.check_distance_constraint(path)
distance_violations.append(violation)
constraints['max_distance'] = max(distance_violations)
# 4. 碰撞约束(新增)
constraints['collision'] = self.check_collision_constraint(paths)
return constraints
def check_terrain_constraint(self, path: np.ndarray) -> float:
"""检查地形约束
返回值0表示满足约束>0表示约束违反程度
"""
max_violation = 0.0
if self.environment.terrain_data is None:
# 如果没有地形数据,只检查最低高度
for point in path:
violation = max(0, self.config.z_min - point[2])
max_violation = max(max_violation, violation)
return max_violation
# 检查每个路径点的地形约束
for point in path:
x, y, z = point
ground_height = self.environment.terrain_data.get_height(x, y)
required_height = ground_height + self.config.ground_clearance
# 违反约束:实际高度低于要求高度
violation = max(0, required_height - z)
max_violation = max(max_violation, violation)
# 还需要检查路径段是否与地形相交
for i in range(len(path) - 1):
segment_violation = self.check_segment_terrain_collision(path[i], path[i + 1])
max_violation = max(max_violation, segment_violation)
return max_violation
def check_segment_terrain_collision(self, start_point: np.ndarray,
end_point: np.ndarray, num_samples: int = 10) -> float:
"""检查路径段是否与地形相撞"""
if self.environment.terrain_data is None:
return 0.0
max_violation = 0.0
# 在路径段上采样检查
for i in range(num_samples):
t = i / (num_samples - 1) if num_samples > 1 else 0
sample_point = start_point + t * (end_point - start_point)
x, y, z = sample_point
ground_height = self.environment.terrain_data.get_height(x, y)
required_height = ground_height + self.config.ground_clearance
violation = max(0, required_height - z)
max_violation = max(max_violation, violation)
return max_violation
def check_no_fly_zone_constraint(self, path: np.ndarray) -> float:
"""检查禁飞区约束"""
max_violation = 0.0
# 检查每个路径点
for point in path:
for nfz in self.environment.no_fly_zones:
if nfz.is_inside(point):
# 点在禁飞区内,违反约束
distance_to_boundary = abs(nfz.distance_to_boundary(point))
max_violation = max(max_violation, distance_to_boundary)
# 检查路径段是否穿过禁飞区
for i in range(len(path) - 1):
segment_violation = self.check_segment_nfz_collision(path[i], path[i + 1])
max_violation = max(max_violation, segment_violation)
return max_violation
def check_segment_nfz_collision(self, start_point: np.ndarray,
end_point: np.ndarray, num_samples: int = 20) -> float:
"""检查路径段是否穿过禁飞区"""
max_violation = 0.0
# 在路径段上采样检查
for i in range(num_samples):
t = i / (num_samples - 1) if num_samples > 1 else 0
sample_point = start_point + t * (end_point - start_point)
for nfz in self.environment.no_fly_zones:
if nfz.is_inside(sample_point):
distance_to_boundary = abs(nfz.distance_to_boundary(sample_point))
max_violation = max(max_violation, distance_to_boundary)
return max_violation
def check_distance_constraint(self, path: np.ndarray) -> float:
"""检查路径长度约束"""
path_distance = self.calculate_path_distance(path)
# 违反约束:实际距离超过最大允许距离
violation = max(0, path_distance - self.config.max_flight_distance)
return violation
def check_collision_constraint(self, paths: List[np.ndarray]) -> float:
"""检查无人机间碰撞约束"""
max_violation = 0.0
if len(paths) < 2:
return 0.0 # 只有一架UAV无碰撞风险
# 检查每对无人机
for i in range(len(paths)):
for j in range(i + 1, len(paths)):
violation = self.check_pairwise_collision(paths[i], paths[j])
max_violation = max(max_violation, violation)
return max_violation
def check_pairwise_collision(self, path1: np.ndarray, path2: np.ndarray) -> float:
"""检查两架无人机之间的碰撞约束"""
max_violation = 0.0
# 假设两架UAV同步飞行相同的路径点数
min_points = min(len(path1), len(path2))
# 检查每个对应的路径点
for i in range(min_points):
distance = np.linalg.norm(path1[i] - path2[i])
violation = max(0, self.config.collision_safe_distance - distance)
max_violation = max(max_violation, violation)
# 检查路径段之间的最小距离
for i in range(min_points - 1):
# 检查线段之间的最小距离
seg_violation = self.check_segment_collision(
path1[i], path1[i+1], path2[i], path2[i+1]
)
max_violation = max(max_violation, seg_violation)
return max_violation
def check_segment_collision(self, p1_start: np.ndarray, p1_end: np.ndarray,
p2_start: np.ndarray, p2_end: np.ndarray,
num_samples: int = 10) -> float:
"""检查两个路径段之间的最小距离"""
max_violation = 0.0
# 在两个路径段上采样点
for i in range(num_samples):
t = i / (num_samples - 1) if num_samples > 1 else 0
# 在第一条路径段上的采样点
sample1 = p1_start + t * (p1_end - p1_start)
# 在第二条路径段上的采样点
sample2 = p2_start + t * (p2_end - p2_start)
# 计算距离
distance = np.linalg.norm(sample1 - sample2)
violation = max(0, self.config.collision_safe_distance - distance)
max_violation = max(max_violation, violation)
return max_violation
def calculate_flight_time(self, path: np.ndarray) -> float:
"""计算飞行时间"""
path_distance = self.calculate_path_distance(path)
return path_distance / self.config.uav_speed
def get_path_statistics(self, paths: List[np.ndarray]) -> Dict:
"""获取路径统计信息"""
stats = {
'total_distance': sum(self.calculate_path_distance(path) for path in paths),
'max_distance': max(self.calculate_path_distance(path) for path in paths),
'min_distance': min(self.calculate_path_distance(path) for path in paths),
'avg_distance': sum(self.calculate_path_distance(path) for path in paths) / len(paths),
'flight_times': [self.calculate_flight_time(path) for path in paths],
'max_flight_time': max(self.calculate_flight_time(path) for path in paths),
}
return stats