企业微信机器人消息发送(文本、图片、文件)

文章目录

  • 一、企业微信机器人用途
  • 二、前置条件
  • 获取webhookurl
  • 三、方法实现
  • 1.发送文本消息方法
  • 2.发送图片方法
  • 3.发送文件方法
  • 四、整体代码

  • 一、企业微信机器人用途

    本文记录将企业微信机器人应用在接口自动化测试过程中,对项目初始化异常报警以及执行结束将报告输出。
    企业微信机器人API文档:https://developer.work.weixin.qq.com/document/path/92455


    二、前置条件

    获取webhookurl

    具有后台权限的管理员添加机器人后,可读取到机器人特有的webhookurl。将机器人关联到某一群组中,请求webhookurl时,机器人将携带的消息发送至该群。
    将机器人信息写入配置文件***.ini中:

    [ALARM_ROBOT]
    # 存放webhookurl
    robot_curl=https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=****
    # 存放需要@的成员的手机号码
    mentioned_mobile_list=18*********
    

    三、方法实现

    1.发送文本消息方法

    构造请求体,txt为待发送文本;mentioned_mobile_list为需要@的成员的手机号码,通过配置文件读取。未添加对返回报文的校验,只写入日志。

    代码如下(示例):

        def send_txt(self, txt: str):
            """
            企业微信机器人发送文本,例:"广州今日天气:29度,大部分多云,降雨概率:60%"
            :param txt: 待发送文本内容
            :return:
            """
            self.req_json["msgtype"] = "text"
            self.req_json["text"] = {}
            self.req_json["text"]["content"] = txt
            self.req_json["text"]["mentioned_mobile_list"] = [phone for phone in self.robot_info['mentioned_mobile_list'].split(',')]
            try:
                resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
                mylog.info(f"企业微信机器人发送消息:req_json: {self.req_json}\n"
                           f"response: {resp.text}")
            except Exception:
                mylog.exception(f"企业微信机器人发送信息失败,text: {txt}")
    

    2.发送图片方法

    需要对图片进行base64和md5转换,附转换方法。

    代码如下(示例):

        def send_pic(self, pic_path: str) -> bool:
            """
            企业微信机器人发送图片
            :param pic_path: 图片路径
            :return:
            """
            self.req_json["msgtype"] = "image"
            self.req_json["image"] = {}
            self.req_json["image"]["base64"] = path2base64(pic_path)
            self.req_json["image"]["md5"] = path2md5(pic_path)
            try:
                resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
                mylog.info(f"企业微信机器人发送图片:req_json: {self.req_json}\n"
                           f"response: {resp.text}")
            except Exception:
                mylog.exception(f"企业微信机器人发送图片失败,req_json: {self.req_json}")
    

    base64与md5转换方法:

    import base64
    import hashlib
    def path2base64(path: str) -> str:
        """
        文件转换为base64
        :param path: 文件路径
        :return:
        """
        with open(path, "rb") as f:
            byte_data = f.read()
        base64_str = base64.b64encode(byte_data).decode("ascii")    # 二进制转base64
        return base64_str
    
    
    def path2md5(path: str) -> str:
        """
        文件转换为md5
        :param path: 文件路径
        :return:
        """
        with open(path, "rb") as f:
            byte_data = f.read()
        md5_str = md5(byte_data)
        return md5_str
    
    
    def md5(text: all) -> str:
        """
        md5加密
        :param text:
        :return:
        """
        m = hashlib.md5()
        m.update(text)
        return m.hexdigest()
    

    3.发送文件方法

    支持所有文件类型,excel/word/ppt/html/zip等等,发送文件前需要先上传文件。

    上传文件方法:

        def upload_file(self, file_path: str) -> str:
            """
            企业微信机器人上传文件,发送文件前需要现上传
            :param file_path: 文件路径
            :return:
            """
            try:
                data = {'file': open(file_path, 'rb')}
                resp = requests.post(self.upload_url, files=data)
                json_res = resp.json()
                if json_res.get('media_id'):
                    mylog.info(f"企业微信机器人上传文件成功,file:{file_path}")
                    return json_res.get('media_id')
            except Exception:
                mylog.exception(f"企业微信机器人上传文件失败,file: {file_path}")
                return ""
    

    发送文件方法:

        def send_file(self, file_path: str) -> bool:
            """
            企业微信机器人发送文件,例如:report.html
            :param file_path: 文件路径
            :return:
            """
            try:
                self.req_json["msgtype"] = "file"
                self.req_json["file"] = {}
                self.req_json["file"]["media_id"] = self.upload_file(file_path)
                resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
                mylog.info(f"企业微信机器人发送文件:req_json: {self.req_json}\n"
                           f"response: {resp.text}")
            except Exception:
                mylog.exception(f"企业微信机器人发送文件失败,req_json: {self.req_json}")
    

    四、整体代码

    import re
    import requests
    from base.init_data import InitData
    from base.my_logger import MyLogger
    from common.my_function import path2md5, path2base64
    
    mylog = MyLogger()
    
    
    class WechatRobot(InitData):
        """
        企业微信机器人发送消息类,当前支持文本、图片、文件,根据腾讯提供的API文档还支持图文、markdown、模板卡片等,待有需求时添加。
        每个机器人发送的消息不能超过20条/分钟。
        企业微信机器人API文档:https://developer.work.weixin.qq.com/document/path/92455
        """
    
        def __init__(self):
            self.headers = {'Content-Type': 'application/json'}
            self.robot_curl = self.robot_info['robot_curl']
            self.upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={re.findall("key=(.*)&*", self.robot_curl)[0]}&type=file'
            self.req_json = {}
    
        def send_txt(self, txt: str) -> bool:
            """
            企业微信机器人发送文本,例:"广州今日天气:29度,大部分多云,降雨概率:60%"
            :param txt: 待发送文本内容
            :return:
            """
            self.req_json["msgtype"] = "text"
            self.req_json["text"] = {}
            self.req_json["text"]["content"] = txt
            self.req_json["text"]["mentioned_mobile_list"] = [phone for phone in
                                                              self.robot_info['mentioned_mobile_list'].split(',')]
            try:
                resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
                mylog.info(f"企业微信机器人发送消息:req_json: {self.req_json}\n"
                           f"response: {resp.text}")
            except Exception:
                mylog.exception(f"企业微信机器人发送信息失败,text: {txt}")
    
        def send_pic(self, pic_path: str) -> bool:
            """
            企业微信机器人发送图片
            :param pic_path: 图片路径
            :return:
            """
            self.req_json["msgtype"] = "image"
            self.req_json["image"] = {}
            self.req_json["image"]["base64"] = path2base64(pic_path)
            self.req_json["image"]["md5"] = path2md5(pic_path)
            try:
                resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
                mylog.info(f"企业微信机器人发送图片:req_json: {self.req_json}\n"
                           f"response: {resp.text}")
            except Exception:
                mylog.exception(f"企业微信机器人发送图片失败,req_json: {self.req_json}")
    
        def send_file(self, file_path: str) -> bool:
            """
            企业微信机器人发送文件,例如:report.html
            :param file_path: 文件路径
            :return:
            """
            try:
                self.req_json["msgtype"] = "file"
                self.req_json["file"] = {}
                self.req_json["file"]["media_id"] = self.upload_file(file_path)
                resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
                mylog.info(f"企业微信机器人发送文件:req_json: {self.req_json}\n"
                           f"response: {resp.text}")
            except Exception:
                mylog.exception(f"企业微信机器人发送文件失败,req_json: {self.req_json}")
    
        def upload_file(self, file_path: str) -> str:
            """
            企业微信机器人上传文件,发送文件前需要现上传
            :param file_path: 文件路径
            :return:
            """
            try:
                data = {'file': open(file_path, 'rb')}
                resp = requests.post(self.upload_url, files=data)
                json_res = resp.json()
                if json_res.get('media_id'):
                    mylog.info(f"企业微信机器人上传文件成功,file:{file_path}")
                    return json_res.get('media_id')
            except Exception:
                mylog.exception(f"企业微信机器人上传文件失败,file: {file_path}")
                return ""
    
    
    if __name__ == '__main__':
        wechat_robot = WechatRobot()
        file_path = './testdatas.zip'
        wechat_robot.send_file(file_path)
    
    

    来源:强小锅

    物联沃分享整理
    物联沃-IOTWORD物联网 » 企业微信机器人消息发送(文本、图片、文件)

    发表评论