PyQt5之QtBluetooth模块:低功耗蓝牙BLE通信

PyQt5之QtBluetooth模块:低功耗蓝牙BLE通信

最近使用PyQt5开发PC端工具,正巧手上有一个富芮坤的低功耗蓝牙,于是想在PC端试试与之通信,不过发现使用PyQt5开发低功耗蓝牙的教程很少,本人参考Qt教程和官方文档,开发过程以此文记录。

参考文档:PyQt5模块安装路径下QtBluetooth.py、Qt C++蓝牙类介绍Qt 低功耗蓝牙扫描示例

低功耗蓝牙通信大体分为以下几个过程:

1. 扫描蓝牙

	def scan_for_devices(self):
        self.agent = QtBt.QBluetoothDeviceDiscoveryAgent()
        self.agent.setLowEnergyDiscoveryTimeout(5000)
        self.agent.deviceDiscovered.connect(self.show_info)
        self.agent.error.connect(self.agent_error)
        self.agent.finished.connect(self.agent_finished)
        self.timer = QtCore.QTimer(self.agent)
        self.timer.start(5)
        self.timer.timeout.connect(self.display_status)
        self.agent.start()
        
	def show_info(self, info: QtBt.QBluetoothDeviceInfo):
        if int(info.coreConfigurations()) == 1:
            print('Device discovered')
            print(f'Name: {info.name()}')
            print(f'Addr: {info.address().toString()}')
            print(f'ServUUID: {info.serviceUuids()}')
      
	def agent_error(self, e):
        error = ["NoError", "InputOutputError", "PoweredOffError", "InvalidBluetoothAdapterError", "UnknownError"]
        if e < 4:
            print(error[e])
        else:
            print(error[4])

	def agent_finished(self):
        print('Agent finished')
        for dev in self.agent.discoveredDevices():
        	print(f'设备名称: {dev.name()} 设备地址: {dev.address()}')

	def display_status(self):
        print(self.agent.isActive(), self.agent.discoveredDevices())
        self.timer.stop()

上面代码可以看到首先要创建蓝牙设备扫描代理QBluetoothServiceDiscoveryAgent,并设置超时时间,然后依次是绑定设备搜索回调函数serviceDiscovered,这个函数返回搜索到蓝牙设备信息;绑定错误信息回调函数error;绑定搜索完成回调函数finished;最终搜索完成后(通过定时器结束搜索),在discoveredDevices中保存了本次搜索的所有蓝牙设备信息。

2. 连接蓝牙
3. 获取服务

    def agent_finished(self):
        print('Agent finished')
        for dev in self.agent.discoveredDevices():
            # 通过蓝牙name连接 或者 通过蓝牙地址连接dev.address()
            if self.dev_name[0] == dev.name() or self.dev_name[1] == dev.name():
                print(f'连接设备: {dev.name()}')
                print(f'coreConfigurations: {int(dev.coreConfigurations())}')
                self.controller = QtBt.QLowEnergyController.createCentral(dev)
                self.controller.connected.connect(self.connect_Notify)
                self.controller.error.connect(self.controller_error)
                self.controller.disconnected.connect(self.disconnect_Notify)
                self.controller.serviceDiscovered.connect(self.addService)
                self.controller.discoveryFinished.connect(self.dis_Finished)
                self.controller.connectToDevice()
                break
                
    def connect_Notify(self, *args, **kwargs):
        print(f'连接通知')
        print(f'args: {args}')
        print(f'kwargs: {kwargs}')
        self.serviceUUID = list()
        self.controller.discoverServices()

    def controller_error(self, e):
        error = ["NoError", "UnknownError", "UnknownRemoteDeviceError", "NetworkError", "InvalidBluetoothAdapterError",
                 "ConnectionError"]
        if e < 6:
            print(error[e])
        else:
            print("UnknownError")
            
    def disconnect_Notify(*args, **kwargs):
        print(f'断开连接通知')
        print(f'args: {args}')
        print(f'kwargs: {kwargs}')

    def addService(self, uuid: QtBt.QBluetoothUuid):
        print('发现服务 Service discovered')
        print(f'uuid: {uuid.toString()}')
        self.serviceUUID.append(uuid)

    def dis_Finished(self):
        print(f'服务搜索完成')
        for uuid in self.serviceUUID:
			print(f'uuid: {uuid.toString()}')

