MQTT协议深度解析:物联网通信核心协议详解

在当今物联网(IoT)迅猛发展的背景下,设备之间的高效、可靠通信变得尤为重要。MQTT(Message Queuing Telemetry Transport)作为一种轻量级的消息传输协议,因其低带宽占用和高可靠性,成为物联网领域中广受欢迎的通信工具。

官网:http://mqtt.org

1. 什么是 MQTT?

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种基于发布/订阅(publish/subscribe)模型的轻量级通信协议,专为资源受限的设备和不可靠网络环境设计。它最初由 Andy Stanford-Clark 和 Arlen Nipper 在 1999 年开发,如今已成为物联网通信的标准协议之一。
MQTT 的核心思想是通过一个中间代理(Broker)来实现消息的分发,从而解耦发送者与接收者。

客户端

服务端

一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:

也称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

– (1)发布其他客户端可能会订阅的信息;

– (1)接受来自客户的网络连接;

– (2)订阅其它客户端发布的消息;

– (2)接受客户发布的应用信息;

– (3)退订或删除应用程序的消息;

– (3)处理来自客户端的订阅和退订请求;

– (4)断开与服务器连接。

– (4)向订阅的客户转发应用程序消息。

 2. 设计原则

  1. 精简,不添加可有可无的功能;
  2. 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递,解耦Client/Server模式,带来的好处在于不必预先知道对方的存在(ip/port),不必同时运行;
  3. 允许用户动态创建主题(不需要预先创建主题),零运维成本;
  4. 把传输量降到最低以提高传输效率;
  5. 把低带宽、高延迟、不稳定的网络等因素考虑在内;
  6. 支持连续的会话保持和控制(心跳);
  7. 理解客户端计算能力可能很低;
  8. 提供服务质量( quality of service level:QoS)管理;
  9. 不强求传输数据的类型与格式,保持灵活性(指的是应用层业务数据)。

3. MQTT 核心概念

3.1 服务端(Broker)

消息的中转站,负责接收来自客户端的消息,并将消息转发给订阅了相关主题的客户端。

3.2 客户端(Client)

客户端可以是发布者(Publisher)或订阅者(Subscriber),也可以两者兼有。每个客户端都可以连接到 Broker 并进行消息的发布或订阅。

3.3 主题(Topic)

