diff --git a/config.py b/config.py index 522012e..11dbeba 100644 --- a/config.py +++ b/config.py @@ -213,3 +213,21 @@ WECHAT_CONFIG = { } ITICK_API_KEY = "dfd4bc0caed148d6bc03b960224754ffb5356349e389431f828702b3a27e8a2b" + +TWITTER_CONFIG = { + "keys": { + "api_key": "c3l344o8pgVwy7Aw4yxj7CprT", + "api_secret": "xjh3RVyyhVr9aDVSq5fFq210R1fmwYt36myBZR7ifuv0wYRWcT", + "bearer_token": "AAAAAAAAAAAAAAAAAAAAAPoL4wEAAAAAXEMHlBpeR66dtTYWBkFSz1Fp3oI%3DHMoLlCMKNRGr1h6c0lBpZnJulx88fQ0JzZE1zm4jI4qNfSxiRZ", + "access_token": "1658642847975784449-MR79EAOk8MTKx3zIbCEySaQjxDPK3R", + "access_token_secret": "2H9RwHzBrWhAbt7RUmGHSg6mcfJEf0Aesx74QFFMeMYMn" + }, + "user_search_url": "https://api.twitter.com/2/users/by/username/{0}", + "contents_search_url": "https://api.twitter.com/2/users/{0}/tweets?max_results=100&tweet.fields=text,created_at&exclude=replies,retweets", + "monitor_accounts": [ + "FoxNews", + "WhiteHouse", + "sama", + "PressSec", + ], +} diff --git a/core/db/db_twitter_content.py b/core/db/db_twitter_content.py new file mode 100644 index 0000000..574546c --- /dev/null +++ b/core/db/db_twitter_content.py @@ -0,0 +1,290 @@ +import pandas as pd +import core.logger as logging +from core.db.db_manager import DBData +from core.utils import get_current_date_time + +logger = logging.logger + + +class DBTwitterContent: + def __init__(self, db_url: str): + self.db_url = db_url + self.table_name = "twitter_content" + self.columns = [ + "user_id", + "user_name", + "timestamp", + "date_time", + "text" + ] + self.db_manager = DBData(db_url, self.table_name, self.columns) + + def insert_data_to_mysql(self, df: pd.DataFrame): + """ + 将Twitter内容数据保存到MySQL的twitter_content表 + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 适用场景:中小数据量(<10万条) + :param df: Twitter内容数据DataFrame + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql(df) + + def insert_data_to_mysql_fast(self, df: pd.DataFrame): + """ + 快速插入Twitter内容数据(方案2:使用executemany批量插入) + 速度:⭐⭐⭐⭐ 很快 + 内存:⭐⭐⭐⭐⭐ 低 + 适用场景:中等数据量 + :param df: Twitter内容数据DataFrame + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_fast(df) + + def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000): + """ + 分块插入Twitter内容数据(方案3:适合大数据量) + 速度:⭐⭐⭐ 中等 + 内存:⭐⭐⭐⭐⭐ 最低 + 适用场景:大数据量(>10万条) + :param df: Twitter内容数据DataFrame + :param chunk_size: 分块大小 + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) + + def insert_data_to_mysql_simple(self, df: pd.DataFrame): + """ + 简单插入Twitter内容数据(方案4:直接使用to_sql,忽略重复) + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 注意:会抛出重复键错误,需要额外处理 + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_simple(df) + + def query_latest_data(self, user_id: str = None): + """ + 查询最新数据 + :param user_id: 用户ID,如果为None则查询所有用户的最新数据 + """ + if user_id: + sql = """ + SELECT * FROM twitter_content + WHERE user_id = :user_id + ORDER BY timestamp DESC + LIMIT 1 + """ + condition_dict = {"user_id": user_id} + else: + sql = """ + SELECT * FROM twitter_content + ORDER BY timestamp DESC + LIMIT 1 + """ + condition_dict = {} + + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def query_data_by_user_id(self, user_id: str, limit: int = 100): + """ + 根据用户ID查询数据 + :param user_id: 用户ID + :param limit: 查询数量 + """ + sql = """ + SELECT * FROM twitter_content + WHERE user_id = :user_id + ORDER BY timestamp DESC + LIMIT :limit + """ + condition_dict = {"user_id": user_id, "limit": limit} + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_data_by_timestamp_range( + self, + start_timestamp: int = None, + end_timestamp: int = None, + user_id: str = None, + limit: int = 1000 + ): + """ + 根据时间戳范围查询数据 + :param start_timestamp: 开始时间戳 + :param end_timestamp: 结束时间戳 + :param user_id: 用户ID,可选 + :param limit: 查询数量 + """ + conditions = [] + condition_dict = {"limit": limit} + + if start_timestamp: + conditions.append("timestamp >= :start_timestamp") + condition_dict["start_timestamp"] = start_timestamp + + if end_timestamp: + conditions.append("timestamp <= :end_timestamp") + condition_dict["end_timestamp"] = end_timestamp + + if user_id: + conditions.append("user_id = :user_id") + condition_dict["user_id"] = user_id + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + sql = f""" + SELECT * FROM twitter_content + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT :limit + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_data_by_text_search( + self, + search_text: str, + user_id: str = None, + limit: int = 100 + ): + """ + 根据文本内容搜索数据 + :param search_text: 搜索文本 + :param user_id: 用户ID,可选 + :param limit: 查询数量 + """ + conditions = ["text LIKE :search_text"] + condition_dict = { + "search_text": f"%{search_text}%", + "limit": limit + } + + if user_id: + conditions.append("user_id = :user_id") + condition_dict["user_id"] = user_id + + where_clause = " AND ".join(conditions) + + sql = f""" + SELECT * FROM twitter_content + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT :limit + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_data_by_date_range( + self, + start_date: str = None, + end_date: str = None, + user_id: str = None, + limit: int = 1000 + ): + """ + 根据日期范围查询数据 + :param start_date: 开始日期 (YYYY-MM-DD) + :param end_date: 结束日期 (YYYY-MM-DD) + :param user_id: 用户ID,可选 + :param limit: 查询数量 + """ + conditions = [] + condition_dict = {"limit": limit} + + if start_date: + conditions.append("date_time >= :start_date") + condition_dict["start_date"] = start_date + + if end_date: + conditions.append("date_time <= :end_date") + condition_dict["end_date"] = end_date + + if user_id: + conditions.append("user_id = :user_id") + condition_dict["user_id"] = user_id + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + sql = f""" + SELECT * FROM twitter_content + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT :limit + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def get_user_list(self, limit: int = 100): + """ + 获取用户列表 + :param limit: 查询数量 + """ + sql = """ + SELECT DISTINCT user_id, user_name, + COUNT(*) as tweet_count, + MAX(timestamp) as last_tweet_time + FROM twitter_content + GROUP BY user_id, user_name + ORDER BY last_tweet_time DESC + LIMIT :limit + """ + condition_dict = {"limit": limit} + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def get_statistics(self): + """ + 获取统计信息 + """ + sql = """ + SELECT + COUNT(*) as total_tweets, + COUNT(DISTINCT user_id) as total_users, + MIN(timestamp) as earliest_tweet, + MAX(timestamp) as latest_tweet, + AVG(LENGTH(text)) as avg_text_length + FROM twitter_content + """ + return self.db_manager.query_data(sql, {}, return_multi=False) + + def delete_old_data(self, days: int = 30): + """ + 删除指定天数前的旧数据 + :param days: 保留天数 + """ + current_time = get_current_date_time() + cutoff_timestamp = int(pd.Timestamp(current_time).timestamp()) - (days * 24 * 60 * 60) + + sql = """ + DELETE FROM twitter_content + WHERE timestamp < :cutoff_timestamp + """ + condition_dict = {"cutoff_timestamp": cutoff_timestamp} + + return self.db_manager.execute_sql(sql, condition_dict) + + def check_duplicate(self, user_id: str, timestamp: int): + """ + 检查是否存在重复数据 + :param user_id: 用户ID + :param timestamp: 时间戳 + """ + sql = """ + SELECT COUNT(*) as count + FROM twitter_content + WHERE user_id = :user_id AND timestamp = :timestamp + """ + condition_dict = {"user_id": user_id, "timestamp": timestamp} + result = self.db_manager.query_data(sql, condition_dict, return_multi=False) + return result['count'] > 0 if result else False diff --git a/core/twitter/__pycache__/twitter_retriever.cpython-312.pyc b/core/twitter/__pycache__/twitter_retriever.cpython-312.pyc new file mode 100644 index 0000000..8273c62 Binary files /dev/null and b/core/twitter/__pycache__/twitter_retriever.cpython-312.pyc differ diff --git a/core/twitter/twitter_retriever.py b/core/twitter/twitter_retriever.py new file mode 100644 index 0000000..1e73390 --- /dev/null +++ b/core/twitter/twitter_retriever.py @@ -0,0 +1,106 @@ +import core.logger as logging +from core.db.db_twitter_content import DBTwitterContent +from config import TWITTER_CONFIG, COIN_MYSQL_CONFIG +import os +import json +import requests +import time +from datetime import datetime +import pytz +import pandas as pd + +logger = logging.logger + +class TwitterRetriever: + def __init__(self): + self.keys = TWITTER_CONFIG["keys"] + self.headers = { + "Authorization": f"Bearer {self.keys['bearer_token']}" + } + self.user_search_url = TWITTER_CONFIG["user_search_url"] + self.contents_search_url = TWITTER_CONFIG["contents_search_url"] + self.monitor_account_list = TWITTER_CONFIG["monitor_accounts"] + + mysql_user = COIN_MYSQL_CONFIG.get("user", "xch") + mysql_password = COIN_MYSQL_CONFIG.get("password", "") + if not mysql_password: + raise ValueError("MySQL password is not set") + mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost") + mysql_port = COIN_MYSQL_CONFIG.get("port", 3306) + mysql_database = COIN_MYSQL_CONFIG.get("database", "okx") + + self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" + self.db_twitter_content = DBTwitterContent(self.db_url) + self.sleep_time = 15 * 60 + 10 + + def search_user(self, username): + url = self.user_search_url.format(username) + response = requests.get(url, headers=self.headers) + if response.status_code == 200: + return response.json() + else: + logger.error(f"Failed to search user: {username}") + return None + + def search_contents(self, username: str, user_id: str): + logger.info(f"Searching contents for user: {user_id}") + url = self.contents_search_url.format(user_id) + response = requests.get(url, headers=self.headers) + if response.status_code == 200: + return response.json() + else: + logger.error(f"Failed to search contents for user: {user_id}") + return None + + def monitor_accounts(self): + for account in self.monitor_account_list: + logger.info(f"Monitoring account: {account}") + logger.info(f"Sleeping for {self.sleep_time} seconds") + time.sleep(self.sleep_time) + result_list = [] + user = self.search_user(account) + if user is None: + continue + username = user["data"]["username"] + user_id = str(user["data"]["id"]) + contents = self.search_contents(username, user_id) + if contents is None: + continue + twitter_contents = contents["data"] + for content in twitter_contents: + datetime_text = content["created_at"] + datetime_dict = self.transform_datetime(datetime_text) + timestamp_ms = datetime_dict["timestamp_ms"] + beijing_time_str = datetime_dict["beijing_time_str"] + text = content["text"] + result = { + "user_id": user_id, + "user_name": username, + "timestamp": timestamp_ms, + "date_time": beijing_time_str, + "text": text + } + result_list.append(result) + if len(result_list) > 0: + result_df = pd.DataFrame(result_list) + self.db_twitter_content.insert_data_to_mysql(result_df) + logger.info(f"Inserted {len(result_df)} rows into twitter_content") + else: + logger.warning(f"No data inserted for account: {account}") + + def transform_datetime(self, datetime_text: str): + utc_time = datetime.strptime(datetime_text, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=pytz.UTC) + + # 1. 转换为时间戳(毫秒) + timestamp_ms = int(utc_time.timestamp() * 1000) + # 2. 转换为北京时间(ISO 8601 格式,带 +08:00) + beijing_tz = pytz.timezone("Asia/Shanghai") + beijing_time = utc_time.astimezone(beijing_tz) + beijing_time_str = beijing_time.strftime("%Y-%m-%dT%H:%M:%S%z") + # 插入冒号到时区偏移(如 +0800 -> +08:00) + beijing_time_str = beijing_time_str[:-2] + ":" + beijing_time_str[-2:] + result = { + "timestamp_ms": timestamp_ms, + "beijing_time_str": beijing_time_str + } + return result \ No newline at end of file diff --git a/sql/table/twitter_content.sql b/sql/table/twitter_content.sql new file mode 100644 index 0000000..ad4e14a --- /dev/null +++ b/sql/table/twitter_content.sql @@ -0,0 +1,10 @@ +CREATE TABLE `twitter_content` ( + `user_id` VARCHAR(50) NOT NULL, + `user_name` VARCHAR(100) NOT NULL, + `timestamp` BIGINT NOT NULL, + `date_time` VARCHAR(50) NOT NULL, + `text` TEXT NOT NULL, + `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY `uk_user_timestamp` (`user_id`, `timestamp`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; diff --git a/twitter_retriever_main.py b/twitter_retriever_main.py new file mode 100644 index 0000000..0196c37 --- /dev/null +++ b/twitter_retriever_main.py @@ -0,0 +1,13 @@ +from core.twitter.twitter_retriever import TwitterRetriever +import core.logger as logging +import os + +logger = logging.logger + +def main(): + twitter_retriever = TwitterRetriever() + twitter_retriever.monitor_accounts() + + +if __name__ == "__main__": + main() \ No newline at end of file