Golang实现MQTT通信
Golang使用MQTT
最近在接触一些物联网的知识,学到了一款产品,就是MQTT,下面就展示如何使用"github.com/eclipse/paho.mqtt.golang"包去连接MQTT
话不多说,直接上代码,里面有注释,不懂得可以评论区问我
package mqttclient
import (
"crypto/tls"
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"log"
"time"
)
// MqttConnectConfig 连接的相关配置
type MqttConnectConfig struct {
Broker string
User string
Password string
Certificate string //证书文件
PrivateKey string //秘钥
ClientId string
WillEnabled bool //遗愿
WillTopic string //遗愿主题
WillPayload string //遗愿消息
WillQos byte //遗愿服务质量
Qos byte //服务质量
Retained bool //保留消息
OnConnect mqtt.OnConnectHandler
OnConnectionLost mqtt.ConnectionLostHandler
}
// MqttClient MQTT客户端,此外也包含了几个参数
type MqttClient struct {
qos byte
retained bool
Client mqtt.Client
topics map[string]mqtt.MessageHandler
}
// 新建证书,也可以不用
func newTLSConfig(certFile string, privateKey string) (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(certFile, privateKey)
if err != nil {
return nil, err
}
return &tls.Config{
ClientAuth: tls.NoClientCert, //不需要证书
ClientCAs: nil, //不验证证书
InsecureSkipVerify: true, //接受服务器提供的任何证书和该证书中的任何主机名
Certificates: []tls.Certificate{cert},
}, nil
}
func NewMqttClient(config MqttConnectConfig) *MqttClient {
var c MqttClient
opts := mqtt.NewClientOptions().AddBroker(config.Broker).SetClientID(config.ClientId).SetMaxReconnectInterval(time.Second * 5)
if config.WillEnabled {
opts.SetWill(config.WillTopic, config.WillPayload, config.WillQos, config.Retained)
}
//判断是否设置证书
if config.Certificate != "" {
tlsConfig, err := newTLSConfig(config.Certificate, config.PrivateKey)
if err != nil {
log.Panic(err)
return nil
}
opts.SetTLSConfig(tlsConfig)
} else {
opts.SetUsername(config.User).SetPassword(config.Password)
}
//初始化
if config.OnConnect == nil {
config.OnConnect = func(c mqtt.Client) {}
}
if config.OnConnectionLost == nil {
config.OnConnectionLost = func(c mqtt.Client, err error) {}
}
opts.SetOnConnectHandler(c.connectHandler(config.OnConnect)).SetConnectionLostHandler(c.onConnectionLostHandler(config.OnConnectionLost))
c.Client = mqtt.NewClient(opts)
c.qos = config.Qos // qos的级别
c.retained = config.Retained // 保留消息
c.topics = make(map[string]mqtt.MessageHandler) //topic
// 用token的状态判断
if tc := c.Client.Connect(); tc.Wait() && tc.Error() != nil {
log.Panic(tc.Error())
return nil
}
return &c
}
// Publish Mqtt message.
func (mc *MqttClient) Publish(topic string, payload []byte) error {
if mc != nil && mc.Client.IsConnected() {
if tc := mc.Client.Publish(topic, mc.qos, mc.retained, payload); tc.Wait() && tc.Error() != nil {
return tc.Error()
}
return nil
}
return errors.New("mqttClient is nil or disconnected")
}
// Subscribe subscribe a Mqtt topic.
func (mc *MqttClient) Subscribe(topics []string, onMessage mqtt.MessageHandler) error {
for _, topic := range topics {
if tc := mc.Client.Subscribe(topic, mc.qos, onMessage); tc.Wait() && tc.Error() != nil {
return tc.Error()
}
mc.topics[topic] = onMessage
log.Println(fmt.Sprintf("订阅主题[%s]成功", topic))
}
return nil
}
// Unsubscribe unsubscribe a Mqtt topic.
func (mc *MqttClient) Unsubscribe(topics ...string) error {
if tc := mc.Client.Unsubscribe(topics...); tc.Wait() && tc.Error() != nil {
return tc.Error()
}
for _, topic := range topics {
delete(mc.topics, topic)
}
return nil
}
func (mc *MqttClient) Close() {
mc.Client.Disconnect(250) //Millisecond
}
func (mc *MqttClient) connectHandler(handler mqtt.OnConnectHandler) mqtt.OnConnectHandler {
return func(c mqtt.Client) {
for topic, onMessage := range mc.topics {
mc.Client.Subscribe(topic, mc.qos, onMessage)
}
handler(c)
}
}
func (mc *MqttClient) onConnectionLostHandler(handler mqtt.ConnectionLostHandler) mqtt.ConnectionLostHandler {
return func(c mqtt.Client, e error) {
handler(c, e)
}
}
上面代码已经封装好了初始化、连接、推送消息、订阅消息等功能
func main() {
mqttConnectConfig := mqttclient.MqttConnectConfig{
//自己设置参数
}
mc := mqttclient.NewMqttClient(mqttConnectConfig)
for i := 65; i < 75; i++ {
err := mc.Publish("a/b/c", []byte{byte(i)})
if err != nil {
log.Panic(err)
return
}
log.Println([]byte{byte(i)})
time.Sleep(1 * time.Second)
}
}