RK3588智慧农场系统开发实战:RS485总线集成华为云IOT与node-red及MQTT技术实践

一、硬件连接流程

本次采用的是

  • 总线型拓扑:所有设备并联到两根 RS485 总线上(A + 和 B-)
  • 二、通信协议配置

    1. 主从通信模式
  • RS485 是半双工:同一时间只能有一个设备发送数据
  • 主从架构:通常一个主设备(MCU / 电脑)轮询多个从设备(传感器)
  • 2. 通信参数配置
  • 波特率:常用 9600、115200 等
  • (需与传感器一致,查看对应手册以及使用RS485转usb工具连接电脑使用串口助手发送对应指令修改设备设备号,波特率等)
  • 数据位:通常 8 位
  • 停止位:通常 1 位
  • 校验位:无校验或 Modbus 常用的 CRC 校验
  • 3. 传感器寻址
  • 每个传感器需设置唯一 ID(如 Modbus 地址 1~247)
  • 通过地址区分不同传感器
  • 三、软件实现流程

    1. 安装必要库

    2.代码

    代码见文章末尾

    注意事项:

    1. 电气安全

    2. RS485 总线上所有设备共地(或使用隔离型转换器)
    3. 线缆选择屏蔽双绞线,长度不超过 1200 米(与波特率有关)
    4. 通信稳定性

    5. 发送数据前后需适当延时(RS485 收发转换需要时间)
    6. 添加重试机制,处理偶尔的通信失败
    7. 协议适配

    8. 不同传感器可能使用不同的 Modbus 寄存器映射,需参考传感器手册
    9. 非 Modbus 协议传感器需使用相应的通信库(如 Siemens S7、Profibus 等)
    10. 错误处理

    11. 添加 CRC 校验确保数据完整性
    12. 实现超时机制避免程序卡死

     

    四,传感器数据 MQTT 上报至 Node-RED 仪表盘

    配置node-red的过程省略。

    node-red &后台开启node-red,打开浏览器连接 地址, 可视化仪表盘则是 地址/ui

    环节 技术 / 工具 作用
    数据采集 RS485 总线、Modbus 协议、C++ 程序 通过 RS485 连接传感器,解析 Modbus 数据(如温湿度、CO 浓度等)。
    数据传输 MQTT 协议、Paho MQTT 库 将采集的数据通过 MQTT 协议发布到消息服务器(如 Node-RED 内置 MQTT 代理)。
    数据处理 Node-RED 流编辑器 接收 MQTT 消息,解析数据并路由至仪表盘节点。
    数据展示 Node-RED Dashboard 组件 以图表、仪表盘等形式实时显示数据

    五,上云流程

    无论连接哪类 MQTT 服务器(华为云、阿里云等),核心步骤基本一致:

    1. 安装依赖与工具
    sudo apt update          # 更新软件源
    sudo apt install git cmake build-essential libssl-dev  # 安装编译工具和SSL库
  • 关键依赖libssl-dev 提供 TLS 加密支持(若使用明文 MQTT 可省略,但不推荐)。
  • 2. 下载并编译 Paho 库
    git clone https://github.com/eclipse/paho.mqtt.c.git  # 克隆源码
    cd paho.mqtt.c
    mkdir build && cd build
    cmake ..  # 生成编译配置(默认开启SSL支持)
    make      # 编译库和示例程序
    sudo make install  # 安装到系统路径

    编译选项

  • 若需禁用 TLS,添加 -DPAHO_WITH_SSL=OFF 到cmake命令:
    cmake .. -DPAHO_WITH_SSL=OFF
  • 若为嵌入式系统,需指定交叉编译工具链(如arm-linux-gnueabihf-gcc)。
  • 3. 编写应用程序

    参考 Paho 库的示例代码(位于paho.mqtt.c/examples),核心逻辑包括:

  • 包含头文件:#include "MQTTClient.h"
  • 初始化 MQTT 客户端、设置连接参数(服务器地址、客户端 ID、用户名 / 密码等)
  • 实现消息回调函数(处理订阅到的消息)
  • 建立连接、发布 / 订阅消息
  • 示例:连接华为云 IoT 平台

    #include "MQTTClient.h"
    
    #define ADDRESS     "ssl://your-iot-server.com:8883"  // 华为云MQTTS地址
    #define CLIENTID    "your-device-id_your-product-id_0_0_20250514"  // 客户端ID
    #define USERNAME    "your-device-id"  // 用户名(设备ID)
    #define PASSWORD    "your-device-secret"  // 密码(设备密钥)
    #define TOPIC       "$oc/devices/your-device-id/sys/properties/report"  // 华为云属性上报Topic
    
    int main() {
        MQTTClient client;
        MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
        MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
        
        // 初始化SSL配置(若使用TLS)
        ssl_opts.trustStore = "/path/to/ca-cert.pem";  // 根证书路径
        opts.ssl = &ssl_opts;
        
        // 配置连接参数
        opts.username = USERNAME;
        opts.password = PASSWORD;
        opts.cleansession = true;
        
        // 创建客户端并连接
        MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
        MQTTClient_connect(client, &opts);
        
        // 发布消息示例
        char payload[100] = "{\"services\":[{\"service_id\":\"SensorService\",\"properties\":{\"TEMP\":25.5}}]}";
        MQTTClient_publish(client, TOPIC, strlen(payload), payload, 0, false, NULL);
        
        // 断开连接
        MQTTClient_disconnect(client, 1000);
        MQTTClient_destroy(&client);
        return 0;
    }

    4. 编译与链接

    gcc your-program.c -o your-program -lpaho-mqtt3c -lssl -lcrypto -lpthread
  • 关键链接参数
  • -lpaho-mqtt3c:Paho C 库的核心库(必选)。
  • -lssl -lcrypto:OpenSSL 库(若使用 TLS/SSL)。
  • -lpthread:线程库(若程序使用多线程)。
  • 适配不同服务器的差异

    不同 MQTT 服务器(如华为云、阿里云)的主要差异在于:

    1. 连接参数

    2. 服务器地址:华为云为ssl://xxx.iotda-device.cn-north-4.myhuaweicloud.com:8883,阿里云为ssl://xxx.mqtt.aliyuncs.com:1883
    3. 客户端 ID / 用户名 / 密码:需根据各平台规则生成(如华为云需包含设备 ID 和产品 ID)。
    4. 证书要求

    5. 华为云:需下载其根证书(如GlobalSignRSAOVSSLCA2018.crt)。
    6. 其他平台:可能使用不同的根证书(如 Let’s Encrypt 或自签名证书)。
    7. Topic 格式

    8. 华为云属性上报 Topic:$oc/devices/{device_id}/sys/properties/report
    9. 阿里云 Topic:/sys/{productKey}/{deviceName}/thing/event/property/post

    常见问题与解决方案

    编译错误:未找到头文件或库文件

  • 确认头文件路径:#include "MQTTClient.h" 应指向/usr/local/include
  • 确认库文件路径:通过sudo ldconfig更新动态链接库缓存,或在编译时指定路径:
  • gcc your-program.c -o your-program -I/usr/local/include -L/usr/local/lib -lpaho-mqtt3c ...

     

    运行时错误:找不到 libpaho-mqtt3c.so

  • 使用sudo find / -name "libpaho-mqtt3c.so*" 确认库文件位置。
  • 创建软链接或修改/etc/ld.so.conf添加库路径:
  • sudo ln -s /usr/local/lib/libpaho-mqtt3c.so.1 /usr/lib/libpaho-mqtt3c.so.1
    sudo ldconfig

     

    连接失败(TLS 相关)

  • 确保证书路径正确且为 PEM 格式(以-----BEGIN CERTIFICATE-----开头)。
  • 临时禁用 TLS 验证(仅测试):
  • ssl_opts.verify = 0;  // 禁用证书验证(生产环境需开启)

    五、替代方案:使用其他 MQTT 库或语言

    如果 C 语言开发成本较高,可考虑:

  • Python:使用paho-mqtt Python 库(pip install paho-mqtt),代码更简洁。
  • Node.js:使用mqtt模块(npm install mqtt),配合 Node-RED 可视化流程(如之前的方案)。
  • 其他 C++ 库:如mqtt-cppemqttd等,但 Paho 库仍是最广泛使用的选择。
  • 总结

    上述步骤是使用 Paho MQTT C 库 实现 MQTT 通信的标准流程,适用于连接各类支持 MQTT 协议的服务器。核心要点是:

    1. 正确编译和安装库文件,确保依赖齐全;
    2. 根据目标服务器的规则配置连接参数(地址、认证信息、Topic 等);
    3. 处理好 TLS 证书和动态链接库路径问题。

    通过这种方式,可以在 C/C++ 项目中高效实现 MQTT 协议通信,满足工业物联网、智能家居等场景的需求。

    关于连接华为云的问题

    这些配置参数非常关键:

  • CLIENT_ID:格式为{设备ID}_{接入协议类型}_{设备是否加密接入}_{时间戳}
  • USERNAME:通常是设备 ID
  • PASSWORD:设备密钥或根据特定算法生成的密码
  • PUB_TOPIC:华为云定义的属性上报主题格式
  • 连接初始化

    int init_huawei_mqtt() {
        // 检查华为云参数有效性
        if (HUAWEI_CLIENT_ID == NULL || HUAWEI_USERNAME == NULL || HUAWEI_PASSWORD == NULL) {
            printf("错误: 华为云参数为空\n");
            return -1;
        }
    
        int rc;
        MQTTClient_create(&huawei_client, HUAWEI_CLOUD_ADDR, HUAWEI_CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
        MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
        conn_opts.username = HUAWEI_USERNAME;
        conn_opts.password = HUAWEI_PASSWORD;
    
        if ((rc = MQTTClient_connect(huawei_client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
            printf("华为云MQTT连接失败,错误码:%d,错误信息:%s\n", rc, MQTTClient_strerror(rc));
            return -1;
        }
        printf("华为云MQTT连接成功: %s\n", HUAWEI_CLOUD_ADDR);
    
        return 0;
    }

    数据格式

    华为云 IoT 平台要求特定的 JSON 格式来上报设备属性:

    void huawei_publish_data(float temperature, float humidity, int co, int lux) {
        char json_str[256];
        snprintf(json_str, sizeof(json_str),
            "{"
            "\"services\":[{"
            "\"service_id\":\"Sensor\","
            "\"properties\":{"
            "\"CO浓度\":%d,"
            "\"光照强度\":%d,"
            "\"湿度\":%.1f,"
            "\"温度\":%.1f"
            "}"
            "}]"
            "}", co, lux, humidity, temperature);
    
        // ... 发布消息代码 ...
    }

    这个 JSON 结构包含:

  • services:服务数组,每个服务代表设备的一类功能
  • service_id:服务 ID,需要在华为云平台上预先定义
  • properties:具体的属性数据,键名需要与平台上定义的模型匹配
  • 华为云默认要求 TLS 连接,之所以在代码中注释掉了 SSL 配置部分:

    // 配置SSL/TLS
    // MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
    // ssl_opts.trustStore = "/etc/ssl/certs/ca-certificates.crt";
    // ssl_opts.verify = 1;
    // ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;  // 指定TLS版本
    // conn_opts.ssl = &ssl_opts;

    华为云 IoT 平台对非加密连接(TCP/1883)和加密连接(MQTTS/8883)的校验机制不同

    第一版代码,目前还存在一点问题(不严谨)大体上所有功能均能实现

    一、旧代码(能连接)的特点:未启用加密(TCP/1883)

    1. 未使用 TLS 加密
  • 协议与端口:旧代码使用 tcp://1883,未强制验证证书和设备身份,属于非安全连接(仅适用于测试环境,生产环境会被禁止)。
  • 华为云策略:部分旧版测试环境可能允许非加密连接,但新版环境通常强制要求 TLS 加密(MQTTS/8883),且会校验证书和设备签名。
  • 2. 绕过证书校验的可能性
  • 旧代码中注释了 SSL 配置,且未启用 verify(证书验证),因此:
  • 华为云可能未严格校验设备证书(仅校验 ClientID、Username、Password)。
  • 若 PASSWORD 直接使用设备密钥(而非签名后的密码),可能在测试环境中被临时允许连接(但不符合官方规范)。
  • 二、新代码(不能连接)的问题:启用加密但配置错误

    1. 加密连接的强制校验点

    当使用 mqtts://8883 时,华为云会严格校验:

  • TLS 证书有效性:必须提供正确的 CA 根证书(如 ca-certificates.crt),否则会因证书验证失败断开连接。
  • 设备签名合法性PASSWORD 必须是通过设备密钥和时间戳生成的签名(而非明文密钥),否则认证失败。
  • 2. 新代码的潜在错误
    (1) PASSWORD 错误使用明文密钥
  • 华为云生产环境要求 PASSWORD 是 HMAC-SHA256 签名结果,而非设备密钥明文。旧代码可能因测试环境允许明文密钥而侥幸连接,但新代码的加密连接会拒绝明文。
  • 正确做法:使用设备密钥、ClientID 中的时间戳按华为云规范生成签名(参考 华为云签名算法文档)。
  • (2) CA 证书路径或内容错误
  • 新代码指定 trustStore = "/etc/ssl/certs/ca-certificates.crt",但:
  • 该路径在某些系统(如嵌入式 Linux)中可能不存在,或证书未包含华为云 IoT 的根证书。
  • 解决方案:手动下载华为云 IoT 根证书(证书下载地址),并确保路径正确。
  • (3) ClientID 时间戳过期
  • CLIENT_ID 中的时间戳(如 2025051413)需与服务器时间相差不超过1 小时,否则签名失效。新代码若使用固定时间戳,可能因超时被拒绝。
  • 三、验证思路:测试非加密连接是否被禁止

    1. 确认华为云环境类型

    2. 若旧代码连接的是 旧版测试环境(允许 TCP/1883 + 明文密码),而新代码尝试连接 生产环境(强制 MQTTS/8883 + 签名认证),则必然失败。
    3. 可通过华为云控制台查看 “设备接入协议” 是否允许非加密连接。
    4. 临时禁用加密验证

    5. 在新代码中暂时改回 tcp://1883,并移除 SSL 配置,观察是否能连接。若能连接,说明问题出在加密配置或签名算法。

    四、解决方案:适配华为云加密连接要求

    1. 生成正确的 PASSWORD(签名)
    // 示例:使用HMAC-SHA256生成签名(需引入加密库,如OpenSSL)
    #include <openssl/hmac.h>
    
    char* generate_signature(const char* device_secret, const char* client_id) {
        // 提取ClientID中的时间戳(假设格式为 deviceId_0_0_timestamp)
        char* timestamp = strrchr(client_id, '_') + 1;
        // 拼接签名原文:clientId + timestamp
        char sign_str[128];
        snprintf(sign_str, sizeof(sign_str), "%s%s", client_id, timestamp);
        // 计算HMAC-SHA256
        unsigned char hash[EVP_MAX_MD_SIZE];
        unsigned int hash_len;
        HMAC(EVP_sha256(), device_secret, strlen(device_secret), 
             (unsigned char*)sign_str, strlen(sign_str), hash, &hash_len);
        // 转换为十六进制字符串
        char* signature = (char*)malloc(2 * hash_len + 1);
        for (int i = 0; i < hash_len; i++) {
            sprintf(signature + 2*i, "%02x", hash[i]);
        }
        return signature;
    }

  • 在代码中调用此函数,将结果赋值给 HUAWEI_PASSWORD
  • 2. 确保 CA 证书正确
  • 下载华为云 IoT 根证书(iot_ca.crt),保存到设备路径(如 /usr/local/etc/ca.pem),并修改代码:
    ssl_opts.trustStore = "/usr/local/etc/ca.pem";
    3. 更新 ClientID 时间戳
  • 使用当前时间生成时间戳(如 2025051514),确保与服务器时间同步。
  •  五、总结:新旧代码差异的核心原因

    场景 旧代码(能连接) 新代码(不能连接)
    连接方式 非加密(TCP/1883),绕过严格校验 加密(MQTTS/8883),强制证书和签名
    PASSWORD 可能使用明文密钥(测试环境允许) 需使用 HMAC 签名(生产环境强制)
    证书配置 未启用,不校验证书 启用但路径 / 内容错误
    适用环境 旧版测试环境或非安全环境 新版生产环境(需严格遵循规范)

    建议:优先使用加密连接(MQTTS/8883),并按照华为云官方文档配置签名和证书。若需临时调试,可先在测试环境中使用非加密连接,但生产环境必须启用安全机制。 

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <termios.h>
    #include <sys/select.h>
    #include <time.h>
    #include <pthread.h>
    #include <stdint.h>
    #include <queue>
    #include "MQTTClient.h"
    #include <cjson/cJSON.h>
    
    // MQTT配置
    #define MQTT_ADDRESS     "tcp://192.168.1.212:1883"
    #define MQTT_USERNAME    " "
    #define MQTT_PASSWORD    " "
    #define MQTT_CLIENTID    "smart_farm_rs485"
    #define MQTT_QOS         2
    #define MQTT_SENSOR_TOPIC "/Yuei/sensor/"
    #define MQTT_COMMAND_TOPIC "/Yuei/command/"
    #define MQTT_TIMEOUT     10000L
    
    // 华为云IoT配置(与文档完全一致)
    #define HUAWEI_CLOUD_ADDR "mqtts://21369d9ff7.st1.iotda-device.cn-north-4.myhuaweicloud.com:8883"
    #define HUAWEI_CLIENT_ID  "6823048384adf27cda58465e_Yiji1_0_0_2025051413"
    #define HUAWEI_USERNAME   "6823048384adf27cda58465e_Yiji1"
    #define HUAWEI_PASSWORD   "e554e71de483b60085373595c34051d0b37f9dbc680257234bb6233e9187704b"
    #define HUAWEI_PUB_TOPIC  "$oc/devices/6823048384adf27cda58465e_Yiji1/sys/properties/report"
    
    // RS485配置
    #define RS485_DEV "/dev/ttyS1"
    #define BAUDRATE B4800
    #define RECV_TIMEOUT_MS 2000
    
    // 继电器配置
    #define RELAY_ADDRESS 4
    #define RELAY1_ADDR 0x0010
    #define RELAY2_ADDR 0x0011
    
    // 全局变量
    int fd;
    MQTTClient mqtt_client;
    MQTTClient huawei_client;  
    pthread_mutex_t serial_mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t mqtt_mutex = PTHREAD_MUTEX_INITIALIZER;  
    int running = 1;
    
    // 命令队列结构
    struct RelayCommand {
        uint16_t address;
        uint8_t value;
        time_t timestamp;
    };
    
    typedef struct {
        float temperature;
        float humidity;
        int co;
        int lux;
        time_t timestamp;
    } SensorData;
    
    std::queue<RelayCommand> commandQueue;
    pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
    
    // 计算Modbus CRC16校验
    unsigned short crc16(unsigned char *data, int len) {
        unsigned short crc = 0xFFFF;
        for (int i = 0; i < len; i++) {
            crc ^= data[i];
            for (int j = 0; j < 8; j++) {
                if (crc & 0x0001) {
                    crc = (crc >> 1) ^ 0xA001;
                } else {
                    crc >>= 1;
                }
            }
        }
        return crc;
    }
    
    // 发送Modbus RTU指令并接收响应
    int send_modbus_command(int fd, unsigned char *command, int cmd_len, unsigned char *response, int resp_len) {
        fd_set readfds;
        struct timeval timeout;
    
        printf("发送指令: ");
        for (int i = 0; i < cmd_len; i++) {
            printf("%02X ", command[i]);
        }
        printf("\n");
    
        tcflush(fd, TCIFLUSH);
        usleep(50000); 
    
        int written = write(fd, command, cmd_len);
        if (written != cmd_len) {
            printf("发送失败: 只发送了 %d 字节,应该发送 %d 字节\n", written, cmd_len);
            return -1;
        }
    
        timeout.tv_sec = 1;
        timeout.tv_usec = 0;
    
        while (1) {
            FD_ZERO(&readfds);
            FD_SET(fd, &readfds);
    
            int sel_result = select(fd + 1, &readfds, NULL, NULL, &timeout);
            if (sel_result < 0) {
                perror("select error");
                return -1;
            } else if (sel_result == 0) {
                printf("响应超时\n");
                return 0;
            }
    
            if (FD_ISSET(fd, &readfds)) {
                int bytes = read(fd, response, resp_len);
                if (bytes > 0) {
                    printf("接收响应: ");
                    for (int i = 0; i < bytes; i++) {
                        printf("%02X ", response[i]);
                    }
                    printf("\n");
                    return bytes;
                }
            }
        }
    
        return 0;
    }
    
    // 重试读取传感器数据
    int read_sensor_data(int fd, unsigned char *command, int cmd_len, unsigned char *response, int resp_len, int max_retries) {
        int response_len = 0;
    
        for (int i = 0; i < max_retries; i++) {
            struct timespec ts;
            clock_gettime(CLOCK_REALTIME, &ts);
            ts.tv_sec += 1; 
    
            if (pthread_mutex_timedlock(&serial_mutex, &ts) != 0) {
                printf("获取串口锁超时(尝试 %d/%d)\n", i + 1, max_retries);
                continue;
            }
            
            response_len = send_modbus_command(fd, command, cmd_len, response, resp_len);
            pthread_mutex_unlock(&serial_mutex);
    
            if (response_len >= 5) {
                unsigned short resp_crc = (response[response_len - 2] | (response[response_len - 1] << 8));
                unsigned short calc_crc = crc16(response, response_len - 2);
                if (resp_crc == calc_crc) return response_len;
                else printf("CRC校验失败(尝试 %d/%d)\n", i + 1, max_retries);
            } else {
                if (response_len == 0) printf("响应超时(尝试 %d/%d)\n", i + 1, max_retries);
                else printf("响应长度不足(尝试 %d/%d)\n", i + 1, max_retries);
            }
    
            usleep(100000); 
        }
    
        return -1;
    }
    
    // 记录日志
    void log_message(const char* message) {
        FILE* log_file = fopen("sensor_log.txt", "a");
        if (!log_file) {
            perror("无法打开日志文件");
            return;
        }
        time_t rawtime;
        struct tm* timeinfo;
        time(&rawtime);
        timeinfo = localtime(&rawtime);
        char time_str[26];
        strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", timeinfo);
        fprintf(log_file, "%s - %s\n", time_str, message);
        fclose(log_file);
    }
    
    // MQTT消息发布函数 - 本地MQTT
    void mqtt_publish(const char *topic, const char *payload) {
        MQTTClient_message message = MQTTClient_message_initializer;
        message.payload = (void*)payload;
        message.payloadlen = strlen(payload);
        message.qos = MQTT_QOS;
        message.retained = 0;
    
        MQTTClient_deliveryToken token;
        int rc;
        
        pthread_mutex_lock(&mqtt_mutex);
        if ((rc = MQTTClient_publishMessage(mqtt_client, topic, &message, &token)) != MQTTCLIENT_SUCCESS) {
            printf("MQTT发布失败,错误码:%d\n", rc);
        } else {
            MQTTClient_waitForCompletion(mqtt_client, token, MQTT_TIMEOUT);
            printf("已发布到主题[%s]: %s\n", topic, payload);
        }
        pthread_mutex_unlock(&mqtt_mutex);
    }
    
    // 华为云MQTT数据发布函数(修正JSON结构)
    void huawei_publish_data(float temperature, float humidity, int co, int lux) {
        char json_str[256];
        snprintf(json_str, sizeof(json_str),
            "{"
            "\"services\":[{"
                "\"service_id\":\"default_service\"," 
                "\"properties\":{"
                    "\"CO\":%d,"
                    "\"HID\":%d,"
                    "\"humi\":%.1f,"
                    "\"temp\":%.1f"
                "}"
            "}]"
            "}", co, lux, humidity, temperature);
    
        pthread_mutex_lock(&mqtt_mutex);
    
        MQTTClient_message pubmsg = MQTTClient_message_initializer;
        pubmsg.payload = json_str;
        pubmsg.payloadlen = strlen(json_str);
        pubmsg.qos = MQTT_QOS;
        pubmsg.retained = 0;
    
        MQTTClient_deliveryToken token;
        int rc = MQTTClient_publishMessage(huawei_client, HUAWEI_PUB_TOPIC, &pubmsg, &token);
        if (rc != MQTTCLIENT_SUCCESS) {
            fprintf(stderr, "[华为云] 发布失败: %d\n", rc);
        } else {
            if (MQTTClient_waitForCompletion(huawei_client, token, MQTT_TIMEOUT) != MQTTCLIENT_SUCCESS) {
                fprintf(stderr, "[华为云] 等待发布完成失败\n");
            }
            printf("[华为云] 数据已发布: %s\n", json_str);
        }
    
        pthread_mutex_unlock(&mqtt_mutex);
    }
    
    // 控制继电器
    int control_relay(int fd, uint16_t address, uint8_t value) {
        uint8_t command[8];
        command[0] = (uint8_t)RELAY_ADDRESS;  // 显式类型转换
        command[1] = 0x05;
        command[2] = (uint8_t)((address >> 8) & 0xFF);  // 显式类型转换
        command[3] = (uint8_t)(address & 0xFF);         // 显式类型转换
        command[4] = value ? 0xFF : 0x00;
        command[5] = 0x00;
    
        uint16_t crc = crc16(command, 6);
        command[6] = (uint8_t)(crc & 0xFF);             // 显式类型转换
        command[7] = (uint8_t)(crc >> 8);               // 显式类型转换
    
        struct timespec ts;
        clock_gettime(CLOCK_REALTIME, &ts);
        ts.tv_sec += 2;
    
        int lock_result = pthread_mutex_timedlock(&serial_mutex, &ts);
        if (lock_result != 0) {
            printf("获取串口锁超时,继电器控制被延迟\n");
            return -1;
        }
    
        tcflush(fd, TCIFLUSH);
        int bytes_written = write(fd, command, 8);
        if (bytes_written != 8) {
            perror("发送命令失败");
            pthread_mutex_unlock(&serial_mutex);
            return -1;
        }
    
        usleep(100000);
        uint8_t response[8];
        int bytes_read = read(fd, response, sizeof(response));
        pthread_mutex_unlock(&serial_mutex);
        
        if (bytes_read < 5) {
            printf("响应数据过短: %d字节\n", bytes_read);
            return -1;
        }
    
        uint16_t received_crc = (response[bytes_read - 2] | (response[bytes_read - 1] << 8));
        uint16_t calculated_crc = crc16(response, bytes_read - 2);
        if (received_crc != calculated_crc || response[1] != 0x05) {
            printf("CRC校验或功能码错误\n");
            return -1;
        }
    
        return 0;
    }
    
    // MQTT消息到达回调函数
    int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
        char* payload = (char*)message->payload;
        printf("收到消息: 主题[%s], 内容: %s\n", topicName, payload);
    
        if (strncmp(topicName, MQTT_COMMAND_TOPIC, strlen(MQTT_COMMAND_TOPIC)) == 0) {
            int relay_num, action;
            if (sscanf(payload, "relay%d=%d", &relay_num, &action) == 2) {
                RelayCommand cmd;
                cmd.address = (relay_num == 1) ? RELAY1_ADDR : RELAY2_ADDR;
                cmd.value = (uint8_t)action;  // 显式类型转换
                cmd.timestamp = time(NULL);
                
                pthread_mutex_lock(&queue_mutex);
                commandQueue.push(cmd);
                pthread_cond_signal(&queue_cond);
                pthread_mutex_unlock(&queue_mutex);
                
                printf("已将继电器命令加入队列: 继电器%d=%d\n", relay_num, action);
            } else {
                printf("无法解析的命令格式: %s\n", payload);
            }
        }
    
        MQTTClient_freeMessage(&message);
        MQTTClient_free(topicName);
        return 1;
    }
    
    // 命令处理线程
    void* command_thread(void* arg) {
        while (running) {
            pthread_mutex_lock(&queue_mutex);
            while (commandQueue.empty() && running) {
                pthread_cond_wait(&queue_cond, &queue_mutex);
            }
            if (!running) {
                pthread_mutex_unlock(&queue_mutex);
                break;
            }
            RelayCommand cmd = commandQueue.front();
            commandQueue.pop();
            pthread_mutex_unlock(&queue_mutex);
            
            printf("处理继电器命令: 地址 0x%04X, 值 %d\n", cmd.address, cmd.value);
            int result = control_relay(fd, cmd.address, cmd.value);
            
            char status_msg[50];
            snprintf(status_msg, sizeof(status_msg), "relay%d_status=%d", (cmd.address==RELAY1_ADDR?1:2), cmd.value);
            mqtt_publish(MQTT_SENSOR_TOPIC, status_msg);
            
            printf(result==0?"继电器控制成功\n":"继电器控制失败\n");
        }
        return NULL;
    }
    
    // MQTT连接初始化
    int init_mqtt() {
        int rc;
        MQTTClient_create(&mqtt_client, MQTT_ADDRESS, MQTT_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
        MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
        conn_opts.username = MQTT_USERNAME;
        conn_opts.password = MQTT_PASSWORD;
        conn_opts.keepAliveInterval = 20;
        conn_opts.cleansession = 1;
        MQTTClient_setCallbacks(mqtt_client, NULL, NULL, msgarrvd, NULL);
    
        if ((rc = MQTTClient_connect(mqtt_client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
            printf("MQTT连接失败,错误码:%d\n", rc);
            return -1;
        }
        printf("MQTT连接成功: %s\n", MQTT_ADDRESS);
    
        if ((rc = MQTTClient_subscribe(mqtt_client, MQTT_COMMAND_TOPIC, MQTT_QOS)) != MQTTCLIENT_SUCCESS) {
            printf("订阅主题失败,错误码:%d\n", rc);
            return -1;
        }
        printf("已订阅主题: %s\n", MQTT_COMMAND_TOPIC);
    
        return 0;
    }
    
    // 华为云MQTT连接初始化(增强安全性)
    int init_huawei_mqtt() {
        // 检查华为云参数有效性
        if (HUAWEI_CLIENT_ID == NULL || HUAWEI_USERNAME == NULL || HUAWEI_PASSWORD == NULL) {
            printf("错误: 华为云参数为空\n");
            return -1;
        }
    
        int rc;
        MQTTClient_create(&huawei_client, HUAWEI_CLOUD_ADDR, HUAWEI_CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
        MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
        conn_opts.username = HUAWEI_USERNAME;
        conn_opts.password = HUAWEI_PASSWORD;
        conn_opts.keepAliveInterval = 20;
        conn_opts.cleansession = 1;
        conn_opts.connectTimeout = 10;
    
        // 配置SSL/TLS
        MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
        ssl_opts.trustStore = "/etc/ssl/certs/ca-certificates.crt";
        ssl_opts.verify = 1;
        ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;  // 指定TLS版本
        conn_opts.ssl = &ssl_opts;
    
        // 验证证书文件是否存在且可读
        if (access(ssl_opts.trustStore, R_OK) != 0) {
            printf("错误: 证书文件不可读: %s\n", ssl_opts.trustStore);
            return -1;
        }
    
        if ((rc = MQTTClient_connect(huawei_client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
            printf("华为云MQTT连接失败,错误码:%d,错误信息:%s\n", rc, MQTTClient_strerror(rc));
            return -1;
        }
        printf("华为云MQTT连接成功: %s\n", HUAWEI_CLOUD_ADDR);
    
        return 0;
    }
    
    // 串口初始化
    int init_serial() {
        fd = open(RS485_DEV, O_RDWR | O_NOCTTY | O_NDELAY);
        if (fd < 0) {
            perror("无法打开串口");
            log_message("无法打开串口");
            return -1;
        }
    
        struct termios oldtio, newtio;
        tcgetattr(fd, &oldtio);
        bzero(&newtio, sizeof(newtio));
        newtio.c_cflag = BAUDRATE | CS8 | CLOCAL | CREAD;
        newtio.c_cflag &= ~CSTOPB;
        newtio.c_iflag = IGNPAR;
        newtio.c_oflag = 0;
        tcflush(fd, TCIFLUSH);
        tcsetattr(fd, TCSANOW, &newtio);
    
        return 0;
    }
    
    // 修改后的传感器线程函数
    void* sensor_thread(void* arg) {
        int max_retries = 3;
        unsigned char response[256];
        SensorData sensor_data = {0};
    
        while (running) {
            memset(&sensor_data, 0, sizeof(sensor_data));
            int data_valid = 1;
    
            // 读取温湿度
            printf("\n读取温湿度传感器数据...\n");
            unsigned char temp_cmd[] = {0x01, 0x03, 0x00, 0x00, 0x00, 0x02};
            temp_cmd[6] = crc16(temp_cmd, 6) & 0xFF;
            temp_cmd[7] = crc16(temp_cmd, 6) >> 8;
            int resp_len = read_sensor_data(fd, temp_cmd, 8, response, sizeof(response), max_retries);
            if (resp_len >= 7 && response[0] == 0x01 && response[1] == 0x03 && response[2] == 0x04) {
                sensor_data.humidity = ((response[3]<<8)|response[4])/10.0f;
                sensor_data.temperature = ((response[5]<<8)|response[6])/10.0f;
            } else {
                data_valid = 0;
            }
    
            // 读取CO浓度
            printf("\n读取CO浓度检测数据...\n");
            unsigned char co_cmd[] = {0x02, 0x03, 0x00, 0x00, 0x00, 0x01};
            co_cmd[6] = crc16(co_cmd, 6) & 0xFF;
            co_cmd[7] = crc16(co_cmd, 6) >> 8;
            resp_len = read_sensor_data(fd, co_cmd, 8, response, sizeof(response), max_retries);
            if (resp_len >= 5 && response[0] == 0x02 && response[1] == 0x03 && response[2] == 0x02) {
                sensor_data.co = (response[3]<<8)|response[4];
            } else {
                data_valid = 0;
            }
    
            // 读取光照值
            printf("\n读取光照值检测数据...\n");
            unsigned char light_cmd[] = {0x03, 0x03, 0x00, 0x00, 0x00, 0x01};
            light_cmd[6] = crc16(light_cmd, 6) & 0xFF;
            light_cmd[7] = crc16(light_cmd, 6) >> 8;
            resp_len = read_sensor_data(fd, light_cmd, 8, response, sizeof(response), max_retries);
            if (resp_len >= 5 && response[0] == 0x03 && response[1] == 0x03 && response[2] == 0x02) {
                sensor_data.lux = (response[3]<<8)|response[4];
            } else {
                data_valid = 0;
            }
    
            if (data_valid) {
                char payload[256];
                snprintf(payload, sizeof(payload), 
                    "{\"TEMP\":%.1f,\"HUMI\":%.1f,\"CO\":%d,\"LUX\":%d}",
                    sensor_data.temperature, sensor_data.humidity, sensor_data.co, sensor_data.lux);
                mqtt_publish(MQTT_SENSOR_TOPIC, payload);
                huawei_publish_data(sensor_data.temperature, sensor_data.humidity, sensor_data.co, sensor_data.lux);
                
                char log_msg[256];
                sprintf(log_msg, "传感器数据 - 温度:%.1f 湿度:%.1f CO:%d LUX:%d",
                        sensor_data.temperature, sensor_data.humidity, sensor_data.co, sensor_data.lux);
                log_message(log_msg);
            } else {
                log_message("传感器数据不完整,本次未发送");
            }
    
            sleep(2);
        }
    
        return NULL;
    }
    
    // 华为云MQTT网络线程函数
    void *huawei_mqtt_thread(void *arg) {
        while(running) {
            MQTTClient_yield();  // 修正函数调用
            usleep(100000);
        }
        return NULL;
    }
    
    int main() {
        int huawei_initialized = 0;  // 标记华为云是否初始化成功
        
        if (init_serial() < 0) return -1;
        if (init_mqtt() < 0) { close(fd); return -1; }
    
        pthread_t command_tid, sensor_tid, huawei_tid;
    
        if (pthread_create(&command_tid, NULL, command_thread, NULL) != 0 ||
            pthread_create(&sensor_tid, NULL, sensor_thread, NULL) != 0) {
            perror("线程创建失败");
            running = 0;
            close(fd);
            MQTTClient_disconnect(mqtt_client, MQTT_TIMEOUT);
            MQTTClient_destroy(&mqtt_client);
            return -1;
        }
    
        // 初始化华为云MQTT
        if (init_huawei_mqtt() == 0) {
            huawei_initialized = 1;
            if (pthread_create(&huawei_tid, NULL, huawei_mqtt_thread, NULL) != 0) {
                fprintf(stderr, "创建华为云MQTT线程失败\n");
                huawei_initialized = 0;
            }
        } else {
            fprintf(stderr, "华为云MQTT初始化失败,继续运行但不会发送数据到云端\n");
        }
    
        printf("程序运行中,按回车键退出...\n");
        getchar(); 
    
        running = 0;
        pthread_cond_signal(&queue_cond);
        pthread_join(command_tid, NULL);
        pthread_join(sensor_tid, NULL);
        
        // 正确释放华为云MQTT资源
        if (huawei_initialized) {
            pthread_join(huawei_tid, NULL);
            MQTTClient_disconnect(huawei_client, MQTT_TIMEOUT);
            MQTTClient_destroy(&huawei_client);
        }
    
        MQTTClient_unsubscribe(mqtt_client, MQTT_COMMAND_TOPIC);
        MQTTClient_disconnect(mqtt_client, MQTT_TIMEOUT);
        MQTTClient_destroy(&mqtt_client);
        close(fd);
        pthread_mutex_destroy(&serial_mutex);
        pthread_mutex_destroy(&mqtt_mutex);
        pthread_mutex_destroy(&queue_mutex);
        pthread_cond_destroy(&queue_cond);
    
        return 0;
    }
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <termios.h>
    #include <sys/select.h>
    #include <time.h>
    #include <pthread.h>
    #include <stdint.h>
    #include <queue>
    #include "MQTTClient.h"
    #include <cjson/cJSON.h>
    
    // MQTT配置
    #define MQTT_ADDRESS     "tcp://192.168.1.212:1883"
    #define MQTT_USERNAME    " "
    #define MQTT_PASSWORD    " "
    #define MQTT_CLIENTID    "smart_farm_rs485"
    #define MQTT_QOS         2
    #define MQTT_SENSOR_TOPIC "/Yuei/sensor/"
    #define MQTT_COMMAND_TOPIC "/Yuei/command/"
    #define MQTT_TIMEOUT     10000L
    
    // 华为云IoT配置(与文档完全一致)
    #define HUAWEI_CLOUD_ADDR "tcp://21369d9ff7.st1.iotda-device.cn-north-4.myhuaweicloud.com:1883"
    #define HUAWEI_CLIENT_ID  "6823048384adf27cda58465e_Yiji1_0_0_2025051413"
    #define HUAWEI_USERNAME   "6823048384adf27cda58465e_Yiji1"
    #define HUAWEI_PASSWORD   "e554e71de483b60085373595c34051d0b37f9dbc680257234bb6233e9187704b"
    #define HUAWEI_PUB_TOPIC  "$oc/devices/6823048384adf27cda58465e_Yiji1/sys/properties/report"
    
    // RS485配置
    #define RS485_DEV "/dev/ttyS1"
    #define BAUDRATE B4800
    #define RECV_TIMEOUT_MS 2000
    
    // 继电器配置
    #define RELAY_ADDRESS 4
    #define RELAY1_ADDR 0x0010
    #define RELAY2_ADDR 0x0011
    
    // 全局变量
    int fd;
    MQTTClient mqtt_client;
    MQTTClient huawei_client;
    pthread_mutex_t serial_mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t mqtt_mutex = PTHREAD_MUTEX_INITIALIZER;
    int running = 1;
    
    // 命令队列结构
    struct RelayCommand {
        uint16_t address;
        uint8_t value;
        time_t timestamp;
    };
    
    typedef struct {
        float temperature;
        float humidity;
        int co;
        int lux;
        time_t timestamp;
    } SensorData;
    
    std::queue<RelayCommand> commandQueue;
    pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
    
    // 计算Modbus CRC16校验
    unsigned short crc16(unsigned char* data, int len) {
        unsigned short crc = 0xFFFF;
        for (int i = 0; i < len; i++) {
            crc ^= data[i];
            for (int j = 0; j < 8; j++) {
                if (crc & 0x0001) {
                    crc = (crc >> 1) ^ 0xA001;
                }
                else {
                    crc >>= 1;
                }
            }
        }
        return crc;
    }
    
    // 发送Modbus RTU指令并接收响应
    int send_modbus_command(int fd, unsigned char* command, int cmd_len, unsigned char* response, int resp_len) {
        fd_set readfds;
        struct timeval timeout;
    
        printf("发送指令: ");
        for (int i = 0; i < cmd_len; i++) {
            printf("%02X ", command[i]);
        }
        printf("\n");
    
        tcflush(fd, TCIFLUSH);
        usleep(50000);
    
        int written = write(fd, command, cmd_len);
        if (written != cmd_len) {
            printf("发送失败: 只发送了 %d 字节,应该发送 %d 字节\n", written, cmd_len);
            return -1;
        }
    
        timeout.tv_sec = 1;
        timeout.tv_usec = 0;
    
        while (1) {
            FD_ZERO(&readfds);
            FD_SET(fd, &readfds);
    
            int sel_result = select(fd + 1, &readfds, NULL, NULL, &timeout);
            if (sel_result < 0) {
                perror("select error");
                return -1;
            }
            else if (sel_result == 0) {
                printf("响应超时\n");
                return 0;
            }
    
            if (FD_ISSET(fd, &readfds)) {
                int bytes = read(fd, response, resp_len);
                if (bytes > 0) {
                    printf("接收响应: ");
                    for (int i = 0; i < bytes; i++) {
                        printf("%02X ", response[i]);
                    }
                    printf("\n");
                    return bytes;
                }
            }
        }
    
        return 0;
    }
    
    // 重试读取传感器数据
    int read_sensor_data(int fd, unsigned char* command, int cmd_len, unsigned char* response, int resp_len, int max_retries) {
        int response_len = 0;
    
        for (int i = 0; i < max_retries; i++) {
            struct timespec ts;
            clock_gettime(CLOCK_REALTIME, &ts);
            ts.tv_sec += 1;
    
            if (pthread_mutex_timedlock(&serial_mutex, &ts) != 0) {
                printf("获取串口锁超时(尝试 %d/%d)\n", i + 1, max_retries);
                continue;
            }
    
            response_len = send_modbus_command(fd, command, cmd_len, response, resp_len);
            pthread_mutex_unlock(&serial_mutex);
    
            if (response_len >= 5) {
                unsigned short resp_crc = (response[response_len - 2] | (response[response_len - 1] << 8));
                unsigned short calc_crc = crc16(response, response_len - 2);
                if (resp_crc == calc_crc) return response_len;
                else printf("CRC校验失败(尝试 %d/%d)\n", i + 1, max_retries);
            }
            else {
                if (response_len == 0) printf("响应超时(尝试 %d/%d)\n", i + 1, max_retries);
                else printf("响应长度不足(尝试 %d/%d)\n", i + 1, max_retries);
            }
    
            usleep(100000);
        }
    
        return -1;
    }
    
    // 记录日志
    void log_message(const char* message) {
        FILE* log_file = fopen("sensor_log.txt", "a");
        if (!log_file) {
            perror("无法打开日志文件");
            return;
        }
        time_t rawtime;
        struct tm* timeinfo;
        time(&rawtime);
        timeinfo = localtime(&rawtime);
        char time_str[26];
        strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", timeinfo);
        fprintf(log_file, "%s - %s\n", time_str, message);
        fclose(log_file);
    }
    
    // MQTT消息发布函数 - 本地MQTT
    void mqtt_publish(const char* topic, const char* payload) {
        MQTTClient_message message = MQTTClient_message_initializer;
        message.payload = (void*)payload;
        message.payloadlen = strlen(payload);
        message.qos = MQTT_QOS;
        message.retained = 0;
    
        MQTTClient_deliveryToken token;
        int rc;
    
        pthread_mutex_lock(&mqtt_mutex);
        if ((rc = MQTTClient_publishMessage(mqtt_client, topic, &message, &token)) != MQTTCLIENT_SUCCESS) {
            printf("MQTT发布失败,错误码:%d\n", rc);
        }
        else {
            MQTTClient_waitForCompletion(mqtt_client, token, MQTT_TIMEOUT);
            printf("已发布到主题[%s]: %s\n", topic, payload);
        }
        pthread_mutex_unlock(&mqtt_mutex);
    }
    
    // 华为云MQTT数据发布函数(修正JSON结构)
    void huawei_publish_data(float temperature, float humidity, int co, int lux) {
        char json_str[256];
        snprintf(json_str, sizeof(json_str),
            "{"
            "\"services\":[{"
            "\"service_id\":\"Sensor\","
            "\"properties\":{"
            "\"CO浓度\":%d,"
            "\"光照强度\":%d,"
            "\"湿度\":%.1f,"
            "\"温度\":%.1f"
            "}"
            "}]"
            "}", co, lux, humidity, temperature);
    
        pthread_mutex_lock(&mqtt_mutex);
    
        MQTTClient_message pubmsg = MQTTClient_message_initializer;
        pubmsg.payload = json_str;
        pubmsg.payloadlen = strlen(json_str);
        pubmsg.qos = MQTT_QOS;
        pubmsg.retained = 0;
    
        MQTTClient_deliveryToken token;
        int rc = MQTTClient_publishMessage(huawei_client, HUAWEI_PUB_TOPIC, &pubmsg, &token);
        if (rc != MQTTCLIENT_SUCCESS) {
            fprintf(stderr, "[华为云] 发布失败: %d\n", rc);
        }
        else {
            if (MQTTClient_waitForCompletion(huawei_client, token, MQTT_TIMEOUT) != MQTTCLIENT_SUCCESS) {
                fprintf(stderr, "[华为云] 等待发布完成失败\n");
            }
            printf("[华为云] 数据已发布: %s\n", json_str);
        }
    
        pthread_mutex_unlock(&mqtt_mutex);
    }
    
    // 控制继电器
    int control_relay(int fd, uint16_t address, uint8_t value) {
        uint8_t command[8];
        command[0] = (uint8_t)RELAY_ADDRESS;  // 显式类型转换
        command[1] = 0x05;
        command[2] = (uint8_t)((address >> 8) & 0xFF);  // 显式类型转换
        command[3] = (uint8_t)(address & 0xFF);         // 显式类型转换
        command[4] = value ? 0xFF : 0x00;
        command[5] = 0x00;
    
        uint16_t crc = crc16(command, 6);
        command[6] = (uint8_t)(crc & 0xFF);             // 显式类型转换
        command[7] = (uint8_t)(crc >> 8);               // 显式类型转换
    
        struct timespec ts;
        clock_gettime(CLOCK_REALTIME, &ts);
        ts.tv_sec += 2;
    
        int lock_result = pthread_mutex_timedlock(&serial_mutex, &ts);
        if (lock_result != 0) {
            printf("获取串口锁超时,继电器控制被延迟\n");
            return -1;
        }
    
        tcflush(fd, TCIFLUSH);
        int bytes_written = write(fd, command, 8);
        if (bytes_written != 8) {
            perror("发送命令失败");
            pthread_mutex_unlock(&serial_mutex);
            return -1;
        }
    
        usleep(100000);
        uint8_t response[8];
        int bytes_read = read(fd, response, sizeof(response));
        pthread_mutex_unlock(&serial_mutex);
    
        if (bytes_read < 5) {
            printf("响应数据过短: %d字节\n", bytes_read);
            return -1;
        }
    
        uint16_t received_crc = (response[bytes_read - 2] | (response[bytes_read - 1] << 8));
        uint16_t calculated_crc = crc16(response, bytes_read - 2);
        if (received_crc != calculated_crc || response[1] != 0x05) {
            printf("CRC校验或功能码错误\n");
            return -1;
        }
    
        return 0;
    }
    
    // MQTT消息到达回调函数
    int msgarrvd(void* context, char* topicName, int topicLen, MQTTClient_message* message) {
        char* payload = (char*)message->payload;
        printf("收到消息: 主题[%s], 内容: %s\n", topicName, payload);
    
        if (strncmp(topicName, MQTT_COMMAND_TOPIC, strlen(MQTT_COMMAND_TOPIC)) == 0) {
            int relay_num, action;
            if (sscanf(payload, "relay%d=%d", &relay_num, &action) == 2) {
                RelayCommand cmd;
                cmd.address = (relay_num == 1) ? RELAY1_ADDR : RELAY2_ADDR;
                cmd.value = (uint8_t)action;  // 显式类型转换
                cmd.timestamp = time(NULL);
    
                pthread_mutex_lock(&queue_mutex);
                commandQueue.push(cmd);
                pthread_cond_signal(&queue_cond);
                pthread_mutex_unlock(&queue_mutex);
    
                printf("已将继电器命令加入队列: 继电器%d=%d\n", relay_num, action);
            }
            else {
                printf("无法解析的命令格式: %s\n", payload);
            }
        }
    
        MQTTClient_freeMessage(&message);
        MQTTClient_free(topicName);
        return 1;
    }
    
    // 命令处理线程
    void* command_thread(void* arg) {
        while (running) {
            pthread_mutex_lock(&queue_mutex);
            while (commandQueue.empty() && running) {
                pthread_cond_wait(&queue_cond, &queue_mutex);
            }
            if (!running) {
                pthread_mutex_unlock(&queue_mutex);
                break;
            }
            RelayCommand cmd = commandQueue.front();
            commandQueue.pop();
            pthread_mutex_unlock(&queue_mutex);
    
            printf("处理继电器命令: 地址 0x%04X, 值 %d\n", cmd.address, cmd.value);
            int result = control_relay(fd, cmd.address, cmd.value);
    
            char status_msg[50];
            snprintf(status_msg, sizeof(status_msg), "relay%d_status=%d", (cmd.address == RELAY1_ADDR ? 1 : 2), cmd.value);
            mqtt_publish(MQTT_SENSOR_TOPIC, status_msg);
    
            printf(result == 0 ? "继电器控制成功\n" : "继电器控制失败\n");
        }
        return NULL;
    }
    
    // MQTT连接初始化
    int init_mqtt() {
        int rc;
        MQTTClient_create(&mqtt_client, MQTT_ADDRESS, MQTT_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
        MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
        conn_opts.username = MQTT_USERNAME;
        conn_opts.password = MQTT_PASSWORD;
        conn_opts.keepAliveInterval = 20;
        conn_opts.cleansession = 1;
        MQTTClient_setCallbacks(mqtt_client, NULL, NULL, msgarrvd, NULL);
    
        if ((rc = MQTTClient_connect(mqtt_client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
            printf("MQTT连接失败,错误码:%d\n", rc);
            return -1;
        }
        printf("MQTT连接成功: %s\n", MQTT_ADDRESS);
    
        if ((rc = MQTTClient_subscribe(mqtt_client, MQTT_COMMAND_TOPIC, MQTT_QOS)) != MQTTCLIENT_SUCCESS) {
            printf("订阅主题失败,错误码:%d\n", rc);
            return -1;
        }
        printf("已订阅主题: %s\n", MQTT_COMMAND_TOPIC);
    
        return 0;
    }
    
    // 华为云MQTT连接初始化(增强安全性)
    int init_huawei_mqtt() {
        // 检查华为云参数有效性
        if (HUAWEI_CLIENT_ID == NULL || HUAWEI_USERNAME == NULL || HUAWEI_PASSWORD == NULL) {
            printf("错误: 华为云参数为空\n");
            return -1;
        }
    
        int rc;
        MQTTClient_create(&huawei_client, HUAWEI_CLOUD_ADDR, HUAWEI_CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
        MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
        conn_opts.username = HUAWEI_USERNAME;
        conn_opts.password = HUAWEI_PASSWORD;
        // conn_opts.keepAliveInterval = 20;
        // conn_opts.cleansession = 1;
        // conn_opts.connectTimeout = 10;
    
        // 配置SSL/TLS
        // MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
        // ssl_opts.trustStore = "/etc/ssl/certs/ca-certificates.crt";
        // ssl_opts.verify = 1;
        // ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;  // 指定TLS版本
        // conn_opts.ssl = &ssl_opts;
    
        // 验证证书文件是否存在且可读
        // if (access(ssl_opts.trustStore, R_OK) != 0) {
        //     printf("错误: 证书文件不可读: %s\n", ssl_opts.trustStore);
        //     return -1;
        // }
    
        if ((rc = MQTTClient_connect(huawei_client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
            printf("华为云MQTT连接失败,错误码:%d,错误信息:%s\n", rc, MQTTClient_strerror(rc));
            return -1;
        }
        printf("华为云MQTT连接成功: %s\n", HUAWEI_CLOUD_ADDR);
    
        return 0;
    }
    
    // 串口初始化
    int init_serial() {
        fd = open(RS485_DEV, O_RDWR | O_NOCTTY | O_NDELAY);
        if (fd < 0) {
            perror("无法打开串口");
            log_message("无法打开串口");
            return -1;
        }
    
        struct termios oldtio, newtio;
        tcgetattr(fd, &oldtio);
        bzero(&newtio, sizeof(newtio));
        newtio.c_cflag = BAUDRATE | CS8 | CLOCAL | CREAD;
        newtio.c_cflag &= ~CSTOPB;
        newtio.c_iflag = IGNPAR;
        newtio.c_oflag = 0;
        tcflush(fd, TCIFLUSH);
        tcsetattr(fd, TCSANOW, &newtio);
    
        return 0;
    }
    
    // 修改后的传感器线程函数
    void* sensor_thread(void* arg) {
        int max_retries = 3;
        unsigned char response[256];
        SensorData sensor_data = { 0 };
    
        while (running) {
            memset(&sensor_data, 0, sizeof(sensor_data));
            int data_valid = 1;
    
            // 读取温湿度
            printf("\n读取温湿度传感器数据...\n");
            unsigned char temp_cmd[] = { 0x01, 0x03, 0x00, 0x00, 0x00, 0x02 };
            temp_cmd[6] = crc16(temp_cmd, 6) & 0xFF;
            temp_cmd[7] = crc16(temp_cmd, 6) >> 8;
            int resp_len = read_sensor_data(fd, temp_cmd, 8, response, sizeof(response), max_retries);
            if (resp_len >= 7 && response[0] == 0x01 && response[1] == 0x03 && response[2] == 0x04) {
                sensor_data.humidity = ((response[3] << 8) | response[4]) / 10.0f;
                sensor_data.temperature = ((response[5] << 8) | response[6]) / 10.0f;
            }
            else {
                data_valid = 0;
            }
    
            // 读取CO浓度
            printf("\n读取CO浓度检测数据...\n");
            unsigned char co_cmd[] = { 0x02, 0x03, 0x00, 0x00, 0x00, 0x01 };
            co_cmd[6] = crc16(co_cmd, 6) & 0xFF;
            co_cmd[7] = crc16(co_cmd, 6) >> 8;
            resp_len = read_sensor_data(fd, co_cmd, 8, response, sizeof(response), max_retries);
            if (resp_len >= 5 && response[0] == 0x02 && response[1] == 0x03 && response[2] == 0x02) {
                sensor_data.co = (response[3] << 8) | response[4];
            }
            else {
                data_valid = 0;
            }
    
            // 读取光照值
            printf("\n读取光照值检测数据...\n");
            unsigned char light_cmd[] = { 0x03, 0x03, 0x00, 0x00, 0x00, 0x01 };
            light_cmd[6] = crc16(light_cmd, 6) & 0xFF;
            light_cmd[7] = crc16(light_cmd, 6) >> 8;
            resp_len = read_sensor_data(fd, light_cmd, 8, response, sizeof(response), max_retries);
            if (resp_len >= 5 && response[0] == 0x03 && response[1] == 0x03 && response[2] == 0x02) {
                sensor_data.lux = (response[3] << 8) | response[4];
            }
            else {
                data_valid = 0;
            }
    
            if (data_valid) {
                char payload[256];
                snprintf(payload, sizeof(payload),
                    "{\"TEMP\":%.1f,\"HUMI\":%.1f,\"CO\":%d,\"LUX\":%d}",
                    sensor_data.temperature, sensor_data.humidity, sensor_data.co, sensor_data.lux);
                mqtt_publish(MQTT_SENSOR_TOPIC, payload);
                huawei_publish_data(sensor_data.temperature, sensor_data.humidity, sensor_data.co, sensor_data.lux);
    
                char log_msg[256];
                sprintf(log_msg, "传感器数据 - 温度:%.1f 湿度:%.1f CO:%d LUX:%d",
                    sensor_data.temperature, sensor_data.humidity, sensor_data.co, sensor_data.lux);
                log_message(log_msg);
            }
            else {
                log_message("传感器数据不完整,本次未发送");
            }
    
            sleep(2);
        }
    
        return NULL;
    }
    
    // 华为云MQTT网络线程函数
    void* huawei_mqtt_thread(void* arg) {
        while (running) {
            MQTTClient_yield();  // 修正函数调用
            usleep(100000);
        }
        return NULL;
    }
    
    int main() {
        int huawei_initialized = 0;  // 标记华为云是否初始化成功
    
        if (init_serial() < 0) return -1;
        if (init_mqtt() < 0) { close(fd); return -1; }
    
        pthread_t command_tid, sensor_tid, huawei_tid;
    
        if (pthread_create(&command_tid, NULL, command_thread, NULL) != 0 ||
            pthread_create(&sensor_tid, NULL, sensor_thread, NULL) != 0) {
            perror("线程创建失败");
            running = 0;
            close(fd);
            MQTTClient_disconnect(mqtt_client, MQTT_TIMEOUT);
            MQTTClient_destroy(&mqtt_client);
            return -1;
        }
    
        // 初始化华为云MQTT
        if (init_huawei_mqtt() == 0) {
            huawei_initialized = 1;
            if (pthread_create(&huawei_tid, NULL, huawei_mqtt_thread, NULL) != 0) {
                fprintf(stderr, "创建华为云MQTT线程失败\n");
                huawei_initialized = 0;
            }
        }
        else {
            fprintf(stderr, "华为云MQTT初始化失败,继续运行但不会发送数据到云端\n");
        }
    
        printf("程序运行中,按回车键退出...\n");
        getchar();
    
        running = 0;
        pthread_cond_signal(&queue_cond);
        pthread_join(command_tid, NULL);
        pthread_join(sensor_tid, NULL);
    
        // 正确释放华为云MQTT资源
        if (huawei_initialized) {
            pthread_join(huawei_tid, NULL);
            MQTTClient_disconnect(huawei_client, MQTT_TIMEOUT);
            MQTTClient_destroy(&huawei_client);
        }
    
        MQTTClient_unsubscribe(mqtt_client, MQTT_COMMAND_TOPIC);
        MQTTClient_disconnect(mqtt_client, MQTT_TIMEOUT);
        MQTTClient_destroy(&mqtt_client);
        close(fd);
        pthread_mutex_destroy(&serial_mutex);
        pthread_mutex_destroy(&mqtt_mutex);
        pthread_mutex_destroy(&queue_mutex);
        pthread_cond_destroy(&queue_cond);
    
        return 0;
    }

    作者:YueiL

    物联沃分享整理
    物联沃-IOTWORD物联网 » RK3588智慧农场系统开发实战:RS485总线集成华为云IOT与node-red及MQTT技术实践

    发表回复