1、Python-CAN基本

python-can 库为 Python 提供控制器局域网支持,为不同的硬件设备提供通用抽象,以及一套用于在 CAN 总线上发送和接收消息的实用程序。

python-can 可以在任何 Python 运行的地方运行; 从 CAN 的高功率计算机到 USB 设备,再到运行 linux 的低功率设备,例如 BeagleBone 或 RaspberryPi。

更具体地说,该库的一些示例用途:

  • 被动记录 CAN 总线上发生的情况。 例如,使用 OBD-II 端口监控商用车辆。
  • 测试通过 CAN 交互的硬件。 在现代汽车、摩托车、船只甚至轮椅中发现的模块已经使用这个库从 Python 中测试了组件。
  • 在回路中对新的硬件模块或软件算法进行原型设计。 轻松与现有总线交互。
  • 创建虚拟模块以原型 CAN 总线通信。
  • 2、python-can 安装

    使用pip安装

    pip install python-can

    同时根据使用的硬件设备需要安装对应的驱动等,参照Installation — python-can 4.0.0 documentation

    3、配置

    3.1 代码中直接配置

    can对象公开了一个rc字典,可用于设置interfacechannel

    import can
    can.rc['interface'] = 'neovi'  # 配置硬件类型
    can.rc['channel'] = 2        # 配置通道,根据具体的硬件,int或者str
    can.rc['bitrate'] = 500000   # 波特率
    
    
    from can.interface import Bus
    
    bus = Bus()   # 使用rc字典中的配置实例化can

    也可以在代码中直接指定接口和通道实例化can

    import can
    
    bus = can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=500000)
    
    bus2 = bus = can.interface.Bus(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)

    3.2 使用配置文件配置

    在 Linux 系统上,配置文件在以下路径中搜索:

    1. ~/can.conf

    2. /etc/can.conf

    3. $HOME/.can

    4. $HOME/.canrc

    在 Windows 系统上,配置文件在以下路径中搜索:

    1. %USERPROFILE%/can.conf

    2. can.ini(当前工作目录)

    3. %APPDATA%/can.ini

    ps:%USERPROFILE%目录为 C:\Users\用户名

            %APPDATA% 目录为 C:\Users\用户名\AppData\Roaming

    配置文件可设置默认接口和通道如下:

    [default]
    interface = <the name of the interface to use>
    channel = <the channel to use by default>
    bitrate = <the bitrate in bits/s to use by default>

    同时也可以添加其他节点如下:

    [HS]
    # All the values from the 'default' section are inherited
    channel = <the channel to use>
    bitrate = <the bitrate in bits/s to use. i.e. 500000>
    
    [MS]
    # All the values from the 'default' section are inherited
    channel = <the channel to use>
    bitrate = <the bitrate in bits/s to use. i.e. 125000>

    使用配置文件的配置实例化can

    from can.interface import Bus
    
    bus = Bus()    # 使用默认(default)配置
    hs_bus = Bus(context='HS')        # 使用HS节点配置
    ms_bus = Bus(context='MS')        # 使用MS节点配置

    3.3 使用环境变量配置

    可以设置以下环境变量

  • CAN_INTERFACE                设置接口类型

  • CAN_CHANNEL                    设置通道

  • CAN_BITRATE                     设置波特率

  • CAN_CONFIG   

  • CAN_CONFIG允许使用 JSON 设置任何总线配置,例如:

    CAN_CONFIG={"receive_own_messages": true, "fd": true}

    4、基本使用

    4.1发送单帧报文到总线

    实例化一条bus总线, 使用can.Message()类创建一条message,bus调用send函数将创建的message实例发送到总线。

    import can
    
    def send_one():
    
        # this uses the default configuration (for example from the config file)
        # see https://python-can.readthedocs.io/en/stable/configuration.html
        #bus = can.interface.Bus()
    
        # Using specific buses works similar:
        bus = can.interface.Bus(bustype='bmcan', channel=0, bitrate=500000, data_bitrate=2000000, tres=True)
        # bus = can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000)
        # bus = can.interface.Bus(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
        # bus = can.interface.Bus(bustype='ixxat', channel=0, bitrate=250000)
        # bus = can.interface.Bus(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)
        # ...
    
        msg = can.Message(arbitration_id=0xc0ffee,
                          data=[0, 25, 0, 1, 3, 1, 4, 1],
                          is_extended_id=True, )
    
        try:
            bus.send(msg, timeout=None)
            print("Message sent on {}".format(bus.channel_info))
        except can.CanError:
            print("Message NOT sent")
    
        bus.shutdown()
    
    if __name__ == '__main__':
        send_one()

    4.2发送周期报文到总线

    以下实例分别演示简单发送周期报文,发送定时的周期报文,和发送周期报文过程中修改发送的内容。

    注意:在发送定时的周期报文时,达到超时时间任务结束后,task将仍旧由总线跟踪。除非在创建任务时设置参数store_task=False,或者调用stop()函数结束任务task.stop()。

    import time
    
    import can
    
    
    
    def simple_periodic_send(bus):
        """
        发送一条周期为200ms的周期报文到总线
        2s后停止发送报文
        """
        print("Starting to send a message every 200ms for 2s")
        msg = can.Message(arbitration_id=0x123, data=[1, 2, 3, 4, 5, 6], is_extended_id=False)
        task = bus.send_periodic(msg, 0.20)
        assert isinstance(task, can.CyclicSendTaskABC)
        time.sleep(2)
        task.stop()
        print("stopped cyclic send")
    
    
    def limited_periodic_send(bus):
        """
        发送一条有时间限制的周期报文到总线,达到超时时间后,停止任务
        """
        print("Starting to send a message every 200ms for 1s")
        msg = can.Message(arbitration_id=0x12345678, data=[0, 0, 0, 0, 0, 0], is_extended_id=True)
        task = bus.send_periodic(msg, 0.20, 1, store_task=False)
        if not isinstance(task, can.LimitedDurationCyclicSendTaskABC):
            print("This interface doesn't seem to support a ")
            task.stop()
            return
    
        time.sleep(2)
    
    
    def test_periodic_send_with_modifying_data(bus):
        """
        在发送周期报文过程中修改发送的数据
        """
        print("Starting to send a message every 200ms. Initial data is ones")
        msg = can.Message(arbitration_id=0x0cf02200, data=[1, 1, 1, 1])
        task = bus.send_periodic(msg, 0.20)
        if not isinstance(task, can.ModifiableCyclicTaskABC):
            print("This interface doesn't seem to support modification")
            task.stop()
            return
        time.sleep(2)
        print("Changing data of running task to begin with 99")
        msg.data[0] = 0x99
        task.modify_data(msg)
        time.sleep(2)
    
        task.stop()
        print("stopped cyclic send")
        print("Changing data of stopped task to single ff byte")
        msg.data = bytearray([0xff])
        msg.dlc = 1
        task.modify_data(msg)
        time.sleep(1)
        print("starting again")
        task.start()
        time.sleep(1)
        task.stop()
        print("done")

    4.3接收总线的can报文

    以下示例演示使用recv()函数接收来自总线的can 报文,然后打印报文。

    import can
    
    def receive_all():
    
        bus = can.interface.Bus(bustype='ixxat', channel=0, bitrate=500000)
    
        print('Waiting for RX CAN messages ...')
        try:
            while True:
                msg = bus.recv(1)
                if msg is not None:
                    print(msg)
        except KeyboardInterrupt:
            pass
    
    
    if __name__ == "__main__":
        receive_all()

    5、 使用Listener和NotifIer

    Notifier 对象用作总线的消息分发器。Notifier 创建一个线程来从总线读取消息并将它们分发给listeners。

    Listener 类是任何希望注册以接收总线上新消息通知的对象的“抽象”基类。通常使用是需继承can.Listener。重写on_message_received()方法,以实现收到特定的消息的处理逻辑。

    以下示例,实现MyListener处理收到message 0x123时,发送message 0x456到总线,打印message的回调函数,创建一个日志记录器,添加到listeners列表。

    实例化一个Notifier对象,添加监听的bus总线,绑定listeners列表。当Notifier对象收到bus总线的任意消息,则会将该消息分发listeners列表中的各个监听器。

    Notifier对象使用stop()函数以结束监听bus。

    有一些侦听器python-can已经提供 ,其中一些允许将消息写入文件,例如can.Logger。

    import can
    from can.message import Message
    
    class MyListener(can.Listener):
        def __init__(self):
            super(MyListener, self).__init__()
    
        def on_message_received(self, msg: Message) -> None:
            """
            example
            when receive message 0x123,transmit message 0x456
            """
            if msg.arbitration_id == 0x123:
                transmit_msg = can.Message(arbitration_id=0x456,
                                           dlc=8,
                                           data=[0 for _ in range(8)],
                                           is_extended_id=False)
                bus.send(transmit_msg)
    
    
    def print_msg(msg):
        print(msg)
    
    
    if __name__ == "__main__":
        bus = can.interface.Bus('virtual_ch', bustype='virtual')
        logger = can.Logger("logfile.asc")  # save log to asc file
        listeners = [
            print_msg,  # Callback function, print the received messages
            logger,  # save received messages to asc file
            MyListener   # my listener
        ]
        notifier = can.Notifier(bus, listeners)
        running = True
        while running:
            input()
            running = False
    
        # It's important to stop the notifier in order to finish the writting of asc file
        notifier.stop()
        # stops the bus
        bus.shutdown()

    6、bus总线设置过滤报文

    可以使用set_filters()设置过滤报文,对bus接收到的所有报文进行过滤,返回过滤器匹配的消息。

    set_filters()参数为一个可迭代的字典,每个字典都包含一个“can_id”、一个“can_mask”和一个可选的“extended”键。

    [{"can_id": 0x11, "can_mask": 0x21, "extended": False}]

    过滤器会匹配<received_can_id> & can_mask == can_id & can_mask的所有消息。如果extended 参数也设置了,则仅会匹配<received_is_extended> == extended的消息。否则,它仅根据仲裁 ID 和掩码匹配每条消息。

    bus = can.interface.Bus('virtual_ch', bustype='virtual')
    bus.set_filters([{"can_id": 0x123, "can_mask": 0xFFFF, "extended": False}])

    7、提供的脚本

    python-can已经提供一些可直接使用的脚本

    参照Scripts — python-can 4.0.0 documentation

    7.1 can.logger

    can.logger 用于记录CAN log,将消息打印到标准输出或给定文件

    可以使用python -m can.logger 或者 can_logger.py 的命令行方式调用

    7.2 can.player

    can.player 用于回放记录的can 报文日志文件

    7.3 can.viewer

    一个带有GUI界面的简单的can 总线 报文消息的查看器

    7.4 can.logconvert

    can.logconvert用于日志文件格式的转换, 将日志文件从一种格式转换为另一种格式

    物联沃分享整理
    物联沃-IOTWORD物联网 » python-can介绍

    发表评论