上面代码是蓝牙扫描结束后,从扫描的设备中查找自己想要连接的蓝牙名称(或地址),通过低功耗蓝牙控制器QLowEnergyController连接设备,然后绑定连接完成回调函数connected、错误信息回调函数error、断开连接回调函数disconnected、搜索服务回调函数serviceDiscovered、服务搜索完成回调函数discoveryFinished,最后通过调用connectToDevice方法连接到蓝牙设备。其中在连接成功回调函数connect_Notify中有调用发现服务discoverServices方法,这个函数是查找蓝牙支持哪些服务,必须要调用,另外如果查找不到蓝牙服务看看蓝牙是否配对,配对有方法可调用,有需求的可查看官方文档,如果蓝牙连接错误可以将蓝牙设备删除,重新配对再测试。

4. 连接服务
5. 获取服务特征

    def dis_Finished(self):
        print(f'服务搜索完成')
        for uuid in self.serviceUUID:
            if uuid.toString() == self.UUID_S:
                self.serviceUUID.clear()
                self.serviceUUID.append(uuid)
                self.ServiceObject = self.controller.createServiceObject(uuid)
                break
        if self.ServiceObject is None:
            print(f'服务连接失败')
        else:
            print(f'服务连接成功')
            self.ServiceObject.stateChanged.connect(self.state_Changed)
            self.ServiceObject.error.connect(self.service_Error)
            self.ServiceObject.discoverDetails()

    def state_Changed(self, s):
        print(f'服务状态变化通知:{s} state:{self.ServiceObject.state()}')

        if s == QtBt.QLowEnergyService.DiscoveringServices:
            print(f"正在搜索服务特征... Discovering services...")
            return
        elif s == QtBt.QLowEnergyService.ServiceDiscovered:
            print(f"搜索服务特征完成. Service discovered.")
            for ch in self.ServiceObject.characteristics():
            	print(f'特征uuid:{ch.uuid().toString()}')
            
    def service_Error(self, error):
        ServiceError = ["NoError", "OperationError", "CharacteristicWriteError", "DescriptorWriteError", "UnknownError",
                        "CharacteristicReadError", "DescriptorReadError"]
        if error < 6:
            print(f'error:{error},{ServiceError[error]}, uuid:{self.ServiceObject.serviceUuid().toString()}')
        else:
        	print(f'error:{error}')

上面代码是在搜索完服务后,从中查找自己需要的服务,通过createServiceObject方法创建低功耗蓝牙服务,之后绑定服务状态变化通知回调stateChanged,DiscoveringServices表示正在搜索服务特征,ServiceDiscovered表示搜索服务特征完成,其余状态我们不关注,然后调用discoverDetails方法开始搜索特征,所有的特征都在characteristics方法返回的列表中。

6. 创建服务特征句柄
7. 监听或写入特征

    def create_write_characteristic(self, uuid: QtBt.QBluetoothUuid):
        data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]
        if self.ServiceObject:
            self.characteristicWrite = self.ServiceObject.characteristic(uuid)
            # 判断特征是否可用
            print(f'isValid:{self.characteristicWrite.isValid()}')
            if self.characteristicWrite.isValid():
                self.ServiceObject.writeCharacteristic(self.characteristicWrite, bytes(data), QtBt.QLowEnergyService.WriteWithResponse)
                print("创建写特征成功,可进行写数据.")
            else:
                print("err:创建写特征失败,写特征不可用.")

上面代码是往蓝牙中写入数据,函数的入参是QBluetoothUuid类型,通过步骤5获得的服务特征的UUID创建写入句柄,使用服务writeCharacteristic方法写数据,方法传入写句柄和数据,数据必须是bytes类型,WriteWithResponse参数是写入通知,还可以传入WriteWithoutResponse无写入通知。

    def create_read_notify(self, uuid: QtBt.QBluetoothUuid):
        if self.ServiceObject:
            self.characteristicRead_ = self.ServiceObject.characteristic(uuid)
            # 判断特征是否可用
            print(f'isValid:{self.characteristicRead_.isValid()}')
            if not self.characteristicRead_.isValid():
                print("err:创建读特征失败,读特征不可用.")
                return

            print("创建读特征成功,正在设置监听...")

            # 获取读特征描述符 descriptors()为list
            self.notification = self.characteristicRead_.descriptors()[0]
            # 判断读特征描述符是否可用
            print(f'read_notify.isValid:{self.notification.isValid()}')
            if not self.notification.isValid():
                print("err:读特征描述符不可用,监听失败.")
                return

            # 绑定监听函数
            self.ServiceObject.characteristicChanged.connect(self.characteristic_Changed)
            # 写0x01,0x00启用监听服务
            self.ServiceObject.writeDescriptor(self.notification, bytes.fromhex('0100'))
            print("设置监听服务成功,正在监听数据...")

    def characteristic_Changed(self, info: QtBt.QLowEnergyCharacteristic, value):
        print(f'特征读取变化通知')
		for i in list(data):
            str_ = str_ + ' ' + '{:02X}'.format(i)
        print(f'{str_}')