类似于邮件地址的主题字符串,用于标识消息的分类。例如:

  • sensor/temperature
  • home/livingroom/light
  • 3.4 Topic 通配符匹配规则

    3.4.1 层级分隔符:/

    / 用来分隔主题树的每一层,并给主题空间提供分等级的结构。当两个通配符在一个主题中出现的时候,主题层次分隔符的使用是很重要的。

    示例:
    love/you/with/all/my/heart

    3.4.2  多层通配符:#

    多层通配符有可以标识大于等于0的层次。因此,love/# 也可以匹配到单独的 love,此时#代表0层。

    多层通配符一定要是主题树的最后一个字符。比如说,love/# 是有效的,但是 love/#/with 是无效的。

    love/you/# 可匹配如下内容(包括单不限于):
    
    love/you
    love/you/with
    love/you/with/all
    love/you/with/all/my/heart
    love/you/with/all/my/hearts

     3.4.3 单层通配符:+

    只匹配主题的一层

    1. love/you/+ :匹配/love/you/with和/love/you/and,但是不匹配love/you/with/all/my/heart。
    2. 单层通配符只匹配1层,love/+不匹配love。
    3. 单层通配符可以被用于主题树的任意层级,连带多层通配符。它必须被用在主题层级分隔符/的右边,除非它是指定自己。因此,+和love/+都是有效的,但是love+无效。单层通配符可以用在主题树的末端,也可以用在中间。比如说,love/+和love/+/with都是有效。

    注意事项:

    1. 主题层次分隔符被用来在主题中引入层次。多层的通配符和单层通配符可以被使用,但他们不能被使用来做发布者的消息。
    2. Topic命名尽量见名知意,符合规范,主题名字是大小写敏感的。比如说,love和LOVE是两个不同的主题。
    3. 以/开头会产生一个不同的主题。比如说,/love与love不同。/love匹配"+/+"和/+,但不匹配+。
    4. 不要在任何主题中包含null(Unicode \x0000)字符。
    5. 在主题树中,长度被限制于64k内但是在这以内没有限制层级的数目。
    6. 可以有任意数目的根节点: 也就是说,可以有任意数目的主题树。 

    3.5 $SYS 主题

    以 $SYS/ 开头的主题为系统主题,系统主题主要用于获取 MQTT 服务器自身运行状态、消息统计、客户端上下线事件等数据。目前,MQTT 协议暂未明确规定 $SYS/ 主题标准,但大多数 MQTT 服务器都遵循该标准建议。

    例如,EMQX 服务器支持通过以下主题获取集群状态。

    主题 说明
    $SYS/brokers EMQX 集群节点列表
    $SYS/brokers/emqx@127.0.0.1/version EMQX 版本
    $SYS/brokers/emqx@127.0.0.1/uptime EMQX 运行时间
    $SYS/brokers/emqx@127.0.0.1/datetime EMQX 系统时间
    $SYS/brokers/emqx@127.0.0.1/sysdescr EMQX 系统信息

    EMQX 还支持客户端上下线事件、收发流量、消息收发、系统监控等丰富的系统主题,用户可通过订阅 $SYS/# 主题获取所有系统主题消息。详细请见:EMQX 系统主题文档。 

    3.6 QoS(服务质量等级)

    MQTT 协议中规定了消息服务质量(Quality of Service),它保证了在不同的网络环境下消息传递的可靠性,QoS 的设计是 MQTT 协议里的重点。
    MQTT 设计了 3 个 QoS 等级。

    3.6.1 QoS 0(最多一次)

    消息发布完全依赖底层TCP/IP网络。会发生消息丢失。 一个消息不会被接收端应答,也不会被发送者存储并再发送。这个也被叫做“即发即弃”

    3.6.2 QoS 1(至少一次)

    确保消息到达,但消息重复可能会发生。发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。

    3.6.3 QoS 2(恰好一次)

    确保消息到达一次。如果接收端接收到了一个QoS为2的PUBLISH消息,它将相应地处理PUBLISH消息,并通过PUBREC消息向发送方确认。

    其中:

  • PUBLISH:发布消息 
  • PUBREC:发布收到
  • PUBREL:发布释放
  • PUBCOMP:发布完成
  • 3.6.4 QoS 匹配规则总结

     不同情况下,客户端收到的消息 QoS 可参考下表:

    发布消息的 QoS

    主题订阅的 QoS

    接收消息的 QoS

    0

    0

    0

    0

    1

    0

    0

    2

    0

    1

    0

    0

    1

    1

    1

    1

    2

    1

    2

    0

    0

    2

    1

    1

    2

    2

    2

    MQTT 中的 QoS 是一种灵活的机制,它允许发布者和订阅者独立设置期望的服务质量等级,而最终的消息传输将基于两者之间的最小值进行处理。这种设计既保障了通信的可靠性,也避免了因能力不匹配导致的资源浪费。

    3.7 MQTT V3.1.1 协议报文

    3.7.1 报文结构

  • 固定报头(Fixed header)
  • 可变报头(Variable header)
  • 报文有效载荷(Payload)
  • 3.7.2 固定报头

    +----------+-----+-----+-----+-----+-----+-----+-----+-----+
    | Bit      |  7  |  6  |  5  |  4  |  3  |  2  |  1  |  0  |
    +----------+-----+-----+-----+-----+-----+-----+-----+-----+
    | byte1    |   MQTT Packet type    |         Flags         |
    +----------+-----------------------+-----------------------+
    | byte2... |   Remaining Length                            |
    +----------+-----------------------------------------------+

    3.7.3 报文类型

    类型名称 类型值 报文说明
    CONNECT 1 发起连接
    CONNACK 2 连接回执
    PUBLISH 3 发布消息
    PUBACK 4 发布回执
    PUBREC 5 QoS2 消息回执
    PUBREL 6 QoS2 消息释放
    PUBCOMP 7 QoS2 消息完成
    SUBSCRIBE 8 订阅主题
    SUBACK 9 订阅回执
    UNSUBSCRIBE 10 取消订阅
    UNSUBACK 11 取消订阅回执
    PINGREQ 12 PING 请求
    PINGRESP 13 PING 响应
    DISCONNECT 14 断开连接

    其中:

  • PUBLISH 发布消息:PUBLISH 报文承载客户端与服务器间双向的发布消息。 PUBACK 报文用于接收端确认 QoS1 报文,PUBREC/PUBREL/PUBCOMP 报文用于 QoS2 消息流程。
  • PINGREQ/PINGRESP 心跳:客户端在无报文发送时,按保活周期(KeepAlive)定时向服务端发送 PINGREQ 心跳报文,服务端响应 PINGRESP 报文。PINGREQ/PINGRESP 报文均 2 个字节。
  • 3.8 MQTT WebSocket 连接

    MQTT 协议除支持 TCP 传输层外,还支持 WebSocket 作为传输层。通过 WebSocket 浏览器可以直连 MQTT 消息服务器,发布订阅模式与其他 MQTT 客户端通信。

    MQTT 协议的 WebSocket 连接,必须采用 binary 模式,并携带子协议 Header:

    Sec-WebSocket-Protocol: mqttv3.1 或 mqttv3.1.1

    4. 物联网级消息中间件EMQ

    4.1 EMQX 简介

    EMQ X Broker 是基于高并发的 Erlang/OTP 语言平台开发,支持百万级连接和分布式集群架构,发布订阅模式的开源 MQTT 消息服务器。

    EMQ官网:EMQ: 连接物理世界与人工智能

     EMQX 文档:产品概览 | EMQX 1.0 文档

    为什么选择EMQ X ?从支持 MQTT5.0、稳定性、扩展性、集群能力等方面考虑,EMQX 的表现应该是最好的。

    4.2 EMQX的特点

  • EMQ X 目前为开源社区中最流行的 MQTT 消息中间件;
  • EMQ X 是开源社区中第一个支持 5.0协议规范的消息服务器,并且完全兼容 MQTT V3.1 和 V3.1.1 协议。
  • 除了 MQTT 协议之外,EMQ X 还支持MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket 等物联网协议
  • 单机支持百万连接,集群支持千万级连接;毫秒级消息转发。
  • 易于安装和使用;
  • 中国本地的技术支持服务;
  • 扩展模块和插件,EMQ X 提供了灵活的扩展机制,支持企业的一些定制场景;
  • 桥接
  • 共享订阅
  •  4.3 环境搭建与配置

    以 Docker 运行单个 EMQX 节点为例:

    1. 拉取emqx镜像

    docker pull emqx/emqx:v4.1.0

    2. 创建emqx容器

    docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083  emqx/emqx:v4.1.0

    其中,端口说明:

    端口 协议 描述
    1883 TCP MQTT over TCP 监听器端口,主要用于未加密的 MQTT 连接。
    8883 TCP MQTT over SSL/TLS 监听器端口,用于加密的 MQTT 连接。
    8083 TCP MQTT over WebSocket 监听器端口,使 MQTT 能通过 WebSocket 进行通信。
    8084 TCP MQTT over WSS (WebSocket over SSL) 监听器端口,提供加密的 WebSocket 连接。
    18083 HTTP EMQX Dashboard 和 REST API 端口,用于管理控制台和 API 接口。
    4370 TCP Erlang 分布式传输端口,根据节点名称不同实际端口可能是 BasePort (4370) + Offset
    5370 TCP 集群 RPC 端口(在 Docker 环境下为 5369),根据节点名称不同实际端口可能是 BasePort (5370) + Offset

    Docker 部署注意事项:

     1. 如果需要持久化 Docker 容器中生成的数据 ,请将以下目录挂载到容器外部,这样即使容器被删除数据也不会丢失:

    /opt/emqx/data
    /opt/emqx/log

    启动容器并挂载目录:

    docker run -tid --name emqx \
      -p 1883:1883 -p 8083:8083 \
      -p 8084:8084 -p 8883:8883 \
      -p 18083:18083 \
      -v $PWD/data:/opt/emqx/data \
      -v $PWD/log:/opt/emqx/log \
      emqx/emqx:v4.1.0

    EMQX 历史版本:Directory listing for EMQX: / | EMQ

    单节点部署:安装部署和迁移 | EMQX文档 

    集群部署:分布式集群介绍 | EMQX文档

    安全认证:安全指南 | EMQX文档

    EMQX命令行:命令行 | EMQX文档

    配置文件:配置文件简介 | EMQX文档

    4.4 EMQX 目录结构

    目录 描述 压缩包解压安装(同 Docker) 二进制包安装
    etc 静态配置文件 ./etc /etc/emqx
    data 数据和配置文件 ./data /var/lib/emqx
    log 日志文件 ./log /var/log/emqx
    releases 启动相关的脚本 ./releases /usr/lib/emqx/releases
    bin 可执行文件 ./bin /usr/lib/emqx/bin
    lib Erlang 代码 ./lib /usr/lib/emqx/lib
    erts-* Erlang 虚拟机文件 ./erts-* /usr/lib/emqx/erts-*
    plugins 插件 ./plugins /usr/lib/emqx/plugins

    以上目录中,用户经常接触与使用的是 bin、etc、data、log 目录。

    4.5 EMQ Dashboard

    EMQ X 提供了 Dashboard 以方便用户管理设备与监控相关指标。通过 Dashboard可以查看服务器基本信息、负载情况和统计数据,可以查看某个客户端的连接状态等信息甚至断开其连接,也可以动态加载和卸载指定插件。

    访问地址:http://localhost:18083 来查看 Dashboard,默认用户名是 admin,密码是 public。

    EMQX Dashboard 详细介绍:EMQX Dashboard | EMQX文档

    EMQX Dashboard 之 MQTT 配置:配置和管理会话持久化 | EMQX文档

    导航项目

    说明

    Monitor

    提供了服务端与客户端监控信息的展示页面

    RULE ENGINE

    提供了规则引擎的可视化操作页面

    MANAGEMENT

    提供了扩展插件与应用的管理页面

    TOOLS

    提供了 Websocket 客户端工具以及 HTTP API 速查页面

    ADMIN

    提供了 Dashboard 用户管理和显示设置等页面

    4.6 客户端调试工具MQTTX

    MQTT X 是 EMQ 开源的一款优雅的跨平台 MQTT 5.0 桌面客户端,它支持 macOS, Linux, Windows。

    MQTT X 的 UI 采用了聊天界面形式,简化了页面操作逻辑,用户可以快速创建连接,允许保存多个客户端,方便用户快速测试 MQTT/MQTTS 连接,及 MQTT 消息的订阅和发布。

     下载地址:MQTTX:全功能 MQTT 客户端工具

    4.7 基础使用教程

    1. 点击 + ,再点击 New Connetion

    2. 填写信息并点击右上角的 Connect ,如果无其他需求,仅填写第一页内容即可,其他均为拓展项。

     

     3. 点击 Connect 即可正常使用

     界面说明:

    4. 发送消息:

     5. 订阅消息:

     

     关于切换中文:

    4.8 共享订阅

    对应文档:共享订阅 | EMQX文档

    EMQX 实现了 MQTT 的共享订阅功能。共享订阅是一种订阅模式,用于在多个订阅者之间实现负载均衡。客户端可以分为多个订阅组,消息仍然会被转发到所有订阅组,但每个订阅组内只有一个客户端接收消息。您可以为一组订阅者的原始主题添加前缀以启用共享订阅。EMQX 支持两种格式的共享订阅前缀,分别为带群组的共享订阅(前缀为 $share/<group-name>/)和不带群组的共享订阅(前缀为 $queue/)。两种共享订阅格式示例如下:

    前缀格式 示例 前缀 真实主题名
    带群组格式 $share/abc/t/1 $share/abc/ t/1
    不带群组格式 $queue/t/1 $queue/ t/1

     需要在EMQDashboard开启该选项:(默认开启)

    4.7.1 不带群组的共享订阅

    格式:$queue/{TopicName}

    EMQX 的共享订阅支持和策略配置:/etc/emqx.conf

    # 均衡策略
    ## Dispatch strategy for shared subscription
    ##
    ## Value: Enum
    ## - random
    ## - round_robin
    ## - sticky
    ## - hash
    broker.shared_subscription_strategy=random

    均衡策略

    描述

    random

    在所有订阅者中随机选择

    round_robin

    按照订阅顺序轮询

    sticky

    一直发往上次选取的订阅者

    hash

    按照发布者 ClientID 的哈希值

    调用示例:

     

     

    4.7.2 带群组的共享订阅

    格式:$share/<group-name>/{TopicName}

     调用示例:

     

     

     

    4.8 保留消息(Retained Message)

    Broker 可以保留某个主题的最后一条消息,当新客户端订阅该主题时,会立即收到这条保留消息。

    需要在EMQDashboard开启该选项:(默认开启)

    配置示例:

    发送消息时勾选 Retain 选项。

    4.9 遗嘱消息(Last Will and Testament, LWT)

    客户端可以在连接时设置一个“遗嘱”,如果客户端异常断开连接,Broker 将自动发布该遗嘱消息。

    配置示例:

    1. 在连接配置时,将页面下拉,在 Last Will and Testament 部分,填写遗嘱消息的配置。

  • 遗嘱消息主题:输入 offline
  • 遗嘱消息 QoS:保持默认值 0
  • 遗嘱消息保留标志:默认禁用。如果启用,遗嘱消息也将是一个保留消息。
  • 遗嘱消息:输入 I'm offline
  • 遗嘱消息延迟时间:设置为 5 秒。
  •  4.10 排他订阅

    排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。

    要进行排它订阅,您需要为主题名称添加前缀,如以下表格中的示例:

    示例 前缀 真实主题名
    $exclusive/t/1 $exclusive/ t/1

     当某个客户端 A 订阅 $exclusive/t/1 后,其他客户端再订阅 $exclusive/t/1 时都会失败,直到 A 取消了对 $exclusive/t/1 的订阅为止。

    注意:

    排它订阅必须使用 $exclusive/ 前缀,在上面的示例中,其他客户端依然可以通过 t/1 成功进行订阅。

    4.10.1 订阅失败错误码​

    错误码 原因
    0x8F 使用了 $exclusive/,但并未开启排它订阅
    0x97 已经有客户端订阅了该主题

     4.10.2 配置排它订阅

    1. 开启排他订阅(默认不支持)

    方式一:通过配置文件配置排它订阅

    mqtt.exclusive_subscription.enable = true

    方式二:通过界面配置 

     2. 填写 $exclusive/ + 真实主题名

     4.11 延迟发布

    EMQ X 的延迟发布功能可以实现按照用户配置的时间间隔延迟发布 PUBLISH 报文的功能。模块开启emqx_mod_delayed

    通过 Dashboard 配置延迟发布:(默认关闭)

    延迟发布主题的具体格式如下:

    $delayed/{DelayInterval}/{TopicName}

    其中: 

  • $delayed: 使用 $delayed 作为主题前缀的消息都将被视为需要延迟发布的消息。
  • {DelayInterval}: 指定该 MQTT 消息延迟发布的时间间隔,单位是秒,允许的最大间隔是 4294967 秒。
  • {TopicName}: MQTT 消息的主题名称。 
  • 发送示例:

    5. Eclipse Paho

    5.1 Eclipse Paho是什么

    Eclipse paho 是 EMQX 官方推荐的实现了 mqtt 协议 Java 客户端。

    其关系类似于 Mysql 于 JDBC ,我们的项目代码要连接数据库需要用到 JDBC ,而我们的项目需要连接 EMQX 需要用到 Eclipse Paho ,并且它提供了基础的消息收发。

    5.2 Eclipse Paho技术调研

    全语言编程:MQTT 客户端编程 | EMQX文档

    Java 编程:如何在 Java 中使用 Paho MQTT 客户端 | EMQ

    1. 添加依赖

    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.5</version>
    </dependency>

    2. 发布消息到EMQ

    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.springframework.web.bind.annotation.GetMapping;
    
    @GetMapping("/publish")
    public void publish() throws MqttException {
    
        MqttClientPersistence persistence = new MemoryPersistence();
        //内存持久化
        MqttClient client = new MqttClient("tcp://localhost:1883", "random123", persistence);
    
        //连接选项中定义用户名密码和其它配置
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
        options.setAutomaticReconnect(true);//是否自动重连
        options.setConnectionTimeout(30);//连接超时时间  秒
        options.setKeepAliveInterval(10);//连接保持检查周期  秒
        options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
    
        client.connect(options);//连接
        client.publish("topic", "发送内容".getBytes(), 2, false);
    
    }

    3. 订阅消息

    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.springframework.web.bind.annotation.GetMapping;
    
    
        @GetMapping("/subscribe")
        public void subscribe() throws MqttException {
    
            MqttClientPersistence persistence = new MemoryPersistence();//内存持久化
            MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "random456", persistence);
    
            //连接选项中定义用户名密码和其它配置
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
            options.setAutomaticReconnect(true);//是否自动重连
            options.setConnectionTimeout(30);//连接超时时间  秒
            options.setKeepAliveInterval(10);//连接保持检查周期  秒
            options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
    
            // 获取全部主题的消息
            client.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("连接丢失!");
                }
    
                @Override
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    System.out.println("接收到消息  topic:" + topic + "  id:" + mqttMessage.getId() + " message:" + mqttMessage.toString());
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    
                }
    
                @Override
                public void connectComplete(boolean b, String s) {
                    System.out.println("连接成功!");
                }
            });
            client.connect(options);//连接
            client.subscribe("test");  //订阅主题
            // 方式二:只获取当前主题的消息
    //        client.subscribe("test", new IMqttMessageListener() {
    //            @Override
    //            public void messageArrived(String topic, MqttMessage message) throws Exception {
    //                System.out.println("接收到消息  topic:" + topic + "  id:" + message.getId() + " message:" + message.toString());
    //            }
    //        });
        }

    注意:

    1. 同一个 Topic 重复订阅会覆盖。

    2. clientId 保证唯一,否则容易将其他相同 clientId 的连接顶掉。

    作者:可儿·四系桜

    物联沃分享整理
    物联沃-IOTWORD物联网 » MQTT协议深度解析:物联网通信核心协议详解

    发表回复