C# Mqtt物联网通讯,使用MqttNet库实现

引入MqttNet包,在Nuget中搜索mqttnet

服务端

创建mqttServer,服务端。
初始化Mqtt:

 public virtual void InitMqttServer(string ip, int port)
 {
     var mqttServerOptions =
             new MqttServerOptionsBuilder()
             .WithDefaultEndpoint()
             .WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip))//set the ip of the server
             .WithDefaultEndpointPort(port)//set the port of the server
             .Build();
     mqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // create MQTT service object
     mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;
     mqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync;
     mqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync;
     mqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync;
     mqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync;
     mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
     mqttServer.ClientAcknowledgedPublishPacketAsync += MqttServer_ClientAcknowledgedPublishPacketAsync;
     mqttServer.InterceptingClientEnqueueAsync += MqttServer_InterceptingClientEnqueueAsync;
     mqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync;
 }
名称 描述
ValidatingConnectionAsync 对客户端的连接进行验证
ClientConnectedAsync 客户端连接成功
ClientDisconnectedAsync 客户端断开连接
ClientSubscribedTopicAsync 客户端发布订阅
ClientUnsubscribedTopicAsync 客户端取消订阅
InterceptingPublishAsync 拦截客户端的消息
ClientAcknowledgedPublishPacketAsync 已确认发布数据包
InterceptingClientEnqueueAsync 拦截客户端排队
ApplicationMessageNotConsumedAsync 应用程序消息未使用

对客户端的连接进行验证

 /// <summary>
 /// connection verification
 /// </summary>
 /// <param name="arg"></param>
 /// <returns></returns>
 public virtual Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
 {
     try
     {
         //verify the validity of the client ID
         if (string.IsNullOrWhiteSpace(arg.ClientId))
         {
             arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
             return Task.CompletedTask;
         }

         //verify username and password
         bool acceptflag = !(string.IsNullOrWhiteSpace(arg.UserName) || string.IsNullOrWhiteSpace(arg.Password));
         //verify failed
         if (!acceptflag)
         {
             arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
             return Task.CompletedTask;
         }
         arg.ReasonCode = MqttConnectReasonCode.Success;
     }
     catch (Exception ex)
     {

     }
     return Task.CompletedTask;
 }

客户端连接成功

  /// <summary>
  /// connection success
  /// </summary>
  /// <param name="arg"></param>
  /// <returns></returns>
  public virtual Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
  {
      try
      {
          MqttUsers.Add(new MqttUser() { ClientId = arg.ClientId, UserName = arg.UserName });
      }
      catch (Exception)
      {

      }
      return Task.CompletedTask;
  }

客户端断开连接

 /// <summary>
 /// disconnect
 /// </summary>
 /// <param name="arg"></param>
 /// <returns></returns>
 /// <exception cref="NotImplementedException"></exception>
 public virtual Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
 {
     try
     {
         MqttUser? mqttUser = MqttUsers.FirstOrDefault(t => t.ClientId == arg.ClientId);
         if (mqttUser != null)
         {
             MqttUsers.Remove(mqttUser);
         }
     }
     catch (Exception)
     {

     }
     return Task.CompletedTask;
 }

客户端发布订阅

 /// <summary>
 /// subscribe
 /// </summary>
 /// <param name="arg"></param>
 /// <returns></returns>
 public virtual Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
 {
     try
     {
         if (arg == null)
             return Task.CompletedTask;
         MqttUser? mqttUser = MqttUsers.FirstOrDefault(t => t.ClientId == arg.ClientId);
         if (mqttUser != null)
         {
             mqttUser.MqttSubedTopics.Add(new MqttSubedTopic() { Parent = mqttUser, Topic = arg.TopicFilter.Topic });
         }
     }
     catch (Exception)
     {

     }
     return Task.CompletedTask;
 }

客户端取消订阅

  /// <summary>
  /// unsubscribe
  /// </summary>
  /// <param name="arg"></param>
  /// <returns></returns>
  public virtual Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
  {
      try
      {
          if (arg == null)
              return Task.CompletedTask;
          MqttUser? mqttUser = MqttUsers.FirstOrDefault(t => t.ClientId == arg.ClientId);
          if (mqttUser != null)
          {
              MqttSubedTopic? mqttSubedTopic = mqttUser.MqttSubedTopics.FirstOrDefault(t => t.Topic == arg.TopicFilter);
              if (mqttSubedTopic != null)
                  mqttUser.MqttSubedTopics.Remove(mqttSubedTopic);
          }
      }
      catch (Exception)
      {
      }
      return Task.CompletedTask;
  }

