企业微信机器人消息发送(文本、图片、文件)
文章目录
一、企业微信机器人用途
本文记录将企业微信机器人应用在接口自动化测试过程中,对项目初始化异常报警以及执行结束将报告输出。
企业微信机器人API文档:https://developer.work.weixin.qq.com/document/path/92455
二、前置条件
获取webhookurl
具有后台权限的管理员添加机器人后,可读取到机器人特有的webhookurl。将机器人关联到某一群组中,请求webhookurl时,机器人将携带的消息发送至该群。
将机器人信息写入配置文件***.ini中:
[ALARM_ROBOT]
# 存放webhookurl
robot_curl=https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=****
# 存放需要@的成员的手机号码
mentioned_mobile_list=18*********
三、方法实现
1.发送文本消息方法
构造请求体,txt为待发送文本;mentioned_mobile_list为需要@的成员的手机号码,通过配置文件读取。未添加对返回报文的校验,只写入日志。
代码如下(示例):
def send_txt(self, txt: str):
"""
企业微信机器人发送文本,例:"广州今日天气:29度,大部分多云,降雨概率:60%"
:param txt: 待发送文本内容
:return:
"""
self.req_json["msgtype"] = "text"
self.req_json["text"] = {}
self.req_json["text"]["content"] = txt
self.req_json["text"]["mentioned_mobile_list"] = [phone for phone in self.robot_info['mentioned_mobile_list'].split(',')]
try:
resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
mylog.info(f"企业微信机器人发送消息:req_json: {self.req_json}\n"
f"response: {resp.text}")
except Exception:
mylog.exception(f"企业微信机器人发送信息失败,text: {txt}")
2.发送图片方法
需要对图片进行base64和md5转换,附转换方法。
代码如下(示例):
def send_pic(self, pic_path: str) -> bool:
"""
企业微信机器人发送图片
:param pic_path: 图片路径
:return:
"""
self.req_json["msgtype"] = "image"
self.req_json["image"] = {}
self.req_json["image"]["base64"] = path2base64(pic_path)
self.req_json["image"]["md5"] = path2md5(pic_path)
try:
resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
mylog.info(f"企业微信机器人发送图片:req_json: {self.req_json}\n"
f"response: {resp.text}")
except Exception:
mylog.exception(f"企业微信机器人发送图片失败,req_json: {self.req_json}")
base64与md5转换方法:
import base64
import hashlib
def path2base64(path: str) -> str:
"""
文件转换为base64
:param path: 文件路径
:return:
"""
with open(path, "rb") as f:
byte_data = f.read()
base64_str = base64.b64encode(byte_data).decode("ascii") # 二进制转base64
return base64_str
def path2md5(path: str) -> str:
"""
文件转换为md5
:param path: 文件路径
:return:
"""
with open(path, "rb") as f:
byte_data = f.read()
md5_str = md5(byte_data)
return md5_str
def md5(text: all) -> str:
"""
md5加密
:param text:
:return:
"""
m = hashlib.md5()
m.update(text)
return m.hexdigest()
3.发送文件方法
支持所有文件类型,excel/word/ppt/html/zip等等,发送文件前需要先上传文件。
上传文件方法:
def upload_file(self, file_path: str) -> str:
"""
企业微信机器人上传文件,发送文件前需要现上传
:param file_path: 文件路径
:return:
"""
try:
data = {'file': open(file_path, 'rb')}
resp = requests.post(self.upload_url, files=data)
json_res = resp.json()
if json_res.get('media_id'):
mylog.info(f"企业微信机器人上传文件成功,file:{file_path}")
return json_res.get('media_id')
except Exception:
mylog.exception(f"企业微信机器人上传文件失败,file: {file_path}")
return ""
发送文件方法:
def send_file(self, file_path: str) -> bool:
"""
企业微信机器人发送文件,例如:report.html
:param file_path: 文件路径
:return:
"""
try:
self.req_json["msgtype"] = "file"
self.req_json["file"] = {}
self.req_json["file"]["media_id"] = self.upload_file(file_path)
resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
mylog.info(f"企业微信机器人发送文件:req_json: {self.req_json}\n"
f"response: {resp.text}")
except Exception:
mylog.exception(f"企业微信机器人发送文件失败,req_json: {self.req_json}")
四、整体代码
import re
import requests
from base.init_data import InitData
from base.my_logger import MyLogger
from common.my_function import path2md5, path2base64
mylog = MyLogger()
class WechatRobot(InitData):
"""
企业微信机器人发送消息类,当前支持文本、图片、文件,根据腾讯提供的API文档还支持图文、markdown、模板卡片等,待有需求时添加。
每个机器人发送的消息不能超过20条/分钟。
企业微信机器人API文档:https://developer.work.weixin.qq.com/document/path/92455
"""
def __init__(self):
self.headers = {'Content-Type': 'application/json'}
self.robot_curl = self.robot_info['robot_curl']
self.upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={re.findall("key=(.*)&*", self.robot_curl)[0]}&type=file'
self.req_json = {}
def send_txt(self, txt: str) -> bool:
"""
企业微信机器人发送文本,例:"广州今日天气:29度,大部分多云,降雨概率:60%"
:param txt: 待发送文本内容
:return:
"""
self.req_json["msgtype"] = "text"
self.req_json["text"] = {}
self.req_json["text"]["content"] = txt
self.req_json["text"]["mentioned_mobile_list"] = [phone for phone in
self.robot_info['mentioned_mobile_list'].split(',')]
try:
resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
mylog.info(f"企业微信机器人发送消息:req_json: {self.req_json}\n"
f"response: {resp.text}")
except Exception:
mylog.exception(f"企业微信机器人发送信息失败,text: {txt}")
def send_pic(self, pic_path: str) -> bool:
"""
企业微信机器人发送图片
:param pic_path: 图片路径
:return:
"""
self.req_json["msgtype"] = "image"
self.req_json["image"] = {}
self.req_json["image"]["base64"] = path2base64(pic_path)
self.req_json["image"]["md5"] = path2md5(pic_path)
try:
resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
mylog.info(f"企业微信机器人发送图片:req_json: {self.req_json}\n"
f"response: {resp.text}")
except Exception:
mylog.exception(f"企业微信机器人发送图片失败,req_json: {self.req_json}")
def send_file(self, file_path: str) -> bool:
"""
企业微信机器人发送文件,例如:report.html
:param file_path: 文件路径
:return:
"""
try:
self.req_json["msgtype"] = "file"
self.req_json["file"] = {}
self.req_json["file"]["media_id"] = self.upload_file(file_path)
resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
mylog.info(f"企业微信机器人发送文件:req_json: {self.req_json}\n"
f"response: {resp.text}")
except Exception:
mylog.exception(f"企业微信机器人发送文件失败,req_json: {self.req_json}")
def upload_file(self, file_path: str) -> str:
"""
企业微信机器人上传文件,发送文件前需要现上传
:param file_path: 文件路径
:return:
"""
try:
data = {'file': open(file_path, 'rb')}
resp = requests.post(self.upload_url, files=data)
json_res = resp.json()
if json_res.get('media_id'):
mylog.info(f"企业微信机器人上传文件成功,file:{file_path}")
return json_res.get('media_id')
except Exception:
mylog.exception(f"企业微信机器人上传文件失败,file: {file_path}")
return ""
if __name__ == '__main__':
wechat_robot = WechatRobot()
file_path = './testdatas.zip'
wechat_robot.send_file(file_path)
来源:强小锅