【SpringBoot集成MQTT消息服务】

MQTT协议

  • MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
    MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
    作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

    MQTT协议特点

  • MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。
  • MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。
  • MQTT Client

  • publisher 和 subscriber 都属于 MQTT Client,之所以有发布者和订阅者这个概念,其实是一种相对的概念,就是指当前客户端是在发布消息还是在接收消息,发布和订阅的功能也可以由同一个 MQTT Client 实现。MQTT 客户端是运行 MQTT 库并通过网络连接到 MQTT 代理的任何设备(从微控制器到成熟的服务器)。例如,MQTT 客户端可以是一个非常小的、资源受限的设备,它通过无线网络进行连接并具有一个最低限度的库。基本上,任何使用 TCP/IP 协议使用 MQTT 设备的都可以称之为 MQTT Client。MQTT 协议的客户端实现非常简单直接,易于实施是 MQTT 非常适合小型设备的原因之一。MQTT 客户端库可用于多种编程语言。例如,Android、Arduino、C、C++、C#、Go、iOS、Java、JavaScript 和 .NET。
  • MQTT Broker

  • 与 MQTT Client 对应的就是 MQTT Broker,Broker 是任何发布/订阅协议的核心,根据实现的不同,代理可以处理多达数百万连接的 MQTT Client。 Broker 负责接收所有消息,过滤消息,确定是哪个Client 订阅了每条消息,并将消息发送给对应的 Client,Broker 还负责保存会话数据,这些数据包括订阅的和错过的消息。Broker 还负责客户端的身份验证和授权。
  • MQTT Connection

  • MQTT 协议基于 TCP/IP。客户端和代理都需要有一个 TCP/IP 协议支持。 MQTT 连接始终位于
  • 准备工作 (下载Broker服务端,相关客户端工具)

  • 服务端工具:
  • mosquitto https://mosquitto.org/download/
  • 客户端工具:
  • MQTTX https://mqttx.app/zh#download
  • 创建SpringBoot的工程 (这里自己搭建就行了)

  • 修改pom.xml ,增加相关mqtt的依赖
  •       <!--mqtt相关依赖-->
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-stream</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-mqtt</artifactId>
            </dependency>
    
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.17.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.huawen</groupId>
        <artifactId>mqtt-demo</artifactId>
        <version>0.0.1</version>
        <name>mqtt-demo</name>
        <description>Boot with MQTT Demo</description>
        <properties>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <!--web依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-integration</artifactId>
            </dependency>
            <!--mqtt相关依赖-->
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-stream</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-mqtt</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    
  • 自定义yml配置
  • spring:
      application:
        name: MQTT-DEMO
    server:
      port: 8989
    #mqtt properties
    mqtt:
      #uris 可以有多个 所以是个数组
      uris:
        - tcp://127.0.0.1:1883
      clientId: mqtt_test1
      topics:
        - demo
        - test
      username: admin
      password: 123456
      timeout: 30
      keepalive: 60
      qos: 1
    
  • 增加config配置读取yml文件 (使用了Lombok 需要自行添加pom依赖)
  • package com.huawen.mqtt.config;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    /**
     * @author:xjl
     * @date:2022/5/5 17:27
     * @Description: MQTT的配置类
     **/
    @Component
    @ConfigurationProperties(prefix = "mqtt")
    @Data
    public class MqttConfiguration {
    
        /**
         * uris 服务器地址配置
         */
        private String[] uris;
    
        /**
         * clientId
         */
        private String clientId;
    
        /**
         * 话题
         */
        private String[] topics;
    
        /**
         * 用户名
         */
        private String username;
    
        /**
         * 密码
         */
        private String password;
    
        /**
         * 连接超时时长
         */
        private Integer timeout;
    
        /**
         * keep Alive时间
         */
        private Integer keepalive;
    
        /**
         * 遗嘱消息 QoS
         */
        private Integer qos;
    }
    
  • 消费者配置
  • package com.huawen.mqtt.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.core.MessageProducer;
    import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
    import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    import org.springframework.integration.mqtt.support.MqttHeaders;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    
    import javax.annotation.Resource;
    
    /**
     * @author:xjl
     * @date:2022/5/6 9:06
     * @Description: MQTT 消费端的配置
     **/
    @Configuration
    @Slf4j
    public class MqttInBoundConfiguration {
        @Resource
        private MqttConfiguration mqttProperties;
    
        //==================================== 消费消息==========================================//
    
        /**
         * 入站通道
         *
         * @return 消息通道对象 {@link MessageChannel}
         */
        @Bean("input")
        public MessageChannel mqttInputChannel() {
            //直连通道
            return new DirectChannel();
        }
    
    
        /**
         * 创建MqttPahoClientFactory 设置MQTT的broker的连接属性 如果使用ssl验证 也需要此处设置
         *
         * @return MQTT客户端工厂 {@link MqttPahoClientFactory}
         */
        @Bean
        public MqttPahoClientFactory inClientFactory() {
            //设置连接属性
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            MqttConnectOptions options = new MqttConnectOptions();
            options.setServerURIs(mqttProperties.getUris());
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepalive());
            // 接受离线消息  告诉代理客户端是否要建立持久会话   false为建立持久会话
            options.setCleanSession(false);
            //设置断开后重新连接
            options.setAutomaticReconnect(true);
            factory.setConnectionOptions(options);
            return factory;
        }
    
    
        /**
         * 入站
         *
         * @return 消息提供者 {@link MessageProducer}
         */
        @Bean
        public MessageProducer producer() {
            // Paho客户端消息驱动通道适配器,主要用来订阅主题  对inboundTopics主题进行监听
            //clientId 加后缀 不然会报retrying 不能重复
            MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_customer", inClientFactory(), mqttProperties.getTopics());
            adapter.setCompletionTimeout(5000);
            // Paho消息转换器
            DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
            // 按字节接收消息
            // defaultPahoMessageConverter.setPayloadAsBytes(true);
            adapter.setConverter(defaultPahoMessageConverter);
            // 设置QoS
            adapter.setQos(mqttProperties.getQos());
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }
    
        /**
         * 通过通道获取数据
         * ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
         * tips:
         * 异步处理
         *
         * @return 消息处理 {@link MessageHandler}
         */
        @Bean
        @ServiceActivator(inputChannel = "input")
        public MessageHandler handler() {
            return message -> {
                log.info("收到的完整消息为--->{}", message);
                log.info("----------------------");
                log.info("message:" + message.getPayload());
                log.info("Id:" + message.getHeaders().getId());
                log.info("receivedQos:" + message.getHeaders().get(MqttHeaders.RECEIVED_QOS));
                String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
                log.info("topic:" + topic);
                log.info("----------------------");
            };
        }
    }
    
  • 生产者配置
  • package com.huawen.mqtt.config;
    
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    
    import javax.annotation.Resource;
    
    /**
     * @author:xjl
     * @date:2022/5/6 8:49
     * @Description: MQTT 生产端的配置
     **/
    @Configuration
    public class MqttOutBoundConfiguration {
        @Resource
        private MqttConfiguration mqttProperties;
    
        //==================================== 发送消息==========================================//
    
        /**
         * 出站通道
         *
         * @return 消息通道对象 {@link MessageChannel}
         */
        @Bean("out")
        public MessageChannel mqttOutBoundChannel() {
            //直连通道
            return new DirectChannel();
        }
    
    
        /**
         * 创建MqttPahoClientFactory 设置MQTT的broker的连接属性 如果使用ssl验证 也需要此处设置
         *
         * @return MQTT客户端工厂 {@link MqttPahoClientFactory}
         */
        @Bean
        public MqttPahoClientFactory outClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            String[] uris = mqttProperties.getUris();
            MqttConnectOptions options = new MqttConnectOptions();
            options.setServerURIs(uris);
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepalive());
            // 接受离线消息  告诉代理客户端是否要建立持久会话   false为建立持久会话
            options.setCleanSession(false);
            //设置断开后重新连接
            options.setAutomaticReconnect(true);
            factory.setConnectionOptions(options);
            return factory;
        }
    
        /**
         * 出站
         *
         * @return 消息处理 {@link MessageHandler}
         */
        @Bean
        @ServiceActivator(inputChannel = "out")
        public MessageHandler mqttOutbound() {
            //发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
            //clientId 加后缀 不然会报retrying 不能重复
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "_producer", outClientFactory());
            //如果设置成true,即异步,发送消息时将不会阻塞。
            messageHandler.setAsync(true);
            //设置默认QoS
            messageHandler.setDefaultQos(mqttProperties.getQos());
            // Paho消息转换器
            DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
            //发送默认按字节类型发送消息
    //        defaultPahoMessageConverter.setPayloadAsBytes(true);
            messageHandler.setConverter(defaultPahoMessageConverter);
            return messageHandler;
        }
    }
    
  • 创建一个通用接口 用于发送数据
  • package com.huawen.mqtt.inter;
    
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.mqtt.support.MqttHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    
    /**
     * @author:xjl
     * @date:2022/5/6 9:20
     * @Description: 接口MqttGateway
     **/
    @MessagingGateway(defaultRequestChannel = "out")
    public interface MqttGateway {
        /**
         * 定义重载方法,用于消息发送
         *
         * @param payload 负载
         */
        void sendToMqtt(String payload);
    
        /**
         * 指定topic进行消息发送
         *
         * @param topic   topic话题
         * @param payload 负载
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    
        /**
         * 指定topic和qos进行消息发送
         *
         * @param topic   topic话题
         * @param qos     qos
         * @param payload 负载 (字符串类型)
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    
        /**
         * 指定topic和qos进行消息发送
         *
         * @param topic   topic话题
         * @param qos     qos
         * @param payload 负载 (字节数组类型)
         */
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
    }
    
  • 生产者测试controller
  • package com.huawen.mqtt.controller;
    
    import com.huawen.mqtt.bean.MyMessage;
    import com.huawen.mqtt.inter.MqttGateway;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    /**
     * @author:xjl
     * @date:2022/5/6 9:17
     * @Description: mqtt发布消息controller
     **/
    @RestController
    public class MqttPublishController {
        @Resource
        private MqttGateway mqttGateWay;
    
        @PostMapping("/send")
        public String send(@RequestBody MyMessage myMessage) {
            // 发送消息到指定主题
            mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());
            return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();
        }
    }
    

    源码地址:https://github.com/KyrieXJL/MQTT_Demo

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【SpringBoot集成MQTT消息服务】

    发表评论