crypto_quant/core/wechat.py

141 lines
4.6 KiB
Python
Raw Normal View History

"""
微信群发功能
建议使用企业微信
但需要管理员提供企业id以及secret信息
通过wechatpy库实现
"""
import core.logger as logging
import requests
import base64
import hashlib
import json
2025-10-27 07:07:12 +00:00
import os
import time
2025-11-03 05:20:57 +00:00
import io
from PIL import Image, ImageChops
logger = logging.logger
class Wechat:
2025-09-15 07:02:49 +00:00
def __init__(self, key: str):
# 虽然config在根目录但是取决于调用代码在哪
# 只要启动代码文件在根目录config就能找到
2025-09-15 07:02:49 +00:00
self.key = key
self.url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.key}"
2025-10-27 07:07:12 +00:00
self.image_path = r"./output/wechat/image"
os.makedirs(self.image_path, exist_ok=True)
def send_text(self, text: str):
"""
发送文本消息
"""
data = {
"msgtype": "text",
"text": {"content": text}
}
response = requests.post(self.url, json=data)
response.raise_for_status()
return response.json()
def send_markdown(self, text: str):
"""
发送markdown消息
"""
data = {
"msgtype": "markdown_v2",
"markdown_v2": {"content": text}
}
response = requests.post(self.url, json=data)
response.raise_for_status()
return response.json()
def send_image(self, image_url: str):
"""
发送图片消息
"""
image_bytes = self.download_image(image_url)
2025-11-03 05:20:57 +00:00
base64_str, md5_str = self.get_base64_and_md5(image_bytes)
data = {
"msgtype": "image",
"image": {
"base64": base64_str,
"md5": md5_str,
}
}
2025-10-27 07:07:12 +00:00
# 获取url中图片的名称
image_name = image_url.split("/")[-1].split(".")[0]
# 获取当前时间格式为YYYYMMDDHHMMSS
now_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
image_name = f"{image_name}_{now_time}.jpg"
image_path = os.path.join(self.image_path, image_name)
with open(image_path, "wb") as f:
f.write(image_bytes)
2025-10-28 08:56:47 +00:00
response = requests.post(self.url, json=data)
response.raise_for_status()
2025-10-27 07:07:12 +00:00
return response.json(), image_path, base64_str, md5_str
def download_image(self, image_url):
"""下载图片并返回 bytes"""
2025-11-03 05:20:57 +00:00
response = requests.get(image_url, timeout=300)
response.raise_for_status() # 抛出 HTTP 错误
2025-11-03 05:20:57 +00:00
image_bytes = response.content
# 得到图片的长与宽
image_width, image_height = self.get_image_width_and_height(image_bytes)
if image_width > 720 or image_height > 720:
# 如果图片长宽大于720则缩小图片
# 计算缩放比例
scale = 720 / max(image_width, image_height)
new_width = int(image_width * scale)
new_height = int(image_height * scale)
# 缩放图片
image = Image.open(io.BytesIO(image_bytes))
image.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
# 将缩放后的图片保存为bytes
img_byte_arr = io.BytesIO()
# 保持原始格式如果是RGBA格式则保存为PNG否则保存为JPEG
if image.mode in ('RGBA', 'LA', 'P'):
image.save(img_byte_arr, format='PNG')
else:
image.save(img_byte_arr, format='JPEG', quality=95)
image_bytes = img_byte_arr.getvalue()
return image_bytes
def get_image_width_and_height(self, image_bytes):
"""得到图片的长与宽"""
image = Image.open(io.BytesIO(image_bytes))
width, height = image.size
return width, height
def get_base64_and_md5(self,image_bytes):
"""计算 Base64不带 data: 前缀)和 MD5"""
b64 = base64.b64encode(image_bytes).decode('utf-8')
md5 = hashlib.md5(image_bytes).hexdigest()
return b64, md5
def send_file(self, file_url: str):
"""
发送文件消息
"""
data = {
"msgtype": "file",
"file": {"url": file_url}
}
response = requests.post(self.url, json=data)
response.raise_for_status()
return response.json()
2025-11-03 05:20:57 +00:00
def send_news(self, news: list):
"""
发送图文消息
"""
data = {
"msgtype": "news",
"news": {"articles": news}
}
response = requests.post(self.url, json=data)
response.raise_for_status()
return response.json()