crypto_quant/core/media/twitter_retriever.py

118 lines
5.2 KiB
Python
Raw Permalink Normal View History

2025-10-11 10:05:11 +00:00
import core.logger as logging
from core.db.db_twitter_content import DBTwitterContent
from config import TWITTER_CONFIG, COIN_MYSQL_CONFIG
import os
import json
import requests
import time
from datetime import datetime
import pytz
import pandas as pd
logger = logging.logger
class TwitterRetriever:
2025-10-11 10:27:54 +00:00
"""
免费版本的账号每个月只能获取100条推文
需要使用付费版本的账号基础版每个月可以获取15000条推文,200美元/
高级版每个月可以获取1000000条推文,5000美元/
"""
2025-10-11 10:05:11 +00:00
def __init__(self):
2025-10-27 09:29:30 +00:00
self.api_key = TWITTER_CONFIG["keys"]["api_key"]
self.base_url = TWITTER_CONFIG["base_url"]
2025-10-11 10:05:11 +00:00
self.monitor_account_list = TWITTER_CONFIG["monitor_accounts"]
2025-10-27 09:29:30 +00:00
self.limit = 20
2025-10-11 10:05:11 +00:00
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_twitter_content = DBTwitterContent(self.db_url)
2025-10-27 09:29:30 +00:00
self.save_path = r"./output/media/twitter/"
os.makedirs(self.save_path, exist_ok=True)
2025-10-11 10:05:11 +00:00
2025-10-27 09:29:30 +00:00
def search_contents(self, username: str):
logger.info(f"Searching contents for user: {username}")
headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json"
}
url = self.base_url.format(username)
response = requests.get(url, headers=headers)
2025-10-11 10:05:11 +00:00
if response.status_code == 200:
2025-10-27 09:29:30 +00:00
tweets = response.json() # 假设响应是推文数组
logger.info(f"获取到 {len(tweets["tweets"])} 条推文")
# for tweet in tweets:
# created_at = self.transform_datetime(tweet.get('created_at', ''))
# logger.info(f"- ID: {tweet.get('id')}")
# logger.info(f" 文本: {tweet.get('text', '')[:100]}...") # 截取前100字符
# logger.info(f" 时间: {created_at}")
# logger.info(f" 点赞: {tweet.get('likes', 0)}, 转发: {tweet.get('retweets', 0)}")
# if tweet.get('media'):
# logger.info(f" 媒体: {tweet['media'][0].get('url', 'N/A')}")
# logger.info("---")
return tweets
2025-10-11 10:05:11 +00:00
else:
2025-10-27 09:29:30 +00:00
logger.error(f"请求失败: {response.status_code} - {response.text}")
2025-10-11 10:05:11 +00:00
return None
def monitor_accounts(self):
2025-10-11 10:24:29 +00:00
for account_dict in self.monitor_account_list:
user_name = account_dict["name"]
logger.info(f"Monitoring account: {user_name}")
2025-10-27 09:29:30 +00:00
2025-10-11 10:05:11 +00:00
result_list = []
2025-10-27 09:29:30 +00:00
contents = self.search_contents(user_name)
2025-10-11 10:05:11 +00:00
if contents is None:
continue
2025-10-27 09:29:30 +00:00
user_twitter_path = os.path.join(self.save_path, user_name)
os.makedirs(user_twitter_path, exist_ok=True)
for content in contents["tweets"]:
content_path = os.path.join(user_twitter_path, f"{content["rest_id"]}.json")
with open(content_path, "w", encoding="utf-8") as f:
json.dump(content, f, ensure_ascii=False, indent=4)
# for content in contents:
# datetime_text = content["created_at"]
# datetime_dict = self.transform_datetime(datetime_text)
# timestamp_ms = datetime_dict["timestamp_ms"]
# beijing_time_str = datetime_dict["beijing_time_str"]
# text = content["text"]
# result = {
# "user_id": content["id"],
# "user_name": user_name,
# "timestamp": timestamp_ms,
# "date_time": beijing_time_str,
# "text": text
# }
# result_list.append(result)
# if len(result_list) > 0:
# result_df = pd.DataFrame(result_list)
# self.db_twitter_content.insert_data_to_mysql(result_df)
# logger.info(f"Inserted {len(result_df)} rows into twitter_content")
# else:
# logger.warning(f"No data inserted for account: {user_name}")
2025-10-11 10:05:11 +00:00
def transform_datetime(self, datetime_text: str):
utc_time = datetime.strptime(datetime_text, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=pytz.UTC)
# 1. 转换为时间戳(毫秒)
timestamp_ms = int(utc_time.timestamp() * 1000)
# 2. 转换为北京时间ISO 8601 格式,带 +08:00
beijing_tz = pytz.timezone("Asia/Shanghai")
beijing_time = utc_time.astimezone(beijing_tz)
beijing_time_str = beijing_time.strftime("%Y-%m-%dT%H:%M:%S%z")
# 插入冒号到时区偏移(如 +0800 -> +08:00
beijing_time_str = beijing_time_str[:-2] + ":" + beijing_time_str[-2:]
result = {
"timestamp_ms": timestamp_ms,
"beijing_time_str": beijing_time_str
}
return result