Python websocket之 websocket-client 库的使用

____tz_zs
本次,我将从主流的三方框架使用出发,带大家熟悉和使用 Python 中常见的 websocket 库。

一、websocket-client 库

websocket-client 库是一个简单好用的同步的 websocket 的客户端的库,基于回调的方式使用。

pypi地址: https://pypi.org/project/websocket-client/
GitHub地址: https://github.com/websocket-client/websocket-client
文档地址: https://websocket-client.readthedocs.io/en/latest/

websocket-client 库也是我们诸多项目中正在使用的 websocket 库,这个库开箱即用,非常的方便,其 WebSocketApp 适合于建立长期连接。

我们使用对象的函数作为 WebSocketApp 的回调函数,可以在全局变量中缓存和共享数据,持有 websocket 引用以便使用和关闭,维护长链接和心跳等,这种方式在构建应用程序时具有更大的灵活性。

下方为 WebSocketApp 的一个使用例子,创建 WebSocketApp 对象,传入 url 地址,指定 on_open 等回调函数,调用 run_forever 运行。

tips:
1、 websocket.enableTrace(True) 可以开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。
2、也可以先创建对象,之后再使用如 self.ws.on_open = self.on_open 来指定回调函数。只需在 run_forever() 之前指定回调函数即可。
3、控制台输出的 send: b'\x8a\x80\xf4-\xd9\x8b' 等形式的信息,是框架的开启运行状态追踪的输出信息(框架的输出log)

(一)简单的使用demo

# -*- coding:utf-8 -*-

"""
@author:    tz_zs
"""

import websocket
from websocket import WebSocketApp

try:
    import thread
except ImportError:
    import _thread as thread
import time


class Test(object):
    def __init__(self):
        super(Test, self).__init__()
        self.url = "ws://echo.websocket.org/"
        self.ws = None

    def on_message(self, message):
        print("####### on_message #######")
        print("message:%s" % message)

    def on_error(self, error):
        print("####### on_error #######")
        print("error:%s" % error)

    def on_close(self):
        print("####### on_close #######")

    def on_ping(self, message):
        print("####### on_ping #######")
        print("ping message:%s" % message)

    def on_pong(self, message):
        print("####### on_pong #######")
        print("pong message:%s" % message)

    def on_open(self):
        print("####### on_open #######")

        thread.start_new_thread(self.run, ())

    def run(self, *args):
        while True:
            time.sleep(1)
            input_msg = input("输入要发送的消息(ps:输入关键词 close 结束程序):\n")
            if input_msg == "close":
                self.ws.close()  # 关闭
                print("thread terminating...")
                break
            else:
                self.ws.send(input_msg)

    def start(self):
        websocket.enableTrace(True)  # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。

        self.ws = WebSocketApp(self.url,
                               on_open=self.on_open,
                               on_message=self.on_message,
                               on_error=self.on_error,
                               on_close=self.on_close)
        # self.ws.on_open = self.on_open  # 也可以先创建对象再这样指定回调函数。run_forever 之前指定回调函数即可。

        self.ws.run_forever()


if __name__ == '__main__':
    Test().start()

"""
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: echo.websocket.org
Origin: http://echo.websocket.org
Sec-WebSocket-Key: AXR9yvs3Ucn9LE35KkhXfw==
Sec-WebSocket-Version: 13
Connection: upgrade


-----------------------
--- response header ---
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Date: Wed, 04 Aug 2021 06:29:05 GMT
Sec-WebSocket-Accept: WoOPLeAQpWaV2Bqd4sDOFkSpUuw=
Server: Kaazing Gateway
Upgrade: websocket
-----------------------
####### on_open #######
输入要发送的消息(ps:输入关键词 close 结束程序):
aaadbbbbb
send: b'\x81\x89\x82-\xdfj\xe3L\xbe\x0e\xe0O\xbd\x08\xe0'
####### on_message #######
message:aaadbbbbb
输入要发送的消息(ps:输入关键词 close 结束程序):
sakdnakjf
send: b'\x81\x89\xa8\xe0g\x8b\xdb\x81\x0c\xef\xc6\x81\x0c\xe1\xce'
####### on_message #######
message:sakdnakjf
输入要发送的消息(ps:输入关键词 close 结束程序):
123456
send: b'\x81\x86(\x84>\xb7\x19\xb6\r\x83\x1d\xb2'
####### on_message #######
message:123456
输入要发送的消息(ps:输入关键词 close 结束程序):
send: b'\x8a\x80.\xf3`+'
send: b'\x8a\x80P\x0c\xc6W'
send: b'\x8a\x807j\x03l'
send: b'\x8a\x80\xd0\xac%v'
send: b'\x8a\x80\xb9\x9do\x08'
send: b'\x8a\x80s\xbb\xad\x8f'
send: b'\x8a\x80\xf4-\xd9\x8b'
close
send: b'\x88\x82\xf5L>\xc4\xf6\xa4'
####### on_close #######

Process finished with exit code 0

"""

