道路信息下的路径规划详解:使用A*算法与Python实现附代码

本文章主要是针对处理GIS数据后,获取到的3857坐标下的道路,进行路径规划,可以实现优先道路(例如优先g道、s道)

1.数据处理

针对得到的gis数据,一般都是shp文件,这里可以采用 geopandas 来进行读取和处理,转化为csv文件或txt文件,这样子方便后续读取和处理数据

注意.shp文件的编码方式,一般是GBK

import geopandas as gpd

具体的数据处理过程再此处先不做详细介绍,有需要可以后续再补充

2.A*算法

A*算法较为常见,这里简单介绍下,详细可以参考路径规划之 A* 算法 – 知乎

可以带着再过一下内容,其主要流程如下:

A*算法通过函数 f(n) = g(n) + h(n) 来更新代价

其中:

  • f(n)是节点n的综合优先级。当我们选择下一个要遍历的节点时,我们总会选取综合优先级最高(值最小)的节点。
  • g(n) 是节点n距离起点的代价。
  • h(n)是节点n距离终点的预计代价,这也就是A*算法的启发函数。 
  • A*算法在运算过程中,每次从优先队列中选取f(n)值最小(优先级最高)的节点作为下一个待遍历的节点。

    另外,A*算法使用两个集合来表示待遍历的节点,与已经遍历过的节点,这通常称之为open_setclose_set

    完整的A*算法描述如下:

    3.本文的实现方法

    3.1 初始化

    使用 open_list(开放列表)保存待探索节点,以优先队列(heap)的形式按估计总代价排序。

    close_list(关闭列表)用于记录已探索的节点。

    came_from 用于回溯路径。

    计算起点到终点的初始距离,根据该距离动态设定最近邻数量 k_num(例如远距离使用 45 个邻居,近距离仅用 25 个),这是搜索的一个优化策略,能兼顾效率与搜索范围。

    3.2 主循环过程:

    每轮从 open_list 中取出估值最小的节点进行扩展。

    如果当前节点即为目标点,说明搜索成功,开始从目标回溯路径。

    若未达到目标,则通过 KD 树查找当前点的 k_num 个最近邻作为可能的扩展方向。

    3.3 优先点机制(priority_check_data):

    若启用了优先点数据(如优先某些道路,对应道路的这些二维坐标点),则对这些点设置更低的代价系数 β = 0.8,使其在路径选择中更具吸引力。

    若当前邻居点在优先点集中,则优先度系数变为 0.8,否则为默认的 1.0。

    nam 变量用于记录此次搜索中优先点的使用次数。

    路径代价更新与估值函数:

    估值函数 f = g + h,其中 g 为当前路径代价(加权距离),h 为启发函数(到终点的估计距离),两者都乘以优先权重 β。

    如果某邻居点从当前路径过来的总代价更低,则更新其代价并加入 open_list。

    3.4 返回路径:

    成功搜索到目标后,程序从目标节点开始,通过 came_from 依次回溯起点,构造完整路径

    A*的详细代码如下:         

        def a_star(self,start,goal):
            open_list = []
            close_list = set()
            came_from = {}
    
            nam = 0
    
    
            heapq.heappush(open_list,(self.distance_calculate(start,goal),0,start,None))
            g_cost = {start:0}
    
            distance_pre = self.distance_calculate(start, goal)
    
            if distance_pre > 5000000:  # 更远距离的情况
                k_num = 45
            elif 4000000 < distance_pre <= 5000000:
                k_num = 40
            elif 2000000 < distance_pre <= 4000000:
                k_num = 35
            elif 1000000 < distance_pre <= 2000000:
                k_num = 30
            elif distance_pre <= 1000000:
                k_num = 25
    
    
            while open_list:
                f,current_g,current_point,parent = heapq.heappop(open_list)
    
                if current_point == goal:
                    path = []
    
                    if nam > 0:
                        print('本次有优先')
                    else:
                        print('本次无优先')
    
                    while current_point:
                        path.append(current_point)
                        current_point = came_from.get(current_point)
                    return path[::-1]
    
                close_list.add(current_point)
    
                dis,idx = self.kdtree.query([current_point],k=k_num)
                neighbors = [tuple(self.map_points[i]) for i in idx[0]]
    
                label = True
                if self.priority_check_data is None:
                    label = False
    
                for point in neighbors:
                    if point in close_list:
                        continue
    
                    beta = 1.0
    
                    if label:
                        if point in self.priority_check_data:
                            beta = 0.8
                            nam += 1
    
                    tentative_g =   current_g + beta * self.distance_calculate(current_point,point)
                    f = tentative_g + beta * self.distance_calculate(point,goal)
    
                    if point not in g_cost or tentative_g < g_cost[point]:
                        came_from[point] = current_point
                        g_cost[point] = tentative_g
                        heapq.heappush(open_list,(f,tentative_g,point,current_point))
    
            return None

    4.整体代码

    这里展示整体的代码:

    4.1 引入库

    import glob
    import os.path
    import requests
    import numpy as np
    import pandas as pd
    import heapq
    import matplotlib.pyplot as plt
    from sklearn.neighbors import KDTree
    import time
    from pyproj import Proj,transform
    import warnings
    from flask import Flask, jsonify, request
    from pyproj import Transformer
    
    warnings.simplefilter(action='ignore',category=FutureWarning)

    4.2 类的初始化

    class AstarPathfinding:
        def __init__(self,map_file_path,start_coordinate,goal_coordinate,my_string):
            """
            :param map_file_path:  地图起始数据 csv文件路径
            :param start_coordinate:    起点经纬度
            :param goal_coordinate:     终点经纬度
            """
            # self.df = pd.read_csv(map_file_path,encoding='GBK')
            self.df = self.load_and_merge_csv_files(map_file_path)
            self.map_inf = list(zip(self.df['X'],self.df['Y'],self.df['Name']))
            self.map_points = [(x,y)for x,y,_ in self.map_inf]
            self.kdtree = KDTree(self.map_points)
            self.name_dict = {i:name for i,(x,y,name) in enumerate(self.map_inf)}
    
            self.start_points = self.map_change(*start_coordinate)
            self.goal_points = self.map_change(*goal_coordinate)
    
            self.priority_check_data = self.read_county_road_csv(map_file_path,my_string)

    4.3 一些函数,用来转换数据以及加载数据

        def map_change(self, longitude_0, latitude_0):
            wgs84 = Proj(proj='latlong', datum='WGS84')
            web_mercator = Proj(proj='merc', datum='WGS84', units='m', lat_ts=0.0, lon_0=0.0, x_0=0.0, y_0=0.0,
                                ellps='WGS84')
            x, y = transform(wgs84, web_mercator, longitude_0, latitude_0)
            return x, y
    
        def map_change_to_4326(self,path):
            path_changed = []
            transformer_3857_to_4326 = Transformer.from_crs('EPSG:3857','EPSG:4326',always_xy=True)
            for data in path:
                new_lon,new_lat = transformer_3857_to_4326.transform(data[0],data[1])
                path_changed.append([new_lon,new_lat])
    
            return path_changed
        def load_and_merge_csv_files(self,input_csv_folder):
            # 获取输入文件夹中的所有 CSV 文件路径
            csv_files = glob.glob(f'{input_csv_folder}/*.csv')
    
            # 创建一个空列表来存储每个 CSV 文件读取的数据
            dfs = []
    
            # 遍历每个 CSV 文件,将它们加载到 DataFrame 中
            for csv_file in csv_files:
                # print(f"正在加载文件: {csv_file}")
                # 读取 CSV 文件并只选择 'X', 'Y', 'Name' 三列
                df = pd.read_csv(csv_file, encoding='GBK', usecols=['X', 'Y', 'Name'])
                dfs.append(df)  # 将 DataFrame 添加到列表中
    
            # 合并所有 DataFrame
            merged_df = pd.concat(dfs, ignore_index=True)  # ignore_index=True:重置行索引
            print("所有文件加载并合并完成!")
            df_filtered = merged_df.dropna(subset=['Name'])
            merged_df = df_filtered[['X', 'Y', 'Name']]
    
            # 保存合并后的 DataFrame 到新的 CSV 文件
            # merged_df.to_csv(output_csv_path, index=False, encoding='GBK')
            # print(f"合并后的数据已保存到 {output_csv_path}")
    
            return merged_df
        def read_county_road_csv(self,file_path,my_string):
            if my_string == '国道' or my_string == '高速路':
                print(f'本次优先{ my_string}!')
                file_paths = glob.glob(os.path.join(file_path,f'*{my_string}*.csv'))
    
                point_set = set()
                for file in file_paths:
                    # print(f'正在读取文件{file}')
                    temp_df = pd.read_csv(file,encoding='GBK')
                    temp_points = set(zip(temp_df['X'],temp_df['Y']))
                    point_set.update(temp_points)
    
                return point_set
            else:
                print('本次无优先道路!')
                return None

    4.4 A*的过程

        def a_star(self,start,goal):
            open_list = []
            close_list = set()
            came_from = {}
    
            nam = 0
    
    
            heapq.heappush(open_list,(self.distance_calculate(start,goal),0,start,None))
            g_cost = {start:0}
    
            distance_pre = self.distance_calculate(start, goal)
    
            if distance_pre > 5000000:  # 更远距离的情况
                k_num = 45
            elif 4000000 < distance_pre <= 5000000:
                k_num = 40
            elif 2000000 < distance_pre <= 4000000:
                k_num = 35
            elif 1000000 < distance_pre <= 2000000:
                k_num = 30
            elif distance_pre <= 1000000:
                k_num = 25
    
    
            while open_list:
                f,current_g,current_point,parent = heapq.heappop(open_list)
    
                if current_point == goal:
                    path = []
    
                    if nam > 0:
                        print('本次有优先')
                    else:
                        print('本次无优先')
    
                    while current_point:
                        path.append(current_point)
                        current_point = came_from.get(current_point)
                    return path[::-1]
    
                close_list.add(current_point)
    
                dis,idx = self.kdtree.query([current_point],k=k_num)
                neighbors = [tuple(self.map_points[i]) for i in idx[0]]
    
                label = True
                if self.priority_check_data is None:
                    label = False
    
                for point in neighbors:
                    if point in close_list:
                        continue
    
                    beta = 1.0
    
                    if label:
                        if point in self.priority_check_data:
                            beta = 0.8
                            nam += 1
    
                    tentative_g =   current_g + beta * self.distance_calculate(current_point,point)
                    f = tentative_g + beta * self.distance_calculate(point,goal)
    
                    if point not in g_cost or tentative_g < g_cost[point]:
                        came_from[point] = current_point
                        g_cost[point] = tentative_g
                        heapq.heappush(open_list,(f,tentative_g,point,current_point))
    
            return None

    4.5 可视化以及其他的一些辅助计算距离等函数

        def visualize_path(self,path):
    
    
            x_points ,y_points = zip(*self.map_points)
    
            plt.scatter(x_points,y_points,color='blue',s=3,label='Map Points')
    
            if path:
                path_x,path_y = zip(*path)
                plt.plot(path_x,path_y,color='red',linewidth=2,label='path')
    
            # 绘制起点和终点
            plt.scatter(*path[0], color='green', marker='*', s=100, label='Start Point')  # 起点
            plt.scatter(*path[-1], color='orange', marker='*', s=100, label='Goal Point')  # 终点
    
            plt.legend(loc='upper left')
            plt.grid(True)
            plt.show()
    
        def distance_calculate(self,a, b):
            return np.sqrt((a[0]-b[0])**2 + (a[1]-b[1])**2)
    
        def find_nearest_point(self,point   ):
            dist,idx = self.kdtree.query([point],1)
            return tuple(self.map_points[idx[0][0]])
    
        def calculate_length(self,path):
            length = 0
            for i in range(1,len(path)):
                length += self.distance_calculate(path[i-1],path[i])
            return length
    
        def load_map_priority(self,file_path):
            df = pd.read_csv(file_path,encoding='GBK')
            data = df[['X', 'Y']]
            data_set = set(tuple(x) for x in data.values)
            return data_set
    

    4.6 主函数执行A*

        def find_path(self):
            start_time = time.time()
    
            A = self.find_nearest_point(self.start_points)
            B = self.find_nearest_point(self.goal_points)
    
            print(f'距离起始点最近的点:{A}')
            print(f'距离终止点最近的点:{B}')
    
            path = self.a_star(A,B)
    
            end_time = time.time()
    
            if path:
                print(f'运行时间:{round((end_time - start_time),3)}s')
                print(f'路径长度:{round((self.calculate_length(path) / 1000), 3)}km')
    
                # 可视化
                # self.visualize_path(path)
    
                path_changed = self.map_change_to_4326(path)
    
    
    
                path_name = set()
                xy_name_list = []
    
                for data in path:
                    _,idx = self.kdtree.query([data])
                    nearest_name = self.name_dict[idx[0][0]]
    
                    xy_name_list.append((data[0],data[1], nearest_name))
                    path_name.add(nearest_name)
    
                # print(f'经过的路有:{list(path_name)}')
    
                path_name_dis = {}
                last_point = None
                current_name = None
    
                for i, data in enumerate(path):
                    _, idx = self.kdtree.query([data])
                    nearest_name = self.name_dict[idx[0][0]]
    
                    if last_point is not None:
                        distance = self.distance_calculate(last_point, data)
    
                        # 路径名称变化时,重新开始新的累加
                        if nearest_name != current_name:
                            current_name = nearest_name
                            path_name_dis[current_name] = path_name_dis.get(current_name, 0) + distance
                        else:
                            path_name_dis[current_name] += distance
    
                    last_point = data
    
                # 输出路径上所有点的坐标和Name
                # for point in xy_name_list:
                #     print(f"坐标: ({point[0]}, {point[1]}),Name: {point[2]}")
    
    
                # 输出每个Name下走的总距离
                for name, distance in path_name_dis.items():
                    print(f'进入:{name},行驶{round(distance / 1000, 3)} km')
    
                # 保存路径
                # df = pd.DataFrame(xy_name_list,columns=['X','Y','Name'])
                # df.to_csv('../map_csv/path_with_name_class.csv',index=False,encoding='GBK')
                return path,path_name_dis,path_changed
            else:
                print('没有找到路径')

    4.7 执行

    if __name__ == '__main__':
    
        start_coords = (116.5,38.0700)
        goal_coords = (117.0444,40.867)
        road_name = '12'
    
        path_finder = AstarPathfinding('my_data',
                                       start_coords,goal_coords,road_name)
        path_finder.find_path()

    这里好像把可视化给注释掉了,大家可以自己加入进去

    有需要源地图数据的可以评论区留言,谢谢大家

    作者:hh_code

    物联沃分享整理
    物联沃-IOTWORD物联网 » 道路信息下的路径规划详解:使用A*算法与Python实现附代码

    发表回复