2025-10-11 10:05:11 +00:00
|
|
|
|
import core.logger as logging
|
|
|
|
|
|
from core.db.db_twitter_content import DBTwitterContent
|
|
|
|
|
|
from config import TWITTER_CONFIG, COIN_MYSQL_CONFIG
|
|
|
|
|
|
import os
|
|
|
|
|
|
import json
|
|
|
|
|
|
import requests
|
|
|
|
|
|
import time
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
import pytz
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.logger
|
|
|
|
|
|
|
|
|
|
|
|
class TwitterRetriever:
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
self.keys = TWITTER_CONFIG["keys"]
|
|
|
|
|
|
self.headers = {
|
|
|
|
|
|
"Authorization": f"Bearer {self.keys['bearer_token']}"
|
|
|
|
|
|
}
|
|
|
|
|
|
self.user_search_url = TWITTER_CONFIG["user_search_url"]
|
|
|
|
|
|
self.contents_search_url = TWITTER_CONFIG["contents_search_url"]
|
|
|
|
|
|
self.monitor_account_list = TWITTER_CONFIG["monitor_accounts"]
|
|
|
|
|
|
|
|
|
|
|
|
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
|
|
|
|
|
|
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
|
|
|
|
|
|
if not mysql_password:
|
|
|
|
|
|
raise ValueError("MySQL password is not set")
|
|
|
|
|
|
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
|
|
|
|
|
|
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
|
|
|
|
|
|
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
|
|
|
|
|
|
|
|
|
|
|
|
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
|
|
|
|
|
|
self.db_twitter_content = DBTwitterContent(self.db_url)
|
|
|
|
|
|
self.sleep_time = 15 * 60 + 10
|
|
|
|
|
|
|
|
|
|
|
|
def search_user(self, username):
|
|
|
|
|
|
url = self.user_search_url.format(username)
|
|
|
|
|
|
response = requests.get(url, headers=self.headers)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
return response.json()
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(f"Failed to search user: {username}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def search_contents(self, username: str, user_id: str):
|
|
|
|
|
|
logger.info(f"Searching contents for user: {user_id}")
|
|
|
|
|
|
url = self.contents_search_url.format(user_id)
|
|
|
|
|
|
response = requests.get(url, headers=self.headers)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
return response.json()
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.error(f"Failed to search contents for user: {user_id}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def monitor_accounts(self):
|
2025-10-11 10:24:29 +00:00
|
|
|
|
for account_dict in self.monitor_account_list:
|
|
|
|
|
|
user_name = account_dict["name"]
|
|
|
|
|
|
user_id = account_dict["id"]
|
|
|
|
|
|
logger.info(f"Monitoring account: {user_name}")
|
2025-10-11 10:05:11 +00:00
|
|
|
|
logger.info(f"Sleeping for {self.sleep_time} seconds")
|
2025-10-11 10:24:29 +00:00
|
|
|
|
# time.sleep(self.sleep_time)
|
2025-10-11 10:05:11 +00:00
|
|
|
|
result_list = []
|
2025-10-11 10:24:29 +00:00
|
|
|
|
if user_id is None or user_id == "":
|
|
|
|
|
|
user = self.search_user(user_name)
|
|
|
|
|
|
if user is None:
|
|
|
|
|
|
continue
|
|
|
|
|
|
user_id = str(user["data"]["id"])
|
|
|
|
|
|
contents = self.search_contents(user_name, user_id)
|
2025-10-11 10:05:11 +00:00
|
|
|
|
if contents is None:
|
|
|
|
|
|
continue
|
|
|
|
|
|
twitter_contents = contents["data"]
|
|
|
|
|
|
for content in twitter_contents:
|
|
|
|
|
|
datetime_text = content["created_at"]
|
|
|
|
|
|
datetime_dict = self.transform_datetime(datetime_text)
|
|
|
|
|
|
timestamp_ms = datetime_dict["timestamp_ms"]
|
|
|
|
|
|
beijing_time_str = datetime_dict["beijing_time_str"]
|
|
|
|
|
|
text = content["text"]
|
|
|
|
|
|
result = {
|
|
|
|
|
|
"user_id": user_id,
|
2025-10-11 10:24:29 +00:00
|
|
|
|
"user_name": user_name,
|
2025-10-11 10:05:11 +00:00
|
|
|
|
"timestamp": timestamp_ms,
|
|
|
|
|
|
"date_time": beijing_time_str,
|
|
|
|
|
|
"text": text
|
|
|
|
|
|
}
|
|
|
|
|
|
result_list.append(result)
|
|
|
|
|
|
if len(result_list) > 0:
|
|
|
|
|
|
result_df = pd.DataFrame(result_list)
|
|
|
|
|
|
self.db_twitter_content.insert_data_to_mysql(result_df)
|
|
|
|
|
|
logger.info(f"Inserted {len(result_df)} rows into twitter_content")
|
|
|
|
|
|
else:
|
2025-10-11 10:24:29 +00:00
|
|
|
|
logger.warning(f"No data inserted for account: {user_name}")
|
2025-10-11 10:05:11 +00:00
|
|
|
|
|
|
|
|
|
|
def transform_datetime(self, datetime_text: str):
|
|
|
|
|
|
utc_time = datetime.strptime(datetime_text, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=pytz.UTC)
|
|
|
|
|
|
|
|
|
|
|
|
# 1. 转换为时间戳(毫秒)
|
|
|
|
|
|
timestamp_ms = int(utc_time.timestamp() * 1000)
|
|
|
|
|
|
# 2. 转换为北京时间(ISO 8601 格式,带 +08:00)
|
|
|
|
|
|
beijing_tz = pytz.timezone("Asia/Shanghai")
|
|
|
|
|
|
beijing_time = utc_time.astimezone(beijing_tz)
|
|
|
|
|
|
beijing_time_str = beijing_time.strftime("%Y-%m-%dT%H:%M:%S%z")
|
|
|
|
|
|
# 插入冒号到时区偏移(如 +0800 -> +08:00)
|
|
|
|
|
|
beijing_time_str = beijing_time_str[:-2] + ":" + beijing_time_str[-2:]
|
|
|
|
|
|
result = {
|
|
|
|
|
|
"timestamp_ms": timestamp_ms,
|
|
|
|
|
|
"beijing_time_str": beijing_time_str
|
|
|
|
|
|
}
|
|
|
|
|
|
return result
|