python监控数据处理应用服务Socket心跳解决方案

1. 概述

从网页、手机App上抓取数据应用服务,涉及到多个系统集成协同工作,依赖工具较多。例如,使用Frida进行代码注入和动态分析,以实现对网络通信的监控和数据捕获。在这样的集成环境中,手机模拟器、手机中应用、消息侦听、数据获取服务等各自独立运行,任何一个环节出现问题,整个流程势必中断。除了必要的数据检验处理外,还需要实时侦听各个独立服务是否存活,并确认流程是否畅通。常用的监控工具往往只能监控进程和端口,无法深入系统内部进行监控,因此,我们采用Socket通讯方式,自主开发监控机制。

具体方案如下:

  1. Pika侦听与心跳机制

  2. 手动启动Pika监听器,循环读取消息队列中的消息。
  3. 每次读取消息后,调用心跳函数向监控端发送心跳信息。
  4. 如果长时间未发送心跳信息(超过预设的时间阈值),则认为该服务已经死掉,此时重启数据应用服务进程和模拟器。
  5. 模拟器数据监控

  6. 监控从模拟器端获取的数据流。
  7. 如果长时间(超过预设的时间阈值)未成功获取到数据,则认为数据获取过程存在问题,此时同样重启数据应用服务进程和模拟器。

通过上述机制,确保在复杂的集成环境中,各个服务能够稳定运行,一旦出现问题能够及时发现并自动恢复,从而提高整体系统的可靠性和稳定性。

2. Socket通讯与心跳

2.1. 关于Socket

Python中的socket库是一种用于网络通信的标准库,它提供了丰富的函数和类来创建和管理网络连接

