from core.biz.huge_volume import HugeVolume from core.biz.huge_volume_chart import HugeVolumeChart from core.db.db_market_data import DBMarketData from core.db.db_huge_volume_data import DBHugeVolumeData from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp from market_data_main import MarketDataMain import logging from config import MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE from datetime import datetime import pandas as pd import os import re logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) class HugeVolumeMain: def __init__(self, threshold: float = 2.0): mysql_user = MYSQL_CONFIG.get("user", "xch") mysql_password = MYSQL_CONFIG.get("password", "") if not mysql_password: raise ValueError("MySQL password is not set") mysql_host = MYSQL_CONFIG.get("host", "localhost") mysql_port = MYSQL_CONFIG.get("port", 3306) mysql_database = MYSQL_CONFIG.get("database", "okx") self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" self.huge_volume = HugeVolume() self.db_market_data = DBMarketData(self.db_url) self.db_huge_volume_data = DBHugeVolumeData(self.db_url) self.market_data_main = MarketDataMain() self.threshold = threshold self.output_folder = "./output/huge_volume_statistics/" os.makedirs(self.output_folder, exist_ok=True) def batch_initial_detect_volume_spike( self, window_size: int = 50, start: str = None ): for symbol in self.market_data_main.symbols: for bar in self.market_data_main.bars: if start is None: start = MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-01 00:00:00" ) data = self.detect_volume_spike( symbol, bar, window_size, start, only_output_huge_volume=False, is_update=False, ) if data is not None and len(data) > 0: logging.info(f"此次初始化巨量交易数据: {len(data)}条") else: logging.info(f"此次初始化巨量交易数据为空") def detect_volume_spike( self, symbol: str = "XCH-USDT", bar: str = "5m", window_size: int = 50, start: str = "2025-05-01 00:00:00", end: str = None, only_output_huge_volume: bool = False, is_update: bool = False, ): if start is None: start = MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-01 00:00:00" ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logging.info( f"开始处理巨量交易数据: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) data = self.db_market_data.query_market_data_by_symbol_bar( symbol, bar, start, end ) if data is None: logging.warning( f"获取行情数据失败: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) return None else: if len(data) == 0: logging.warning( f"获取行情数据为空: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) return None else: if isinstance(data, list): data = pd.DataFrame(data) elif isinstance(data, dict): data = pd.DataFrame([data]) data = self.huge_volume.detect_huge_volume( data=data, window_size=window_size, threshold=self.threshold, check_price=True, only_output_huge_volume=only_output_huge_volume, output_excel=False, ) if data is not None: if is_update: for index, row in data.iterrows(): exist_huge_volume_data = self.db_huge_volume_data.query_data_by_symbol_bar_window_size_timestamp( symbol, bar, window_size, row["timestamp"] ) if exist_huge_volume_data is not None: # remove the exist_huge_volume_data from data data = data[ data["timestamp"] != exist_huge_volume_data["timestamp"] ] if data is not None and len(data) > 0: self.db_huge_volume_data.insert_data_to_mysql(data) else: logging.warning( f"此次处理巨量交易数据为空: {symbol} {bar} {start} {end}" ) return data else: return None def batch_update_volume_spike(self, window_size: int = 50): for symbol in self.market_data_main.symbols: for bar in self.market_data_main.bars: self.update_volume_spike(symbol, bar, window_size) def update_volume_spike(self, symbol: str, bar: str, window_size: int = 50): try: self.market_data_main.update_data(symbol, bar) latest_huge_volume_data = self.db_huge_volume_data.query_latest_data( symbol, bar, window_size ) if latest_huge_volume_data is None or len(latest_huge_volume_data) == 0: self.detect_volume_spike(symbol, bar, only_output_huge_volume=False) return else: earliest_date_time = latest_huge_volume_data["date_time"] earliest_timestamp = latest_huge_volume_data["timestamp"] seconds = self.get_seconds_by_bar(bar) earliest_timestamp = earliest_timestamp - ( (window_size - 1) * seconds * 1000 ) earliest_date_time = timestamp_to_datetime(earliest_timestamp) data = self.detect_volume_spike( symbol=symbol, bar=bar, window_size=window_size, start=earliest_date_time, only_output_huge_volume=False, is_update=True, ) logging.info( f"更新巨量交易数据: {symbol} {bar} 窗口大小: {window_size} 从 {earliest_date_time} 到 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) if data is not None and len(data) > 0: logging.info(f"此次更新巨量交易数据: {len(data)}条") else: logging.info(f"此次更新巨量交易数据为空") except Exception as e: logging.error( f"更新巨量交易数据失败: {symbol} {bar} 窗口大小: {window_size} 从 {earliest_date_time} 到 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}" ) def get_seconds_by_bar(self, bar: str): """ 根据bar获取秒数 bar: 1s/1m/3m/5m/15m/30m/1H/2H/4H/6H/12H/1D/2D/3D/1W/1M/3M :param bar: 时间周期 :return: 秒数 """ if bar == "1s": return 1 elif bar == "1m": return 60 elif bar == "3m": return 180 elif bar == "5m": return 300 elif bar == "15m": return 900 elif bar == "30m": return 1800 elif bar == "1H": return 3600 elif bar == "2H": return 7200 elif bar == "4H": return 14400 elif bar == "6H": return 21600 elif bar == "12H": return 43200 elif bar == "1D": return 86400 elif bar == "2D": return 172800 elif bar == "3D": return 259200 elif bar == "1W": return 604800 elif bar == "1M": return 2592000 elif bar == "3M": return 7776000 else: raise ValueError(f"不支持的bar: {bar}") def next_periods_rise_or_fall( self, symbol: str, bar: str, window_size: int = 50, start: str = None, end: str = None, periods: list = [3, 5], ): if start is None: start = MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-01 00:00:00" ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") periods_text = ", ".join([str(period) for period in periods]) logging.info( f"开始计算巨量出现后,之后{periods_text}个周期,上涨或下跌的比例: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) volume_statistics_data = ( self.db_huge_volume_data.query_huge_volume_data_by_symbol_bar_window_size( symbol, bar, window_size, start, end ) ) if volume_statistics_data is None or len(volume_statistics_data) == 0: logging.warning( f"获取巨量交易数据为空: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) return None else: if isinstance(volume_statistics_data, list): volume_statistics_data = pd.DataFrame(volume_statistics_data) elif isinstance(volume_statistics_data, dict): volume_statistics_data = pd.DataFrame([volume_statistics_data]) if volume_statistics_data is not None and len(volume_statistics_data) > 0: # 根据timestamp排序 volume_statistics_data = volume_statistics_data.sort_values( by="timestamp", ascending=True ) volume_statistics_data["window_size"] = window_size volume_statistics_data = volume_statistics_data[ [ "symbol", "bar", "window_size", "timestamp", "date_time", "open", "high", "low", "close", "volume", "huge_volume", "volume_ratio", "volume_80_20_price_spike", "price_80_high", "price_20_low", "volume_90_10_price_spike", "price_90_high", "price_10_low", ] ] volume_statistics_data = volume_statistics_data.reset_index(drop=True) huge_volume_data, result_data = self.huge_volume.next_periods_rise_or_fall( data=volume_statistics_data, window_size=window_size, periods=periods ) return huge_volume_data, result_data def batch_next_periods_rise_or_fall( self, start: str = None, end: str = None, next_periods: list = [1, 2, 3, 5, 10], output_excel: bool = False, ): if start is None: start = MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-01 00:00:00" ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") huge_volume_data_list = [] result_data_list = [] window_size_list = WINDOW_SIZE.get("window_sizes", None) if ( window_size_list is None or not isinstance(window_size_list, list) or len(window_size_list) == 0 ): window_size_list = [50, 80, 100, 120] for symbol in self.market_data_main.symbols: for bar in self.market_data_main.bars: for window_size in window_size_list: huge_volume_data, result_data = self.next_periods_rise_or_fall( symbol, bar, window_size, start, end, next_periods ) huge_volume_data_list.append(huge_volume_data) result_data_list.append(result_data) total_huge_volume_data = pd.concat(huge_volume_data_list) total_result_data = pd.concat(result_data_list) if output_excel: total_huge_volume_data = total_huge_volume_data.reset_index(drop=True) total_result_data = total_result_data.reset_index(drop=True) current_date = datetime.now().strftime("%Y%m%d%H%M%S") file_name = f"next_periods_rise_or_fall_{current_date}.xlsx" try: with pd.ExcelWriter( os.path.join(self.output_folder, file_name) ) as writer: total_huge_volume_data.to_excel( writer, sheet_name="details", index=False ) total_result_data.to_excel( writer, sheet_name="next_periods_statistics", index=False ) except Exception as e: logging.error(f"导出Excel文件失败: {e}") return total_huge_volume_data, total_result_data def plot_huge_volume_data( self, data_file_path: str, sheet_name: str = "next_periods_statistics", output_folder: str = "./output/huge_volume_statistics/", ): os.makedirs(output_folder, exist_ok=True) huge_volume_data = pd.read_excel(data_file_path, sheet_name=sheet_name) huge_volume_chart = HugeVolumeChart(huge_volume_data) include_heatmap = True include_line = False huge_volume_chart.plot_entrance( include_heatmap=include_heatmap, include_line=include_line ) def batch_initial_detect_volume_spike(threshold: float = 2.0): window_sizes = WINDOW_SIZE.get("window_sizes", None) if ( window_sizes is None or not isinstance(window_sizes, list) or len(window_sizes) == 0 ): window_sizes = [50, 80, 100, 120] huge_volume_main = HugeVolumeMain(threshold) for window_size in window_sizes: huge_volume_main.batch_initial_detect_volume_spike( window_size=window_size, start="2025-05-01 00:00:00", ) def batch_update_volume_spike(threshold: float = 2.0): window_sizes = WINDOW_SIZE.get("window_sizes", None) if ( window_sizes is None or not isinstance(window_sizes, list) or len(window_sizes) == 0 ): window_sizes = [50, 80, 100, 120] huge_volume_main = HugeVolumeMain(threshold) for window_size in window_sizes: huge_volume_main.batch_update_volume_spike(window_size=window_size) if __name__ == "__main__": # batch_initial_detect_volume_spike(threshold=2.0) # batch_update_volume_spike(threshold=2.0) huge_volume_main = HugeVolumeMain(threshold=2.0) # huge_volume_main.batch_next_periods_rise_or_fall(output_excel=True) data_file_path = "./output/huge_volume_statistics/next_periods_rise_or_fall_stat_20250731200304.xlsx" sheet_name = "next_periods_statistics" output_folder = "./output/huge_volume_statistics/" huge_volume_main.plot_huge_volume_data( data_file_path=data_file_path, sheet_name=sheet_name, output_folder=output_folder, )