上面代码是监听服务(读数据),函数的入参同样是QBluetoothUuid类型,监听服务和写数据有区别,需要开启监听通知,创建读句柄之后,还需要设置读特征描述符,从descriptors方法中获取读特征描述符,该方法返回一个list,我手上的低功耗蓝牙list中只有一个数据元素,不知道其他BLE是否也是这样,list中的数据就是我们需要的读特征描述符,通过服务writeDescriptor方法设置0x01,0x00启用监听服务,监听回调返回的数据类型是bytearray。

8. 完整代码
附上完整代码:

#!/usr/bin/env python
# --*--coding=utf-8--*--
# pip install PyQt5
import sys
import time
from PyQt5 import QtCore
from PyQt5.QtCore import QByteArray
from PyQt5 import QtBluetooth as QtBt


class Application(QtCore.QCoreApplication):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.agent = None
        self.controller = None
        self.timer = None
        self.service = None
        self.serviceUUID = list()
        self.ServiceObject = None
        self.Service = None
        self.characteristicWrite = None
        self.characteristicRead_ = None
        self.notification = None
        self.descriptorReadUUID = list()
        self.descriptorWriteUUID = list()
        self.dev_name = ['C0CEF5E80E38', 'BLE HID KBD MICE']
        self.dev_addr = 'C0:CE:F5:E8:0E:38'
        # 要连接的服务UUID
        self.UUID_S = "{6e400001-b5a3-f393-e0a9-e50e24dc4179}"
        # 写特征UUID
        self.UUID_W = "{6e400002-b5a3-f393-e0a9-e50e24dc4179}"
        # 读特征UUID
        self.UUID_R = "{6e400003-b5a3-f393-e0a9-e50e24dc4179}"
        # 启动扫描程序
        self.scan_for_devices()
        self.exec()

    def display_status(self):
        print(self.agent.isActive(), self.agent.discoveredDevices())
        self.timer.stop()

    def show_info(self, info: QtBt.QBluetoothDeviceInfo):
    	# 过滤低功耗设备
        if int(info.coreConfigurations()) == QtBt.QBluetoothDeviceInfo.LowEnergyCoreConfiguration:
            print('Device discovered')
            print(f'Name: {info.name()}')
            print(f'Addr: {info.address().toString()}')
            print(f'ServUUID: {info.serviceUuids()}')

    def agent_finished(self):
        print('Agent finished')
        for dev in self.agent.discoveredDevices():
            # 通过蓝牙name连接 或者 通过蓝牙地址连接dev.address()
            if self.dev_name[0] == dev.name() or self.dev_name[1] == dev.name():
                print(f'连接设备: {dev.name()}')
                print(f'coreConfigurations: {int(dev.coreConfigurations())}')
                self.controller = QtBt.QLowEnergyController.createCentral(dev)
                self.controller.connected.connect(self.connect_Notify)
                self.controller.error.connect(self.controller_error)
                self.controller.disconnected.connect(self.disconnect_Notify)
                self.controller.serviceDiscovered.connect(self.addService)
                self.controller.discoveryFinished.connect(self.dis_Finished)
                self.controller.connectToDevice()
                break

    def controller_error(self, e):
        error = ["NoError", "UnknownError", "UnknownRemoteDeviceError", "NetworkError", "InvalidBluetoothAdapterError",
                 "ConnectionError"]
        if e < 6:
            print(error[e])
        else:
            print("UnknownError")

    def connect_Notify(self, *args, **kwargs):
        print(f'连接通知')
        print(f'args: {args}')
        print(f'kwargs: {kwargs}')
        self.serviceUUID = list()
        self.controller.discoverServices()

    def disconnect_Notify(*args, **kwargs):
        print(f'断开连接通知')
        print(f'args: {args}')
        print(f'kwargs: {kwargs}')

    def addService(self, uuid: QtBt.QBluetoothUuid):
        print('发现服务 Service discovered')
        print(f'uuid: {uuid.toString()}')
        self.serviceUUID.append(uuid)

    def dis_Finished(self):
        print(f'服务搜索完成')
        for uuid in self.serviceUUID:
            if uuid.toString() == self.UUID_S:
                self.serviceUUID.clear()
                self.serviceUUID.append(uuid)
                self.ServiceObject = self.controller.createServiceObject(uuid)
                break
        if self.ServiceObject is None:
            print(f'服务连接失败')
        else:
            print(f'服务连接成功')
            self.ServiceObject.stateChanged.connect(self.state_Changed)
            self.ServiceObject.characteristicWritten.connect(self.characteristic_Written)
            self.ServiceObject.error.connect(self.service_Error)
            self.ServiceObject.discoverDetails()

    def characteristic_Written(self, info: QtBt.QLowEnergyCharacteristic, value):
        print(f'特征写入变化通知')
        ch = info.uuid().toString() + " - Characteristic written:" + str(value)
        print(f'{ch}')

    def characteristic_Changed(self, info: QtBt.QLowEnergyCharacteristic, value):
        print(f'特征读取变化通知')
        ch = info.uuid().toString() + " - Characteristic read:" + str(value)
        print(f'{ch}')

    def service_Error(self, error):
        ServiceError = ["NoError", "OperationError", "CharacteristicWriteError", "DescriptorWriteError", "UnknownError",
                        "CharacteristicReadError", "DescriptorReadError"]
        if error < 6:
            print(f'error:{error},{ServiceError[error]}, uuid:{self.ServiceObject.serviceUuid().toString()}')

    def state_Changed(self, s):
        print(f'服务状态变化通知:{s} state:{self.ServiceObject.state()}')

        if s == QtBt.QLowEnergyService.DiscoveringServices:
            print(f"正在搜索服务特征... Discovering services...")
            return
        elif s == QtBt.QLowEnergyService.ServiceDiscovered:
            print(f"搜索服务特征完成. Service discovered.")
            self.descriptorReadUUID = list()
            self.descriptorWriteUUID = list()
            for ch in self.ServiceObject.characteristics():
                print(f'特征:{ch.uuid().toString()}')
                if ch.uuid().toString() == self.UUID_W:
                    # 保存要写的特征UUID
                    self.descriptorWriteUUID.append(ch.uuid())
                    # 创建写特征
                    self.create_write_characteristic(ch.uuid())
                if ch.uuid().toString() == self.UUID_R:
                    # 保存要读的特征UUID
                    self.descriptorReadUUID.append(ch.uuid())
                    # 监听读特征
                    self.create_read_notify(ch.uuid())

    def create_write_characteristic(self, uuid: QtBt.QBluetoothUuid):
        data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]
        if self.ServiceObject:
            self.characteristicWrite = self.ServiceObject.characteristic(uuid)
            # 判断特征是否可用
            print(f'isValid:{self.characteristicWrite.isValid()}')
            if self.characteristicWrite.isValid():
                self.ServiceObject.writeCharacteristic(self.characteristicWrite, bytes(data), QtBt.QLowEnergyService.WriteWithResponse)
                print("创建写特征成功,可进行写数据.")
            else:
                print("err:创建写特征失败,写特征不可用.")

    def create_read_notify(self, uuid: QtBt.QBluetoothUuid):
        if self.ServiceObject:
            self.characteristicRead_ = self.ServiceObject.characteristic(uuid)
            # 判断特征是否可用
            print(f'isValid:{self.characteristicRead_.isValid()}')
            if not self.characteristicRead_.isValid():
                print("err:创建读特征失败,读特征不可用.")
                return

            print("创建读特征成功,正在设置监听...")

            # 获取读特征描述符 descriptors()为list
            self.notification = self.characteristicRead_.descriptors()[0]
            # 判断读特征描述符是否可用
            print(f'read_notify.isValid:{self.notification.isValid()}')
            if not self.notification.isValid():
                print("err:读特征描述符不可用,监听失败.")
                return

            # 绑定监听函数
            self.ServiceObject.characteristicChanged.connect(self.characteristic_Changed)
            # 写0x01,0x00启用监听服务
            self.ServiceObject.writeDescriptor(self.notification, bytes.fromhex('0100'))
            print("设置监听服务成功,正在监听数据...")

    def agent_error(self, e):
        error = ["NoError", "InputOutputError", "PoweredOffError", "InvalidBluetoothAdapterError", "UnknownError"]
        if e < 4:
            print(error[e])
        else:
            print(error[4])

    def scan_for_devices(self):
        self.agent = QtBt.QBluetoothDeviceDiscoveryAgent()
        self.agent.setLowEnergyDiscoveryTimeout(5000)

        self.agent.deviceDiscovered.connect(self.show_info)
        self.agent.error.connect(self.agent_error)
        self.agent.finished.connect(self.agent_finished)

        self.timer = QtCore.QTimer(self.agent)
        self.timer.start(5)
        self.timer.timeout.connect(self.display_status)

        self.agent.start()


if __name__ == '__main__':
    app = Application(sys.argv)

经过以上步骤与BLE通信就算完成了,如有错误之处欢迎指正,转载请注明出处,谢谢!

物联沃分享整理
物联沃-IOTWORD物联网 » PyQt5之QtBluetooth模块:低功耗蓝牙BLE通信

发表评论