PyQt5之QtBluetooth模块:低功耗蓝牙BLE通信
PyQt5之QtBluetooth模块:低功耗蓝牙BLE通信
最近使用PyQt5开发PC端工具,正巧手上有一个富芮坤的低功耗蓝牙,于是想在PC端试试与之通信,不过发现使用PyQt5开发低功耗蓝牙的教程很少,本人参考Qt教程和官方文档,开发过程以此文记录。
参考文档:PyQt5模块安装路径下QtBluetooth.py、Qt C++蓝牙类介绍、Qt 低功耗蓝牙扫描示例
低功耗蓝牙通信大体分为以下几个过程:
1. 扫描蓝牙
def scan_for_devices(self):
self.agent = QtBt.QBluetoothDeviceDiscoveryAgent()
self.agent.setLowEnergyDiscoveryTimeout(5000)
self.agent.deviceDiscovered.connect(self.show_info)
self.agent.error.connect(self.agent_error)
self.agent.finished.connect(self.agent_finished)
self.timer = QtCore.QTimer(self.agent)
self.timer.start(5)
self.timer.timeout.connect(self.display_status)
self.agent.start()
def show_info(self, info: QtBt.QBluetoothDeviceInfo):
if int(info.coreConfigurations()) == 1:
print('Device discovered')
print(f'Name: {info.name()}')
print(f'Addr: {info.address().toString()}')
print(f'ServUUID: {info.serviceUuids()}')
def agent_error(self, e):
error = ["NoError", "InputOutputError", "PoweredOffError", "InvalidBluetoothAdapterError", "UnknownError"]
if e < 4:
print(error[e])
else:
print(error[4])
def agent_finished(self):
print('Agent finished')
for dev in self.agent.discoveredDevices():
print(f'设备名称: {dev.name()} 设备地址: {dev.address()}')
def display_status(self):
print(self.agent.isActive(), self.agent.discoveredDevices())
self.timer.stop()
上面代码可以看到首先要创建蓝牙设备扫描代理QBluetoothServiceDiscoveryAgent,并设置超时时间,然后依次是绑定设备搜索回调函数serviceDiscovered,这个函数返回搜索到蓝牙设备信息;绑定错误信息回调函数error;绑定搜索完成回调函数finished;最终搜索完成后(通过定时器结束搜索),在discoveredDevices中保存了本次搜索的所有蓝牙设备信息。
2. 连接蓝牙
3. 获取服务
def agent_finished(self):
print('Agent finished')
for dev in self.agent.discoveredDevices():
# 通过蓝牙name连接 或者 通过蓝牙地址连接dev.address()
if self.dev_name[0] == dev.name() or self.dev_name[1] == dev.name():
print(f'连接设备: {dev.name()}')
print(f'coreConfigurations: {int(dev.coreConfigurations())}')
self.controller = QtBt.QLowEnergyController.createCentral(dev)
self.controller.connected.connect(self.connect_Notify)
self.controller.error.connect(self.controller_error)
self.controller.disconnected.connect(self.disconnect_Notify)
self.controller.serviceDiscovered.connect(self.addService)
self.controller.discoveryFinished.connect(self.dis_Finished)
self.controller.connectToDevice()
break
def connect_Notify(self, *args, **kwargs):
print(f'连接通知')
print(f'args: {args}')
print(f'kwargs: {kwargs}')
self.serviceUUID = list()
self.controller.discoverServices()
def controller_error(self, e):
error = ["NoError", "UnknownError", "UnknownRemoteDeviceError", "NetworkError", "InvalidBluetoothAdapterError",
"ConnectionError"]
if e < 6:
print(error[e])
else:
print("UnknownError")
def disconnect_Notify(*args, **kwargs):
print(f'断开连接通知')
print(f'args: {args}')
print(f'kwargs: {kwargs}')
def addService(self, uuid: QtBt.QBluetoothUuid):
print('发现服务 Service discovered')
print(f'uuid: {uuid.toString()}')
self.serviceUUID.append(uuid)
def dis_Finished(self):
print(f'服务搜索完成')
for uuid in self.serviceUUID:
print(f'uuid: {uuid.toString()}')
上面代码是蓝牙扫描结束后,从扫描的设备中查找自己想要连接的蓝牙名称(或地址),通过低功耗蓝牙控制器QLowEnergyController连接设备,然后绑定连接完成回调函数connected、错误信息回调函数error、断开连接回调函数disconnected、搜索服务回调函数serviceDiscovered、服务搜索完成回调函数discoveryFinished,最后通过调用connectToDevice方法连接到蓝牙设备。其中在连接成功回调函数connect_Notify中有调用发现服务discoverServices方法,这个函数是查找蓝牙支持哪些服务,必须要调用,另外如果查找不到蓝牙服务看看蓝牙是否配对,配对有方法可调用,有需求的可查看官方文档,如果蓝牙连接错误可以将蓝牙设备删除,重新配对再测试。
4. 连接服务
5. 获取服务特征
def dis_Finished(self):
print(f'服务搜索完成')
for uuid in self.serviceUUID:
if uuid.toString() == self.UUID_S:
self.serviceUUID.clear()
self.serviceUUID.append(uuid)
self.ServiceObject = self.controller.createServiceObject(uuid)
break
if self.ServiceObject is None:
print(f'服务连接失败')
else:
print(f'服务连接成功')
self.ServiceObject.stateChanged.connect(self.state_Changed)
self.ServiceObject.error.connect(self.service_Error)
self.ServiceObject.discoverDetails()
def state_Changed(self, s):
print(f'服务状态变化通知:{s} state:{self.ServiceObject.state()}')
if s == QtBt.QLowEnergyService.DiscoveringServices:
print(f"正在搜索服务特征... Discovering services...")
return
elif s == QtBt.QLowEnergyService.ServiceDiscovered:
print(f"搜索服务特征完成. Service discovered.")
for ch in self.ServiceObject.characteristics():
print(f'特征uuid:{ch.uuid().toString()}')
def service_Error(self, error):
ServiceError = ["NoError", "OperationError", "CharacteristicWriteError", "DescriptorWriteError", "UnknownError",
"CharacteristicReadError", "DescriptorReadError"]
if error < 6:
print(f'error:{error},{ServiceError[error]}, uuid:{self.ServiceObject.serviceUuid().toString()}')
else:
print(f'error:{error}')
上面代码是在搜索完服务后,从中查找自己需要的服务,通过createServiceObject方法创建低功耗蓝牙服务,之后绑定服务状态变化通知回调stateChanged,DiscoveringServices表示正在搜索服务特征,ServiceDiscovered表示搜索服务特征完成,其余状态我们不关注,然后调用discoverDetails方法开始搜索特征,所有的特征都在characteristics方法返回的列表中。
6. 创建服务特征句柄
7. 监听或写入特征
def create_write_characteristic(self, uuid: QtBt.QBluetoothUuid):
data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]
if self.ServiceObject:
self.characteristicWrite = self.ServiceObject.characteristic(uuid)
# 判断特征是否可用
print(f'isValid:{self.characteristicWrite.isValid()}')
if self.characteristicWrite.isValid():
self.ServiceObject.writeCharacteristic(self.characteristicWrite, bytes(data), QtBt.QLowEnergyService.WriteWithResponse)
print("创建写特征成功,可进行写数据.")
else:
print("err:创建写特征失败,写特征不可用.")
上面代码是往蓝牙中写入数据,函数的入参是QBluetoothUuid类型,通过步骤5获得的服务特征的UUID创建写入句柄,使用服务writeCharacteristic方法写数据,方法传入写句柄和数据,数据必须是bytes类型,WriteWithResponse参数是写入通知,还可以传入WriteWithoutResponse无写入通知。
def create_read_notify(self, uuid: QtBt.QBluetoothUuid):
if self.ServiceObject:
self.characteristicRead_ = self.ServiceObject.characteristic(uuid)
# 判断特征是否可用
print(f'isValid:{self.characteristicRead_.isValid()}')
if not self.characteristicRead_.isValid():
print("err:创建读特征失败,读特征不可用.")
return
print("创建读特征成功,正在设置监听...")
# 获取读特征描述符 descriptors()为list
self.notification = self.characteristicRead_.descriptors()[0]
# 判断读特征描述符是否可用
print(f'read_notify.isValid:{self.notification.isValid()}')
if not self.notification.isValid():
print("err:读特征描述符不可用,监听失败.")
return
# 绑定监听函数
self.ServiceObject.characteristicChanged.connect(self.characteristic_Changed)
# 写0x01,0x00启用监听服务
self.ServiceObject.writeDescriptor(self.notification, bytes.fromhex('0100'))
print("设置监听服务成功,正在监听数据...")
def characteristic_Changed(self, info: QtBt.QLowEnergyCharacteristic, value):
print(f'特征读取变化通知')
for i in list(data):
str_ = str_ + ' ' + '{:02X}'.format(i)
print(f'{str_}')
上面代码是监听服务(读数据),函数的入参同样是QBluetoothUuid类型,监听服务和写数据有区别,需要开启监听通知,创建读句柄之后,还需要设置读特征描述符,从descriptors方法中获取读特征描述符,该方法返回一个list,我手上的低功耗蓝牙list中只有一个数据元素,不知道其他BLE是否也是这样,list中的数据就是我们需要的读特征描述符,通过服务writeDescriptor方法设置0x01,0x00启用监听服务,监听回调返回的数据类型是bytearray。
8. 完整代码
附上完整代码:
#!/usr/bin/env python
# --*--coding=utf-8--*--
# pip install PyQt5
import sys
import time
from PyQt5 import QtCore
from PyQt5.QtCore import QByteArray
from PyQt5 import QtBluetooth as QtBt
class Application(QtCore.QCoreApplication):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.agent = None
self.controller = None
self.timer = None
self.service = None
self.serviceUUID = list()
self.ServiceObject = None
self.Service = None
self.characteristicWrite = None
self.characteristicRead_ = None
self.notification = None
self.descriptorReadUUID = list()
self.descriptorWriteUUID = list()
self.dev_name = ['C0CEF5E80E38', 'BLE HID KBD MICE']
self.dev_addr = 'C0:CE:F5:E8:0E:38'
# 要连接的服务UUID
self.UUID_S = "{6e400001-b5a3-f393-e0a9-e50e24dc4179}"
# 写特征UUID
self.UUID_W = "{6e400002-b5a3-f393-e0a9-e50e24dc4179}"
# 读特征UUID
self.UUID_R = "{6e400003-b5a3-f393-e0a9-e50e24dc4179}"
# 启动扫描程序
self.scan_for_devices()
self.exec()
def display_status(self):
print(self.agent.isActive(), self.agent.discoveredDevices())
self.timer.stop()
def show_info(self, info: QtBt.QBluetoothDeviceInfo):
# 过滤低功耗设备
if int(info.coreConfigurations()) == QtBt.QBluetoothDeviceInfo.LowEnergyCoreConfiguration:
print('Device discovered')
print(f'Name: {info.name()}')
print(f'Addr: {info.address().toString()}')
print(f'ServUUID: {info.serviceUuids()}')
def agent_finished(self):
print('Agent finished')
for dev in self.agent.discoveredDevices():
# 通过蓝牙name连接 或者 通过蓝牙地址连接dev.address()
if self.dev_name[0] == dev.name() or self.dev_name[1] == dev.name():
print(f'连接设备: {dev.name()}')
print(f'coreConfigurations: {int(dev.coreConfigurations())}')
self.controller = QtBt.QLowEnergyController.createCentral(dev)
self.controller.connected.connect(self.connect_Notify)
self.controller.error.connect(self.controller_error)
self.controller.disconnected.connect(self.disconnect_Notify)
self.controller.serviceDiscovered.connect(self.addService)
self.controller.discoveryFinished.connect(self.dis_Finished)
self.controller.connectToDevice()
break
def controller_error(self, e):
error = ["NoError", "UnknownError", "UnknownRemoteDeviceError", "NetworkError", "InvalidBluetoothAdapterError",
"ConnectionError"]
if e < 6:
print(error[e])
else:
print("UnknownError")
def connect_Notify(self, *args, **kwargs):
print(f'连接通知')
print(f'args: {args}')
print(f'kwargs: {kwargs}')
self.serviceUUID = list()
self.controller.discoverServices()
def disconnect_Notify(*args, **kwargs):
print(f'断开连接通知')
print(f'args: {args}')
print(f'kwargs: {kwargs}')
def addService(self, uuid: QtBt.QBluetoothUuid):
print('发现服务 Service discovered')
print(f'uuid: {uuid.toString()}')
self.serviceUUID.append(uuid)
def dis_Finished(self):
print(f'服务搜索完成')
for uuid in self.serviceUUID:
if uuid.toString() == self.UUID_S:
self.serviceUUID.clear()
self.serviceUUID.append(uuid)
self.ServiceObject = self.controller.createServiceObject(uuid)
break
if self.ServiceObject is None:
print(f'服务连接失败')
else:
print(f'服务连接成功')
self.ServiceObject.stateChanged.connect(self.state_Changed)
self.ServiceObject.characteristicWritten.connect(self.characteristic_Written)
self.ServiceObject.error.connect(self.service_Error)
self.ServiceObject.discoverDetails()
def characteristic_Written(self, info: QtBt.QLowEnergyCharacteristic, value):
print(f'特征写入变化通知')
ch = info.uuid().toString() + " - Characteristic written:" + str(value)
print(f'{ch}')
def characteristic_Changed(self, info: QtBt.QLowEnergyCharacteristic, value):
print(f'特征读取变化通知')
ch = info.uuid().toString() + " - Characteristic read:" + str(value)
print(f'{ch}')
def service_Error(self, error):
ServiceError = ["NoError", "OperationError", "CharacteristicWriteError", "DescriptorWriteError", "UnknownError",
"CharacteristicReadError", "DescriptorReadError"]
if error < 6:
print(f'error:{error},{ServiceError[error]}, uuid:{self.ServiceObject.serviceUuid().toString()}')
def state_Changed(self, s):
print(f'服务状态变化通知:{s} state:{self.ServiceObject.state()}')
if s == QtBt.QLowEnergyService.DiscoveringServices:
print(f"正在搜索服务特征... Discovering services...")
return
elif s == QtBt.QLowEnergyService.ServiceDiscovered:
print(f"搜索服务特征完成. Service discovered.")
self.descriptorReadUUID = list()
self.descriptorWriteUUID = list()
for ch in self.ServiceObject.characteristics():
print(f'特征:{ch.uuid().toString()}')
if ch.uuid().toString() == self.UUID_W:
# 保存要写的特征UUID
self.descriptorWriteUUID.append(ch.uuid())
# 创建写特征
self.create_write_characteristic(ch.uuid())
if ch.uuid().toString() == self.UUID_R:
# 保存要读的特征UUID
self.descriptorReadUUID.append(ch.uuid())
# 监听读特征
self.create_read_notify(ch.uuid())
def create_write_characteristic(self, uuid: QtBt.QBluetoothUuid):
data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]
if self.ServiceObject:
self.characteristicWrite = self.ServiceObject.characteristic(uuid)
# 判断特征是否可用
print(f'isValid:{self.characteristicWrite.isValid()}')
if self.characteristicWrite.isValid():
self.ServiceObject.writeCharacteristic(self.characteristicWrite, bytes(data), QtBt.QLowEnergyService.WriteWithResponse)
print("创建写特征成功,可进行写数据.")
else:
print("err:创建写特征失败,写特征不可用.")
def create_read_notify(self, uuid: QtBt.QBluetoothUuid):
if self.ServiceObject:
self.characteristicRead_ = self.ServiceObject.characteristic(uuid)
# 判断特征是否可用
print(f'isValid:{self.characteristicRead_.isValid()}')
if not self.characteristicRead_.isValid():
print("err:创建读特征失败,读特征不可用.")
return
print("创建读特征成功,正在设置监听...")
# 获取读特征描述符 descriptors()为list
self.notification = self.characteristicRead_.descriptors()[0]
# 判断读特征描述符是否可用
print(f'read_notify.isValid:{self.notification.isValid()}')
if not self.notification.isValid():
print("err:读特征描述符不可用,监听失败.")
return
# 绑定监听函数
self.ServiceObject.characteristicChanged.connect(self.characteristic_Changed)
# 写0x01,0x00启用监听服务
self.ServiceObject.writeDescriptor(self.notification, bytes.fromhex('0100'))
print("设置监听服务成功,正在监听数据...")
def agent_error(self, e):
error = ["NoError", "InputOutputError", "PoweredOffError", "InvalidBluetoothAdapterError", "UnknownError"]
if e < 4:
print(error[e])
else:
print(error[4])
def scan_for_devices(self):
self.agent = QtBt.QBluetoothDeviceDiscoveryAgent()
self.agent.setLowEnergyDiscoveryTimeout(5000)
self.agent.deviceDiscovered.connect(self.show_info)
self.agent.error.connect(self.agent_error)
self.agent.finished.connect(self.agent_finished)
self.timer = QtCore.QTimer(self.agent)
self.timer.start(5)
self.timer.timeout.connect(self.display_status)
self.agent.start()
if __name__ == '__main__':
app = Application(sys.argv)
经过以上步骤与BLE通信就算完成了,如有错误之处欢迎指正,转载请注明出处,谢谢!