diff --git a/config.py b/config.py index 4804750..53eb277 100644 --- a/config.py +++ b/config.py @@ -234,4 +234,6 @@ TWITTER_CONFIG = { } TRUTH_SOCIAL_API = {"api_key": "FRfhlDHnmYc1PCCrVHZdWtqDENr2", -"user_id": {"realDonaldTrump": "107780257626128497"}} \ No newline at end of file +"user_id": {"realDonaldTrump": "107780257626128497"}} + +ALI_API_KEY = "sk-216039fdd9ee4bc48667418b23e648d0" \ No newline at end of file diff --git a/core/db/db_truth_social_content.py b/core/db/db_truth_social_content.py index 0cf2c88..8aaf326 100644 --- a/core/db/db_truth_social_content.py +++ b/core/db/db_truth_social_content.py @@ -17,6 +17,8 @@ class DBTruthSocialContent: "timestamp", "date_time", "text", + "analysis_result", + "analysis_token", "media_url", "media_type", "media_thumbnail" diff --git a/core/media/__pycache__/truth_social_retriever.cpython-312.pyc b/core/media/__pycache__/truth_social_retriever.cpython-312.pyc index 9cc9fa1..d25757a 100644 Binary files a/core/media/__pycache__/truth_social_retriever.cpython-312.pyc and b/core/media/__pycache__/truth_social_retriever.cpython-312.pyc differ diff --git a/core/media/truth_social_retriever.py b/core/media/truth_social_retriever.py index d655989..e6f830f 100644 --- a/core/media/truth_social_retriever.py +++ b/core/media/truth_social_retriever.py @@ -1,6 +1,6 @@ import core.logger as logging from core.db.db_truth_social_content import DBTruthSocialContent -from config import TRUTH_SOCIAL_API, COIN_MYSQL_CONFIG, WECHAT_CONFIG +from config import TRUTH_SOCIAL_API, COIN_MYSQL_CONFIG, WECHAT_CONFIG, ALI_API_KEY from core.wechat import Wechat import requests @@ -11,9 +11,11 @@ import time from datetime import datetime import pytz import pandas as pd +import dashscope logger = logging.logger + class TruthSocialRetriever: def __init__(self) -> None: self.api_key = TRUTH_SOCIAL_API.get("api_key", "") @@ -38,25 +40,33 @@ class TruthSocialRetriever: self.save_path = r"./output/media/truth_social/" os.makedirs(self.save_path, exist_ok=True) - def get_user_id_from_page(self, handle='realDonaldTrump'): - url = f'https://truthsocial.com/@{handle}' - headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} # 模拟浏览器 - + self.ali_api_key = ALI_API_KEY + instruction_file = r"./instructions/media_article_instructions.json" + with open(instruction_file, "r", encoding="utf-8") as f: + self.instruction = json.load(f) + + def get_user_id_from_page(self, handle="realDonaldTrump"): + url = f"https://truthsocial.com/@{handle}" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + } # 模拟浏览器 + response = requests.get(url, headers=headers) response.raise_for_status() - - soup = BeautifulSoup(response.text, 'html.parser') + + soup = BeautifulSoup(response.text, "html.parser") # 查找嵌入的 JSON(Truth Social 使用 data 属性或 script 标签) - scripts = soup.find_all('script') + scripts = soup.find_all("script") for script in scripts: - if script.string and 'id' in script.string and handle in script.string: + if script.string and "id" in script.string and handle in script.string: # 简单提取(实际可能需正则匹配 JSON) import re + match = re.search(r'"id"\s*:\s*"(\d+)"', script.string) if match: return match.group(1) return None - + def get_user_posts(self, limit: int = None): """ 获取用户在 Truth Social 的最新帖子。 @@ -66,65 +76,64 @@ class TruthSocialRetriever: 497美元:500,000次,如果5分钟跑一次,则可以跑1736天 参数: - limit: 最大帖子数(API 默认返回 20 条,可通过分页获取更多)。 - + 返回: - 帖子列表(JSON 格式)。 """ - headers = { - 'x-api-key': self.api_key, - 'Content-Type': 'application/json' - } - + headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} + for user_name, user_id in self.user_info.items(): params = { - 'handle': user_name, # 用户名 - 'user_id': user_id, # 可选,用户 ID - 'next_max_id': None, # 分页时设置为上一次响应的 max_id - 'trim': 'false' # 保留完整内容 + "handle": user_name, # 用户名 + "user_id": user_id, # 可选,用户 ID + "next_max_id": None, # 分页时设置为上一次响应的 max_id + "trim": "false", # 保留完整内容 } - - url = 'https://api.scrapecreators.com/v1/truthsocial/user/posts' + + url = "https://api.scrapecreators.com/v1/truthsocial/user/posts" logger.info(f"Searching contents for user: {user_name}") try: response = requests.get(url, headers=headers, params=params) response.raise_for_status() # 检查 HTTP 错误 data = response.json() - + # 提取帖子列表(假设响应中 'posts' 是键,根据实际文档调整) if limit is not None and isinstance(limit, int): - posts = data.get('posts', [])[:limit] + posts = data.get("posts", [])[:limit] else: - posts = data.get('posts', []) + posts = data.get("posts", []) results = [] if posts: logger.info(f"获取{user_name}帖子: {len(posts)}条") for post in posts: result = {} - result["article_id"] = post.get('id') + result["article_id"] = post.get("id") result["user_id"] = user_id result["user_name"] = user_name - datetime_text = post.get('created_at') + datetime_text = post.get("created_at") datetime_dict = self.transform_datetime(datetime_text) timestamp_ms = datetime_dict["timestamp_ms"] result["timestamp"] = timestamp_ms beijing_time_str = datetime_dict["beijing_time_str"] result["date_time"] = beijing_time_str - result["text"] = post.get('text', '无内容') - media_attachments = post.get('media_attachments', []) + result["text"] = post.get("text", "无内容") + media_attachments = post.get("media_attachments", []) result["media_url"] = "" result["media_type"] = "" result["media_thumbnail"] = "" if media_attachments: for media_attachment in media_attachments: - result["media_url"] = media_attachment.get('url') - result["media_type"] = media_attachment.get('type') - result["media_thumbnail"] = media_attachment.get('preview_url') + result["media_url"] = media_attachment.get("url") + result["media_type"] = media_attachment.get("type") + result["media_thumbnail"] = media_attachment.get( + "preview_url" + ) break results.append(result) else: print("获取帖子失败,请检查 API 密钥或网络。") - + if len(results) > 0: # user_path = os.path.join(self.save_path, user_name) # os.makedirs(user_path, exist_ok=True) @@ -136,19 +145,37 @@ class TruthSocialRetriever: # logger.info(f"已将{len(results)}条数据保存到: {json_file_name}") result_df = pd.DataFrame(results) result_df = self.remove_duplicate_posts(result_df) + result_df["analysis_result"] = "" + result_df["analysis_token"] = 0 if len(result_df) > 0: + result_df = self.send_wechat_message(result_df) + result_df = result_df[ + [ + "article_id", + "user_id", + "user_name", + "timestamp", + "date_time", + "text", + "analysis_result", + "analysis_token", + "media_url", + "media_type", + "media_thumbnail", + ] + ] self.db_truth_social_content.insert_data_to_mysql(result_df) logger.info(f"已将{len(result_df)}条数据插入到数据库") - self.send_wechat_message(result_df) + else: logger.info(f"没有数据需要插入到数据库和发送企业微信消息") except requests.exceptions.RequestException as e: print(f"请求错误: {e}") except json.JSONDecodeError as e: print(f"JSON 解析错误: {e}") - + def send_message_by_json_file(self, json_file_name: str): - with open(json_file_name, 'r', encoding='utf-8') as f: + with open(json_file_name, "r", encoding="utf-8") as f: results = json.load(f) result_df = pd.DataFrame(results) result_df = self.remove_duplicate_posts(result_df) @@ -156,13 +183,15 @@ class TruthSocialRetriever: self.send_wechat_message(result_df) else: logger.info(f"没有数据需要发送企业微信消息") - + def remove_duplicate_posts(self, result_df: pd.DataFrame): try: duplicate_index_list = [] for index, row in result_df.iterrows(): article_id = row["article_id"] - exist_data = self.db_truth_social_content.query_data_by_article_id(article_id) + exist_data = self.db_truth_social_content.query_data_by_article_id( + article_id + ) if exist_data: duplicate_index_list.append(index) # 删除重复的行 @@ -174,7 +203,7 @@ class TruthSocialRetriever: result_df = pd.DataFrame([]) logger.error(f"删除重复的行失败: {e}") return result_df - + def send_wechat_message(self, result_df: pd.DataFrame): if self.wechat is None: logger.error("企业微信未初始化") @@ -188,18 +217,73 @@ class TruthSocialRetriever: self.wechat.send_image(media_thumbnail) else: contents = [] - contents.append(f"### 川普推文") + contents.append(f"## 川普推文") contents.append(text) - contents.append(f"### 推文时间") + contents.append(f"## 推文时间") contents.append(date_time) mark_down_text = "\n\n".join(contents) - self.wechat.send_markdown(mark_down_text) + analysis_result, analysis_token = self.analyze_truth_social_content( + text + ) + result_df.at[index, "analysis_result"] = analysis_result + result_df.at[index, "analysis_token"] = analysis_token + analysis_text = f"\n\n## 分析结果\n\n{analysis_result}" + analysis_text += f"\n\n## 分析token\n\n{analysis_token}" + if self.calculate_bytes(mark_down_text + analysis_text) > 4096: + self.wechat.send_markdown(mark_down_text) + if self.calculate_bytes(analysis_text) > 4096: + half_analysis_text_length = len(analysis_text) // 2 + analysis_1st = analysis_text[:half_analysis_text_length].strip() + analysis_2nd = analysis_text[half_analysis_text_length:].strip() + self.wechat.send_markdown( + f"## 分析结果第一部分\n\n{analysis_1st}" + ) + self.wechat.send_markdown( + f"## 分析结果第二部分\n\n{analysis_2nd}" + ) + else: + self.wechat.send_markdown(f"## 分析结果\n\n{analysis_text}") + else: + self.wechat.send_markdown(mark_down_text + analysis_text) except Exception as e: logger.error(f"发送企业微信消息失败: {e}") continue - + return result_df + + def calculate_bytes(self, text: str): + return len(text.encode("utf-8")) + + def analyze_truth_social_content(self, text: str): + try: + context = text + instructions = self.instruction.get("Instructions", "") + output = self.instruction.get("Output", "") + prompt = f"# Context\n\n{context}\n\n# Instructions\n\n{instructions}\n\n# Output\n\n{output}" + response = dashscope.Generation.call( + api_key=self.ali_api_key, + model="qwen-plus", + messages=[{"role": "user", "content": prompt}], + enable_search=True, + search_options={"forced_search": True}, # 强制联网搜索 + result_format="message", + ) + response_contents = ( + response.get("output", {}) + .get("choices", [])[0] + .get("message", {}) + .get("content", "") + ) + # 获取response的token + token = response.get("usage", {}).get("total_tokens", 0) + return response_contents, token + except Exception as e: + logger.error(f"分析推文失败: {e}") + return None + def transform_datetime(self, datetime_text: str): - utc_time = datetime.strptime(datetime_text, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=pytz.UTC) + utc_time = datetime.strptime(datetime_text, "%Y-%m-%dT%H:%M:%S.%fZ").replace( + tzinfo=pytz.UTC + ) # 1. 转换为时间戳(毫秒) timestamp_ms = int(utc_time.timestamp() * 1000) @@ -209,9 +293,5 @@ class TruthSocialRetriever: beijing_time_str = beijing_time.strftime("%Y-%m-%dT%H:%M:%S%z") # 插入冒号到时区偏移(如 +0800 -> +08:00) beijing_time_str = beijing_time_str[:-2] + ":" + beijing_time_str[-2:] - result = { - "timestamp_ms": timestamp_ms, - "beijing_time_str": beijing_time_str - } + result = {"timestamp_ms": timestamp_ms, "beijing_time_str": beijing_time_str} return result - diff --git a/instructions/media_article_instructions.json b/instructions/media_article_instructions.json new file mode 100644 index 0000000..49033bf --- /dev/null +++ b/instructions/media_article_instructions.json @@ -0,0 +1,5 @@ +{ + "Context": "{0}\n\n", + "Instructions": "你是一个专业的时政与金融分析师,你的任务是分析推文,结合推文时间(北京时间),联网搜索,并给出分析结果。\n要求:1. 翻译推文为中文,要求符合中文表达习惯;\n2. 分析推文内容,给出推文的核心观点;\n3. 人物分析:分析推文涉及人物以及人物简介;4. 区域分析:包括国家与地区;5. 行业以及影响分析;6. 经济与金融分析:分析涉及经济与金融影响,包括美股、虚拟货币以及中国A股;\n\n", + "Output": "## 输出要求\n\n除了翻译之外,核心观点+人物分析+区域分析+行业及影响分析+经济与金融分析,不超过1000汉字。\n\n## 输出格式\n\n### 翻译\n\n### 人物分析\n\n### 区域分析\n\n### 行业及影响分析\n\n### 经济与金融分析\n\n" +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 01c840d..c97f5b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,5 @@ xlsxwriter >= 3.2.5 openpyxl >= 3.1.5 cryptography >= 3.4.8 mplfinance -schedule \ No newline at end of file +schedule +dashscope >= 1.24.7 \ No newline at end of file diff --git a/sql/table/truth_social_content.sql b/sql/table/truth_social_content.sql index aaaf4ba..c053a08 100644 --- a/sql/table/truth_social_content.sql +++ b/sql/table/truth_social_content.sql @@ -5,6 +5,8 @@ CREATE TABLE `truth_social_content` ( `timestamp` BIGINT NOT NULL, `date_time` VARCHAR(50) NOT NULL, `text` TEXT NOT NULL, + `analysis_result` TEXT NULL, + `analysis_token` INT NULL, `media_url` TEXT NULL, `media_type` VARCHAR(50) NULL, `media_thumbnail` TEXT NULL, @@ -14,6 +16,10 @@ CREATE TABLE `truth_social_content` ( -- 对于 MySQL 8.0.29 之前的版本不支持 "ADD COLUMN IF NOT EXISTS" -- 如需在已有表上添加列,请分别执行以下语句(每条仅需执行一次) +ALTER TABLE `truth_social_content` + ADD COLUMN `analysis_result` TEXT NULL DEFAULT NULL AFTER `text`; +ALTER TABLE `truth_social_content` + ADD COLUMN `analysis_token` INT NULL DEFAULT NULL AFTER `analysis_result`; ALTER TABLE `truth_social_content` ADD COLUMN `media_url` TEXT NULL DEFAULT NULL AFTER `text`; ALTER TABLE `truth_social_content`