(二)通过 HTTP 或 SOCKS 代理进行连接

很多时候,特别是在线下测试环境中时,我们常常需要代理去访问某些服务器。如下 demo 中分别使用 http 和 socks5。
http_proxy_host 参数传入代理的 host 地址。
http_proxy_port 参数传入代理的端口号。
proxy_type 参数如果不填,默认为 "http",可选参数值有 'http', 'socks4', 'socks5', 'socks5h'

使用 socks5 可能会少包 PySocks module not found,注意不要下错包了 pip install PySocksNo module named ‘socks’

# -*- coding:utf-8 -*-

"""
@author:    tz_zs
"""

import websocket
from websocket import WebSocketApp

try:
    import thread
except ImportError:
    import _thread as thread
import time


class Test(object):
    def __init__(self):
        super(Test, self).__init__()
        self.url = "ws://echo.websocket.org/"
        self.ws = None

    def on_message(self, message):
        print("####### on_message #######")
        print("message:%s" % message)

    def on_error(self, error):
        print("####### on_error #######")
        print("error:%s" % error)

    def on_close(self):
        print("####### on_close #######")

    def on_ping(self, message):
        print("####### on_ping #######")
        print("ping message:%s" % message)

    def on_pong(self, message):
        print("####### on_pong #######")
        print("pong message:%s" % message)

    def on_open(self):
        print("####### on_open #######")

        thread.start_new_thread(self.run, ())

    def run(self, *args):
        # for i in range(3):
        #     time.sleep(1)
        #     self.ws.send("Hello %d" % i)

        while True:
            time.sleep(1)
            input_msg = input("输入要发送的消息(ps:输入关键词 close 结束程序):\n")
            if input_msg == "close":
                self.ws.close()  # 关闭
                print("thread terminating...")
                break
            else:
                self.ws.send(input_msg)

    def start(self):
        websocket.enableTrace(True)  # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。

        self.ws = WebSocketApp(self.url,
                               on_open=self.on_open,
                               on_message=self.on_message,
                               on_error=self.on_error,
                               on_close=self.on_close)
        # self.ws.on_open = self.on_open  # 也可以先创建对象再这样指定回调函数。run_forever 之前指定回调函数即可。

        self.ws.run_forever(http_proxy_host="192.168.1.110", http_proxy_port=8123, proxy_type='http')
        # self.ws.run_forever(http_proxy_host="192.168.1.110", http_proxy_port=1080, proxy_type='socks5')

if __name__ == '__main__':
    Test().start()

"""
Connecting proxy...
--- request header ---
CONNECT echo.websocket.org:80 HTTP/1.0


-----------------------
--- response header ---
HTTP/1.1 200 Connection established
-----------------------
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: echo.websocket.org
Origin: http://echo.websocket.org
Sec-WebSocket-Key: JvORoQC9F6tb639eFo5s+Q==
Sec-WebSocket-Version: 13
Connection: upgrade


-----------------------
--- response header ---
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Date: Wed, 04 Aug 2021 07:23:32 GMT
Sec-WebSocket-Accept: LyeriRX6MoTWDOUIwu3T1AhurSQ=
Server: Kaazing Gateway
Upgrade: websocket
-----------------------
####### on_open #######
输入要发送的消息(ps:输入关键词 close 结束程序):
124
send: b'\x81\x83b\x83c\xd5S\xb1W'
####### on_message #######
message:124
输入要发送的消息(ps:输入关键词 close 结束程序):
743
send: b'\x81\x83*\\\xe8d\x1dh\xdb'
####### on_message #######
message:743
输入要发送的消息(ps:输入关键词 close 结束程序):
close
send: b'\x88\x82\xaeS\xd6\x94\xad\xbb'
####### on_close #######thread terminating...


Process finished with exit code 0
"""

