crypto_quant/core/twitter/twitter_retriever.py

106 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import core.logger as logging
from core.db.db_twitter_content import DBTwitterContent
from config import TWITTER_CONFIG, COIN_MYSQL_CONFIG
import os
import json
import requests
import time
from datetime import datetime
import pytz
import pandas as pd
logger = logging.logger
class TwitterRetriever:
def __init__(self):
self.keys = TWITTER_CONFIG["keys"]
self.headers = {
"Authorization": f"Bearer {self.keys['bearer_token']}"
}
self.user_search_url = TWITTER_CONFIG["user_search_url"]
self.contents_search_url = TWITTER_CONFIG["contents_search_url"]
self.monitor_account_list = TWITTER_CONFIG["monitor_accounts"]
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_twitter_content = DBTwitterContent(self.db_url)
self.sleep_time = 15 * 60 + 10
def search_user(self, username):
url = self.user_search_url.format(username)
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Failed to search user: {username}")
return None
def search_contents(self, username: str, user_id: str):
logger.info(f"Searching contents for user: {user_id}")
url = self.contents_search_url.format(user_id)
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Failed to search contents for user: {user_id}")
return None
def monitor_accounts(self):
for account in self.monitor_account_list:
logger.info(f"Monitoring account: {account}")
logger.info(f"Sleeping for {self.sleep_time} seconds")
time.sleep(self.sleep_time)
result_list = []
user = self.search_user(account)
if user is None:
continue
username = user["data"]["username"]
user_id = str(user["data"]["id"])
contents = self.search_contents(username, user_id)
if contents is None:
continue
twitter_contents = contents["data"]
for content in twitter_contents:
datetime_text = content["created_at"]
datetime_dict = self.transform_datetime(datetime_text)
timestamp_ms = datetime_dict["timestamp_ms"]
beijing_time_str = datetime_dict["beijing_time_str"]
text = content["text"]
result = {
"user_id": user_id,
"user_name": username,
"timestamp": timestamp_ms,
"date_time": beijing_time_str,
"text": text
}
result_list.append(result)
if len(result_list) > 0:
result_df = pd.DataFrame(result_list)
self.db_twitter_content.insert_data_to_mysql(result_df)
logger.info(f"Inserted {len(result_df)} rows into twitter_content")
else:
logger.warning(f"No data inserted for account: {account}")
def transform_datetime(self, datetime_text: str):
utc_time = datetime.strptime(datetime_text, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=pytz.UTC)
# 1. 转换为时间戳(毫秒)
timestamp_ms = int(utc_time.timestamp() * 1000)
# 2. 转换为北京时间ISO 8601 格式,带 +08:00
beijing_tz = pytz.timezone("Asia/Shanghai")
beijing_time = utc_time.astimezone(beijing_tz)
beijing_time_str = beijing_time.strftime("%Y-%m-%dT%H:%M:%S%z")
# 插入冒号到时区偏移(如 +0800 -> +08:00
beijing_time_str = beijing_time_str[:-2] + ":" + beijing_time_str[-2:]
result = {
"timestamp_ms": timestamp_ms,
"beijing_time_str": beijing_time_str
}
return result