【python/ros】python小程序实现rosbag数据包解析保存

序言
  • 模型训练要求感知数据集按照如下格式存储

    # txt: 时间戳 目标id 坐标x 坐标y 速度x 速度y
    timestamp:float agent_ID:int pos_x:float pos_y:float v_x:float v_y:float
    
  • python实现rosbag包解析,提取感知topic消息字段并存入文件

  • 1. 读取bag包
  • ros提供了解析bag包的python api,按如下方式载入bag

    import rosbag
    bag_file = '/your/path/xxx.bag'
    bag_data = rosbag.Bag(bag_file, "r")
    
  • 可通过如下命令查看bag包含哪些topic、消息类型和帧率

    info = bag_data.get_type_and_topic_info()
    print(info)
    
  • 如图显示感知topic消息数350,帧率10Hz

  • 2. 读取特定topic的数据
  • 如果不给定topic,会遍历所有的topic

  • 查看topic的方式:方式1如上;方式2 rosbag info xxx.bag;方式3 通过read_messages()返回值

  • read_messages()每次迭代返回3个值:topic, msg, t。假如感知topic名字是/perception/obstacles,msg是具体的消息数据,t表示时间戳

    perception_data = bag_data.read_messages('/perception/obstacles')
    for topic, msg, t in perception_data:
        print(topic)
    
  • 3. msg解析与存储
  • 同样,可通过多种方式查看msg有哪些字段:方式1 topic能看到消息,查看proto的消息字段;方式2 业务代码中消息解析字段;方式3 rostopic echo topic_name可打印看到消息内容

  • 我们字典来按照目标id存储消息解析的结果,如下

    # frame_id = 0
    object = {}		# {id : [[frame1 data],[frame2 data]]}
    for topic, msg, t in perception_data:
        if msg is not None:
            for obstacle in msg.obstacles:
    	        item = []
    	        # item.append(frame_id)
    	        item.append(obstacle.timestamp)
    	        item.append(obstacle.track_id)
    	        item.append(target.position.x)
    	        item.append(target.position.y)
    	        item.append(target.velocity.x)
    	        item.append(target.velocity.y)
    	        if obstacle.track_id in object.keys():
    	            object[obstacle.track_id].append(item)
    	        else:
    	            object[obstacle.track_id] = [item]
    
  • 4. 数据按要求格式写入文件
  • 新建存储perception数据的文件

    # 执行后自动创建
    file = open('/disk_f/traing_data/' + 'perception_obstacles.txt', "w", encoding='utf-8')
    
  • 数据转换成string类型存储(结合实际需要)

    fields = ' '.join(map(lambda x: str(x), values))    # .join 将元素以指定分隔符连成新字符串
    file.write(fields + '\n')
    
  • 将数据按照id从大到小排列(时间戳是按时间排列的,不然也可以指定按时间戳排序)

    # object.items返回所有key:value数据对
    data = sorted(object.items(), key=lambda x: x[0])
    data = dict(data)
    
  • 数据写入文件:打开-写入-关闭

    file = open('/disk_f/traing_data/' + 'perception_obstacles.txt', "w", encoding='utf-8')
    if object is not None:
        data = sorted(object.items(), key=lambda x: x[0])
        data = dict(data)
        for target_id in data.keys():
            # break
            frame_id = 0
            for values in data[target_id]:
                fields = ' '.join(map(lambda x: str(x), values))
                file.write(str(frame_id) + '  ' + fields + '\n')
                frame_id += 12
    file.close()
    
  • 写入后效果

  • 5. 完整程序
  • python实现rosbag数据包特定topic消息的解析保存

    import rosbag
    
    '''
    ====== 1. load xxx.bag data
    '''
    bag_file = '/your/path/xxx.bag'
    bag_data = rosbag.Bag(bag_file, "r")
    
    info = bag_data.get_type_and_topic_info()
    # print(info)
    
    '''
    ====== 2. save msg fields
    '''
    perception_data = bag_data.read_messages('/perception/obstacles')
    object = {}
    for topic, msg, t in perception_data:
        if msg is not None:
            for obstacle in msg.obstacles:
    	        item = []
    	        # item.append(frame_id)
    	        item.append(obstacle.timestamp)
    	        item.append(obstacle.track_id)
    	        item.append(obstacle.position.x)
    	        item.append(obstacle.position.y)
    	        item.append(obstacle.velocity.x)
    	        item.append(obstacle.velocity.y)
    	        if obstacle.track_id in object.keys():
    	            object[obstacle.track_id].append(item)  # or object[obstacle.track_id] += [item]
    	        else:
    	            object[obstacle.track_id] = [item]      # otherwise [1, 2, 3, [4, 5, 6]]
    
    '''
    ====== 3. data save as txt
    '''
    file = open('/disk_f/traing_data/' + 'perception_obstacles.txt', "w", encoding='utf-8')
    if object is not None:
        data = sorted(object.items(), key=lambda x: x[0])     # sorted by key i.e. track_id
        data = dict(data)
        for target_id in data.keys():
            frame_id = 0
            for values in data[target_id]:
                fields = ' '.join(map(lambda x: str(x), values))
                file.write(str(frame_id) + '  ' + fields + '\n')
                frame_id += 12
    file.close()
    

  • 5.1 item[]放在for循环外导致的异常
  • 如上代码,我们需要将每次循环的item放入字典,但如果 item[]放在for循环外,如下

    item = []
    for topic, msg, t in perception_data:
    	if msg is not None:
    		for obstacle in msg.obstacles:
    			item.clear()
    			item.append(obstacle.timestamp)
    			if obstacle.track_id in object.keys():
    				object[obstacle.track_id].append(item)
    			else:
    				object[obstacle.track_id] = [item]
    
  • 会出现循环结束后字典中只存了最后一个目标的item。原因在于使用同一个item对象,它的值就是最后一次赋值的值;把item[]放在for循环内,每次新建一个列表对象就不会有这个问题。或拷贝时尝试 item.copy()

  • 5.2 字典写入二维列表
  • 如上,如果dict[id].append(item_list)直接添加的话,得到的是 [1, 2, 3, [4, 5, 6]]不符合预期

  • 一维列表合并为二维列表的方式

    l1 = list([1,2,3,4])
    l2 = list([5,6,7,8])
    l3 = [l1] + [l2]
    print(l3)	# [[1, 2, 3, 4], [5, 6, 7, 8]]
    
  • 5.3 字典排序
  • 使用python内置函数

    sorted(iterable, key=None, reverse=False)
    

    iterable:可迭代对象;key:用来比较的元素,取自迭代对象中;reverse:默认False升序, True降序

  • 返回值是列表,通过dict(list_)转为字典格式。其他字典排序的例子,参考 python字典排序方法

  • 5.4 topic数据保存成csv/txt方法
  • 见文章:rosbag的topic数据保存成csv/txt文件

  • 参考文章:
    python解析rosbag的图像数据
    python提取bag中的图像和点云数据
    python同步解析bag包1
    python解析bag包2
    无法把列表正常加到字典中
    一维列表合并为二维列表
    python字典的使用
    python用字典存储数据

    created by shuaixio, 2022.06.04

    来源:shuaixio

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【python/ros】python小程序实现rosbag数据包解析保存

    发表评论