【开源物联网】使用WebSocket实现MQTT通信

1、概述

MQTT是物联网主流通信协议,但是很多终端天然不具备Mqtt通信能力,比如Web H5、小程序等终端形式,这些终端提供更底层的WebSocket通信方式。因此,研究基于WebSocket进行Mqtt通信是非常普遍的需求。

2、基于WebSocket进行MQTT通信

2.1通信框架

基于WebSocket进行MQTT通信框架代码如下:

public void startup() {
		mainGroup = new NioEventLoopGroup();
		subGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap server = new ServerBootstrap();
			// 绑定两个线程组
			server.group(mainGroup, subGroup)
					// 指定NIO的模式
					.channel(NioServerSocketChannel.class)
					// 子处理器,用于处理workerGroup
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							ChannelPipeline pipeline = ch.pipeline();

							ch.pipeline().addLast("logging", new LoggingHandler("DEBUG"));// 设置log监听器,并且日志级别为debug,方便观察运行流程

							// websocket 基于http协议,所以要有http编解码器 服务端用HttpServerCodec
							pipeline.addLast(new HttpServerCodec());
							// 对写大数据流的支持
							pipeline.addLast(new ChunkedWriteHandler());

							/**
							 * 我们通常接收到的是一个http片段,如果要想完整接受一次请求的所有数据,我们需要绑定HttpObjectAggregator,然后我们
							 * 就可以收到一个FullHttpRequest-是一个完整的请求信息。
							 * 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
							 * 几乎在netty中的编程,都会使用到此hanler
							 */
							pipeline.addLast(new HttpObjectAggregator(1024 * 64));

							// ====================== 以上是用于支持http协议 , 以下是支持httpWebsocket
							// ======================

							/**
							 * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws 本handler会帮你处理一些繁重的复杂的事 会帮你处理握手动作:
							 * handshaking(close, ping, pong) ping + pong = 心跳
							 * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
							 */
							// pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

							// 自定义的handler
							pipeline.addLast(new WebsocketMqttHandler());
							pipeline.addLast(new MqttMessageWebSocketFrameEncoder());
							pipeline.addLast(MqttEncoder.INSTANCE);
							pipeline.addLast(new MqttDecoder());
							pipeline.addLast(new MqttHandler());
						}
					});

			// 启动server,并且设置8088为启动的端口号,同时启动方式为同步
			ChannelFuture future = server.bind(port).sync();
			// 监听关闭的channel,设置位同步方式
			future.channel().closeFuture().sync();
		} catch (Exception e) {
			System.out.println("start exception" + e.toString());
		} finally {
			// 退出线程组
			mainGroup.shutdownGracefully();
			subGroup.shutdownGracefully();
		}
	}

框架基于Java netty库实现,本文关注基于WebSocket的MQTT通信,MQTT本身的Java实现不是本文分析重点, 详情请参考MQTT物联网网关Broker与Java开源实现 。第44-46行添加的MqttEncoder、MqttDecoder和MqttHandler和MQTT物联网网关Broker与Java开源实现 描述的功能相同,共同完成Mqtt协议的处理。第42行的WebsocketMqttHandler需要在Mqtt协议处理之前从WebSocket报文内容里面提取出Mqtt报文;第43行的MqttMessageWebSocketFrameEncoder用于将要发送出去的Mqtt报文编码成WebSocket报文。

WebsocketMqttHandler两大功能:建立连接、收发报文。

WebsocketMqttHandler的核心代码如下:

@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
		// 获取客户端传输过来的消息
		logger.info("收到消息:" + msg);
		if (msg instanceof FullHttpRequest) {
			// 以http请求形式接入,但是走的是websocket
			handleHttpRequest(ctx, (FullHttpRequest) msg);
		} else if (msg instanceof WebSocketFrame) {
			// 处理websocket客户端的消息
			handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
		}

	}

其主体功能包括对Http报文的处理和对WebSocket帧的处理。

  • Http报文处理:http报文用于客户端和Broker之间建立连接;
  • WebSocket帧处理:从WebSocket报文帧里面提取(组合)Mqtt报文。 
  • 2.2建立WebSocket连接

    以微信小程序作为客户端,建立与服务端简的WebSocket连接,客户端操作详情请参考微信小程序MQTT通信及开源框架实现,本文关注基于WebSocket进行MQTT的流程。

    建立WebSocket连接由两个步骤完成:

  •  客户端首先通过http协议发送一条协议升级(升级到websocket)请求报文;
  • 服务端进行握手成功后返回一条101 http协议报文表示WebSocket连接已建立。
  • 其代码实现如下:

    /**
    	 * 唯一的一次http请求,用于创建websocket
    	 */
    	private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
    		// 要求Upgrade为websocket,过滤掉get/Post
    		if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
    			// 若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
    			sendHttpResponse(ctx, req,
    					new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
    			return;
    		}
    		logger.info(req.content());
    		// ctx.fireChannelRead(req.content());
    		WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
    				"ws://localhost:1885/websocket",
    				"mqtt", false);
    		handshaker = wsFactory.newHandshaker(req);
    		if (handshaker == null) {
    			WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
    		} else {
    			req.headers().set("Sec-WebSocket-Protocol", "mqtt");
    			handshaker.handshake(ctx.channel(), req);
    		}
    	}

     识别出收到的报文是Http报文时在第22行进行握手完成协议升级成websocket协议建立连接。

    建立连接过程跟踪

    采用wireshark工具对连接过程进行跟踪可以看到如下报文信息:

    WebSocket连接建立报文跟踪 

    从服务器端返回的101报文表示建立连接成功。

     2.3基于WebSocket进行MQTT通信

    建立websocket连接后,Mqtt报文基于WebSocket进行传输,结束WebSocket报文后进行如下处理:

    接收报文

    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
    		// 判断是否关闭链路的指令
    		if (frame instanceof CloseWebSocketFrame) {
    			handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
    			return;
    		}
    		// 判断是否ping消息
    		if (frame instanceof PingWebSocketFrame) {
    			ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
    			return;
    		}
    		ByteBuf echoMsg = frame.content();
    		frame.retain();
    		ctx.fireChannelRead(echoMsg);
    	}

     在第14行继续交给下一个handler(即Mqtt相关handler,包括MqttDecoder、MqttHandler)处理。

    发送报文

    在MqttMessageWebSocketFrameEncoder里对要发送的Mqtt报文进行WebSocket封装,如下第9行所示:

    @Override
    	protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
    		if (msg == null)
    			return;
    
    		// byte[] data = ;
    
    		// out.add(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(msg)));
    		ctx.channel().writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(msg)));
    	}

    3、更多

    开源项目:Open-Api

     更多信息:www.lokei.cn 

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【开源物联网】使用WebSocket实现MQTT通信

    发表评论