crypto_quant/core/db/db_twitter_content.py

291 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import core.logger as logging
from core.db.db_manager import DBData
from core.utils import get_current_date_time
logger = logging.logger
class DBTwitterContent:
def __init__(self, db_url: str):
self.db_url = db_url
self.table_name = "twitter_content"
self.columns = [
"user_id",
"user_name",
"timestamp",
"date_time",
"text"
]
self.db_manager = DBData(db_url, self.table_name, self.columns)
def insert_data_to_mysql(self, df: pd.DataFrame):
"""
将Twitter内容数据保存到MySQL的twitter_content表
速度:⭐⭐⭐⭐⭐ 最快
内存:⭐⭐⭐⭐ 中等
适用场景:中小数据量(<10万条
:param df: Twitter内容数据DataFrame
"""
if df is None or df.empty:
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql(df)
def insert_data_to_mysql_fast(self, df: pd.DataFrame):
"""
快速插入Twitter内容数据方案2使用executemany批量插入
速度:⭐⭐⭐⭐ 很快
内存:⭐⭐⭐⭐⭐ 低
适用场景:中等数据量
:param df: Twitter内容数据DataFrame
"""
if df is None or df.empty:
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_fast(df)
def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000):
"""
分块插入Twitter内容数据方案3适合大数据量
速度:⭐⭐⭐ 中等
内存:⭐⭐⭐⭐⭐ 最低
适用场景:大数据量(>10万条
:param df: Twitter内容数据DataFrame
:param chunk_size: 分块大小
"""
if df is None or df.empty:
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_chunk(df, chunk_size)
def insert_data_to_mysql_simple(self, df: pd.DataFrame):
"""
简单插入Twitter内容数据方案4直接使用to_sql忽略重复
速度:⭐⭐⭐⭐⭐ 最快
内存:⭐⭐⭐⭐ 中等
注意:会抛出重复键错误,需要额外处理
"""
if df is None or df.empty:
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_simple(df)
def query_latest_data(self, user_id: str = None):
"""
查询最新数据
:param user_id: 用户ID如果为None则查询所有用户的最新数据
"""
if user_id:
sql = """
SELECT * FROM twitter_content
WHERE user_id = :user_id
ORDER BY timestamp DESC
LIMIT 1
"""
condition_dict = {"user_id": user_id}
else:
sql = """
SELECT * FROM twitter_content
ORDER BY timestamp DESC
LIMIT 1
"""
condition_dict = {}
return self.db_manager.query_data(sql, condition_dict, return_multi=False)
def query_data_by_user_id(self, user_id: str, limit: int = 100):
"""
根据用户ID查询数据
:param user_id: 用户ID
:param limit: 查询数量
"""
sql = """
SELECT * FROM twitter_content
WHERE user_id = :user_id
ORDER BY timestamp DESC
LIMIT :limit
"""
condition_dict = {"user_id": user_id, "limit": limit}
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_data_by_timestamp_range(
self,
start_timestamp: int = None,
end_timestamp: int = None,
user_id: str = None,
limit: int = 1000
):
"""
根据时间戳范围查询数据
:param start_timestamp: 开始时间戳
:param end_timestamp: 结束时间戳
:param user_id: 用户ID可选
:param limit: 查询数量
"""
conditions = []
condition_dict = {"limit": limit}
if start_timestamp:
conditions.append("timestamp >= :start_timestamp")
condition_dict["start_timestamp"] = start_timestamp
if end_timestamp:
conditions.append("timestamp <= :end_timestamp")
condition_dict["end_timestamp"] = end_timestamp
if user_id:
conditions.append("user_id = :user_id")
condition_dict["user_id"] = user_id
where_clause = " AND ".join(conditions) if conditions else "1=1"
sql = f"""
SELECT * FROM twitter_content
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT :limit
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_data_by_text_search(
self,
search_text: str,
user_id: str = None,
limit: int = 100
):
"""
根据文本内容搜索数据
:param search_text: 搜索文本
:param user_id: 用户ID可选
:param limit: 查询数量
"""
conditions = ["text LIKE :search_text"]
condition_dict = {
"search_text": f"%{search_text}%",
"limit": limit
}
if user_id:
conditions.append("user_id = :user_id")
condition_dict["user_id"] = user_id
where_clause = " AND ".join(conditions)
sql = f"""
SELECT * FROM twitter_content
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT :limit
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_data_by_date_range(
self,
start_date: str = None,
end_date: str = None,
user_id: str = None,
limit: int = 1000
):
"""
根据日期范围查询数据
:param start_date: 开始日期 (YYYY-MM-DD)
:param end_date: 结束日期 (YYYY-MM-DD)
:param user_id: 用户ID可选
:param limit: 查询数量
"""
conditions = []
condition_dict = {"limit": limit}
if start_date:
conditions.append("date_time >= :start_date")
condition_dict["start_date"] = start_date
if end_date:
conditions.append("date_time <= :end_date")
condition_dict["end_date"] = end_date
if user_id:
conditions.append("user_id = :user_id")
condition_dict["user_id"] = user_id
where_clause = " AND ".join(conditions) if conditions else "1=1"
sql = f"""
SELECT * FROM twitter_content
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT :limit
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def get_user_list(self, limit: int = 100):
"""
获取用户列表
:param limit: 查询数量
"""
sql = """
SELECT DISTINCT user_id, user_name,
COUNT(*) as tweet_count,
MAX(timestamp) as last_tweet_time
FROM twitter_content
GROUP BY user_id, user_name
ORDER BY last_tweet_time DESC
LIMIT :limit
"""
condition_dict = {"limit": limit}
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def get_statistics(self):
"""
获取统计信息
"""
sql = """
SELECT
COUNT(*) as total_tweets,
COUNT(DISTINCT user_id) as total_users,
MIN(timestamp) as earliest_tweet,
MAX(timestamp) as latest_tweet,
AVG(LENGTH(text)) as avg_text_length
FROM twitter_content
"""
return self.db_manager.query_data(sql, {}, return_multi=False)
def delete_old_data(self, days: int = 30):
"""
删除指定天数前的旧数据
:param days: 保留天数
"""
current_time = get_current_date_time()
cutoff_timestamp = int(pd.Timestamp(current_time).timestamp()) - (days * 24 * 60 * 60)
sql = """
DELETE FROM twitter_content
WHERE timestamp < :cutoff_timestamp
"""
condition_dict = {"cutoff_timestamp": cutoff_timestamp}
return self.db_manager.execute_sql(sql, condition_dict)
def check_duplicate(self, user_id: str, timestamp: int):
"""
检查是否存在重复数据
:param user_id: 用户ID
:param timestamp: 时间戳
"""
sql = """
SELECT COUNT(*) as count
FROM twitter_content
WHERE user_id = :user_id AND timestamp = :timestamp
"""
condition_dict = {"user_id": user_id, "timestamp": timestamp}
result = self.db_manager.query_data(sql, condition_dict, return_multi=False)
return result['count'] > 0 if result else False