Python实现MQTT消息发送与优化指南

开启mosquitto后发送消息的代码参考如下:

import json
import paho.mqtt.client as mqtt
'''
mosquitto
mosquitto_sub -h localhost -t your/topic

'''
# MQTT 服务器配置
MQTT_BROKER = "localhost"   # 服务器地址,如果 Mosquitto 运行在本地
MQTT_PORT = 1883            # 默认 MQTT 端口
MQTT_TOPIC = "your/topic"   # 你要发布的主题


# 回调函数:连接成功时调用
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("连接到 MQTT 代理服务器成功")
    else:
        print(f"连接失败,错误码 {rc}")

# 回调函数:发布消息后的回调
def on_publish(client, userdata, mid):
    print(f"消息已发布,消息 ID: {mid}")

def main(payload):
    # 创建 MQTT 客户端实例
    client = mqtt.Client()

    # 设置回调函数
    client.on_connect = on_connect
    client.on_publish = on_publish

    try:
        # 连接到 MQTT 代理服务器
        client.connect(MQTT_BROKER, MQTT_PORT, 60)

        # 启动网络循环(非阻塞)
        client.loop_start()

        # 发布消息
        result = client.publish(MQTT_TOPIC, payload)

        # 等待发布完成
        status = result[0]
        if status == 0:
            print(f"成功发布消息到主题 `{MQTT_TOPIC}`")
        else:
            print(f"发送消息失败,状态码 {status}")

    except Exception as e:
        print(f"发生错误: {e}")
    finally:
        # 停止网络循环并断开连接
        client.loop_stop()
        client.disconnect()

if __name__ == "__main__":
    # 要发送的字典数据
    data = {
        "temperature": 25.5,
        "humidity": 60,
        "status": "OK"
    }

    # 将字典转换为 JSON 字符串
    payload = json.dumps(data)
    main(payload)

上述代码只能每次发送一条信息,然后初始化客户端以及回调,这在需要一直发消息的程式中不能完美适应。

        要优化现有的 MQTT 发布代码,使其在一个循环中持续发送信息而无需每次都重新初始化客户端和回调,可以通过以下几个步骤实现:

1.客户端初始化与连接:将 MQTT 客户端的初始化、连接和回调设置放在主程序初始化阶段,只执行一次。
2.持续发布消息:在一个独立的循环中持续发布消息,而不是在每次发送时都创建新的客户端实例。
3.优雅关闭:在程序结束时,优雅地停止网络循环并断开客户端连接。

为了实现这些目标,建议使用一个类来封装 MQTT 客户端的行为。下面是重构后的示例代码:

import json
import time
import threading
import paho.mqtt.client as mqtt

'''
mosquitto
mosquitto_sub -h localhost -t your/topic
'''

# MQTT 服务器配置
MQTT_BROKER = "localhost"   # 服务器地址,如果 Mosquitto 运行在本地
MQTT_PORT = 1883            # 默认 MQTT 端口
MQTT_TOPIC = "your/topic"   # 你要发布的主题

class MQTTPublisher:
    def __init__(self, broker, port, topic):
        self.broker = broker
        self.port = port
        self.topic = topic
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_publish = self.on_publish
        self.connected = False
        self._stop_event = threading.Event()

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            self.connected = True
            print("连接到 MQTT 代理服务器成功")
        else:
            print(f"连接失败,错误码 {rc}")

    def on_publish(self, client, userdata, mid):
        print(f"消息已发布,消息 ID: {mid}")

    def connect(self):
        try:
            self.client.connect(self.broker, self.port, 60)
            self.client.loop_start()
            # 等待连接建立
            while not self.connected and not self._stop_event.is_set():
                time.sleep(0.1)
        except Exception as e:
            print(f"连接错误: {e}")

    def publish(self, payload):
        if self.connected:
            result = self.client.publish(self.topic, payload)
            status = result[0]
            if status == 0:
                print(f"成功发布消息到主题 `{self.topic}`")
            else:
                print(f"发送消息失败,状态码 {status}")
        else:
            print("未连接到 MQTT 代理服务器,无法发布消息")

    def disconnect(self):
        self._stop_event.set()
        self.client.loop_stop()
        self.client.disconnect()
        print("已断开与 MQTT 代理服务器的连接")

def main():
    # 要发送的初始字典数据
    data = {
        "temperature": 25.5,
        "humidity": 60,
        "status": "OK"
    }

    # 将字典转换为 JSON 字符串
    payload = json.dumps(data)

    # 初始化 MQTTPublisher 实例
    publisher = MQTTPublisher(MQTT_BROKER, MQTT_PORT, MQTT_TOPIC)
    publisher.connect()

    try:
        while True:
            # 更新或获取新的数据
            # 这里可以根据实际需求动态生成或获取数据
            data["temperature"] += 0.1  # 示例:温度逐渐上升
            current_payload = json.dumps(data)
            publisher.publish(current_payload)
            time.sleep(5)  # 每5秒发布一次
    except KeyboardInterrupt:
        print("收到退出信号,准备断开连接...")
    finally:
        publisher.disconnect()

if __name__ == "__main__":
    main()

代码详解

1.创建 MQTTPublisher 类
– 初始化 (__init__):设置 MQTT 代理地址、端口和主题。初始化 MQTT 客户端并设置回调函数。
– 连接方法 (connect):连接到 MQTT 代理服务器并启动网络循环。使用一个循环等待连接成功。
– 发布方法 (publish):检查是否已连接,然后发布消息。如果未连接则输出错误信息。
– 断开方法 (disconnect):停止网络循环并断开连接。

2.主函数 main
– 初始化要发送的数据,并将其转换为 JSON 字符串。
– 创建 MQTTPublisher 实例并连接到 MQTT 代理。
– 在 try 块中,进入一个无限循环(可以根据需要调整),持续发布消息。示例中每隔5秒发布一次更新后的温度数据。
– 捕获 KeyboardInterrupt 异常(如用户按下 Ctrl+C),然后在 finally 块中优雅地断开客户端连接。

运行与测试

确保你已经安装了 paho-mqtt 库。如果未安装,可以使用以下命令安装:

pip install paho-mqtt

确保本地有 Mosquitto 服务器在运行,并使用以下命令订阅主题以查看发布的消息:

mosquitto_sub -h localhost -t your/topic

然后运行上述 Python 脚本,你应该会看到持续发布到指定主题的消息。

额外优化建议

1.线程安全:如果你计划在多线程环境中使用 MQTTPublisher,请确保方法是线程安全的。
2.错误处理:根据需要增强错误处理,如连接重试机制等。
3.配置管理:将配置(如 broker 地址、端口和主题)移动到配置文件或使用命令行参数,使代码更灵活。
4.日志记录:使用 logging 模块代替 print 语句,以便更好地管理和记录日志信息。

        通过以上优化,代码将更加高效、可维护,并且能够满足持续发布消息的需求,而无需每次发送时都重新初始化 MQTT 客户端。

作者:图灵追慕者

物联沃分享整理
物联沃-IOTWORD物联网 » Python实现MQTT消息发送与优化指南

发表回复