291 lines
9.1 KiB
Python
291 lines
9.1 KiB
Python
import pandas as pd
|
||
import core.logger as logging
|
||
from core.db.db_manager import DBData
|
||
from core.utils import get_current_date_time
|
||
|
||
logger = logging.logger
|
||
|
||
|
||
class DBTwitterContent:
|
||
def __init__(self, db_url: str):
|
||
self.db_url = db_url
|
||
self.table_name = "twitter_content"
|
||
self.columns = [
|
||
"user_id",
|
||
"user_name",
|
||
"timestamp",
|
||
"date_time",
|
||
"text"
|
||
]
|
||
self.db_manager = DBData(db_url, self.table_name, self.columns)
|
||
|
||
def insert_data_to_mysql(self, df: pd.DataFrame):
|
||
"""
|
||
将Twitter内容数据保存到MySQL的twitter_content表
|
||
速度:⭐⭐⭐⭐⭐ 最快
|
||
内存:⭐⭐⭐⭐ 中等
|
||
适用场景:中小数据量(<10万条)
|
||
:param df: Twitter内容数据DataFrame
|
||
"""
|
||
if df is None or df.empty:
|
||
logger.warning("DataFrame为空,无需写入数据库。")
|
||
return
|
||
|
||
self.db_manager.insert_data_to_mysql(df)
|
||
|
||
def insert_data_to_mysql_fast(self, df: pd.DataFrame):
|
||
"""
|
||
快速插入Twitter内容数据(方案2:使用executemany批量插入)
|
||
速度:⭐⭐⭐⭐ 很快
|
||
内存:⭐⭐⭐⭐⭐ 低
|
||
适用场景:中等数据量
|
||
:param df: Twitter内容数据DataFrame
|
||
"""
|
||
if df is None or df.empty:
|
||
logger.warning("DataFrame为空,无需写入数据库。")
|
||
return
|
||
|
||
self.db_manager.insert_data_to_mysql_fast(df)
|
||
|
||
def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000):
|
||
"""
|
||
分块插入Twitter内容数据(方案3:适合大数据量)
|
||
速度:⭐⭐⭐ 中等
|
||
内存:⭐⭐⭐⭐⭐ 最低
|
||
适用场景:大数据量(>10万条)
|
||
:param df: Twitter内容数据DataFrame
|
||
:param chunk_size: 分块大小
|
||
"""
|
||
if df is None or df.empty:
|
||
logger.warning("DataFrame为空,无需写入数据库。")
|
||
return
|
||
|
||
self.db_manager.insert_data_to_mysql_chunk(df, chunk_size)
|
||
|
||
def insert_data_to_mysql_simple(self, df: pd.DataFrame):
|
||
"""
|
||
简单插入Twitter内容数据(方案4:直接使用to_sql,忽略重复)
|
||
速度:⭐⭐⭐⭐⭐ 最快
|
||
内存:⭐⭐⭐⭐ 中等
|
||
注意:会抛出重复键错误,需要额外处理
|
||
"""
|
||
if df is None or df.empty:
|
||
logger.warning("DataFrame为空,无需写入数据库。")
|
||
return
|
||
|
||
self.db_manager.insert_data_to_mysql_simple(df)
|
||
|
||
def query_latest_data(self, user_id: str = None):
|
||
"""
|
||
查询最新数据
|
||
:param user_id: 用户ID,如果为None则查询所有用户的最新数据
|
||
"""
|
||
if user_id:
|
||
sql = """
|
||
SELECT * FROM twitter_content
|
||
WHERE user_id = :user_id
|
||
ORDER BY timestamp DESC
|
||
LIMIT 1
|
||
"""
|
||
condition_dict = {"user_id": user_id}
|
||
else:
|
||
sql = """
|
||
SELECT * FROM twitter_content
|
||
ORDER BY timestamp DESC
|
||
LIMIT 1
|
||
"""
|
||
condition_dict = {}
|
||
|
||
return self.db_manager.query_data(sql, condition_dict, return_multi=False)
|
||
|
||
def query_data_by_user_id(self, user_id: str, limit: int = 100):
|
||
"""
|
||
根据用户ID查询数据
|
||
:param user_id: 用户ID
|
||
:param limit: 查询数量
|
||
"""
|
||
sql = """
|
||
SELECT * FROM twitter_content
|
||
WHERE user_id = :user_id
|
||
ORDER BY timestamp DESC
|
||
LIMIT :limit
|
||
"""
|
||
condition_dict = {"user_id": user_id, "limit": limit}
|
||
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
|
||
|
||
def query_data_by_timestamp_range(
|
||
self,
|
||
start_timestamp: int = None,
|
||
end_timestamp: int = None,
|
||
user_id: str = None,
|
||
limit: int = 1000
|
||
):
|
||
"""
|
||
根据时间戳范围查询数据
|
||
:param start_timestamp: 开始时间戳
|
||
:param end_timestamp: 结束时间戳
|
||
:param user_id: 用户ID,可选
|
||
:param limit: 查询数量
|
||
"""
|
||
conditions = []
|
||
condition_dict = {"limit": limit}
|
||
|
||
if start_timestamp:
|
||
conditions.append("timestamp >= :start_timestamp")
|
||
condition_dict["start_timestamp"] = start_timestamp
|
||
|
||
if end_timestamp:
|
||
conditions.append("timestamp <= :end_timestamp")
|
||
condition_dict["end_timestamp"] = end_timestamp
|
||
|
||
if user_id:
|
||
conditions.append("user_id = :user_id")
|
||
condition_dict["user_id"] = user_id
|
||
|
||
where_clause = " AND ".join(conditions) if conditions else "1=1"
|
||
|
||
sql = f"""
|
||
SELECT * FROM twitter_content
|
||
WHERE {where_clause}
|
||
ORDER BY timestamp DESC
|
||
LIMIT :limit
|
||
"""
|
||
|
||
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
|
||
|
||
def query_data_by_text_search(
|
||
self,
|
||
search_text: str,
|
||
user_id: str = None,
|
||
limit: int = 100
|
||
):
|
||
"""
|
||
根据文本内容搜索数据
|
||
:param search_text: 搜索文本
|
||
:param user_id: 用户ID,可选
|
||
:param limit: 查询数量
|
||
"""
|
||
conditions = ["text LIKE :search_text"]
|
||
condition_dict = {
|
||
"search_text": f"%{search_text}%",
|
||
"limit": limit
|
||
}
|
||
|
||
if user_id:
|
||
conditions.append("user_id = :user_id")
|
||
condition_dict["user_id"] = user_id
|
||
|
||
where_clause = " AND ".join(conditions)
|
||
|
||
sql = f"""
|
||
SELECT * FROM twitter_content
|
||
WHERE {where_clause}
|
||
ORDER BY timestamp DESC
|
||
LIMIT :limit
|
||
"""
|
||
|
||
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
|
||
|
||
def query_data_by_date_range(
|
||
self,
|
||
start_date: str = None,
|
||
end_date: str = None,
|
||
user_id: str = None,
|
||
limit: int = 1000
|
||
):
|
||
"""
|
||
根据日期范围查询数据
|
||
:param start_date: 开始日期 (YYYY-MM-DD)
|
||
:param end_date: 结束日期 (YYYY-MM-DD)
|
||
:param user_id: 用户ID,可选
|
||
:param limit: 查询数量
|
||
"""
|
||
conditions = []
|
||
condition_dict = {"limit": limit}
|
||
|
||
if start_date:
|
||
conditions.append("date_time >= :start_date")
|
||
condition_dict["start_date"] = start_date
|
||
|
||
if end_date:
|
||
conditions.append("date_time <= :end_date")
|
||
condition_dict["end_date"] = end_date
|
||
|
||
if user_id:
|
||
conditions.append("user_id = :user_id")
|
||
condition_dict["user_id"] = user_id
|
||
|
||
where_clause = " AND ".join(conditions) if conditions else "1=1"
|
||
|
||
sql = f"""
|
||
SELECT * FROM twitter_content
|
||
WHERE {where_clause}
|
||
ORDER BY timestamp DESC
|
||
LIMIT :limit
|
||
"""
|
||
|
||
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
|
||
|
||
def get_user_list(self, limit: int = 100):
|
||
"""
|
||
获取用户列表
|
||
:param limit: 查询数量
|
||
"""
|
||
sql = """
|
||
SELECT DISTINCT user_id, user_name,
|
||
COUNT(*) as tweet_count,
|
||
MAX(timestamp) as last_tweet_time
|
||
FROM twitter_content
|
||
GROUP BY user_id, user_name
|
||
ORDER BY last_tweet_time DESC
|
||
LIMIT :limit
|
||
"""
|
||
condition_dict = {"limit": limit}
|
||
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
|
||
|
||
def get_statistics(self):
|
||
"""
|
||
获取统计信息
|
||
"""
|
||
sql = """
|
||
SELECT
|
||
COUNT(*) as total_tweets,
|
||
COUNT(DISTINCT user_id) as total_users,
|
||
MIN(timestamp) as earliest_tweet,
|
||
MAX(timestamp) as latest_tweet,
|
||
AVG(LENGTH(text)) as avg_text_length
|
||
FROM twitter_content
|
||
"""
|
||
return self.db_manager.query_data(sql, {}, return_multi=False)
|
||
|
||
def delete_old_data(self, days: int = 30):
|
||
"""
|
||
删除指定天数前的旧数据
|
||
:param days: 保留天数
|
||
"""
|
||
current_time = get_current_date_time()
|
||
cutoff_timestamp = int(pd.Timestamp(current_time).timestamp()) - (days * 24 * 60 * 60)
|
||
|
||
sql = """
|
||
DELETE FROM twitter_content
|
||
WHERE timestamp < :cutoff_timestamp
|
||
"""
|
||
condition_dict = {"cutoff_timestamp": cutoff_timestamp}
|
||
|
||
return self.db_manager.execute_sql(sql, condition_dict)
|
||
|
||
def check_duplicate(self, user_id: str, timestamp: int):
|
||
"""
|
||
检查是否存在重复数据
|
||
:param user_id: 用户ID
|
||
:param timestamp: 时间戳
|
||
"""
|
||
sql = """
|
||
SELECT COUNT(*) as count
|
||
FROM twitter_content
|
||
WHERE user_id = :user_id AND timestamp = :timestamp
|
||
"""
|
||
condition_dict = {"user_id": user_id, "timestamp": timestamp}
|
||
result = self.db_manager.query_data(sql, condition_dict, return_multi=False)
|
||
return result['count'] > 0 if result else False
|