MQTT底层实现解析 | MQTT源码详细分析【大厂必问】

MQTT底层实现 | MQTT源码分析

  • 前言
  • 参考资料
  • 1.使用
  • 2.mqtt内部实现
  • 数据储存
  • 包的解析、发送ack回应
  • 订阅主题消息发送
  • 发布消息
  • 发送接收心跳包
  • 3.流程图
  • 结语
  • 前言

    同学面试大疆音视频驱动,问了很多Linux驱动细节和MQTT底层实现,被问麻了。我根据韦东山老师讲解MQTT的笔记补充和拓展了很多实现代码。(侵删)

    参考资料

  • kawaii-mqtt源码:

  • 作者发布源码:https://github.com/jiejieTop/mqttclient
  • 大牛维护的:https://github.com/longtengmcu/kawaii-mqtt
  • 博客

  • 作者博客:

  • 你不得不看的图文并茂的MQTT协议通信过程!!!
  • MQTT协议简介及协议原理
  • mqttclient设计与实现方式
  • 大牛笔记:

  • 记一次解决MQTT软件包内存泄露的心路历程
  • APP

  • https://mosquitto.org/download/
  • https://mqttx.app/zh
  • 1.使用

    几条代码使用MQTT:

    void my_message_handler_t(void* client, message_data_t* msg)
    {
    }
    
    int main(void)
    {
        int err;
        mqtt_client_t *client = NULL;
    
        err = mqtt_connect(client);
    
        err = mqtt_subscribe(client, "100ask-topic", QOS0, my_message_handler_t);
        
        while (1);
    }
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PviGpGwI-1679668212169)(image/MQTT源码分析/12_use_mqtt.png)]

    从上述代码中,提2个问题:

    答案:

    2.mqtt内部实现

    数据储存

    mqtt_client结构体里有2个链表,放着处理消息的函数和ack包的处理函数。

    mqtt_read_bufmqtt_write_buf两个buf放着需要处理的mqtt_massage_t数据

    包的解析、发送ack回应

  • 在mqtt_connect中创建了一个线程mqtt_yield_thread,它会使用mqtt_get_client_state时刻检测客户端的状态,如果客户端没连接上服务器,则暂停本线程并打印错误信息。如果连接上了,那就使用mqtt_yield来处理。

  • mqtt_yield中不断确定有没有连接上服务器,用mqtt_packet_handle来处理数据包信息。 启动定时器mqtt_ack_list_scan扫描ack列表,销毁已经超时的ack处理程序或重新发送它们 (如果超时没回应则订阅失败,在mqtt_subscribe中构造的消息处理函数在这销毁)

  • mqtt_packet_handle会辨识别包头的信息判断出是那种类型的包。调用各自包的处理函数。最后发送心跳包。

  • 发布消息包的解析

  • 构造设置mqtt_message_t结构体。

  • 发送回应包给服务器,使用将数据传递给用户定义的函数

  • 调用过程

  • mqtt_deliver_message
    	mqtt_get_msg_handler
    		/* 遍历msg_handler_list以找到匹配的消息处理程序 */
    		LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_msg_handler_list) {
            msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
    	mqtt_new_message_data(&md, topic_name, message);    /* make a message data */
        msg_handler->handler(c, &md);       /* deliver the message,调用用户定义的处理函数 */
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0xoj01Zm-1679668212181)(image/MQTT源码分析/1679631141965.png)]
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PeVOneXj-1679668212181)(image/MQTT源码分析/1679631283266.png)]

    订阅主题消息发送

  • 使用mqtt_subscribe订阅主题
  • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gsLI21ks-1679668212182)(image/MQTT源码分析/1679629895900.png)]

  • 收到回应(ACK)包的处理
  • MQTT Client向Broker发出某些数据包时,期待得到回应(ACK):会启动一个定时器。如果定时器超时表示没有收到ACK:

  • 要么重发
  • 要么出错
  • 对于ACK包,一般无需提供处理函数
  • 要订阅某个主题时,MQTT Client会发出SUBCRIBE包,期待得到回应的数据包:SUBACK包。代码如下:

    mqtt_subscribe
        msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);
    	rc = mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler);
                /* create a ack handler node */
                ack_handler = mqtt_ack_handler_create(c, type, packet_id, payload_len, handler);
    				platform_timer_cutdown(&ack_handler->timer, c->mqtt_cmd_timeout); 
    			mqtt_list_add_tail(&ack_handler->list, &c->mqtt_ack_handler_list);
    

    如果在指定时间里没有收到SUBACK包,那么就会在mqtt_ack_handler_list中删除该handler。

    如果收到队列SUBACK包,那么要做两件事:

  • 在mqtt_ack_handler_list中删除该handler
  • 把该handler放到mqtt_msg_handler_list中:以后收到PUBLISH数据包时这个handler被调用
  • 发布消息

  • 在主函数里创建发布线程。发布线程构造消息调用mqtt_publish发布。
  • mqtt_publish根据MQTT协议构造数据包 根据平台相关的函数发送数据包。
  • 调用过程:

    main
        res = pthread_create(&thread1, NULL, mqtt_publish_thread, client);
    				mqtt_publish_thread
                        mqtt_publish(client, "topic1", &msg);
    
    // 1. 构造消息
    mqtt_message_t msg;
    
    memset(&msg, 0, sizeof(msg));
    msg.payload = (void *) buf;
    msg.payloadlen = xxx;
    
    mqtt_publish(client, "topic1", &msg);
    	// 1.1 根据MQTT协议构造数据包
    	MQTTSerialize_publish(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, msg->qos, msg->retained, msg->id, topic, (uint8_t*)msg->payload, msg->payloadlen);
        // 1.2 根据平台相关的函数发送数据包
        mqtt_send_packet
            network_write
            	nettype_tcp_write
            		platform_net_socket_write_timeout
    

    发送接收心跳包

    函数调用流程:

    mqtt_yield_thread
    	mqtt_yield
    		mqtt_packet_handle(c, &timer);
    			 mqtt_keep_alive(c);
    
  • mqtt_keep_alive构造MQTT心跳包消息,并发送,client中的mqtt_ping_outstanding计时器加一;
  • 判断mqtt_ping_outstanding是否等于0,如果不是那就是断开连接了。
  • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-af34Jd1K-1679668212186)(image/MQTT源码分析/1679636436693.png)]

  • 当收到服务器PINGRESP推送包时(表示成功保持心跳),mqtt_ping_outstanding计时器清零。
  • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UZLHKyiC-1679668212187)(image/MQTT源码分析/1679636714121.png)]

    3.流程图

    结语

    如果有出错,恳请网友指出,多谢!

    物联沃分享整理
    物联沃-IOTWORD物联网 » MQTT底层实现解析 | MQTT源码详细分析【大厂必问】

    发表评论