道路信息下的路径规划详解:使用A*算法与Python实现附代码
本文章主要是针对处理GIS数据后,获取到的3857坐标下的道路,进行路径规划,可以实现优先道路(例如优先g道、s道)
1.数据处理
针对得到的gis数据,一般都是shp文件,这里可以采用 geopandas 来进行读取和处理,转化为csv文件或txt文件,这样子方便后续读取和处理数据
注意.shp文件的编码方式,一般是GBK
import geopandas as gpd
具体的数据处理过程再此处先不做详细介绍,有需要可以后续再补充
2.A*算法
A*算法较为常见,这里简单介绍下,详细可以参考路径规划之 A* 算法 – 知乎
可以带着再过一下内容,其主要流程如下:
A*算法通过函数 f(n) = g(n) + h(n) 来更新代价
其中:
A*算法在运算过程中,每次从优先队列中选取f(n)值最小(优先级最高)的节点作为下一个待遍历的节点。
另外,A*算法使用两个集合来表示待遍历的节点,与已经遍历过的节点,这通常称之为open_set
和close_set
。
完整的A*算法描述如下:
3.本文的实现方法
3.1 初始化
使用 open_list(开放列表)保存待探索节点,以优先队列(heap)的形式按估计总代价排序。
close_list(关闭列表)用于记录已探索的节点。
came_from 用于回溯路径。
计算起点到终点的初始距离,根据该距离动态设定最近邻数量 k_num(例如远距离使用 45 个邻居,近距离仅用 25 个),这是搜索的一个优化策略,能兼顾效率与搜索范围。
3.2 主循环过程:
每轮从 open_list 中取出估值最小的节点进行扩展。
如果当前节点即为目标点,说明搜索成功,开始从目标回溯路径。
若未达到目标,则通过 KD 树查找当前点的 k_num 个最近邻作为可能的扩展方向。
3.3 优先点机制(priority_check_data):
若启用了优先点数据(如优先某些道路,对应道路的这些二维坐标点),则对这些点设置更低的代价系数 β = 0.8,使其在路径选择中更具吸引力。
若当前邻居点在优先点集中,则优先度系数变为 0.8,否则为默认的 1.0。
nam 变量用于记录此次搜索中优先点的使用次数。
路径代价更新与估值函数:
估值函数 f = g + h,其中 g 为当前路径代价(加权距离),h 为启发函数(到终点的估计距离),两者都乘以优先权重 β。
如果某邻居点从当前路径过来的总代价更低,则更新其代价并加入 open_list。
3.4 返回路径:
成功搜索到目标后,程序从目标节点开始,通过 came_from 依次回溯起点,构造完整路径
A*的详细代码如下:
def a_star(self,start,goal):
open_list = []
close_list = set()
came_from = {}
nam = 0
heapq.heappush(open_list,(self.distance_calculate(start,goal),0,start,None))
g_cost = {start:0}
distance_pre = self.distance_calculate(start, goal)
if distance_pre > 5000000: # 更远距离的情况
k_num = 45
elif 4000000 < distance_pre <= 5000000:
k_num = 40
elif 2000000 < distance_pre <= 4000000:
k_num = 35
elif 1000000 < distance_pre <= 2000000:
k_num = 30
elif distance_pre <= 1000000:
k_num = 25
while open_list:
f,current_g,current_point,parent = heapq.heappop(open_list)
if current_point == goal:
path = []
if nam > 0:
print('本次有优先')
else:
print('本次无优先')
while current_point:
path.append(current_point)
current_point = came_from.get(current_point)
return path[::-1]
close_list.add(current_point)
dis,idx = self.kdtree.query([current_point],k=k_num)
neighbors = [tuple(self.map_points[i]) for i in idx[0]]
label = True
if self.priority_check_data is None:
label = False
for point in neighbors:
if point in close_list:
continue
beta = 1.0
if label:
if point in self.priority_check_data:
beta = 0.8
nam += 1
tentative_g = current_g + beta * self.distance_calculate(current_point,point)
f = tentative_g + beta * self.distance_calculate(point,goal)
if point not in g_cost or tentative_g < g_cost[point]:
came_from[point] = current_point
g_cost[point] = tentative_g
heapq.heappush(open_list,(f,tentative_g,point,current_point))
return None
4.整体代码
这里展示整体的代码:
4.1 引入库
import glob
import os.path
import requests
import numpy as np
import pandas as pd
import heapq
import matplotlib.pyplot as plt
from sklearn.neighbors import KDTree
import time
from pyproj import Proj,transform
import warnings
from flask import Flask, jsonify, request
from pyproj import Transformer
warnings.simplefilter(action='ignore',category=FutureWarning)
4.2 类的初始化
class AstarPathfinding:
def __init__(self,map_file_path,start_coordinate,goal_coordinate,my_string):
"""
:param map_file_path: 地图起始数据 csv文件路径
:param start_coordinate: 起点经纬度
:param goal_coordinate: 终点经纬度
"""
# self.df = pd.read_csv(map_file_path,encoding='GBK')
self.df = self.load_and_merge_csv_files(map_file_path)
self.map_inf = list(zip(self.df['X'],self.df['Y'],self.df['Name']))
self.map_points = [(x,y)for x,y,_ in self.map_inf]
self.kdtree = KDTree(self.map_points)
self.name_dict = {i:name for i,(x,y,name) in enumerate(self.map_inf)}
self.start_points = self.map_change(*start_coordinate)
self.goal_points = self.map_change(*goal_coordinate)
self.priority_check_data = self.read_county_road_csv(map_file_path,my_string)
4.3 一些函数,用来转换数据以及加载数据
def map_change(self, longitude_0, latitude_0):
wgs84 = Proj(proj='latlong', datum='WGS84')
web_mercator = Proj(proj='merc', datum='WGS84', units='m', lat_ts=0.0, lon_0=0.0, x_0=0.0, y_0=0.0,
ellps='WGS84')
x, y = transform(wgs84, web_mercator, longitude_0, latitude_0)
return x, y
def map_change_to_4326(self,path):
path_changed = []
transformer_3857_to_4326 = Transformer.from_crs('EPSG:3857','EPSG:4326',always_xy=True)
for data in path:
new_lon,new_lat = transformer_3857_to_4326.transform(data[0],data[1])
path_changed.append([new_lon,new_lat])
return path_changed
def load_and_merge_csv_files(self,input_csv_folder):
# 获取输入文件夹中的所有 CSV 文件路径
csv_files = glob.glob(f'{input_csv_folder}/*.csv')
# 创建一个空列表来存储每个 CSV 文件读取的数据
dfs = []
# 遍历每个 CSV 文件,将它们加载到 DataFrame 中
for csv_file in csv_files:
# print(f"正在加载文件: {csv_file}")
# 读取 CSV 文件并只选择 'X', 'Y', 'Name' 三列
df = pd.read_csv(csv_file, encoding='GBK', usecols=['X', 'Y', 'Name'])
dfs.append(df) # 将 DataFrame 添加到列表中
# 合并所有 DataFrame
merged_df = pd.concat(dfs, ignore_index=True) # ignore_index=True:重置行索引
print("所有文件加载并合并完成!")
df_filtered = merged_df.dropna(subset=['Name'])
merged_df = df_filtered[['X', 'Y', 'Name']]
# 保存合并后的 DataFrame 到新的 CSV 文件
# merged_df.to_csv(output_csv_path, index=False, encoding='GBK')
# print(f"合并后的数据已保存到 {output_csv_path}")
return merged_df
def read_county_road_csv(self,file_path,my_string):
if my_string == '国道' or my_string == '高速路':
print(f'本次优先{ my_string}!')
file_paths = glob.glob(os.path.join(file_path,f'*{my_string}*.csv'))
point_set = set()
for file in file_paths:
# print(f'正在读取文件{file}')
temp_df = pd.read_csv(file,encoding='GBK')
temp_points = set(zip(temp_df['X'],temp_df['Y']))
point_set.update(temp_points)
return point_set
else:
print('本次无优先道路!')
return None
4.4 A*的过程
def a_star(self,start,goal):
open_list = []
close_list = set()
came_from = {}
nam = 0
heapq.heappush(open_list,(self.distance_calculate(start,goal),0,start,None))
g_cost = {start:0}
distance_pre = self.distance_calculate(start, goal)
if distance_pre > 5000000: # 更远距离的情况
k_num = 45
elif 4000000 < distance_pre <= 5000000:
k_num = 40
elif 2000000 < distance_pre <= 4000000:
k_num = 35
elif 1000000 < distance_pre <= 2000000:
k_num = 30
elif distance_pre <= 1000000:
k_num = 25
while open_list:
f,current_g,current_point,parent = heapq.heappop(open_list)
if current_point == goal:
path = []
if nam > 0:
print('本次有优先')
else:
print('本次无优先')
while current_point:
path.append(current_point)
current_point = came_from.get(current_point)
return path[::-1]
close_list.add(current_point)
dis,idx = self.kdtree.query([current_point],k=k_num)
neighbors = [tuple(self.map_points[i]) for i in idx[0]]
label = True
if self.priority_check_data is None:
label = False
for point in neighbors:
if point in close_list:
continue
beta = 1.0
if label:
if point in self.priority_check_data:
beta = 0.8
nam += 1
tentative_g = current_g + beta * self.distance_calculate(current_point,point)
f = tentative_g + beta * self.distance_calculate(point,goal)
if point not in g_cost or tentative_g < g_cost[point]:
came_from[point] = current_point
g_cost[point] = tentative_g
heapq.heappush(open_list,(f,tentative_g,point,current_point))
return None
4.5 可视化以及其他的一些辅助计算距离等函数
def visualize_path(self,path):
x_points ,y_points = zip(*self.map_points)
plt.scatter(x_points,y_points,color='blue',s=3,label='Map Points')
if path:
path_x,path_y = zip(*path)
plt.plot(path_x,path_y,color='red',linewidth=2,label='path')
# 绘制起点和终点
plt.scatter(*path[0], color='green', marker='*', s=100, label='Start Point') # 起点
plt.scatter(*path[-1], color='orange', marker='*', s=100, label='Goal Point') # 终点
plt.legend(loc='upper left')
plt.grid(True)
plt.show()
def distance_calculate(self,a, b):
return np.sqrt((a[0]-b[0])**2 + (a[1]-b[1])**2)
def find_nearest_point(self,point ):
dist,idx = self.kdtree.query([point],1)
return tuple(self.map_points[idx[0][0]])
def calculate_length(self,path):
length = 0
for i in range(1,len(path)):
length += self.distance_calculate(path[i-1],path[i])
return length
def load_map_priority(self,file_path):
df = pd.read_csv(file_path,encoding='GBK')
data = df[['X', 'Y']]
data_set = set(tuple(x) for x in data.values)
return data_set
4.6 主函数执行A*
def find_path(self):
start_time = time.time()
A = self.find_nearest_point(self.start_points)
B = self.find_nearest_point(self.goal_points)
print(f'距离起始点最近的点:{A}')
print(f'距离终止点最近的点:{B}')
path = self.a_star(A,B)
end_time = time.time()
if path:
print(f'运行时间:{round((end_time - start_time),3)}s')
print(f'路径长度:{round((self.calculate_length(path) / 1000), 3)}km')
# 可视化
# self.visualize_path(path)
path_changed = self.map_change_to_4326(path)
path_name = set()
xy_name_list = []
for data in path:
_,idx = self.kdtree.query([data])
nearest_name = self.name_dict[idx[0][0]]
xy_name_list.append((data[0],data[1], nearest_name))
path_name.add(nearest_name)
# print(f'经过的路有:{list(path_name)}')
path_name_dis = {}
last_point = None
current_name = None
for i, data in enumerate(path):
_, idx = self.kdtree.query([data])
nearest_name = self.name_dict[idx[0][0]]
if last_point is not None:
distance = self.distance_calculate(last_point, data)
# 路径名称变化时,重新开始新的累加
if nearest_name != current_name:
current_name = nearest_name
path_name_dis[current_name] = path_name_dis.get(current_name, 0) + distance
else:
path_name_dis[current_name] += distance
last_point = data
# 输出路径上所有点的坐标和Name
# for point in xy_name_list:
# print(f"坐标: ({point[0]}, {point[1]}),Name: {point[2]}")
# 输出每个Name下走的总距离
for name, distance in path_name_dis.items():
print(f'进入:{name},行驶{round(distance / 1000, 3)} km')
# 保存路径
# df = pd.DataFrame(xy_name_list,columns=['X','Y','Name'])
# df.to_csv('../map_csv/path_with_name_class.csv',index=False,encoding='GBK')
return path,path_name_dis,path_changed
else:
print('没有找到路径')
4.7 执行
if __name__ == '__main__':
start_coords = (116.5,38.0700)
goal_coords = (117.0444,40.867)
road_name = '12'
path_finder = AstarPathfinding('my_data',
start_coords,goal_coords,road_name)
path_finder.find_path()
这里好像把可视化给注释掉了,大家可以自己加入进去
有需要源地图数据的可以评论区留言,谢谢大家
作者:hh_code