Python中Websockets服务端实现多线程消息传输

目录

一、消息队列

二、服务端

三、设备功能

四、主线程

五、客户端

六、更新


思路:

1.websockets需要从客户端接收消息,由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程。收到的消息,我们需要共享给主线程,然后主线程根据设备(多线程)分发消息给各线程

2.消息中心需要独立出来,websockets服务端放消息,主线程去消息

3.根据思路设计模块:

                        1.消息库

                        2.服务端

                        3.主线程

                        4.多线程

先运行Main.py,再运行websocket_client.py(客户端),客户端发送的消息可能不一样,所以统一消息里面必须有device_id 或者device_name

修改websocket_client.py中data的信息,发送不同消息

一、消息队列

message_base.py

根据设备,创建储存设备消息,提取设备消息的功能

from queue import Queue


class MessageBase:
    def __init__(self):
        self.data = dict()

    def add(self, device, data):
        if device in self.data:
            self.data[device].put(data)
        else:
            self.data[device] = Queue()
            self.data[device].put(data)

    def get(self, device):
        data_queue: Queue = self.data.get(device)
        if not data_queue or data_queue.empty():
            return None
        data = data_queue.get()
        return data


if __name__ == '__main__':
    mb = MessageBase()
    mb.add("a", "asdasd")
    mb.add("a", "11111111")
    print(mb.data)
    data = mb.get("a")
    print(data)

二、服务端

websocket_server.py

从客户端接收消息,并存到消息队列

import asyncio
import json
import threading
import websockets
##
from message_base import MessageBase


class WebServer:
    def __init__(self, host, port, message_base: MessageBase):
        self.host = host
        self.port = port
        self.clients = []
        self.message_base = message_base

    async def echo(self, websocket, path):
        self.clients.append(websocket)
        client_ip, client_port = websocket.remote_address
        print(f"连接到:{client_ip}:{client_port}")
        while True:
            try:
                recv_text = await websocket.recv()
                data = json.loads(recv_text)
                device = data.get("device")
                if device:
                    self.message_base.add(device, data)
                else:
                    continue
            except websockets.ConnectionClosed:
                print("ConnectionClosed...")  # 链接断开
                self.clients.remove(websocket)
                break
            except websockets.InvalidState:
                print("InvalidState...")  # 无效状态
                self.clients.remove(websocket)
                break
            except Exception as e:
                print(e)

    def connect(self):
        print("连接成功!")
        asyncio.set_event_loop(asyncio.new_event_loop())
        start_server = websockets.serve(self.echo, self.host, self.port)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()

    def run(self):
        t = threading.Thread(target=self.connect)
        t.start()
        print("已启动!")


if __name__ == '__main__':
    mb = MessageBase()
    ws = WebServer("192.168.6.28", 8001, mb)
    ws.run()

三、设备功能

device_function.py

每个设备对应的线程功能,可以统一也可以写多个功能

class DeviceFunc:
    def __init__(self, device_name, data):
        self.device_name = device_name
        self.data = data

    def show_data(self):
        if self.data:
            print(self.device_name, "收到消息:", self.data.get("value"))

四、主线程

main.py

初始化所有功能模块,并运行主线程

from message_base import MessageBase
from websocket_server import WebServer
from device_function import DeviceFunc


class MainThread:
    def __init__(self, message_base: MessageBase, websocket_server: WebServer, device_list):
        self.message_base = message_base
        self.websocket_server = websocket_server
        self.device_list = device_list

    def run_server(self):
        self.websocket_server.run()

    def run(self):
        self.run_server()
        while True:
            for device in self.device_list:
                try:
                    # 开始根据设备即功能处理消息
                    data = self.message_base.get(device)
                    if not data:
                        continue
                    df = DeviceFunc(device, data)
                    df.show_data()
                except Exception as err:
                    pass


if __name__ == '__main__':
    mb = MessageBase()
    ws = WebServer("192.168.6.28", 8000, mb)
    device_list = ["aa", "bb", "cc"]

    mt = MainThread(mb, ws, device_list)
    mt.run()

五、客户端

webscoket_client.py

给服务端发送消息,测试用

import json

import websocket


class WebClient:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.conn = None
        self.flag = False

    def connect(self):
        try:
            url = f"ws://{self.host}:{self.port}"
            self.conn = websocket.create_connection(url)
            self.flag = True
            print("连接成功")
        except Exception as err:
            self.flag = False
            print("连接失败", err)

    def close(self):
        self.conn.close()

    def recv(self):
        data = self.conn.recv(1024)
        print(data)

    def send(self, data):
        self.conn.send(data)
        print("发送成功")


if __name__ == '__main__':
    host = "192.168.6.28"
    port = 8000
    ws = WebClient(host, port)
    if not ws.flag:
        ws.connect()
    devices = ["aa", "bb", "cc"]
    while True:
        device = random.choice(devices)
        s = ""
        for i in range(random.randint(0, 100)):
            s += chr(random.randint(65, 122))
        data = {"device": device, "value": s}
        data = json.dumps(data)
        ws.send(data)
        time.sleep(1)

六、更新

队列在获取消息时有阻塞的现象,我们上面避免阻塞用了下面注释的内容,

   

def get(self, device):
    data_queue: Queue = self.data.get(device)
    if not data_queue:  # 阻塞
        return None
    # if not data_queue or data_queue.empty():
    #     return None
    data = data_queue.get()
    return data

我们如果不用注释的内容,让获取消息直接进入阻塞,但是阻塞是在设备线程里面,不影响主线程和其他线程。这样,我们需要把设备功能封装到线程里面。这样设备工作不会相互影响

DeviceFunc对象改为DeviceThread

import threading


class DeviceThread(threading.Thread):
    def __init__(self, device_name, message_base):
        super().__init__(target=self.process)
        self.device_name = device_name
        self.message_base = message_base

    def get_data(self):
        data = self.message_base.get(self.device_name)
        return data

    def process(self):
        while True:
            data = self.get_data()
            if data:
                print(self.device_name, "收到消息:", data.get("value"))

主线程的run方法修改为:

def run(self):
    self.run_server()
    # while True:
    #     for device in self.device_list:
    #         try:
    #             data = self.message_base.get(device)
    #             if not data:
    #                 continue
    #             df = DeviceFunc(device, data)
    #             df.show_data()
    #         except Exception as err:
    #             pass
    for device in self.device_list:
        t = DeviceThread(device, self.message_base)
        t.start()
物联沃分享整理
物联沃-IOTWORD物联网 » Python中Websockets服务端实现多线程消息传输

发表评论