From 16ce41545e27648c45e9ff925af9222ff433126e Mon Sep 17 00:00:00 2001 From: blade <8019068@qq.com> Date: Sat, 11 Oct 2025 18:05:11 +0800 Subject: [PATCH] support fetch twitter articles --- config.py | 18 ++ core/db/db_twitter_content.py | 290 ++++++++++++++++++ .../twitter_retriever.cpython-312.pyc | Bin 0 -> 5956 bytes core/twitter/twitter_retriever.py | 106 +++++++ sql/table/twitter_content.sql | 10 + twitter_retriever_main.py | 13 + 6 files changed, 437 insertions(+) create mode 100644 core/db/db_twitter_content.py create mode 100644 core/twitter/__pycache__/twitter_retriever.cpython-312.pyc create mode 100644 core/twitter/twitter_retriever.py create mode 100644 sql/table/twitter_content.sql create mode 100644 twitter_retriever_main.py diff --git a/config.py b/config.py index 522012e..11dbeba 100644 --- a/config.py +++ b/config.py @@ -213,3 +213,21 @@ WECHAT_CONFIG = { } ITICK_API_KEY = "dfd4bc0caed148d6bc03b960224754ffb5356349e389431f828702b3a27e8a2b" + +TWITTER_CONFIG = { + "keys": { + "api_key": "c3l344o8pgVwy7Aw4yxj7CprT", + "api_secret": "xjh3RVyyhVr9aDVSq5fFq210R1fmwYt36myBZR7ifuv0wYRWcT", + "bearer_token": "AAAAAAAAAAAAAAAAAAAAAPoL4wEAAAAAXEMHlBpeR66dtTYWBkFSz1Fp3oI%3DHMoLlCMKNRGr1h6c0lBpZnJulx88fQ0JzZE1zm4jI4qNfSxiRZ", + "access_token": "1658642847975784449-MR79EAOk8MTKx3zIbCEySaQjxDPK3R", + "access_token_secret": "2H9RwHzBrWhAbt7RUmGHSg6mcfJEf0Aesx74QFFMeMYMn" + }, + "user_search_url": "https://api.twitter.com/2/users/by/username/{0}", + "contents_search_url": "https://api.twitter.com/2/users/{0}/tweets?max_results=100&tweet.fields=text,created_at&exclude=replies,retweets", + "monitor_accounts": [ + "FoxNews", + "WhiteHouse", + "sama", + "PressSec", + ], +} diff --git a/core/db/db_twitter_content.py b/core/db/db_twitter_content.py new file mode 100644 index 0000000..574546c --- /dev/null +++ b/core/db/db_twitter_content.py @@ -0,0 +1,290 @@ +import pandas as pd +import core.logger as logging +from core.db.db_manager import DBData +from core.utils import get_current_date_time + +logger = logging.logger + + +class DBTwitterContent: + def __init__(self, db_url: str): + self.db_url = db_url + self.table_name = "twitter_content" + self.columns = [ + "user_id", + "user_name", + "timestamp", + "date_time", + "text" + ] + self.db_manager = DBData(db_url, self.table_name, self.columns) + + def insert_data_to_mysql(self, df: pd.DataFrame): + """ + 将Twitter内容数据保存到MySQL的twitter_content表 + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 适用场景:中小数据量(<10万条) + :param df: Twitter内容数据DataFrame + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql(df) + + def insert_data_to_mysql_fast(self, df: pd.DataFrame): + """ + 快速插入Twitter内容数据(方案2:使用executemany批量插入) + 速度:⭐⭐⭐⭐ 很快 + 内存:⭐⭐⭐⭐⭐ 低 + 适用场景:中等数据量 + :param df: Twitter内容数据DataFrame + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_fast(df) + + def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000): + """ + 分块插入Twitter内容数据(方案3:适合大数据量) + 速度:⭐⭐⭐ 中等 + 内存:⭐⭐⭐⭐⭐ 最低 + 适用场景:大数据量(>10万条) + :param df: Twitter内容数据DataFrame + :param chunk_size: 分块大小 + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) + + def insert_data_to_mysql_simple(self, df: pd.DataFrame): + """ + 简单插入Twitter内容数据(方案4:直接使用to_sql,忽略重复) + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 注意:会抛出重复键错误,需要额外处理 + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_simple(df) + + def query_latest_data(self, user_id: str = None): + """ + 查询最新数据 + :param user_id: 用户ID,如果为None则查询所有用户的最新数据 + """ + if user_id: + sql = """ + SELECT * FROM twitter_content + WHERE user_id = :user_id + ORDER BY timestamp DESC + LIMIT 1 + """ + condition_dict = {"user_id": user_id} + else: + sql = """ + SELECT * FROM twitter_content + ORDER BY timestamp DESC + LIMIT 1 + """ + condition_dict = {} + + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def query_data_by_user_id(self, user_id: str, limit: int = 100): + """ + 根据用户ID查询数据 + :param user_id: 用户ID + :param limit: 查询数量 + """ + sql = """ + SELECT * FROM twitter_content + WHERE user_id = :user_id + ORDER BY timestamp DESC + LIMIT :limit + """ + condition_dict = {"user_id": user_id, "limit": limit} + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_data_by_timestamp_range( + self, + start_timestamp: int = None, + end_timestamp: int = None, + user_id: str = None, + limit: int = 1000 + ): + """ + 根据时间戳范围查询数据 + :param start_timestamp: 开始时间戳 + :param end_timestamp: 结束时间戳 + :param user_id: 用户ID,可选 + :param limit: 查询数量 + """ + conditions = [] + condition_dict = {"limit": limit} + + if start_timestamp: + conditions.append("timestamp >= :start_timestamp") + condition_dict["start_timestamp"] = start_timestamp + + if end_timestamp: + conditions.append("timestamp <= :end_timestamp") + condition_dict["end_timestamp"] = end_timestamp + + if user_id: + conditions.append("user_id = :user_id") + condition_dict["user_id"] = user_id + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + sql = f""" + SELECT * FROM twitter_content + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT :limit + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_data_by_text_search( + self, + search_text: str, + user_id: str = None, + limit: int = 100 + ): + """ + 根据文本内容搜索数据 + :param search_text: 搜索文本 + :param user_id: 用户ID,可选 + :param limit: 查询数量 + """ + conditions = ["text LIKE :search_text"] + condition_dict = { + "search_text": f"%{search_text}%", + "limit": limit + } + + if user_id: + conditions.append("user_id = :user_id") + condition_dict["user_id"] = user_id + + where_clause = " AND ".join(conditions) + + sql = f""" + SELECT * FROM twitter_content + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT :limit + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_data_by_date_range( + self, + start_date: str = None, + end_date: str = None, + user_id: str = None, + limit: int = 1000 + ): + """ + 根据日期范围查询数据 + :param start_date: 开始日期 (YYYY-MM-DD) + :param end_date: 结束日期 (YYYY-MM-DD) + :param user_id: 用户ID,可选 + :param limit: 查询数量 + """ + conditions = [] + condition_dict = {"limit": limit} + + if start_date: + conditions.append("date_time >= :start_date") + condition_dict["start_date"] = start_date + + if end_date: + conditions.append("date_time <= :end_date") + condition_dict["end_date"] = end_date + + if user_id: + conditions.append("user_id = :user_id") + condition_dict["user_id"] = user_id + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + sql = f""" + SELECT * FROM twitter_content + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT :limit + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def get_user_list(self, limit: int = 100): + """ + 获取用户列表 + :param limit: 查询数量 + """ + sql = """ + SELECT DISTINCT user_id, user_name, + COUNT(*) as tweet_count, + MAX(timestamp) as last_tweet_time + FROM twitter_content + GROUP BY user_id, user_name + ORDER BY last_tweet_time DESC + LIMIT :limit + """ + condition_dict = {"limit": limit} + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def get_statistics(self): + """ + 获取统计信息 + """ + sql = """ + SELECT + COUNT(*) as total_tweets, + COUNT(DISTINCT user_id) as total_users, + MIN(timestamp) as earliest_tweet, + MAX(timestamp) as latest_tweet, + AVG(LENGTH(text)) as avg_text_length + FROM twitter_content + """ + return self.db_manager.query_data(sql, {}, return_multi=False) + + def delete_old_data(self, days: int = 30): + """ + 删除指定天数前的旧数据 + :param days: 保留天数 + """ + current_time = get_current_date_time() + cutoff_timestamp = int(pd.Timestamp(current_time).timestamp()) - (days * 24 * 60 * 60) + + sql = """ + DELETE FROM twitter_content + WHERE timestamp < :cutoff_timestamp + """ + condition_dict = {"cutoff_timestamp": cutoff_timestamp} + + return self.db_manager.execute_sql(sql, condition_dict) + + def check_duplicate(self, user_id: str, timestamp: int): + """ + 检查是否存在重复数据 + :param user_id: 用户ID + :param timestamp: 时间戳 + """ + sql = """ + SELECT COUNT(*) as count + FROM twitter_content + WHERE user_id = :user_id AND timestamp = :timestamp + """ + condition_dict = {"user_id": user_id, "timestamp": timestamp} + result = self.db_manager.query_data(sql, condition_dict, return_multi=False) + return result['count'] > 0 if result else False diff --git a/core/twitter/__pycache__/twitter_retriever.cpython-312.pyc b/core/twitter/__pycache__/twitter_retriever.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8273c6230e5bc847c43ad0efc382d570907913cf GIT binary patch literal 5956 zcmbVQOKcm*8Q$eC-%pE@D2cK~y)09*B)@IRPg$~M`Jo_|nl`MQ4b5Fkq$!e~UBwZx zLV_X(x^Qa=Eg%I+P_>uPaDb?%`sibTUUow@WH$1nWiy;1vD_WvU!i(R6uu8B5OU8+LGGrjJY=~3y zbcj}EDsG%Ng-rA2kXfyhaZAXe>RUrrwX}t7YH1JIp`_!Ec}K`m(shQMYM(3Q<_s}| zHhsAakp^O%V2$q)tm#pyKjc{?gXXWX(qxcSJY(ml??wfIL!-%rz$JvBL2*t0bYgn? z!gXf!mC1_}>tuh!DvVAp=Xk zM}#QW06EQ4oROtD<1HE%Mu$wS30lpp8Cndi1zIeN)}T%Cl$Ld!6Ht_Ui$f8zVj_I_ z6#QP;U|Xw%pm(ukGsq&r=-RA-r9`6E@vt=3{ebw$5G7%Ret;F$j3sM$E;Uv3nM!@y zo-0>jF7+ApaZXK&xiqO$_}{uV)~@RsL<4UZzyXVibv!EV`xh^>;$&UA z73w83x>=8|OTR>S7h9+68dgd>sULPFpb8jn+`w?Z1&ne_!^w;2rK_|in4xZzH^GDfHGMxaHawGBgpZYAQYabYA_6}XL5nFN z$=q28Cxn?u5^*!7g=r;&G*}o&Eh-jnS8nLd9@{vYYZ%JdwwinM%>#1tKsNpOP_Fq* z#`RbC?yb63Y4?d--H_xM+Nv0GGS_exhE%j1$u*q#v5}*>=8=r+Y16)JOV+V5vXPLE zUU>5Alk}6gbbU&ienXmKB<4*ioRr>7NsV`42llOwLmNl_bn=fUa~)^#GTRC*do%Q} z+%Mofh=`7PP{CfMV(K>_tOAC{psY?Y9SO@lBB|ZYC4-zCs-KWL zW>6v(!>(PGSi|jo;Nc$V)zPu6@~9+E5rU4cF&!1ldJqW!k3KSJ>m>=m40`e*iCRJF z>(3xd`!0r~agGfLNw~Ne7y{VShXX;YddU>itt6Tc3yKACcNREa;L#o!rW#P}ybu-^ zc(^_+r_eDznNUpe_P2 zK@(`FTX*<&MQ<0V^EV)SmngXE@~#fq)sc4vWmhn}FXuX(p|-sK)l(~{)~W3Dr!x;{ zHoe0cOTq5V+uLM&+xp?p?RyKLaeC$SZ_Y}Nj%SX#2UqW1mHdY`PHgl_&8IgVX9`{) z>Vp~3QM@!W-T@P3Bb5%JyHE>Uw92qt%WYT^hlR?I0nULfRHc}?ssXB^{t1^TK@TCd z97n{#Uk?#wOP?5Ym!n0xW>BU|E=SjLr3#nRjmEsXs0%Dzy-RR8<79gGlBHNUA@* zX*r`h>0a5jH}C3~UH#eFoa@;C?WAX)m>(}n%@doB%bJt^h%I8oQIYMRDqF0LY-7YS zd4nSe;9oi_#&k3T3#9~ZTZJJn(Z`rq0}Fu#Q6EudBzlP(*R)X{JxeU%d1_!|1v&$^ zzDwR0M5@SqS(V)`(E^T*YSMdDZmU^eGJ@qAXe!d8QNKhc(FC^tAexG{m&~GhjJRSB4GNaeEAWeOkbV)f8*dq;gr-Uvk6RageA=v9_uPsEY3L z{4enhXT{p|QQ#S_O3zp(?@w0PY2P^`>#p`m7&AxX=;oPB*Iz60SBy0!3OgJ~JEt(U z!)+LL(iVtzaCccCx0t0BLEaE$6H`;EK#wl5kd`eR(+ge#qUOS1R4b`gzs9~Isvf{!6bonO8_PB@_}dq;%|xc zX>2sTXEGVUi-X>hI$pZIrQj~K8=J&Ph)&|H12R|?ELt(DOctHMEflO3ox)O`fWiqL ztjsfbU8<%joJw&CR-xkXLNKIQ#X1IaUPKVB5k7mRzBoM|5r&5ZqjF%yd^d~|a2i2R zF~plo>r(6x+!x|Pk%xQ0KpAiqiw2j9r)smjVlMS5PHkxnKHiEd8;uBP1Y10hGkl(k zsW_WrEzQc_;_;j+mRDpdzd-onr`vZxdDRPoS0lnNA*2Jj@cHw;1G4YH3!>e6h0Kg^ z0aLhg?~2qikiDCYN=+v=fz}q=*7~#br|yStsprIFi_|o-={UdT@I1JD@A9&+`p(Kb zn~t7BOKau|V0PZyDSJED$8+BOnX!V~n-L41CYWpO;)=L_S@sQPE^Re7uXU}x_5S{K zL2lWfYwXXAtBdKAeSO(W8?!mznarhv+b6l(@YuGVtVwPglpL)Ef8TS$>FirJ7Xm$5 zha5P%>?{Pj@`2-W;P_)_E^ra*ZTqut$Ze;V9fi&V(!o)=b4>CC3cfvSZ>_wuOh0Yg zTWAj!+WQOb2Meu%Z|$a*x@FV0i`dng_jJe}cu)Mr)4T15S-xu}Jl^G#Id|K7`{(Y? zt=2uiyZZjstUuR!blF<))aN}tvZp8SIVF2eJ$`-DGrr|%T(z&**P@?L*@;Ikspq(K z{8g#-`WK$5XVB$ZaY^mP0M|D0h3E3Oqa;4~A`^QJzF%z)KMY?B2pHtWRO|l_kz{q= zT`STFdeo}P*~=8nAm}f0K;#-RNFz%um4=9_p(9W?o!ZqTV6?6YB&v1<(lv~tu~uwh zjk*RzDEPa~)IzZ4G#m=lpx#1YdC3Gc(ZpIlG~l>3MFg#r>4xst`@84+yV>dPOT*pQ zhP$T*x^IQxpD&Y;#w;uP5112XaY++uP@#tH)7PJ(( z9QFu!T?mmP#A0wzY&uS-@&6xX%r3&l&p?OA6iI`JA~sxxmUQP1Y}g5yKgZ)gh76dV z*Pr)x$=s!uptdieAe+%KzZz_&)K&XzH4TL@2M#;IPN$)FYg zZGfW_!zea}nNP9{aV$F+{J%rI*n@guUuw`m6IjAQLq$!5kLMv#yoUI#hVTih8D`%+ zCfV;1&kbH<&+}fhv1z-`VrN^68qB(3SxJ$9) zKRyPE&&MLWH{%L~7^Ow+Vs8%Y{L7~JUycc4$F1mW@t-10ViiM@FEW4iNiYsgs{bJd zcbZb+1RLhjHQcOj 0: + result_df = pd.DataFrame(result_list) + self.db_twitter_content.insert_data_to_mysql(result_df) + logger.info(f"Inserted {len(result_df)} rows into twitter_content") + else: + logger.warning(f"No data inserted for account: {account}") + + def transform_datetime(self, datetime_text: str): + utc_time = datetime.strptime(datetime_text, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=pytz.UTC) + + # 1. 转换为时间戳(毫秒) + timestamp_ms = int(utc_time.timestamp() * 1000) + # 2. 转换为北京时间(ISO 8601 格式,带 +08:00) + beijing_tz = pytz.timezone("Asia/Shanghai") + beijing_time = utc_time.astimezone(beijing_tz) + beijing_time_str = beijing_time.strftime("%Y-%m-%dT%H:%M:%S%z") + # 插入冒号到时区偏移(如 +0800 -> +08:00) + beijing_time_str = beijing_time_str[:-2] + ":" + beijing_time_str[-2:] + result = { + "timestamp_ms": timestamp_ms, + "beijing_time_str": beijing_time_str + } + return result \ No newline at end of file diff --git a/sql/table/twitter_content.sql b/sql/table/twitter_content.sql new file mode 100644 index 0000000..ad4e14a --- /dev/null +++ b/sql/table/twitter_content.sql @@ -0,0 +1,10 @@ +CREATE TABLE `twitter_content` ( + `user_id` VARCHAR(50) NOT NULL, + `user_name` VARCHAR(100) NOT NULL, + `timestamp` BIGINT NOT NULL, + `date_time` VARCHAR(50) NOT NULL, + `text` TEXT NOT NULL, + `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY `uk_user_timestamp` (`user_id`, `timestamp`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; diff --git a/twitter_retriever_main.py b/twitter_retriever_main.py new file mode 100644 index 0000000..0196c37 --- /dev/null +++ b/twitter_retriever_main.py @@ -0,0 +1,13 @@ +from core.twitter.twitter_retriever import TwitterRetriever +import core.logger as logging +import os + +logger = logging.logger + +def main(): + twitter_retriever = TwitterRetriever() + twitter_retriever.monitor_accounts() + + +if __name__ == "__main__": + main() \ No newline at end of file