源码 site-packages/websocket/_http.py

class proxy_info(object):

    def __init__(self, **options):
        self.type = options.get("proxy_type") or "http"
        if not(self.type in ['http', 'socks4', 'socks5', 'socks5h']):
            raise ValueError("proxy_type must be 'http', 'socks4', 'socks5' or 'socks5h'")
        self.host = options.get("http_proxy_host", None)
        if self.host:
            self.port = options.get("http_proxy_port", 0)
            self.auth = options.get("http_proxy_auth", None)
            self.no_proxy = options.get("http_no_proxy", None)
        else:
            self.port = 0
            self.auth = None
            self.no_proxy = None

(三)ping 和 pong

WebSocket 规范将 ping 和 pong 消息操作码定义为协议的一部分。使即使服务器和客户端之间没有传输数据,也可以保持长期连接处于活动状态。

1、自动响应

框架接收到服务器发来的 ping 帧时,会立刻自动调用下方的函数,将数据使用 pong 帧原样发送给服务器,然后才回调 on_ping 将数据给使用者。所以,服务器发送到 ping 帧,我们一般不需要处理,框架自动回应了。

源码 site-packages/websocket/_core.py 297行

def pong(self, payload):
    """
    send pong data.

    payload: data payload to send server.
    """
    if isinstance(payload, six.text_type):
        payload = payload.encode("utf-8")
    self.send(payload, ABNF.OPCODE_PONG)

2、ping_interval

设置参数 ping_interval 框架将每间隔时间后自动发送空内容的 ping 帧。如果不设置参数(默认参数为 0),则不自动发送。

源码 site-packages/websocket/_core.py 287行

def ping(self,payload=""):
    """
    send ping data.

    payload: data payload to send server.
    """
    if isinstance(payload, six.text_type):
        payload = payload.encode("utf-8")
    self.send(payload, ABNF.OPCODE_PING)

3、ping_timeout

但是,因为框架不是异步的,如果发生阻塞事件,ping/pong 可能会出现一些问题。所以一般需要设置 ping_timeout 超时时间。
注意,当 ping_interval 和 ping_timeout 参数都设置了的时候,框架要求参数 ping_interval 的值需大于参数 ping_timeout。

if ping_timeout and ping_interval and ping_interval <= ping_timeout:
    raise WebSocketException("Ensure ping_interval > ping_timeout")

源码中发送空 ping 帧时,会保存发送时间 self.last_ping_tm ,接收到服务器返回的 pong 帧时,会保存接收时间 self.last_pong_tm,如果同时满足(1)当前的时间距离上次发送 ping 的时间间隔大于参数 ping_timeout,(2)上次发送 ping 之后没有收到 pong,或接收到 pong 的时间距离上次发送 ping 的时间间隔大于参数 ping_timeout。

源码 site-packages/websocket/_app.py 294行

