import pandas as pd import core.logger as logging from core.db.db_manager import DBData from core.utils import get_current_date_time logger = logging.logger class DBTwitterContent: def __init__(self, db_url: str): self.db_url = db_url self.table_name = "twitter_content" self.columns = [ "user_id", "user_name", "timestamp", "date_time", "text" ] self.db_manager = DBData(db_url, self.table_name, self.columns) def insert_data_to_mysql(self, df: pd.DataFrame): """ 将Twitter内容数据保存到MySQL的twitter_content表 速度:⭐⭐⭐⭐⭐ 最快 内存:⭐⭐⭐⭐ 中等 适用场景:中小数据量(<10万条) :param df: Twitter内容数据DataFrame """ if df is None or df.empty: logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql(df) def insert_data_to_mysql_fast(self, df: pd.DataFrame): """ 快速插入Twitter内容数据(方案2:使用executemany批量插入) 速度:⭐⭐⭐⭐ 很快 内存:⭐⭐⭐⭐⭐ 低 适用场景:中等数据量 :param df: Twitter内容数据DataFrame """ if df is None or df.empty: logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_fast(df) def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000): """ 分块插入Twitter内容数据(方案3:适合大数据量) 速度:⭐⭐⭐ 中等 内存:⭐⭐⭐⭐⭐ 最低 适用场景:大数据量(>10万条) :param df: Twitter内容数据DataFrame :param chunk_size: 分块大小 """ if df is None or df.empty: logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) def insert_data_to_mysql_simple(self, df: pd.DataFrame): """ 简单插入Twitter内容数据(方案4:直接使用to_sql,忽略重复) 速度:⭐⭐⭐⭐⭐ 最快 内存:⭐⭐⭐⭐ 中等 注意:会抛出重复键错误,需要额外处理 """ if df is None or df.empty: logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_simple(df) def query_latest_data(self, user_id: str = None): """ 查询最新数据 :param user_id: 用户ID,如果为None则查询所有用户的最新数据 """ if user_id: sql = """ SELECT * FROM twitter_content WHERE user_id = :user_id ORDER BY timestamp DESC LIMIT 1 """ condition_dict = {"user_id": user_id} else: sql = """ SELECT * FROM twitter_content ORDER BY timestamp DESC LIMIT 1 """ condition_dict = {} return self.db_manager.query_data(sql, condition_dict, return_multi=False) def query_data_by_user_id(self, user_id: str, limit: int = 100): """ 根据用户ID查询数据 :param user_id: 用户ID :param limit: 查询数量 """ sql = """ SELECT * FROM twitter_content WHERE user_id = :user_id ORDER BY timestamp DESC LIMIT :limit """ condition_dict = {"user_id": user_id, "limit": limit} return self.db_manager.query_data(sql, condition_dict, return_multi=True) def query_data_by_timestamp_range( self, start_timestamp: int = None, end_timestamp: int = None, user_id: str = None, limit: int = 1000 ): """ 根据时间戳范围查询数据 :param start_timestamp: 开始时间戳 :param end_timestamp: 结束时间戳 :param user_id: 用户ID,可选 :param limit: 查询数量 """ conditions = [] condition_dict = {"limit": limit} if start_timestamp: conditions.append("timestamp >= :start_timestamp") condition_dict["start_timestamp"] = start_timestamp if end_timestamp: conditions.append("timestamp <= :end_timestamp") condition_dict["end_timestamp"] = end_timestamp if user_id: conditions.append("user_id = :user_id") condition_dict["user_id"] = user_id where_clause = " AND ".join(conditions) if conditions else "1=1" sql = f""" SELECT * FROM twitter_content WHERE {where_clause} ORDER BY timestamp DESC LIMIT :limit """ return self.db_manager.query_data(sql, condition_dict, return_multi=True) def query_data_by_text_search( self, search_text: str, user_id: str = None, limit: int = 100 ): """ 根据文本内容搜索数据 :param search_text: 搜索文本 :param user_id: 用户ID,可选 :param limit: 查询数量 """ conditions = ["text LIKE :search_text"] condition_dict = { "search_text": f"%{search_text}%", "limit": limit } if user_id: conditions.append("user_id = :user_id") condition_dict["user_id"] = user_id where_clause = " AND ".join(conditions) sql = f""" SELECT * FROM twitter_content WHERE {where_clause} ORDER BY timestamp DESC LIMIT :limit """ return self.db_manager.query_data(sql, condition_dict, return_multi=True) def query_data_by_date_range( self, start_date: str = None, end_date: str = None, user_id: str = None, limit: int = 1000 ): """ 根据日期范围查询数据 :param start_date: 开始日期 (YYYY-MM-DD) :param end_date: 结束日期 (YYYY-MM-DD) :param user_id: 用户ID,可选 :param limit: 查询数量 """ conditions = [] condition_dict = {"limit": limit} if start_date: conditions.append("date_time >= :start_date") condition_dict["start_date"] = start_date if end_date: conditions.append("date_time <= :end_date") condition_dict["end_date"] = end_date if user_id: conditions.append("user_id = :user_id") condition_dict["user_id"] = user_id where_clause = " AND ".join(conditions) if conditions else "1=1" sql = f""" SELECT * FROM twitter_content WHERE {where_clause} ORDER BY timestamp DESC LIMIT :limit """ return self.db_manager.query_data(sql, condition_dict, return_multi=True) def get_user_list(self, limit: int = 100): """ 获取用户列表 :param limit: 查询数量 """ sql = """ SELECT DISTINCT user_id, user_name, COUNT(*) as tweet_count, MAX(timestamp) as last_tweet_time FROM twitter_content GROUP BY user_id, user_name ORDER BY last_tweet_time DESC LIMIT :limit """ condition_dict = {"limit": limit} return self.db_manager.query_data(sql, condition_dict, return_multi=True) def get_statistics(self): """ 获取统计信息 """ sql = """ SELECT COUNT(*) as total_tweets, COUNT(DISTINCT user_id) as total_users, MIN(timestamp) as earliest_tweet, MAX(timestamp) as latest_tweet, AVG(LENGTH(text)) as avg_text_length FROM twitter_content """ return self.db_manager.query_data(sql, {}, return_multi=False) def delete_old_data(self, days: int = 30): """ 删除指定天数前的旧数据 :param days: 保留天数 """ current_time = get_current_date_time() cutoff_timestamp = int(pd.Timestamp(current_time).timestamp()) - (days * 24 * 60 * 60) sql = """ DELETE FROM twitter_content WHERE timestamp < :cutoff_timestamp """ condition_dict = {"cutoff_timestamp": cutoff_timestamp} return self.db_manager.execute_sql(sql, condition_dict) def check_duplicate(self, user_id: str, timestamp: int): """ 检查是否存在重复数据 :param user_id: 用户ID :param timestamp: 时间戳 """ sql = """ SELECT COUNT(*) as count FROM twitter_content WHERE user_id = :user_id AND timestamp = :timestamp """ condition_dict = {"user_id": user_id, "timestamp": timestamp} result = self.db_manager.query_data(sql, condition_dict, return_multi=False) return result['count'] > 0 if result else False