crypto_quant/core/media/twitter_retriever.py

118 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import core.logger as logging
from core.db.db_twitter_content import DBTwitterContent
from config import TWITTER_CONFIG, COIN_MYSQL_CONFIG
import os
import json
import requests
import time
from datetime import datetime
import pytz
import pandas as pd
logger = logging.logger
class TwitterRetriever:
"""
免费版本的账号每个月只能获取100条推文
需要使用付费版本的账号基础版每个月可以获取15000条推文,200美元/月
高级版每个月可以获取1000000条推文,5000美元/月
"""
def __init__(self):
self.api_key = TWITTER_CONFIG["keys"]["api_key"]
self.base_url = TWITTER_CONFIG["base_url"]
self.monitor_account_list = TWITTER_CONFIG["monitor_accounts"]
self.limit = 20
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_twitter_content = DBTwitterContent(self.db_url)
self.save_path = r"./output/media/twitter/"
os.makedirs(self.save_path, exist_ok=True)
def search_contents(self, username: str):
logger.info(f"Searching contents for user: {username}")
headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json"
}
url = self.base_url.format(username)
response = requests.get(url, headers=headers)
if response.status_code == 200:
tweets = response.json() # 假设响应是推文数组
logger.info(f"获取到 {len(tweets["tweets"])} 条推文")
# for tweet in tweets:
# created_at = self.transform_datetime(tweet.get('created_at', ''))
# logger.info(f"- ID: {tweet.get('id')}")
# logger.info(f" 文本: {tweet.get('text', '')[:100]}...") # 截取前100字符
# logger.info(f" 时间: {created_at}")
# logger.info(f" 点赞: {tweet.get('likes', 0)}, 转发: {tweet.get('retweets', 0)}")
# if tweet.get('media'):
# logger.info(f" 媒体: {tweet['media'][0].get('url', 'N/A')}")
# logger.info("---")
return tweets
else:
logger.error(f"请求失败: {response.status_code} - {response.text}")
return None
def monitor_accounts(self):
for account_dict in self.monitor_account_list:
user_name = account_dict["name"]
logger.info(f"Monitoring account: {user_name}")
result_list = []
contents = self.search_contents(user_name)
if contents is None:
continue
user_twitter_path = os.path.join(self.save_path, user_name)
os.makedirs(user_twitter_path, exist_ok=True)
for content in contents["tweets"]:
content_path = os.path.join(user_twitter_path, f"{content["rest_id"]}.json")
with open(content_path, "w", encoding="utf-8") as f:
json.dump(content, f, ensure_ascii=False, indent=4)
# for content in contents:
# datetime_text = content["created_at"]
# datetime_dict = self.transform_datetime(datetime_text)
# timestamp_ms = datetime_dict["timestamp_ms"]
# beijing_time_str = datetime_dict["beijing_time_str"]
# text = content["text"]
# result = {
# "user_id": content["id"],
# "user_name": user_name,
# "timestamp": timestamp_ms,
# "date_time": beijing_time_str,
# "text": text
# }
# result_list.append(result)
# if len(result_list) > 0:
# result_df = pd.DataFrame(result_list)
# self.db_twitter_content.insert_data_to_mysql(result_df)
# logger.info(f"Inserted {len(result_df)} rows into twitter_content")
# else:
# logger.warning(f"No data inserted for account: {user_name}")
def transform_datetime(self, datetime_text: str):
utc_time = datetime.strptime(datetime_text, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=pytz.UTC)
# 1. 转换为时间戳(毫秒)
timestamp_ms = int(utc_time.timestamp() * 1000)
# 2. 转换为北京时间ISO 8601 格式,带 +08:00
beijing_tz = pytz.timezone("Asia/Shanghai")
beijing_time = utc_time.astimezone(beijing_tz)
beijing_time_str = beijing_time.strftime("%Y-%m-%dT%H:%M:%S%z")
# 插入冒号到时区偏移(如 +0800 -> +08:00
beijing_time_str = beijing_time_str[:-2] + ":" + beijing_time_str[-2:]
result = {
"timestamp_ms": timestamp_ms,
"beijing_time_str": beijing_time_str
}
return result