def check():
    if (ping_timeout):
        has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout
        has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
        has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout

        if (self.last_ping_tm
                and has_timeout_expired
                and (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
            raise WebSocketTimeoutException("ping/pong timed out")
    return True

4、例子

如下 demo 中,设置 ping_interval 为 20, ping_timeout 为 10。

# -*- coding:utf-8 -*-

"""
@author:    tz_zs
"""

import websocket
from websocket import WebSocketApp, ABNF

try:
    import thread
except ImportError:
    import _thread as thread
import time


class Test(object):
    def __init__(self):
        super(Test, self).__init__()
        self.url = "ws://echo.websocket.org/"
        self.ws = None

    def on_message(self, message):
        print("####### on_message #######")
        print("message:%s" % message)

    def on_error(self, error):
        print("####### on_error #######")
        print("error:%s" % error)

    def on_close(self):
        print("####### on_close #######")

    def on_ping(self, message):
        print("####### on_ping #######")
        print("ping time:%s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
        print("ping message:%s" % message)

    def on_pong(self, message):
        print("####### on_pong #######")
        print("pong time:%s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
        print("pong message:%s" % message)

    def on_open(self):
        print("####### on_open #######")

        thread.start_new_thread(self.run, ())

    def run(self, *args):
        # for i in range(3):
        #     time.sleep(1)
        #     self.ws.send("Hello %d" % i)

        while True:
            time.sleep(1)
            input_msg = input("输入要发送的ping消息(ps:输入关键词 close 结束程序):\n")
            if input_msg == "close":
                self.ws.close()  # 关闭
                print("thread terminating...")
                break
            else:
                self.ws.send(input_msg, ABNF.OPCODE_PING)
                # self.ws.send(input_msg)

    def start(self):
        websocket.enableTrace(True)  # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。

        self.ws = WebSocketApp(self.url,
                               on_open=self.on_open,
                               on_message=self.on_message,
                               on_error=self.on_error,
                               on_close=self.on_close,
                               on_ping=self.on_ping,
                               on_pong=self.on_pong)

        self.ws.run_forever(ping_interval=20, ping_timeout=10)


if __name__ == '__main__':
    Test().start()

框架每 20 秒发送空内容的 ping,控制台输出如下:

"""
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: echo.websocket.org
Origin: http://echo.websocket.org
Sec-WebSocket-Key: MDsoARDQEI8/4YILiLGwHw==
Sec-WebSocket-Version: 13
Connection: upgrade


-----------------------
--- response header ---
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Date: Wed, 04 Aug 2021 08:20:42 GMT
Sec-WebSocket-Accept: QiNgZdxqfcpJIeKlV6Byezls2Gw=
Server: Kaazing Gateway
Upgrade: websocket
-----------------------
####### on_open #######
输入要发送的ping消息(ps:输入关键词 close 结束程序):
send: b'\x89\x80J\x1cn\x8c'
####### on_pong #######
pong time:2021-08-04 16:21:12
pong message:b''
send: b'\x89\x80_\xad\xa1\xd9'
####### on_pong #######
pong time:2021-08-04 16:21:32
pong message:b''
send: b'\x89\x80\xb6$\xc9\x9d'
####### on_pong #######
pong time:2021-08-04 16:21:52
pong message:b''
send: b'\x89\x80)\xc3\x1f\xdc'
####### on_pong #######
pong time:2021-08-04 16:22:12
pong message:b''
send: b'\x89\x80\xcf|\xfa&'
####### on_pong #######
pong time:2021-08-04 16:22:32
pong message:b''
send: b'\x89\x80\xe5\x19/\xf9'
####### on_pong #######
pong time:2021-08-04 16:22:52
pong message:b''
send: b'\x89\x80\xfb\x9bA8'
####### on_pong #######
pong time:2021-08-04 16:23:12
pong message:b''
send: b'\x89\x80\xfc\xaa\xa6}'
####### on_pong #######
pong time:2021-08-04 16:23:32
pong message:b''
send: b'\x89\x80O\xa0\x0e\xb6'
####### on_pong #######
pong time:2021-08-04 16:23:52
pong message:b''

"""

手动发送有内容的 ping,控制台输出如下:

"""
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: echo.websocket.org
Origin: http://echo.websocket.org
Sec-WebSocket-Key: Ovizd9gdVze4BCLhymQ92Q==
Sec-WebSocket-Version: 13
Connection: upgrade


-----------------------
--- response header ---
####### on_open #######
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Date: Wed, 04 Aug 2021 08:26:07 GMT
Sec-WebSocket-Accept: vTtPyDWKViUA88UgBaLep+qi+CI=
Server: Kaazing Gateway
Upgrade: websocket
-----------------------
输入要发送的ping消息(ps:输入关键词 close 结束程序):
ping_test
send: b'\x89\x89+\x81\x19$[\xe8wCt\xf5|W_'
####### on_pong #######
pong time:2021-08-04 16:26:24
pong message:b'ping_test'
输入要发送的ping消息(ps:输入关键词 close 结束程序):
ping1111111111
send: b'\x89\x8e\xe3\x05\xbck\x93l\xd2\x0c\xd24\x8dZ\xd24\x8dZ\xd24'
####### on_pong #######
pong time:2021-08-04 16:26:32
pong message:b'ping1111111111'
输入要发送的ping消息(ps:输入关键词 close 结束程序):
send: b'\x89\x80#\x0e\xa8\xdd'
####### on_pong #######
pong time:2021-08-04 16:26:37
pong message:b''
close
send: b'\x88\x82-0\xd6\xbc.\xd8'
####### on_close #######

Process finished with exit code 0

"""

来源:tz_zs

物联沃分享整理
物联沃-IOTWORD物联网 » Python websocket之 websocket-client 库的使用

发表评论