diff --git a/config.py b/config.py index a541f04..4f4c160 100644 --- a/config.py +++ b/config.py @@ -150,7 +150,7 @@ A_STOCK_MONITOR_CONFIG = { "000333.SZ", "002230.SZ", "300308.SZ", - "002475.SZ" + "002475.SZ", ], "bars": ["1D", "1W", "1M"], "initial_date": "2015-01-01 00:00:00", @@ -236,7 +236,7 @@ ITICK_API_KEY = "dfd4bc0caed148d6bc03b960224754ffb5356349e389431f828702b3a27e8a2 TWITTER_CONFIG = { "keys": { "api_key": "FRfhlDHnmYc1PCCrVHZdWtqDENr2", - }, + }, "base_url": "https://api.scrapecreators.com/v1/twitter/user-tweets?handle={0}&trim=false", "monitor_accounts": [ {"name": "realDonaldTrump"}, @@ -247,7 +247,18 @@ TWITTER_CONFIG = { ], } -TRUTH_SOCIAL_API = {"api_key": "FRfhlDHnmYc1PCCrVHZdWtqDENr2", -"user_id": {"realDonaldTrump": "107780257626128497"}} +TRUTH_SOCIAL_API = { + "api_key": "FRfhlDHnmYc1PCCrVHZdWtqDENr2", + "media_config": [ + { + "media_name": "Truth Social", + "base_url": "https://api.scrapecreators.com/v1/truthsocial/user/posts", + "user_info": { + "WhiteHouse": {"id": "", "full_name": "白宫"}, + "realDonaldTrump": {"id": "107780257626128497", "full_name": "川普"}, + }, + } + ], +} -ALI_API_KEY = "sk-216039fdd9ee4bc48667418b23e648d0" \ No newline at end of file +ALI_API_KEY = "sk-216039fdd9ee4bc48667418b23e648d0" diff --git a/core/media/__pycache__/truth_social_retriever.cpython-312.pyc b/core/media/__pycache__/truth_social_retriever.cpython-312.pyc index 150311b..963507a 100644 Binary files a/core/media/__pycache__/truth_social_retriever.cpython-312.pyc and b/core/media/__pycache__/truth_social_retriever.cpython-312.pyc differ diff --git a/core/media/truth_social_retriever.py b/core/media/truth_social_retriever.py index 5b4a6d3..e797897 100644 --- a/core/media/truth_social_retriever.py +++ b/core/media/truth_social_retriever.py @@ -19,7 +19,8 @@ logger = logging.logger class TruthSocialRetriever: def __init__(self) -> None: self.api_key = TRUTH_SOCIAL_API.get("api_key", "") - self.user_info = TRUTH_SOCIAL_API.get("user_id", {}) + self.media_config_list = TRUTH_SOCIAL_API.get("media_config", []) + # self.user_info = TRUTH_SOCIAL_API.get("user_id", {}) mysql_user = COIN_MYSQL_CONFIG.get("user", "xch") mysql_password = COIN_MYSQL_CONFIG.get("password", "") if not mysql_password: @@ -52,6 +53,10 @@ class TruthSocialRetriever: image_post_instruction_file = r"./instructions/media_image_post_instructions.json" with open(image_post_instruction_file, "r", encoding="utf-8") as f: self.image_post_instruction = json.load(f) + + text_image_post_instruction_file = r"./instructions/media_article_image_post_instructions.json" + with open(text_image_post_instruction_file, "r", encoding="utf-8") as f: + self.text_image_post_instruction = json.load(f) def get_user_id_from_page(self, handle="realDonaldTrump"): url = f"https://truthsocial.com/@{handle}" @@ -90,89 +95,96 @@ class TruthSocialRetriever: """ headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} - for user_name, user_id in self.user_info.items(): - params = { - "handle": user_name, # 用户名 - "user_id": user_id, # 可选,用户 ID - "next_max_id": None, # 分页时设置为上一次响应的 max_id - "trim": "false", # 保留完整内容 - } + for media_config in self.media_config_list: + media_name = media_config.get("media_name", "") + logger.info(f"开始获取{media_name}的帖子") + base_url = media_config.get("base_url", "") + user_info = media_config.get("user_info", {}) + for user_name, user_details in user_info.items(): + user_id = user_details.get("id", "") + user_full_name = user_details.get("full_name", "") - url = "https://api.scrapecreators.com/v1/truthsocial/user/posts" - logger.info(f"Searching contents for user: {user_name}") - try: - response = requests.get(url, headers=headers, params=params) - response.raise_for_status() # 检查 HTTP 错误 - data = response.json() + params = { + "handle": user_name, # 用户名 + "user_id": user_id, # 可选,用户 ID + "next_max_id": None, # 分页时设置为上一次响应的 max_id + "trim": "false", # 保留完整内容 + } - # 提取帖子列表(假设响应中 'posts' 是键,根据实际文档调整) - if limit is not None and isinstance(limit, int): - posts = data.get("posts", [])[:limit] - else: - posts = data.get("posts", []) + logger.info(f"Searching contents for user: {user_name}") + try: + response = requests.get(base_url, headers=headers, params=params) + response.raise_for_status() # 检查 HTTP 错误 + data = response.json() - results = [] - if posts: - logger.info(f"获取{user_name}帖子: {len(posts)}条") - for post in posts: - result = {} - result["article_id"] = post.get("id") - result["user_id"] = user_id - result["user_name"] = user_name - datetime_text = post.get("created_at") - datetime_dict = self.transform_datetime(datetime_text) - timestamp_ms = datetime_dict["timestamp_ms"] - result["timestamp"] = timestamp_ms - beijing_time_str = datetime_dict["beijing_time_str"] - result["date_time"] = beijing_time_str - result["text"] = post.get("text", "无内容") - media_attachments = post.get("media_attachments", []) - result["media_url"] = "" - result["media_type"] = "" - result["media_thumbnail"] = "" - if media_attachments: - for media_attachment in media_attachments: - result["media_url"] = media_attachment.get("url") - result["media_type"] = media_attachment.get("type") - result["media_thumbnail"] = media_attachment.get( - "preview_url" - ) - break - results.append(result) - else: - print("获取帖子失败,请检查 API 密钥或网络。") - - if len(results) > 0: - result_df = pd.DataFrame(results) - result_df = self.remove_duplicate_posts(result_df) - - if len(result_df) > 0: - result_df["analysis_result"] = "" - result_df["analysis_token"] = 0 - result_df = self.send_wechat_message(result_df) - result_df = result_df[ - [ - "article_id", - "user_id", - "user_name", - "timestamp", - "date_time", - "text", - "analysis_result", - "analysis_token", - "media_url", - "media_type", - "media_thumbnail", - ] - ] - self.db_truth_social_content.insert_data_to_mysql(result_df) - logger.info(f"已将{len(result_df)}条数据插入到数据库") + # 提取帖子列表(假设响应中 'posts' 是键,根据实际文档调整) + if limit is not None and isinstance(limit, int): + posts = data.get("posts", [])[:limit] else: - logger.info(f"没有数据需要插入到数据库和发送企业微信消息") - except requests.exceptions.RequestException as e: - print(f"请求错误: {e}") - except json.JSONDecodeError as e: - print(f"JSON 解析错误: {e}") + posts = data.get("posts", []) + + results = [] + if posts: + logger.info(f"获取{user_name}帖子: {len(posts)}条") + for post in posts: + result = {} + result["article_id"] = post.get("id") + result["user_id"] = user_id + result["user_name"] = user_name + datetime_text = post.get("created_at") + datetime_dict = self.transform_datetime(datetime_text) + timestamp_ms = datetime_dict["timestamp_ms"] + result["timestamp"] = timestamp_ms + beijing_time_str = datetime_dict["beijing_time_str"] + result["date_time"] = beijing_time_str + result["text"] = post.get("text", "") + media_attachments = post.get("media_attachments", []) + result["media_url"] = "" + result["media_type"] = "" + result["media_thumbnail"] = "" + if media_attachments: + for media_attachment in media_attachments: + result["media_url"] = media_attachment.get("url") + result["media_type"] = media_attachment.get("type") + result["media_thumbnail"] = media_attachment.get( + "preview_url" + ) + break + results.append(result) + else: + print("获取帖子失败,请检查 API 密钥或网络。") + + if len(results) > 0: + result_df = pd.DataFrame(results) + result_df = self.remove_duplicate_posts(result_df) + + if len(result_df) > 0: + result_df["analysis_result"] = "" + result_df["analysis_token"] = 0 + result_df = self.send_wechat_message(result_df, user_full_name) + result_df = result_df[ + [ + "article_id", + "user_id", + "user_name", + "timestamp", + "date_time", + "text", + "analysis_result", + "analysis_token", + "media_url", + "media_type", + "media_thumbnail", + ] + ] + self.db_truth_social_content.insert_data_to_mysql(result_df) + logger.info(f"已将{len(result_df)}条数据插入到数据库") + else: + logger.info(f"没有数据需要插入到数据库和发送企业微信消息") + except requests.exceptions.RequestException as e: + print(f"请求错误: {e}") + except json.JSONDecodeError as e: + print(f"JSON 解析错误: {e}") def send_message_by_json_file(self, json_file_name: str): with open(json_file_name, "r", encoding="utf-8") as f: @@ -204,7 +216,7 @@ class TruthSocialRetriever: logger.error(f"删除重复的行失败: {e}") return result_df - def send_wechat_message(self, result_df: pd.DataFrame): + def send_wechat_message(self, result_df: pd.DataFrame, user_full_name: str): if self.wechat is None: logger.error("企业微信未初始化") return @@ -213,7 +225,72 @@ class TruthSocialRetriever: date_time = row["date_time"] text = row["text"] media_thumbnail = row["media_thumbnail"] - if media_thumbnail and len(media_thumbnail) > 0: + if len(text) > 0: + if media_thumbnail and len(media_thumbnail) > 0: + contents = [] + contents.append(f"## {user_full_name}推文") + contents.append(text) + contents.append(f"## 推文时间") + contents.append(date_time) + mark_down_text = "\n\n".join(contents) + self.wechat.send_markdown(mark_down_text) + response, image_path, base64_str, md5_str = self.wechat.send_image(media_thumbnail) + image_format = "jpg" + if image_path is not None and len(image_path) > 0: + image_format = image_path.split(".")[-1] + if image_format == "jpeg": + image_format = "jpg" + analysis_result, analysis_token = self.analyze_truth_social_content( + text=mark_down_text, + image_stream=base64_str, + image_format=image_format, + media_type="hybrid", + user_full_name=user_full_name + ) + if analysis_result is not None and len(analysis_result) > 0: + result_df.at[index, "analysis_result"] = analysis_result + result_df.at[index, "analysis_token"] = analysis_token + else: + result_df.at[index, "analysis_result"] = "" + result_df.at[index, "analysis_token"] = 0 + analysis_text = f"\n\n## 上述图文分析结果\n\n{analysis_result}" + analysis_text += f"\n\n## 上述图文分析token\n\n{analysis_token}" + self.wechat.send_markdown(analysis_text) + else: + contents = [] + contents.append(f"## {user_full_name}推文") + contents.append(text) + contents.append(f"## 推文时间") + contents.append(date_time) + mark_down_text = "\n\n".join(contents) + analysis_result, analysis_token = self.analyze_truth_social_content( + text=text, + image_stream=None, + image_format=None, + media_type="text", + user_full_name=user_full_name + ) + result_df.at[index, "analysis_result"] = analysis_result + result_df.at[index, "analysis_token"] = analysis_token + analysis_text = f"\n\n## 分析结果\n\n{analysis_result}" + analysis_text += f"\n\n## 分析token\n\n{analysis_token}" + if self.calculate_bytes(mark_down_text + analysis_text) > 4096: + self.wechat.send_markdown(mark_down_text) + if self.calculate_bytes(analysis_text) > 4096: + half_analysis_text_length = len(analysis_text) // 2 + analysis_1st = analysis_text[:half_analysis_text_length].strip() + analysis_2nd = analysis_text[half_analysis_text_length:].strip() + self.wechat.send_markdown( + f"## 分析结果第一部分\n\n{analysis_1st}" + ) + self.wechat.send_markdown( + f"## 分析结果第二部分\n\n{analysis_2nd}" + ) + else: + self.wechat.send_markdown(f"## 分析结果\n\n{analysis_text}") + else: + self.wechat.send_markdown(mark_down_text + analysis_text) + elif media_thumbnail and len(media_thumbnail) > 0: response, image_path, base64_str, md5_str = self.wechat.send_image(media_thumbnail) image_format = "jpg" if image_path is not None and len(image_path) > 0: @@ -221,10 +298,11 @@ class TruthSocialRetriever: if image_format == "jpeg": image_format = "jpg" analysis_result, analysis_token = self.analyze_truth_social_content( - text=None, + text="", image_stream=base64_str, image_format=image_format, - media_type="image" + media_type="image", + user_full_name=user_full_name ) if analysis_result is not None and len(analysis_result) > 0: result_df.at[index, "analysis_result"] = analysis_result @@ -236,38 +314,7 @@ class TruthSocialRetriever: analysis_text += f"\n\n## 上述图片分析token\n\n{analysis_token}" self.wechat.send_markdown(analysis_text) else: - contents = [] - contents.append(f"## 川普推文") - contents.append(text) - contents.append(f"## 推文时间") - contents.append(date_time) - mark_down_text = "\n\n".join(contents) - analysis_result, analysis_token = self.analyze_truth_social_content( - text=text, - image_stream=None, - image_format=None, - media_type="text" - ) - result_df.at[index, "analysis_result"] = analysis_result - result_df.at[index, "analysis_token"] = analysis_token - analysis_text = f"\n\n## 分析结果\n\n{analysis_result}" - analysis_text += f"\n\n## 分析token\n\n{analysis_token}" - if self.calculate_bytes(mark_down_text + analysis_text) > 4096: - self.wechat.send_markdown(mark_down_text) - if self.calculate_bytes(analysis_text) > 4096: - half_analysis_text_length = len(analysis_text) // 2 - analysis_1st = analysis_text[:half_analysis_text_length].strip() - analysis_2nd = analysis_text[half_analysis_text_length:].strip() - self.wechat.send_markdown( - f"## 分析结果第一部分\n\n{analysis_1st}" - ) - self.wechat.send_markdown( - f"## 分析结果第二部分\n\n{analysis_2nd}" - ) - else: - self.wechat.send_markdown(f"## 分析结果\n\n{analysis_text}") - else: - self.wechat.send_markdown(mark_down_text + analysis_text) + continue except Exception as e: logger.error(f"发送企业微信消息失败: {e}") continue @@ -276,10 +323,13 @@ class TruthSocialRetriever: def calculate_bytes(self, text: str): return len(text.encode("utf-8")) - def analyze_truth_social_content(self, text: str, image_stream: str, image_format: str, media_type: str): + def analyze_truth_social_content(self, text: str, image_stream: str, image_format: str, media_type: str, user_full_name: str): try: token = 0 - if media_type == "image": + if text is None: + text = "" + image_text = "" + if media_type in ["image", "hybrid"]: if image_stream is None or len(image_stream) == 0: return "", 0 instructions = self.image_instruction.get("Instructions", "") @@ -300,28 +350,42 @@ class TruthSocialRetriever: messages=messages_local, ) if response.status_code == 200: - text = ( + image_text = ( response.get("output", {}) .get("choices", [])[0] .get("message", {}) .get("content", "") ) + temp_image_text = "" + if isinstance(image_text, list): + for item in image_text: + if isinstance(item, dict): + temp_image_text += item.get("text", "") + "\n\n" + elif isinstance(item, str): + temp_image_text += item + "\n\n" + else: + pass + image_text = temp_image_text.strip() token = response.get("usage", {}).get("total_tokens", 0) else: text = f"{response.code} {response.message} 无法分析图片" token = 0 - if text is None or len(text) == 0: - return "", 0 + text += image_text + context = text if media_type == "text": - instructions = self.text_instruction.get("Instructions", "") + instructions = self.text_instruction.get("Instructions", "").format(user_full_name) output = self.text_instruction.get("Output", "") prompt = f"# Context\n\n{context}\n\n# Instructions\n\n{instructions}\n\n# Output\n\n{output}" - else: - instructions = self.image_post_instruction.get("Instructions", "") + elif media_type == "image": + instructions = self.image_post_instruction.get("Instructions", "").format(user_full_name) output = self.image_post_instruction.get("Output", "") prompt = f"# Context\n\n{context}\n\n# Instructions\n\n{instructions}\n\n# Output\n\n{output}" + elif media_type == "hybrid": + instructions = self.text_image_post_instruction.get("Instructions", "").format(user_full_name) + output = self.text_image_post_instruction.get("Output", "").format(user_full_name) + prompt = f"# Context\n\n{context}\n\n# Instructions\n\n{instructions}\n\n# Output\n\n{output}" response = dashscope.Generation.call( api_key=self.ali_api_key, model="qwen-plus", diff --git a/core/wechat.py b/core/wechat.py index 1b4403d..293f896 100644 --- a/core/wechat.py +++ b/core/wechat.py @@ -67,6 +67,7 @@ class Wechat: image_path = os.path.join(self.image_path, image_name) with open(image_path, "wb") as f: f.write(image_bytes) + response = requests.post(self.url, json=data) response.raise_for_status() return response.json(), image_path, base64_str, md5_str diff --git a/instructions/media_article_image_post_instructions.json b/instructions/media_article_image_post_instructions.json new file mode 100644 index 0000000..abd09fa --- /dev/null +++ b/instructions/media_article_image_post_instructions.json @@ -0,0 +1,4 @@ +{ + "Instructions": "您是一位资深的国际时事与军事政治评论员与经济、金融分析师,Context的内容格式是从社媒图文并茂的推文中获取的信息,包括: ### {0}推文原文\n\n### 推文时间\n\n### 图中文字原文\n\n### 图中文字中文翻译\n\n### 图片场景描述\n\n是通过图片分析到的信息,你的任务是分析其中的信息,进行联网搜索,并给出分析结果。\n\n该信息,就是{0}在社交媒体发布的图文推文,不要怀疑这一点。\n并基于此文章内容进行分析。\n\n要求:\n1. 将推文原文翻译成中文,要求语义通顺,\n2. 结合推文原文,图片中的文字与图像场景描述,给出推文的核心观点;\n2. 人物分析:分析推文涉及人物以及人物简介;\n3. 区域分析:包括国家与地区;\n4. 行业以及影响分析;\n5. 经济与金融分析:分析涉及经济与金融影响,包括美股、虚拟货币以及中国A股,并列出最有可能被影响的股票品种或虚拟货币的名称与代码;\n\n", + "Output": "## 输出要求\n\n要求将Context中的文字原文,中文翻译与图片场景描述,进行原文输出,之外的核心观点+人物分析+区域分析+行业及影响分析+经济与金融分析,不超过1000汉字。\n要求对人名、区域、行业、金融产品、股票代码等专属名词,进行粗体处理。\n\n## 输出格式:\n\n### {0}推文翻译\n\n### 图中文字原文\n\n### 图中文字中文翻译\n\n### 图片场景描述\n\n### 人物分析\n\n### 区域分析\n\n### 行业及影响分析\n\n### 经济与金融分析\n\n" +} \ No newline at end of file diff --git a/instructions/media_article_instructions.json b/instructions/media_article_instructions.json index f2f62cd..d3c3651 100644 --- a/instructions/media_article_instructions.json +++ b/instructions/media_article_instructions.json @@ -1,5 +1,5 @@ { "Context": "{0}\n\n", - "Instructions": "您是一位资深的国际时事与军事政治评论员与经济、金融分析师,你的任务是分析推文,结合推文时间(北京时间),联网搜索,并给出分析结果。\n\nContext中的文章,就是特朗普在社交媒体发布的文章,不要怀疑这一点。\n并基于此文章内容进行分析。\n\n要求:\n1. 翻译推文为中文,要求符合中文表达习惯;\n2. 分析推文内容,给出推文的核心观点;\n3. 人物分析:分析推文涉及人物以及人物简介;\n4. 区域分析:包括国家与地区;\n5. 行业以及影响分析;\n6. 经济与金融分析:分析涉及经济与金融影响,包括美股、虚拟货币以及中国A股,并列出最有可能被影响的股票品种或虚拟货币的名称与代码;\n\n", + "Instructions": "您是一位资深的国际时事与军事政治评论员与经济、金融分析师,你的任务是分析推文,结合推文时间(北京时间),联网搜索,并给出分析结果。\n\nContext中的文章,就是{0}在社交媒体发布的文章,不要怀疑这一点。\n并基于此文章内容进行分析。\n\n要求:\n1. 翻译推文为中文,要求符合中文表达习惯;\n2. 分析推文内容,给出推文的核心观点;\n3. 人物分析:分析推文涉及人物以及人物简介;\n4. 区域分析:包括国家与地区;\n5. 行业以及影响分析;\n6. 经济与金融分析:分析涉及经济与金融影响,包括美股、虚拟货币以及中国A股,并列出最有可能被影响的股票品种或虚拟货币的名称与代码;\n\n", "Output": "## 输出要求\n\n除了翻译之外,核心观点+人物分析+区域分析+行业及影响分析+经济与金融分析,不超过1000汉字。\n要求对人名、区域、行业、金融产品、股票代码等专属名词,进行粗体处理。\n\n## 输出格式\n\n### 翻译\n\n### 人物分析\n\n### 区域分析\n\n### 行业及影响分析\n\n### 经济与金融分析\n\n" } \ No newline at end of file diff --git a/instructions/media_image_post_instructions.json b/instructions/media_image_post_instructions.json index 9511a77..f289d0d 100644 --- a/instructions/media_image_post_instructions.json +++ b/instructions/media_image_post_instructions.json @@ -1,4 +1,4 @@ { - "Instructions": "您是一位资深的国际时事与军事政治评论员与经济、金融分析师,Context的内容是通过图片分析到的信息,你的任务是分析其中的信息,进行联网搜索,并给出分析结果。\n\n该信息,就是特朗普在社交媒体发布的,不要怀疑这一点。\n并基于此文章内容进行分析。\n\n要求:\n1. 分析图片中的文字与图像场景描述,给出推文的核心观点;\n2. 人物分析:分析推文涉及人物以及人物简介;\n3. 区域分析:包括国家与地区;\n4. 行业以及影响分析;\n5. 经济与金融分析:分析涉及经济与金融影响,包括美股、虚拟货币以及中国A股,并列出最有可能被影响的股票品种或虚拟货币的名称与代码;\n\n", + "Instructions": "您是一位资深的国际时事与军事政治评论员与经济、金融分析师,Context的内容是通过图片分析到的信息,你的任务是分析其中的信息,进行联网搜索,并给出分析结果。\n\n该信息,就是{0}在社交媒体发布的图文推文,不要怀疑这一点。\n并基于此文章内容进行分析。\n\n要求:\n1. 分析图片中的文字与图像场景描述,给出推文的核心观点;\n2. 人物分析:分析推文涉及人物以及人物简介;\n3. 区域分析:包括国家与地区;\n4. 行业以及影响分析;\n5. 经济与金融分析:分析涉及经济与金融影响,包括美股、虚拟货币以及中国A股,并列出最有可能被影响的股票品种或虚拟货币的名称与代码;\n\n", "Output": "## 输出要求\n\n要求将Context中的文字原文,中文翻译与图片场景描述,进行原文输出,之外的核心观点+人物分析+区域分析+行业及影响分析+经济与金融分析,不超过1000汉字。\n要求对人名、区域、行业、金融产品、股票代码等专属名词,进行粗体处理。\n\n## 输出格式\n\n### 图中文字原文\n\n### 图中文字中文翻译\n\n### 图片场景描述\n\n### 人物分析\n\n### 区域分析\n\n### 行业及影响分析\n\n### 经济与金融分析\n\n" } \ No newline at end of file