使用paho-mqtt实现多客户端订阅同一主题并确保消息仅接收一次

项目需求:原本做的项目是单进程单线程模式订阅mqtt,发现在消息回调处理消息时耗时较久,我们业务对消息处理是一次性的,只要求处理一次,所以需要提升并发处理能力。看了网上建议改为多线程模式,然而本人实践过程,采用多进程or多线程模式方式运行,发现并没达到预期效果。下面时本人的一下实践记录,仅供参考学习。

环境:python3.7

本地mqtt服务使用的emqx

操作工具用的MQTTX客户端

 1、下面是mqtt多线程模式运行代码实现,只实现消息订阅端。

import random, string
from paho.mqtt.client import Client
from threading import Thread

broker = '192.168.8.205'
port = 1883
topic = "python-mqtt"


def connect_mqtt():
    def on_connect(_, __, ___, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client_id = f"test-client_{''.join(random.choice(string.ascii_lowercase) for _ in range(4))}"
    client = Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client):
    def on_message(_, __, mesgage):
        print(f"Received `{mesgage.payload.decode()}` from `{mesgage.topic}` topic\n")
    client.subscribe(topic)
    client.on_message = on_message


def main():
    c = connect_mqtt()
    subscribe(c)
    c.loop_forever()
    # c.loop_start()


if __name__ == '__main__':

    lt = []
    for i in range(10):
        t = Thread(target=main, args=(), name=f'thread-{i}')
        lt.append(t)
    for t in lt:
        t.start()
        print(t.name)
    for t in lt:
        t.join()

 2、MQTTX连接发送消息:

 3、运行效果,发现发布一条消息可以被接收10次(订阅客户端10个,分别被处理了10次),而我的需求是想要发布一条消息,被这个10个客户端之一消费掉,且只处理一次。

代码参考前文,修改如下

 4、后来又换个思路,尝试了下还是10个线程,客户端唯一(client_id唯一),这种模式由于mqtt协议要求客户端唯一,导致10个线程并发启动,出现抢占式连接mqqt服务,出现不停的断开连接,重新连接。这种模式下运行客户端实际也只有一个,订阅处理等能力同于一个线程模式下的客户端方式,也无法达到预期。

———————————————–分割线——————————- 

后面无意间找到一篇文章,了解到mqtt服务有一种叫共享订阅模式。以emqx为例,emqx支持两种格式的共享订阅前缀:$share/topic 和$queue/topic,然后通过修改emqx服务的配置etc/emqx.conf

如图:

消息发布时,topic配置不变,订阅时,沿用$share/topic 和$queue/topic即可。

代码参考前文修改如下:

 运行效果:

 可以看到程序运行只收到一条消息了

 参考文章:

   1.(mqtt集群订阅如何只消费一个(一次)消息? – 程序新视界

   2.(paho-mqtt 实现通信_nuc_baixu的博客-CSDN博客_paho mqtt

物联沃分享整理
物联沃-IOTWORD物联网 » 使用paho-mqtt实现多客户端订阅同一主题并确保消息仅接收一次

发表评论