from core.biz.huge_volume import HugeVolume from core.biz.huge_volume_chart import HugeVolumeChart from core.db.db_market_data import DBMarketData from core.db.db_huge_volume_data import DBHugeVolumeData from core.db.db_binance_data import DBBinanceData from core.db.db_binance_huge_volume_data import DBBinanceHugeVolumeData from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp from market_data_main import MarketDataMain from core.wechat import Wechat import core.logger as logging from config import ( OKX_MONITOR_CONFIG, US_STOCK_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE, BINANCE_MONITOR_CONFIG, ) from datetime import datetime, timedelta import pandas as pd import os import re logger = logging.logger class HugeVolumeMain: def __init__( self, threshold: float = 2.0, is_us_stock: bool = False, is_binance: bool = False, ): mysql_user = MYSQL_CONFIG.get("user", "xch") mysql_password = MYSQL_CONFIG.get("password", "") if not mysql_password: raise ValueError("MySQL password is not set") mysql_host = MYSQL_CONFIG.get("host", "localhost") mysql_port = MYSQL_CONFIG.get("port", 3306) mysql_database = MYSQL_CONFIG.get("database", "okx") self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" self.huge_volume = HugeVolume() if is_binance: self.db_market_data = DBBinanceData(self.db_url) self.db_huge_volume_data = DBBinanceHugeVolumeData(self.db_url) else: self.db_market_data = DBMarketData(self.db_url) self.db_huge_volume_data = DBHugeVolumeData(self.db_url) self.market_data_main = MarketDataMain( is_us_stock=is_us_stock, is_binance=is_binance ) self.threshold = threshold self.is_us_stock = is_us_stock self.is_binance = is_binance self.output_folder = "./output/huge_volume_statistics/" os.makedirs(self.output_folder, exist_ok=True) def batch_initial_detect_volume_spike( self, window_size: int = 50, start: str = None ): for symbol in self.market_data_main.symbols: for bar in self.market_data_main.bars: if start is None: if self.is_us_stock: start = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2015-08-30 00:00:00" ) else: start = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-15 00:00:00" ) data = self.detect_volume_spike( symbol, bar, window_size, start, only_output_huge_volume=False, is_update=False, ) if data is not None and len(data) > 0: logger.info(f"此次初始化巨量交易数据: {len(data)}条") else: logger.info(f"此次初始化巨量交易数据为空") def detect_volume_spike( self, symbol: str = "XCH-USDT", bar: str = "5m", window_size: int = 50, start: str = None, end: str = None, only_output_huge_volume: bool = False, is_update: bool = False, ): if start is None: if self.is_us_stock: start = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2015-08-31 00:00:00" ) else: if self.is_binance: start = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2017-08-16 00:00:00" ) else: start = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-01 00:00:00" ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info( f"开始处理巨量交易数据: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) data = self.db_market_data.query_market_data_by_symbol_bar( symbol, bar, start, end ) if data is None: logger.warning( f"获取行情数据失败: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) return None else: if len(data) == 0: logger.warning( f"获取行情数据为空: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) return None else: if isinstance(data, list): data = pd.DataFrame(data) elif isinstance(data, dict): data = pd.DataFrame([data]) data = self.huge_volume.detect_huge_volume( data=data, window_size=window_size, threshold=self.threshold, check_price=True, only_output_huge_volume=only_output_huge_volume, output_excel=False, ) if data is not None and len(data) > 0: if is_update and len(data) > window_size: min_timestamp = int(data["timestamp"].min()) max_timestamp = int(data["timestamp"].max()) exist_data = self.db_huge_volume_data.query_huge_volume_data_by_symbol_bar_window_size( symbol, bar, window_size, min_timestamp, max_timestamp ) if exist_data is not None and len(exist_data) > 0: exist_data = pd.DataFrame(exist_data) data = data[~data["timestamp"].isin(exist_data["timestamp"])] if data is not None and len(data) > 0: data = data[self.db_huge_volume_data.columns] self.db_huge_volume_data.insert_data_to_mysql(data) logger.info(f"此次处理巨量交易数据: {len(data)}条") else: logger.warning( f"此次处理巨量交易数据为空: {symbol} {bar} {start} {end}" ) return data else: return None def batch_update_volume_spike(self, window_size: int = 50): for symbol in self.market_data_main.symbols: for bar in self.market_data_main.bars: try: self.market_data_main.update_data(symbol, bar) self.update_volume_spike(symbol, bar, window_size) except Exception as e: logger.error( f"更新巨量交易数据失败: {symbol} {bar} 窗口大小: {window_size}: {e}" ) def batch_import_binance_data_by_csv(self, root_path: str): """ 批量从binance csv文件导入数据 文件名示例:BTC-USDT_5m.csv,BTC-USDT_30m.csv,BTC-USDT_1h.csv :param root_path: 根路径 """ if root_path is None: logger.error("root_path is None") return if not os.path.exists(root_path): logger.error(f"root_path: {root_path} 不存在") return window_sizes = WINDOW_SIZE.get("window_sizes", None) if ( window_sizes is None or not isinstance(window_sizes, list) or len(window_sizes) == 0 ): window_sizes = [50, 80, 100, 120] folders = os.listdir(root_path) logger.info(f"共有{len(folders)}个文件夹") symbols = self.market_data_main.symbols bars = self.market_data_main.bars output_folder = r"./data/binance/" os.makedirs(output_folder, exist_ok=True) success_folder_file = os.path.join(output_folder, "success_folder.txt") success_folder_list = [] with open(success_folder_file, "r", encoding="utf-8") as f: for line in f: if line.strip() == "": continue success_folder_list.append(line.strip()) for folder in folders: if not os.path.isdir(os.path.join(root_path, folder)): continue logger.info(f"开始处理文件夹: {folder}") if folder in success_folder_list: logger.info(f"文件夹: {folder} 已处理") continue files = os.listdir(os.path.join(root_path, folder)) all_success = True for file in files: if not os.path.isfile(os.path.join(root_path, folder, file)): continue if not file.endswith(".csv"): continue full_file_path = os.path.join(root_path, folder, file) file_pure_name = file.split(".")[0] symbol = file_pure_name.split("_")[0] bar = file_pure_name.split("_")[1] if bar == "1h": bar = "1H" if symbol not in symbols or bar not in bars: continue success = self.import_binance_data_by_csv( full_file_path, symbol, bar, window_sizes ) if not success: all_success = False if all_success: try: with open(success_folder_file, "a", encoding="utf-8") as f: f.write(folder + "\n") except Exception as e: logger.error(f"写入记录失败: {folder} {e}") def import_binance_data_by_csv( self, full_file_path: str, symbol: str, bar: str, window_sizes: list ): """ 从binance csv文件导入数据 :param full_file_path: 文件路径 :param symbol: 虚拟货币名称 :param bar: 时间周期 """ try: logger.info(f"开始处理文件: {full_file_path} {symbol} {bar}") if full_file_path is None or symbol is None or bar is None: logger.error("信息不完整") return if not os.path.exists(full_file_path): logger.error(f"文件不存在: {full_file_path}") return df = pd.read_csv(full_file_path, encoding="GBK") if df is None or len(df) == 0: raise Exception(f"文件为空: {full_file_path}") columns = list(df) if len(columns) == 0: raise Exception(f"文件为空: {full_file_path}") elif len(columns) == 1 and "邢不行" in columns[0]: df.reset_index(inplace=True) df.columns = df.iloc[0] df = df.iloc[1:] elif "邢不行" in columns[0] or "Unnamed" in columns[1]: # 将第一行作为列名 df.columns = df.iloc[0] df = df.iloc[1:] else: pass df.reset_index(drop=True, inplace=True) df = self.market_data_main.adjust_binance_csv_data(symbol, bar, df) df = self.market_data_main.post_save_data(df) min_start_time_ts = int(df["timestamp"].min()) max_start_time_ts = int(df["timestamp"].max()) df = self.market_data_main.post_calculate_metrics( symbol, bar, min_start_time_ts, max_start_time_ts ) df = df.sort_values(by="timestamp", ascending=True) df = df.reset_index(drop=True) earliest_date_time = str(df.iloc[0]["date_time"]) earliest_timestamp = int(df.iloc[0]["timestamp"]) end_date_time = str(df.iloc[-1]["date_time"]) for window_size in window_sizes: self.update_volume_spike(symbol, bar, window_size, earliest_date_time, earliest_timestamp, end_date_time) return True except Exception as e: logger.error(f"导入binance数据失败: {e}") output_folder = r"./data/binance/" os.makedirs(output_folder, exist_ok=True) error_record_file = os.path.join(output_folder, "error_record.txt") with open(error_record_file, "a", encoding="utf-8") as f: f.write(full_file_path + "\n") return False def update_volume_spike( self, symbol: str, bar: str, window_size: int = 50, earliest_date_time: str = None, earliest_timestamp: int = None, end_date_time: str = None, ): try: if earliest_date_time is None or earliest_timestamp is None: latest_huge_volume_data = self.db_huge_volume_data.query_latest_data( symbol, bar, window_size ) if latest_huge_volume_data is None or len(latest_huge_volume_data) == 0: self.detect_volume_spike( symbol=symbol, bar=bar, window_size=window_size, only_output_huge_volume=False, ) return else: earliest_date_time = latest_huge_volume_data["date_time"] earliest_timestamp = latest_huge_volume_data["timestamp"] seconds = self.get_seconds_by_bar(bar) earliest_timestamp = earliest_timestamp - ( (window_size - 1) * seconds * 1000 ) earliest_date_time = timestamp_to_datetime(earliest_timestamp) data = self.detect_volume_spike( symbol=symbol, bar=bar, window_size=window_size, start=earliest_date_time, end=end_date_time, only_output_huge_volume=False, is_update=True, ) logger.info( f"更新巨量交易数据: {symbol} {bar} 窗口大小: {window_size} 从 {earliest_date_time} 到 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) if data is not None and len(data) > 0: logger.info(f"此次更新巨量交易数据: {len(data)}条") else: logger.info(f"此次更新巨量交易数据为空") except Exception as e: logger.error( f"更新巨量交易数据失败: {symbol} {bar} 窗口大小: {window_size} 到 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}" ) def get_seconds_by_bar(self, bar: str): """ 根据bar获取秒数 bar: 1s/1m/3m/5m/15m/30m/1H/2H/4H/6H/12H/1D/2D/3D/1W/1M/3M :param bar: 时间周期 :return: 秒数 """ if bar == "1s": return 1 elif bar == "1m": return 60 elif bar == "3m": return 180 elif bar == "5m": return 300 elif bar == "15m": return 900 elif bar == "30m": return 1800 elif bar == "1H": return 3600 elif bar == "2H": return 7200 elif bar == "4H": return 14400 elif bar == "6H": return 21600 elif bar == "12H": return 43200 elif bar == "1D": return 86400 elif bar == "2D": return 172800 elif bar == "3D": return 259200 elif bar == "1W": return 604800 elif bar == "1M": return 2592000 elif bar == "3M": return 7776000 else: raise ValueError(f"不支持的bar: {bar}") def next_periods_rise_or_fall( self, symbol: str, bar: str, window_size: int = 50, start: str = None, end: str = None, periods: list = [3, 5], ): if start is None: start = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-01 00:00:00" ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") periods_text = ", ".join([str(period) for period in periods]) logger.info( f"开始计算巨量出现后,之后{periods_text}个周期,上涨或下跌的比例: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) volume_statistics_data = ( self.db_huge_volume_data.query_huge_volume_data_by_symbol_bar_window_size( symbol, bar, window_size, start, end ) ) if volume_statistics_data is None or len(volume_statistics_data) == 0: logger.warning( f"获取巨量交易数据为空: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) return None else: if isinstance(volume_statistics_data, list): volume_statistics_data = pd.DataFrame(volume_statistics_data) elif isinstance(volume_statistics_data, dict): volume_statistics_data = pd.DataFrame([volume_statistics_data]) if volume_statistics_data is not None and len(volume_statistics_data) > 0: # 根据timestamp排序 volume_statistics_data = volume_statistics_data.sort_values( by="timestamp", ascending=True ) volume_statistics_data["window_size"] = window_size volume_statistics_data = volume_statistics_data[ [ "symbol", "bar", "window_size", "timestamp", "date_time", "open", "high", "low", "close", "volume", "huge_volume", "volume_ratio", "volume_80_20_price_spike", "price_80_high", "price_20_low", "volume_90_10_price_spike", "price_90_high", "price_10_low", ] ] volume_statistics_data = volume_statistics_data.reset_index(drop=True) huge_volume_data, result_data = self.huge_volume.next_periods_rise_or_fall( data=volume_statistics_data, window_size=window_size, periods=periods ) return huge_volume_data, result_data def send_huge_volume_data_to_wechat(self, start: str = None, end: str = None): if start is None: start = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-01 00:00:00" ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") start_timestamp = transform_date_time_to_timestamp(start) end_timestamp = transform_date_time_to_timestamp(end) start_date_time = timestamp_to_datetime(start_timestamp) end_date_time = timestamp_to_datetime(end_timestamp) logger.info(f"开始获取巨量交易数据: {start} 到 {end}") huge_volume_data = self.db_huge_volume_data.query_huge_volume_records( start=start_timestamp, end=end_timestamp ) if huge_volume_data is None or len(huge_volume_data) == 0: logger.warning(f"获取巨量交易数据为空: {start} 到 {end}") return else: if isinstance(huge_volume_data, list): huge_volume_data = pd.DataFrame(huge_volume_data) else: huge_volume_data = pd.DataFrame([huge_volume_data]) # 过滤huge_volume_data,要求huge_volume为1,且(price_80_high == 1 or price_90_high == 1 or price_20_low == 1 or price_10_low == 1) huge_volume_data = huge_volume_data[huge_volume_data["huge_volume"] == 1] # 过滤huge_volume_data,要求(price_80_high == 1 or price_90_high == 1 or price_20_low == 1 or price_10_low == 1) huge_volume_data = huge_volume_data[ (huge_volume_data["price_90_high"] == 1) | (huge_volume_data["price_10_low"] == 1) ] # 过滤huge_volume_data,要求volume_ratio > 10 huge_volume_data = huge_volume_data[huge_volume_data["volume_ratio"] > 10] # 根据symbol, bar, window_size, timestamp排序 huge_volume_data = huge_volume_data.sort_values( by=["symbol", "bar", "window_size", "timestamp"], ascending=True ) huge_volume_data = huge_volume_data.reset_index(drop=True) logger.info(f"获取巨量交易数据: {len(huge_volume_data)}条") contents = [] contents.append(f"# 放量交易数据: {start_date_time} 到 {end_date_time}") symbol_list = huge_volume_data["symbol"].unique() # 根据symbol_list排序 symbol_list.sort() for symbol in symbol_list: contents = [] contents.append(f"# 放量交易数据: {start_date_time} 到 {end_date_time}") contents.append(f"## 币种: {symbol}") symbol_data = huge_volume_data[huge_volume_data["symbol"] == symbol] symbol_data = symbol_data.sort_values( by=["bar", "window_size", "timestamp"], ascending=True ) symbol_data = symbol_data.reset_index(drop=True) for index, row in symbol_data.iterrows(): if row["huge_volume"] == 1 and ( row["price_80_high"] == 1 or row["price_90_high"] == 1 or row["price_20_low"] == 1 or row["price_10_low"] == 1 ): if row["price_90_high"] == 1: price_position_text = "90%分位数高点" elif row["price_80_high"] == 1: price_position_text = "80%分位数高点" else: price_position_text = "" if price_position_text == "": if row["price_10_low"] == 1: price_position_text = "10%分位数低点" elif row["price_20_low"] == 1: price_position_text = "20%分位数低点" else: price_position_text = "" open_price = str(round(row["open"], 6)) high = str(round(row["high"], 6)) low = str(round(row["low"], 6)) close = str(round(row["close"], 6)) volume = str(round(row["volume"], 6)) volCCyQuote = str(round(row["volCCyQuote"], 6)) volume_ratio = str(round(row["volume_ratio"], 6)) contents.append( f"交易周期: {row['bar']}, 滑动窗口: {row['window_size']} , 发生时间: {row['date_time']}" ) contents.append( f"开盘价: {open_price}, 最高价: {high}, 最低价: {low}, 收盘价: {close}" ) contents.append( f"成交量: {volume}, 成交量USDT: {volCCyQuote}, 交易量比率: {volume_ratio}" ) contents.append(f"价格分位: {price_position_text}") contents.append(f"--------------------------------") text = "\n\n".join(contents) # 获得text的字节数 text_length = len(text.encode("utf-8")) logger.info(f"发送巨量交易数据到微信,字节数: {text_length}") # with open(os.path.join(self.output_folder, "huge_volume_data.md"), "w", encoding="utf-8") as f: # f.write(text) wechat = Wechat() wechat.send_markdown(text) def batch_next_periods_rise_or_fall( self, start: str = None, end: str = None, next_periods: list = [1, 2, 3, 5, 10], output_excel: bool = False, ): if start is None: start = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-01 00:00:00" ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") huge_volume_data_list = [] result_data_list = [] window_size_list = WINDOW_SIZE.get("window_sizes", None) if ( window_size_list is None or not isinstance(window_size_list, list) or len(window_size_list) == 0 ): window_size_list = [50, 80, 100, 120] for symbol in self.market_data_main.symbols: for bar in self.market_data_main.bars: for window_size in window_size_list: huge_volume_data, result_data = self.next_periods_rise_or_fall( symbol, bar, window_size, start, end, next_periods ) huge_volume_data_list.append(huge_volume_data) result_data_list.append(result_data) total_huge_volume_data = pd.concat(huge_volume_data_list) total_result_data = pd.concat(result_data_list) if output_excel: total_huge_volume_data = total_huge_volume_data.reset_index(drop=True) total_result_data = total_result_data.reset_index(drop=True) current_date = datetime.now().strftime("%Y%m%d%H%M%S") file_name = f"next_periods_rise_or_fall_{current_date}.xlsx" try: with pd.ExcelWriter( os.path.join(self.output_folder, file_name) ) as writer: total_huge_volume_data.to_excel( writer, sheet_name="details", index=False ) total_result_data.to_excel( writer, sheet_name="next_periods_statistics", index=False ) except Exception as e: logger.error(f"导出Excel文件失败: {e}") return total_huge_volume_data, total_result_data def plot_huge_volume_data( self, data_file_path: str, sheet_name: str = "next_periods_statistics", output_folder: str = "./output/huge_volume_statistics/", ): os.makedirs(output_folder, exist_ok=True) huge_volume_data = pd.read_excel(data_file_path, sheet_name=sheet_name) huge_volume_chart = HugeVolumeChart(huge_volume_data) include_heatmap = True include_line = False huge_volume_chart.plot_entrance( include_heatmap=include_heatmap, include_line=include_line ) def batch_initial_detect_volume_spike(threshold: float = 2.0): window_sizes = WINDOW_SIZE.get("window_sizes", None) if ( window_sizes is None or not isinstance(window_sizes, list) or len(window_sizes) == 0 ): window_sizes = [50, 80, 100, 120] huge_volume_main = HugeVolumeMain(threshold) start_date = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-15 00:00:00" ) for window_size in window_sizes: huge_volume_main.batch_initial_detect_volume_spike( window_size=window_size, start=start_date, ) def batch_update_volume_spike(threshold: float = 2.0, is_us_stock: bool = False): window_sizes = WINDOW_SIZE.get("window_sizes", None) if ( window_sizes is None or not isinstance(window_sizes, list) or len(window_sizes) == 0 ): window_sizes = [50, 80, 100, 120] huge_volume_main = HugeVolumeMain(threshold, is_us_stock, is_binance=False) for window_size in window_sizes: huge_volume_main.batch_update_volume_spike(window_size=window_size) def batch_import_binance_data_by_csv(): huge_volume_main = HugeVolumeMain(threshold=2.0, is_us_stock=False, is_binance=True) root_path = "./data/binance/spot/" huge_volume_main.batch_import_binance_data_by_csv(root_path) def test_import_binance_data_by_csv(): huge_volume_main = HugeVolumeMain(threshold=2.0, is_us_stock=False, is_binance=True) file_path = "./data/binance/spot/2020-08-11/SOL-USDT_1h.csv" huge_volume_main.import_binance_data_by_csv( file_path, "SOL-USDT", "1H", [50, 80, 100, 120] ) def test_send_huge_volume_data_to_wechat(): huge_volume_main = HugeVolumeMain(threshold=2.0) # 获得昨天日期 yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") logger.info(f"昨天日期: {yesterday}") # 获得今天日期 today = datetime.now().strftime("%Y-%m-%d") logger.info(f"今天日期: {today}") huge_volume_main.send_huge_volume_data_to_wechat(start=yesterday, end=today) if __name__ == "__main__": test_import_binance_data_by_csv() # batch_import_binance_data_by_csv() # batch_update_volume_spike(threshold=2.0, is_us_stock=True) # test_send_huge_volume_data_to_wechat() # batch_initial_detect_volume_spike(threshold=2.0) # huge_volume_main = HugeVolumeMain(threshold=2.0) # huge_volume_main.batch_next_periods_rise_or_fall(output_excel=True) # data_file_path = "./output/huge_volume_statistics/next_periods_rise_or_fall_stat_20250731200304.xlsx" # sheet_name = "next_periods_statistics" # output_folder = "./output/huge_volume_statistics/" # huge_volume_main.plot_huge_volume_data( # data_file_path=data_file_path, # sheet_name=sheet_name, # output_folder=output_folder, # )