""" 微信群发功能 建议使用企业微信 但需要管理员提供企业id以及secret信息 通过wechatpy库实现 """ import core.logger as logging import requests import base64 import hashlib import json logger = logging.logger class Wechat: def __init__(self, key: str): # 虽然config在根目录,但是取决于调用代码在哪 # 只要启动代码文件在根目录,config就能找到 self.key = key self.url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.key}" def send_text(self, text: str): """ 发送文本消息 """ data = { "msgtype": "text", "text": {"content": text} } response = requests.post(self.url, json=data) response.raise_for_status() return response.json() def send_markdown(self, text: str): """ 发送markdown消息 """ data = { "msgtype": "markdown_v2", "markdown_v2": {"content": text} } response = requests.post(self.url, json=data) response.raise_for_status() return response.json() def send_image(self, image_url: str): """ 发送图片消息 """ # data = { # "msgtype": "image", # "image": {"url": image_url} # } # response = requests.post(self.url, json=data) # return response.json() image_bytes = self.download_image(image_url) base64_str, md5_str = self.get_base64_and_md5(image_bytes) data = { "msgtype": "image", "image": { "base64": base64_str, "md5": md5_str, } } response = requests.post(self.url, json=data) response.raise_for_status() return response.json() def download_image(self, image_url): """下载图片并返回 bytes""" response = requests.get(image_url, timeout=10) response.raise_for_status() # 抛出 HTTP 错误 return response.content def get_base64_and_md5(self,image_bytes): """计算 Base64(不带 data: 前缀)和 MD5""" b64 = base64.b64encode(image_bytes).decode('utf-8') md5 = hashlib.md5(image_bytes).hexdigest() return b64, md5 def send_file(self, file_url: str): """ 发送文件消息 """ data = { "msgtype": "file", "file": {"url": file_url} } response = requests.post(self.url, json=data) response.raise_for_status() return response.json() def send_news(self, news: list): """ 发送图文消息 """ data = { "msgtype": "news", "news": {"articles": news} } response = requests.post(self.url, json=data) response.raise_for_status() return response.json()