客户端

初始化:

 public virtual void InitMqttClient(string serverIp, int serverPort, string userName = "", string password = "")
 {
     try
     {
         string ClientId = Guid.NewGuid().ToString();
         var options = new MqttClientOptionsBuilder()
                    .WithTcpServer(serverIp, serverPort)                  //服务器IP和端口
                    .WithClientId(ClientId)                                                //客户端ID
                    .WithCredentials(userName, password).Build();         //账号
         Client = new MqttFactory().CreateMqttClient();
         Client.ConnectingAsync += Client_ConnectingAsync;
         Client.ConnectedAsync += Client_ConnectedAsync;
         Client.DisconnectedAsync += Client_DisconnectedAsync;
         Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
         Client.InspectPacketAsync += Client_InspectPacketAsync;
        
     }
     catch (Exception)
     {

     }
 }

启动客户端

   public virtual bool StartMqttClient(MqttClientOptions options)
   {
       try
       {
           if (Client == null)
               return false;
           connectCts = new();
           connectCt = connectCts.Token;
           Client.ConnectAsync(options, connectCt);
           return Client.IsConnected;
       }
       catch (Exception)
       {

       }
       return false;
   }

关闭客户端

  /// <summary>
  /// 关闭客户端
  /// </summary>
  public virtual void StopMqttClient()
  {
      try
      {
          connectCts.Cancel();//取消连接
          reconnectCts.Cancel();//取消重连
      }
      catch (Exception)
      {
      }
  }

