websockets库使用(基于Python)

主要参考资料:
【Python】websockets库的介绍及用法: https://blog.csdn.net/qq_53871375/article/details/135920231
python模块websockets,浏览器与服务器之间的双向通信: https://blog.csdn.net/randy521520/article/details/134752051

目录

  • websockets库
  • 创建 WebSocket 服务器
  • 创建 WebSocket 客户端
  • 广播消息
  • SSL/TLS 加密
  • websockets库

    websockets库 是一个基于 asyncio 的 Python 库,旨在提供简单易用的 WebSockets 服务器和客户端功能。有如下特性:

  • 简单易用:提供简洁的 API,方便快速上手。
  • 基于 asyncio:利用 Python 的 asyncio 库实现异步 I/O 操作,支持高并发。
  • 全双工通信:支持在单个连接上同时进行数据发送和接收。
  • 支持多种协议:兼容 WebSocket 协议,支持 SSL/TLS 加密。
  • 灵活扩展:支持自定义协议和中间件,方便扩展功能。
  • 创建 WebSocket 服务器

    可以使用 websockets.serve 创建一个简单的 WebSocket 服务器:
    websockets.serve(): 这个函数负责创建并启动一个WebSocket服务器。
    websocket.recv(): 这个函数是一个协程函数,用于从WebSocket接收消息。
    websocket.send(): 这个函数是一个协程函数,用于向WebSocket发送消息。

    import asyncio
    import websockets
     
    async def hello(websocket, path):
        name = await websocket.recv()
        print(f"< {name}")
        
        greeting = f"Hello {name}!"
        
        await websocket.send(greeting)
        print(f"> {greeting}")
     
    start_server = websockets.serve(hello, "localhost", 8765)
     
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
    

    创建 WebSocket 客户端

    可以使用 websockets.connect 创建一个简单的 WebSocket 客户端:
    websockets.connect(): 这个函数负责创建一个 WebSocket 客户端。
    websocket.close(): 关闭 WebSocket 连接。
    websockets.exceptions: 提供了WebSocket特定的异常。如:ConnectionClosed、InvalidHandshake、InvalidOrigin等。

    import asyncio
    import websockets
     
    async def hello():
        uri = "ws://localhost:8765"
        async with websockets.connect(uri) as websocket:
            await websocket.send("Hello, World!")
            response = await websocket.recv()
            print(f"< {response}")
     
    asyncio.get_event_loop().run_until_complete(hello())
    

    广播消息

    可以实现消息广播功能,将消息发送给所有连接的客户端:

    import asyncio
    import websockets
     
    connected_clients = set()
     
    async def handler(websocket, path):
        connected_clients.add(websocket)
        try:
            async for message in websocket:
                await asyncio.wait([client.send(message) for client in connected_clients])
        finally:
            connected_clients.remove(websocket)
     
    start_server = websockets.serve(handler, "localhost", 8765)
     
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
    

    SSL/TLS 加密

    可以为 WebSocket 服务器添加 SSL/TLS 加密,确保数据传输安全:

    import asyncio
    import ssl
    import websockets
     
    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    ssl_context.load_cert_chain(certfile="path/to/certfile", keyfile="path/to/keyfile")
     
    async def echo(websocket, path):
        async for message in websocket:
            await websocket.send(message)
     
    start_server = websockets.serve(echo, "localhost", 8765, ssl=ssl_context)
     
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
    

    作者:弱冠少年

    物联沃分享整理
    物联沃-IOTWORD物联网 » websockets库使用(基于Python)

    发表回复