socket库概述

  • 功能:Python的socket库允许开发者创建客户端和服务器端应用程序,实现网络通信。

  • 协议支持:它支持多种协议,包括TCP(面向连接、可靠传输)和UDP(无连接、快速传输)。

  • 操作方式:支持同步和异步通信,其中同步通信是一种阻塞式的方式,而异步通信则不会阻塞程序的其他操作。

  • 基本操作

  • 创建套接字:使用socket.socket()函数来创建一个套接字对象,这是进行网络通信的基础。例如,创建一个TCP套接字可以使用sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

  • 绑定地址:对于服务器而言,需要将套接字与特定的网络接口和端口绑定,使用bind()方法完成此操作。

  • 监听连接:服务器通过调用listen()方法开始监听来自客户端的连接请求。

  • 接受连接:使用accept()方法接受客户端的连接请求,并返回一个新的套接字对象和客户端地址信息。

  • 发送和接收数据:利用send()recv()方法在客户端和服务器之间发送和接收数据。

  • 关闭套接字:通信完成后,使用close()方法关闭套接字以释放资源。

  • 总的来说,Python的socket库为网络编程提供了强大的工具,使得开发者能够轻松地构建各种类型的网络应用程序。无论是简单的TCP或UDP客户端和服务器,还是复杂的网络服务,socket库都能提供必要的支持。

    2.2. 监控服务端Socket Server

    import socket
    from loguru import logger
    from time import sleep
    from datetime import datetime
    import time
    import json
    
    logger.add("monitor_{time}.log",
                rotation="1 weeks",  # 1周分隔转日志文件  
                retention="2 month"  # 保留2个月的日志文件 
                )
      
    def start_server(host='localhost', port=5005):
        # 启动应用程序
        # 应用函数()
        sleep(90)
        # 创建缓存    
        station_list = []  
        poi_list = []      
        heartbeat_list = [time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())]# 心跳缓存
        # 创建socket对象
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
            # 绑定地址和端口号
            server_socket.bind((host, port))        
            # 开始监听传入的连接请求
            server_socket.listen()
            logger.info(f"monitor Server listening on {host}:{port}")
           
            server_socket.settimeout(3)   # 超时3秒
            timeout = 0        
            while True:
                try:
                    # 接受一个新的连接
                    client_socket, client_address = server_socket.accept()
                    logger.info(f"Connected by {client_address}")
                    
                    with client_socket:
                        while True:
                            # 接收数据
                            data = client_socket.recv(1024)
                            if not data:
                                sleep(1)
                                break
                            message = data.decode('utf-8')
                            print(f"Received from client: {message}")
                            sleep(1)
                            timeout = 0
                            # 监控处理函数
                            monitor(message, station_list, poi_list, heartbeat_list)          
                except socket.timeout:
                    timeout += 1
                    if timeout*5%10==0:
                        logger.info(f"app is starting, time out {timeout*5}")
                    if timeout*5 > 150:   # 超过2分钟30秒没有启动,重启启动
                        timeout = 0
                        # 应用函数()
                        sleep(60)
                        heartbeat_list = [time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())]                               
                except Exception as e:
                    logger.error(f"monitor Server error occurred: {e}")
                    break
    

    2.3. 建立心跳线,客户端Socket Client

    Socket Client

    import socket
    import time
    from time import sleep
    from loguru import logger
    
    def start_client(message, host='localhost', port=5005):
        try:
            # 创建socket对象
            client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            
            # 连接到服务器
            client_socket.connect((host, port))
            logger.info(f"Connected to server at {host}:{port}")
            
            # 发送数据
            client_socket.sendall(message.encode())
            
            # 接收响应
            #data = client_socket.recv(1024)
            #print(f"Received from server: {data.decode()}")
        except socket.error as e:
            logger.info(f"Socket error occurred: {e}")        
        finally:    
            # 关闭连接
            client_socket.close()
    

    建立心跳线,Socket Client应用

    手动侦听消息,使用Pika库监听RabbitMQ中的消息,并循环读取消息。每次读取到消息时,调用心跳函数向监控端发送消息。如果长时间未发送消息,则认为服务已经死掉,触发重启控制软件和手机的操作。

    # 消费消息
    def startRabbitMQ():
        # 1.连接rabbit
        try:    
            credentials = pika.PlainCredentials('rabbit', '*****')  # 用户名和密码
            # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
            connection = pika.BlockingConnection(pika.ConnectionParameters('192.*.*.*',port = 55671,virtual_host = '/xxxxx-dev',credentials = credentials))
        except pika.exceptions.AMQPError as e:
            logger.error(f"Error connecting to RabbitMQ in main process: {e}")
            exit(1)   
    	# 建立心跳信息
        current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
        message = json.dumps({"type": "heartbeat", "time": current_time})            
        start_client(message)
        
        try:        
            channel = connection.channel()
            time.sleep(1)
             
            channel.queue_declare(queue='xxxxx_poi_queue', durable=True)
            channel.basic_qos(prefetch_count=1)
            while True:
                logger.info('取消息开始时间')  
                method_frame, header_frame, body = channel.basic_get(queue='xxxxx_poi_queue', auto_ack=False)  
                if method_frame:  
                    # 处理消息体 
                    print('header_frame:',header_frame) 
                    logger.info(f'body:,{body}')  
                    _poi = body.decode('utf-8')  # 将 bytes 转换为字符串 
                    
                    current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
    
                    datas = {"type":"poiid","time":current_time,"dat":_poi}
                    message = json.dumps(datas)
                    start_client(message)                                                 
                    # 业务应用处理函数()      
                    # 如果你设置了auto_ack=False,则需要手动确认消息  
                    channel.basic_ack(delivery_tag=method_frame.delivery_tag)  
                else:  
                    logger.info("没有消息可以获取,")  
    
                current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
                message = json.dumps({"type": "heartbeat", "time": current_time})            
                start_client(message)
                time.sleep(10)
                      
        except pika.exceptions.AMQPError as e:
            logger.error(f"Error RabbitMQ in process: {e}")
            exit(1)    
    

    监控应用、工具进程,在此略过,相关技术参见《Python监控服务进程及自启动服务方法与实践》。

    3. 遇到的问题

    3.1. 阻塞模式

    在阻塞模式下,当调用某些socket API(如send、recv等)时,如果操作不能立即完成,调用线程会被挂起,直到操作完成或超时

    在Python的socket编程中,settimeout()方法用于设置套接字操作的超时时间。当调用这个方法后,如果在指定的时间内没有完成相应的网络操作(如连接、发送或接收数据),程序将抛出一个socket.timeout异常。

  • 阻塞模式:默认情况下,套接字是阻塞模式的,这意味着如果进行的操作(如accept()recv()等)不能立即完成,程序会一直等待直到操作完成。

  • 非阻塞模式:通过设置超时时间,可以将套接字设置为非阻塞模式。在这种模式下,如果操作不能在指定时间内完成,程序会立即返回并抛出一个socket.timeout异常。

  • 3.2. 数据传递编码与解码

    在Python的socket编程中,传递字典类型数据时,通常需要将字典序列化为字符串或字节流进行传输。这是因为socket通信只接收bytes类型数据,而实际传过去的可能是str类型或其他非bytes类型。其中如下是关于字典类型数据的编码与解码的详细解析:

  • 编码:将字典转换为JSON字符串,然后将其编码为字节流进行发送。
  • import json
    import socket
    
    ...
    current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
    message = json.dumps({"type": "heartbeat", "time": current_time}) 
    ...
    # 发送数据
    client_socket.sendall(message.encode())
    
    
  • 解码:接收到字节流后,先将其解码为字符串,再从字符串中解析出字典。
  • # 接受一个新的连接
    client_socket, client_address = server_socket.accept()
    logger.info(f"Connected by {client_address}")
    
    with client_socket:
        while True:
            # 接收数据
            data = client_socket.recv(1024)
            if not data:
                sleep(1)
                break
            message = data.decode('utf-8')
            print(f"Received from client: {message}")
    

    其中,编码使用data.encode,解码使用data.decode

    作者:肖永威

    物联沃分享整理
    物联沃-IOTWORD物联网 » python监控数据处理应用服务Socket心跳解决方案

    发表回复