Golang使用MQTT

最近在接触一些物联网的知识,学到了一款产品,就是MQTT,下面就展示如何使用"github.com/eclipse/paho.mqtt.golang"包去连接MQTT

话不多说,直接上代码,里面有注释,不懂得可以评论区问我

package mqttclient

import (
	"crypto/tls"
	"errors"
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"log"
	"time"
)

// MqttConnectConfig 连接的相关配置
type MqttConnectConfig struct {
	Broker           string
	User             string
	Password         string
	Certificate      string //证书文件
	PrivateKey       string //秘钥
	ClientId         string
	WillEnabled      bool   //遗愿
	WillTopic        string //遗愿主题
	WillPayload      string //遗愿消息
	WillQos          byte   //遗愿服务质量
	Qos              byte   //服务质量
	Retained         bool   //保留消息
	OnConnect        mqtt.OnConnectHandler
	OnConnectionLost mqtt.ConnectionLostHandler
}

// MqttClient MQTT客户端,此外也包含了几个参数
type MqttClient struct {
	qos      byte
	retained bool
	Client   mqtt.Client
	topics   map[string]mqtt.MessageHandler
}

// 新建证书,也可以不用
func newTLSConfig(certFile string, privateKey string) (*tls.Config, error) {
	cert, err := tls.LoadX509KeyPair(certFile, privateKey)
	if err != nil {
		return nil, err
	}
	return &tls.Config{
		ClientAuth:         tls.NoClientCert, //不需要证书
		ClientCAs:          nil,              //不验证证书
		InsecureSkipVerify: true,             //接受服务器提供的任何证书和该证书中的任何主机名
		Certificates:       []tls.Certificate{cert},
	}, nil
}

func NewMqttClient(config MqttConnectConfig) *MqttClient {
	var c MqttClient
	opts := mqtt.NewClientOptions().AddBroker(config.Broker).SetClientID(config.ClientId).SetMaxReconnectInterval(time.Second * 5)
	if config.WillEnabled {
		opts.SetWill(config.WillTopic, config.WillPayload, config.WillQos, config.Retained)
	}
	//判断是否设置证书
	if config.Certificate != "" {
		tlsConfig, err := newTLSConfig(config.Certificate, config.PrivateKey)
		if err != nil {
			log.Panic(err)
			return nil
		}
		opts.SetTLSConfig(tlsConfig)
	} else {
		opts.SetUsername(config.User).SetPassword(config.Password)
	}
	//初始化
	if config.OnConnect == nil {
		config.OnConnect = func(c mqtt.Client) {}
	}
	if config.OnConnectionLost == nil {
		config.OnConnectionLost = func(c mqtt.Client, err error) {}
	}
	opts.SetOnConnectHandler(c.connectHandler(config.OnConnect)).SetConnectionLostHandler(c.onConnectionLostHandler(config.OnConnectionLost))
	c.Client = mqtt.NewClient(opts)
	c.qos = config.Qos                              // qos的级别
	c.retained = config.Retained                    // 保留消息
	c.topics = make(map[string]mqtt.MessageHandler) //topic
	// 用token的状态判断
	if tc := c.Client.Connect(); tc.Wait() && tc.Error() != nil {
		log.Panic(tc.Error())
		return nil
	}
	return &c
}

// Publish  Mqtt message.
func (mc *MqttClient) Publish(topic string, payload []byte) error {
	if mc != nil && mc.Client.IsConnected() {
		if tc := mc.Client.Publish(topic, mc.qos, mc.retained, payload); tc.Wait() && tc.Error() != nil {
			return tc.Error()
		}
		return nil
	}
	return errors.New("mqttClient is nil or disconnected")
}

// Subscribe subscribe a Mqtt topic.
func (mc *MqttClient) Subscribe(topics []string, onMessage mqtt.MessageHandler) error {
	for _, topic := range topics {
		if tc := mc.Client.Subscribe(topic, mc.qos, onMessage); tc.Wait() && tc.Error() != nil {
			return tc.Error()
		}
		mc.topics[topic] = onMessage
		log.Println(fmt.Sprintf("订阅主题[%s]成功", topic))
	}
	return nil
}

// Unsubscribe unsubscribe a Mqtt topic.
func (mc *MqttClient) Unsubscribe(topics ...string) error {
	if tc := mc.Client.Unsubscribe(topics...); tc.Wait() && tc.Error() != nil {
		return tc.Error()
	}
	for _, topic := range topics {
		delete(mc.topics, topic)
	}
	return nil
}

func (mc *MqttClient) Close() {
	mc.Client.Disconnect(250) //Millisecond
}

func (mc *MqttClient) connectHandler(handler mqtt.OnConnectHandler) mqtt.OnConnectHandler {
	return func(c mqtt.Client) {
		for topic, onMessage := range mc.topics {
			mc.Client.Subscribe(topic, mc.qos, onMessage)
		}
		handler(c)
	}
}

func (mc *MqttClient) onConnectionLostHandler(handler mqtt.ConnectionLostHandler) mqtt.ConnectionLostHandler {
	return func(c mqtt.Client, e error) {
		handler(c, e)
	}
}

上面代码已经封装好了初始化、连接、推送消息、订阅消息等功能

func main() {
	mqttConnectConfig := mqttclient.MqttConnectConfig{
		//自己设置参数
	}
	mc := mqttclient.NewMqttClient(mqttConnectConfig)
	for i := 65; i < 75; i++ {
		err := mc.Publish("a/b/c", []byte{byte(i)})
		if err != nil {
			log.Panic(err)
			return
		}
		log.Println([]byte{byte(i)})
		time.Sleep(1 * time.Second)
	}
}
物联沃分享整理
物联沃-IOTWORD物联网 » Golang实现MQTT通信

发表评论