剩余代码

 /// <summary>
 /// 订阅主题
 /// </summary>
 /// <param name="topic"></param>
 /// <returns></returns>
 public virtual async Task SubscribeTopic(string topic)
 {
     try
     {
         //若已订阅则返回
         if (TopicList.Contains(topic))
         {
             return;
         }
         if (Client is null || Client.IsConnected == false)
         {
             //未连接服务器;
             return;
         }
         var result = await Client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build());       //订阅服务端消息
         TopicList.Add(topic);
     }
     catch (Exception ex)
     {
     }
     return;
 }
 /// <summary>
 /// 取消订阅订阅主题
 /// </summary>
 /// <returns></returns>
 public virtual async void UnsubscribeTopic(string topic)
 {
     try
     {
         //取消订阅主题
         var result = await Client.UnsubscribeAsync(topic);
         TopicList.Remove(topic);
     }
     catch (Exception ex)
     {
     }
 }

 /// <summary>
 /// 发布消息
 /// </summary>
 /// <param name="sendMessage"></param>
 /// <param name="isRetain"></param>
 /// <param name="publishTopic"></param>
 /// <param name="QosLevel"></param>
 /// <returns></returns>
 public virtual async Task<bool> PublishMessage(string sendMessage, bool isRetain,string publishTopic,MQTTnet.Protocol.MqttQualityOfServiceLevel QosLevel)
 {
     try
     {
         if (Client==null)
         {
             return false;
         }

         //根据选择的消息质量进行设置
         var mqttAMB = new MqttApplicationMessageBuilder();

         //根据设置的消息质量发布消息
         switch (QosLevel)
         {
             case MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce:
                 mqttAMB.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
                 //mqttAMB.WithAtLeastOnceQoS();
                 break;
             case MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce:
                 mqttAMB.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce);
                 //mqttAMB.WithAtMostOnceQoS();
                 break;
             case MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce:
                 mqttAMB.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce);
                 //mqttAMB.WithExactlyOnceQoS();
                 break;
             default:
                 break;
         }


         switch (mqttPayloadType)
         {
             case MqttPayloadType.Json:
             case MqttPayloadType.Plaintext:
                 mqttAMB.WithPayload(sendMessage);
                 break;
             case MqttPayloadType.Base64:
                 mqttAMB.WithPayload(Convert.ToBase64String(Encoding.Default.GetBytes(sendMessage)));
                 break;
             //case MqttPayloadType.Json:
             //     mqttAMB.WithPayload(SendMessage.ToJsonString());
             //     break;
             case MqttPayloadType.Hex:
                 mqttAMB.WithPayload(StringExtention.GetBytes(sendMessage.Replace(" ", string.Empty)));
                 break;
         }

         //保留消息
         if (isRetain)
         {
             mqttAMB.WithRetainFlag();
         }
         else
         {
             mqttAMB.WithRetainFlag(false);
         }

         var mam = mqttAMB.WithTopic(publishTopic)                  //发布的主题
         //.WithPayload(SendMessage)
         //.WithExactlyOnceQoS()
         .Build();

         //发布
         var result = await Client.PublishAsync(mam, CancellationToken.None);
         return true;
     }
     catch (Exception ex)
     {
     }
     return false;
 }
 /// <summary>
 /// 数据包检查
 /// </summary>
 /// <param name="arg"></param>
 /// <returns></returns>
 public virtual Task Client_InspectPacketAsync(MQTTnet.Diagnostics.InspectMqttPacketEventArgs arg)
 {
     return Task.CompletedTask;
 }
 /// <summary>
 /// 接收事件
 /// </summary>
 /// <param name="arg"></param>
 /// <returns></returns>
 /// <exception cref="NotImplementedException"></exception>
 public virtual Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
 {
     try
     {
         string str = string.Empty;
         if (arg.ApplicationMessage.PayloadSegment.Array == null)
         {
             str = $"主题{arg.ApplicationMessage.Topic}接收的消息为空";
             return Task.CompletedTask;
         }
         var payload = arg.ApplicationMessage.PayloadSegment.ToArray();
         str = string.Empty;
         switch (mqttPayloadType)
         {
             case MqttPayloadType.Json:
             case MqttPayloadType.Plaintext:
                 str = $"{Encoding.UTF8.GetString(payload)}" + $"主题:{arg.ApplicationMessage.Topic}";
                 break;
             //case MqttPayloadType.Json:
             //    ShowReceiveMessage($"{Encoding.UTF8.GetString(payload).ToJsonString()}", $"主题:{arg.ApplicationMessage.Topic}");
             //    break;
             case MqttPayloadType.Hex:
                 str = $"{BitConverter.ToString(payload).Replace("-", "").InsertFormat(4, " ")}" + $"主题:{arg.ApplicationMessage.Topic}";
                 break;
             case MqttPayloadType.Base64:
                 str = $"{Convert.ToBase64String(payload)}" + $"主题:{arg.ApplicationMessage.Topic}";
                 break;
         }
     }
     catch (Exception)
     {
     }
     return Task.CompletedTask;
 }

 /// <summary>
 /// 重新连接
 /// </summary>
 public virtual bool ReConnect()
 {
     try
     {
         if (Client == null)
             return false;
         reconnectCts = new CancellationTokenSource();
         reconnectCt = reconnectCts.Token;
         Client.ReconnectAsync(reconnectCt);
         return Client.IsConnected;
     }
     catch (Exception)
     {
     }
     return false;
 }
 /// <summary>
 /// 异常离线、主动断开
 /// </summary>
 /// <param name="arg"></param>
 /// <returns></returns>
 /// <exception cref="NotImplementedException"></exception>
 public virtual Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
 {
     if (arg == null)
         return Task.CompletedTask;
     //异常导致的掉线
     if (arg.Exception != null)
     {
         //被取消连接
         if (arg.Exception is OperationCanceledException)
         {
             //已取消连接...
         }
         else if (arg.Exception is MqttCommunicationException)
         {
             string str = $"{arg.Exception.Message} {arg.Exception.InnerException?.Message}";
         }
         else
         {
             string str = $"{arg.Exception.Message} {arg.Exception.InnerException?.Message}";
         }
     }
     //非异常导致离线
     else
     {
         //已断开连接..
     }
     return Task.CompletedTask;
 }

 /// <summary>
 /// 连接成功
 /// </summary>
 /// <param name="arg"></param>
 /// <returns></returns>
 public virtual Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
 {
     return Task.CompletedTask;
 }

 /// <summary>
 /// 连接中
 /// </summary>
 /// <param name="arg"></param>
 /// <returns></returns>
 public virtual Task Client_ConnectingAsync(MqttClientConnectingEventArgs arg)
 {
     return Task.CompletedTask;
 }

这里代码不是很全,本文主要是告诉你怎么去使用Mqtt和MqttNet库,具体的方法里面需要根据自己的业务场景去实现。

C#技术交流QQ群:371769310

物联沃分享整理
物联沃-IOTWORD物联网 » C# Mqtt物联网通讯,使用MqttNet库实现

发表评论