From aee344b0db8c799f4060ca1733acda4deb5345dc Mon Sep 17 00:00:00 2001 From: blade <8019068@qq.com> Date: Thu, 4 Sep 2025 18:15:30 +0800 Subject: [PATCH] support import binance data --- config.py | 113 ++-- core/biz/market_data_from_csv.py | 0 core/db/db_binance_data.py | 525 +++++++++++++++ core/db/db_binance_huge_volume_data.py | 804 +++++++++++++++++++++++ core/utils.py | 9 +- huge_volume_main.py | 169 ++++- market_data_main.py | 317 ++++++--- sql/table/crypto_binance_data.sql | 63 ++ sql/table/crypto_binance_huge_volume.sql | 59 ++ 9 files changed, 1884 insertions(+), 175 deletions(-) delete mode 100644 core/biz/market_data_from_csv.py create mode 100644 core/db/db_binance_data.py create mode 100644 core/db/db_binance_huge_volume_data.py create mode 100644 sql/table/crypto_binance_data.sql create mode 100644 sql/table/crypto_binance_huge_volume.sql diff --git a/config.py b/config.py index 667c6cf..bfa2942 100644 --- a/config.py +++ b/config.py @@ -20,65 +20,94 @@ SANDBOX = False # 交易配置 TRADING_CONFIG = { - "symbol": "BTC-USDT", # 交易对 - "position_size": 0.001, # 每次交易数量(BTC) - + "symbol": "BTC-USDT", # 交易对 + "position_size": 0.001, # 每次交易数量(BTC) # 策略参数 - "sma_short_period": 5, # 短期移动平均线周期 - "sma_long_period": 20, # 长期移动平均线周期 - - "rsi_period": 14, # RSI计算周期 - "rsi_oversold": 30, # RSI超卖阈值 - "rsi_overbought": 70, # RSI超买阈值 - + "sma_short_period": 5, # 短期移动平均线周期 + "sma_long_period": 20, # 长期移动平均线周期 + "rsi_period": 14, # RSI计算周期 + "rsi_oversold": 30, # RSI超卖阈值 + "rsi_overbought": 70, # RSI超买阈值 # 网格交易参数 - "grid_levels": 5, # 网格数量 - "grid_range": 0.005, # 网格范围(0.5%) - + "grid_levels": 5, # 网格数量 + "grid_range": 0.005, # 网格范围(0.5%) # 风险控制 - "max_position": 0.01, # 最大持仓量(BTC) - "stop_loss_pct": 0.005, # 止损百分比(0.5%) - "take_profit_pct": 0.01, # 止盈百分比(1%) + "max_position": 0.01, # 最大持仓量(BTC) + "stop_loss_pct": 0.005, # 止损百分比(0.5%) + "take_profit_pct": 0.01, # 止盈百分比(1%) } # 时间间隔配置 TIME_CONFIG = { - "strategy_interval": 30, # 策略执行间隔(秒) - "kline_interval": "5m", # K线数据间隔 - "kline_limit": 100, # K线数据条数 + "strategy_interval": 30, # 策略执行间隔(秒) + "kline_interval": "5m", # K线数据间隔 + "kline_limit": 100, # K线数据条数 } OKX_MONITOR_CONFIG = { - "volume_monitor":{ - "symbols": ["XCH-USDT","SOL-USDT", - "BTC-USDT", "ETH-USDT", "DOGE-USDT"], + "volume_monitor": { + "symbols": [ + "XCH-USDT", + "BTC-USDT", + "SOL-USDT", + "ETH-USDT", + "DOGE-USDT", + ], "bars": ["5m", "15m", "30m", "1H"], - "initial_date": "2025-05-15 00:00:00" + "initial_date": "2025-05-15 00:00:00", }, - # "volume_monitor":{ - # "symbols": ["BTC-USDT"], - # "bars": ["5m"], - # "initial_date": "2025-05-01 00:00:00" - # }, - "price_monitor":{ + "price_monitor": { "symbols": ["XCH-USDT"], "bats": [ {"bar": "5m", "threshold": 0.025}, - {"bar": "15m", "threshold": 0.5}, - {"bar": "1H", "threshold": 0.1} - ] - } + {"bar": "15m", "threshold": 0.5}, + {"bar": "1H", "threshold": 0.1}, + ], + }, +} + +BINANCE_MONITOR_CONFIG = { + "volume_monitor": { + "symbols": [ + "BTC-USDT", + "ETH-USDT", + "SOL-USDT", + "DOGE-USDT", + "XRP-USDT", + "BNB-USDT", + ], + "bars": ["5m", "30m", "1H"], + "initial_date": "2017-08-16 00:00:00", + }, } US_STOCK_MONITOR_CONFIG = { - "volume_monitor":{ - "symbols": ["QQQ", "TQQQ", "MSFT", "AAPL", "GOOG", "NVDA", "META", "AMZN", "TSLA", "AVGO"], + "volume_monitor": { + "symbols": [ + "QQQ", + "TQQQ", + "MSFT", + "AAPL", + "GOOG", + "NVDA", + "META", + "AMZN", + "AVGO", + "TSLA", + "PLTR", + "COIN", + "MSTR", + "HOOD", + "MARA", + "CVNA", + "XYZ", + ], "bars": ["5m", "15m", "30m", "1H"], - "initial_date": "2015-08-31 00:00:00" + "initial_date": "2014-11-30 00:00:00", } } -WINDOW_SIZE = {"window_sizes":[50, 80, 100, 120]} +WINDOW_SIZE = {"window_sizes": [50, 80, 100, 120]} BAR_THRESHOLD = { "5m": 1000 * 60 * 5, @@ -86,7 +115,7 @@ BAR_THRESHOLD = { "30m": 1000 * 60 * 30, "1H": 1000 * 60 * 60, "4H": 1000 * 60 * 60 * 4, - "1D": 1000 * 60 * 60 * 24 + "1D": 1000 * 60 * 60 * 24, } MYSQL_CONFIG = { @@ -94,11 +123,9 @@ MYSQL_CONFIG = { "port": 3306, "user": "xch", "password": "xch_okx_2025", - "database": "okx" + "database": "okx", } -WECHAT_CONFIG = { - "key": "11e6f7ac-efa9-418a-904c-9325a9f5d324" -} +WECHAT_CONFIG = {"key": "11e6f7ac-efa9-418a-904c-9325a9f5d324"} -ITICK_API_KEY = "dfd4bc0caed148d6bc03b960224754ffb5356349e389431f828702b3a27e8a2b" \ No newline at end of file +ITICK_API_KEY = "dfd4bc0caed148d6bc03b960224754ffb5356349e389431f828702b3a27e8a2b" diff --git a/core/biz/market_data_from_csv.py b/core/biz/market_data_from_csv.py deleted file mode 100644 index e69de29..0000000 diff --git a/core/db/db_binance_data.py b/core/db/db_binance_data.py new file mode 100644 index 0000000..c4ea848 --- /dev/null +++ b/core/db/db_binance_data.py @@ -0,0 +1,525 @@ +import pandas as pd +import core.logger as logging +from core.db.db_manager import DBData +from core.utils import transform_date_time_to_timestamp + +logger = logging.logger + + +class DBBinanceData: + def __init__( + self, + db_url: str + ): + self.db_url = db_url + self.table_name = "crypto_binance_data" + self.columns = [ + "symbol", + "bar", + "timestamp", + "date_time", + "date_time_us", + "open", + "high", + "low", + "close", + "pre_close", + "close_change", + "pct_chg", + "volume", + "volCcy", + "volCCyQuote", + "buy_sz", + "sell_sz", + # 技术指标字段 + "ma1", + "ma2", + "dif", + "dea", + "macd", + "macd_signal", + "macd_divergence", + "kdj_k", + "kdj_d", + "kdj_j", + "kdj_signal", + "kdj_pattern", + "sar", + "sar_signal", + "ma5", + "ma10", + "ma20", + "ma30", + "ma_cross", + "ma5_close_diff", + "ma10_close_diff", + "ma20_close_diff", + "ma30_close_diff", + "ma_close_avg", + "ma_long_short", + "ma_divergence", + "rsi_14", + "rsi_signal", + "boll_upper", + "boll_middle", + "boll_lower", + "boll_signal", + "boll_pattern", + "k_length", + "k_shape", + "k_up_down", + "create_time", + ] + self.db_manager = DBData(db_url, self.table_name, self.columns) + + def insert_data_to_mysql(self, df: pd.DataFrame): + """ + 将K线行情数据保存到MySQL的crypto_binance_data表 + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 适用场景:中小数据量(<10万条) + :param df: K线数据DataFrame + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql(df) + + def insert_data_to_mysql_fast(self, df: pd.DataFrame): + """ + 快速插入K线行情数据(方案2:使用executemany批量插入) + 速度:⭐⭐⭐⭐ 很快 + 内存:⭐⭐⭐⭐⭐ 低 + 适用场景:中等数据量 + :param df: K线数据DataFrame + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_fast(df) + + def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000): + """ + 分块插入K线行情数据(方案3:适合大数据量) + 速度:⭐⭐⭐ 中等 + 内存:⭐⭐⭐⭐⭐ 最低 + 适用场景:大数据量(>10万条) + :param df: K线数据DataFrame + :param chunk_size: 分块大小 + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) + + def insert_data_to_mysql_simple(self, df: pd.DataFrame): + """ + 简单插入K线行情数据(方案4:直接使用to_sql,忽略重复) + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 注意:会抛出重复键错误,需要额外处理 + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_simple(df) + + def query_latest_data(self, symbol: str, bar: str): + """ + 查询最新数据 + :param symbol: 交易对 + :param bar: K线周期 + """ + sql = """ + SELECT * FROM crypto_binance_data + WHERE symbol = :symbol AND bar = :bar + ORDER BY timestamp DESC + LIMIT 1 + """ + condition_dict = {"symbol": symbol, "bar": bar} + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def query_data_before_timestamp(self, symbol: str, bar: str, timestamp: int, limit: int = 100): + """ + 根据时间戳查询之前的数据 + :param symbol: 交易对 + :param bar: K线周期 + :param timestamp: 时间戳 + :param limit: 查询数量 + """ + sql = """ + SELECT * FROM crypto_binance_data + WHERE symbol = :symbol AND bar = :bar AND timestamp < :timestamp + ORDER BY timestamp DESC + LIMIT :limit + """ + condition_dict = {"symbol": symbol, "bar": bar, "timestamp": timestamp, "limit": limit} + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_data_by_technical_indicators( + self, + symbol: str, + bar: str, + start: str = None, + end: str = None, + macd_signal: str = None, + kdj_signal: str = None, + rsi_signal: str = None, + boll_signal: str = None, + ma_cross: str = None + ): + """ + 根据技术指标查询数据 + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + :param macd_signal: MACD信号 + :param kdj_signal: KDJ信号 + :param rsi_signal: RSI信号 + :param boll_signal: 布林带信号 + :param ma_cross: 均线交叉信号 + """ + conditions = ["symbol = :symbol", "bar = :bar"] + condition_dict = {"symbol": symbol, "bar": bar} + + if macd_signal: + conditions.append("macd_signal = :macd_signal") + condition_dict["macd_signal"] = macd_signal + if kdj_signal: + conditions.append("kdj_signal = :kdj_signal") + condition_dict["kdj_signal"] = kdj_signal + if rsi_signal: + conditions.append("rsi_signal = :rsi_signal") + condition_dict["rsi_signal"] = rsi_signal + if boll_signal: + conditions.append("boll_signal = :boll_signal") + condition_dict["boll_signal"] = boll_signal + if ma_cross: + conditions.append("ma_cross = :ma_cross") + condition_dict["ma_cross"] = ma_cross + + # 处理时间范围 + if start: + start_timestamp = transform_date_time_to_timestamp(start) + if start_timestamp: + conditions.append("timestamp >= :start") + condition_dict["start"] = start_timestamp + if end: + end_timestamp = transform_date_time_to_timestamp(end) + if end_timestamp: + conditions.append("timestamp <= :end") + condition_dict["end"] = end_timestamp + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_data + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_macd_signals( + self, + symbol: str, + bar: str, + signal: str = None, + start: str = None, + end: str = None + ): + """ + 查询MACD信号数据 + :param symbol: 交易对 + :param bar: K线周期 + :param signal: MACD信号类型 + :param start: 开始时间 + :param end: 结束时间 + """ + conditions = ["symbol = :symbol", "bar = :bar"] + condition_dict = {"symbol": symbol, "bar": bar} + + if signal: + conditions.append("macd_signal = :signal") + condition_dict["signal"] = signal + + # 处理时间范围 + if start: + start_timestamp = transform_date_time_to_timestamp(start) + if start_timestamp: + conditions.append("timestamp >= :start") + condition_dict["start"] = start_timestamp + if end: + end_timestamp = transform_date_time_to_timestamp(end) + if end_timestamp: + conditions.append("timestamp <= :end") + condition_dict["end"] = end_timestamp + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_data + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_kdj_signals( + self, + symbol: str, + bar: str, + signal: str = None, + pattern: str = None, + start: str = None, + end: str = None + ): + """ + 查询KDJ信号数据 + :param symbol: 交易对 + :param bar: K线周期 + :param signal: KDJ信号类型 + :param pattern: KDJ模式 + :param start: 开始时间 + :param end: 结束时间 + """ + conditions = ["symbol = :symbol", "bar = :bar"] + condition_dict = {"symbol": symbol, "bar": bar} + + if signal: + conditions.append("kdj_signal = :signal") + condition_dict["signal"] = signal + if pattern: + conditions.append("kdj_pattern = :pattern") + condition_dict["pattern"] = pattern + + # 处理时间范围 + if start: + start_timestamp = transform_date_time_to_timestamp(start) + if start_timestamp: + conditions.append("timestamp >= :start") + condition_dict["start"] = start_timestamp + if end: + end_timestamp = transform_date_time_to_timestamp(end) + if end_timestamp: + conditions.append("timestamp <= :end") + condition_dict["end"] = end_timestamp + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_data + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_ma_signals( + self, + symbol: str, + bar: str, + cross: str = None, + long_short: str = None, + divergence: str = None, + start: str = None, + end: str = None + ): + """ + 查询均线信号数据 + :param symbol: 交易对 + :param bar: K线周期 + :param cross: 均线交叉信号 + :param long_short: 均线多空 + :param divergence: 均线发散 + :param start: 开始时间 + :param end: 结束时间 + """ + conditions = ["symbol = :symbol", "bar = :bar"] + condition_dict = {"symbol": symbol, "bar": bar} + + if cross: + conditions.append("ma_cross = :cross") + condition_dict["cross"] = cross + if long_short: + conditions.append("ma_long_short = :long_short") + condition_dict["long_short"] = long_short + if divergence: + conditions.append("ma_divergence = :divergence") + condition_dict["divergence"] = divergence + + # 处理时间范围 + if start: + start_timestamp = transform_date_time_to_timestamp(start) + if start_timestamp: + conditions.append("timestamp >= :start") + condition_dict["start"] = start_timestamp + if end: + end_timestamp = transform_date_time_to_timestamp(end) + if end_timestamp: + conditions.append("timestamp <= :end") + condition_dict["end"] = end_timestamp + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_data + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_bollinger_signals( + self, + symbol: str, + bar: str, + signal: str = None, + pattern: str = None, + start: str = None, + end: str = None + ): + """ + 查询布林带信号数据 + :param symbol: 交易对 + :param bar: K线周期 + :param signal: 布林带信号 + :param pattern: 布林带模式 + :param start: 开始时间 + :param end: 结束时间 + """ + conditions = ["symbol = :symbol", "bar = :bar"] + condition_dict = {"symbol": symbol, "bar": bar} + + if signal: + conditions.append("boll_signal = :signal") + condition_dict["signal"] = signal + if pattern: + conditions.append("boll_pattern = :pattern") + condition_dict["pattern"] = pattern + + # 处理时间范围 + if start: + start_timestamp = transform_date_time_to_timestamp(start) + if start_timestamp: + conditions.append("timestamp >= :start") + condition_dict["start"] = start_timestamp + if end: + end_timestamp = transform_date_time_to_timestamp(end) + if end_timestamp: + conditions.append("timestamp <= :end") + condition_dict["end"] = end_timestamp + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_data + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def get_technical_statistics( + self, + symbol: str, + bar: str, + start: str = None, + end: str = None + ): + """ + 获取技术指标统计信息 + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + """ + conditions = ["symbol = :symbol", "bar = :bar"] + condition_dict = {"symbol": symbol, "bar": bar} + + # 处理时间范围 + if start: + start_timestamp = transform_date_time_to_timestamp(start) + if start_timestamp: + conditions.append("timestamp >= :start") + condition_dict["start"] = start_timestamp + if end: + end_timestamp = transform_date_time_to_timestamp(end) + if end_timestamp: + conditions.append("timestamp <= :end") + condition_dict["end"] = end_timestamp + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT + COUNT(*) as total_records, + COUNT(CASE WHEN macd_signal IS NOT NULL THEN 1 END) as macd_signal_count, + COUNT(CASE WHEN kdj_signal IS NOT NULL THEN 1 END) as kdj_signal_count, + COUNT(CASE WHEN rsi_signal IS NOT NULL THEN 1 END) as rsi_signal_count, + COUNT(CASE WHEN boll_signal IS NOT NULL THEN 1 END) as boll_signal_count, + COUNT(CASE WHEN ma_cross IS NOT NULL THEN 1 END) as ma_cross_count, + AVG(ma5_close_diff) as avg_ma5_close_diff, + AVG(ma10_close_diff) as avg_ma10_close_diff, + AVG(ma20_close_diff) as avg_ma20_close_diff, + AVG(ma30_close_diff) as avg_ma30_close_diff, + AVG(ma_close_avg) as avg_ma_close_avg, + AVG(rsi_14) as avg_rsi_14, + AVG(boll_upper) as avg_boll_upper, + AVG(boll_middle) as avg_boll_middle, + AVG(boll_lower) as avg_boll_lower + FROM crypto_binance_data + WHERE {where_clause} + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def query_market_data_by_symbol_bar(self, symbol: str, bar: str, start: str = None, end: str = None): + """ + 根据交易对和K线周期查询数据 + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + """ + if start is None and end is None: + sql = """ + SELECT * FROM crypto_binance_data + WHERE symbol = :symbol AND bar = :bar + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar} + else: + if start is not None: + start = transform_date_time_to_timestamp(start) + if start is None: + logger.warning(f"开始时间格式错误: {start}") + return None + if end is not None: + end = transform_date_time_to_timestamp(end) + if end is None: + logger.warning(f"结束时间格式错误: {end}") + return None + if start is not None and end is not None: + if start > end: + start, end = end, start + sql = """ + SELECT * FROM crypto_binance_data + WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end} + elif start is not None: + sql = """ + SELECT * FROM crypto_binance_data + WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "start": start} + elif end is not None: + sql = """ + SELECT * FROM crypto_binance_data + WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "end": end} + return self.db_manager.query_data(sql, condition_dict, return_multi=True) \ No newline at end of file diff --git a/core/db/db_binance_huge_volume_data.py b/core/db/db_binance_huge_volume_data.py new file mode 100644 index 0000000..3e8a72b --- /dev/null +++ b/core/db/db_binance_huge_volume_data.py @@ -0,0 +1,804 @@ +import pandas as pd +import core.logger as logging +from typing import Optional, List, Dict, Any, Union +from core.db.db_manager import DBData +from core.utils import transform_date_time_to_timestamp + +logger = logging.logger + +class DBBinanceHugeVolumeData: + def __init__( + self, + db_url: str + ): + self.db_url = db_url + self.table_name = "crypto_binance_huge_volume" + self.columns = [ + "symbol", + "bar", + "window_size", + "timestamp", + "date_time", + "open", + "high", + "low", + "close", + "volume", + "volCcy", + "volCCyQuote", + "volume_ma", + "volume_std", + "volume_threshold", + "huge_volume", + "volume_ratio", + "spike_intensity", + "close_80_percentile", + "close_20_percentile", + "close_80_high", + "close_20_low", + "close_90_percentile", + "close_10_percentile", + "close_90_high", + "close_10_low", + "high_80_percentile", + "high_20_percentile", + "high_80_high", + "high_20_low", + "high_90_percentile", + "high_10_percentile", + "high_90_high", + "high_10_low", + "low_80_percentile", + "low_20_percentile", + "low_80_high", + "low_20_low", + "low_90_percentile", + "low_10_percentile", + "low_90_high", + "low_10_low", + "create_time", + ] + self.db_manager = DBData(db_url, self.table_name, self.columns) + + def _process_time_parameter(self, time_param: Optional[Union[str, int]]) -> Optional[int]: + """ + 处理时间参数,统一转换为时间戳 + :param time_param: 时间参数(字符串或整数) + :return: 时间戳或None + """ + if time_param is None: + return None + time_param = transform_date_time_to_timestamp(time_param) + if time_param is None: + return None + return time_param + + def _build_query_conditions( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None, + additional_conditions: Optional[List[str]] = None + ) -> tuple[List[str], Dict[str, Any]]: + """ + 构建查询条件 + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param start: 开始时间 + :param end: 结束时间 + :param additional_conditions: 额外的查询条件 + :return: (条件列表, 参数字典) + """ + conditions = additional_conditions or [] + condition_dict = {} + + if symbol: + conditions.append("symbol = :symbol") + condition_dict["symbol"] = symbol + if bar: + conditions.append("bar = :bar") + condition_dict["bar"] = bar + if window_size: + conditions.append("window_size = :window_size") + condition_dict["window_size"] = window_size + # 处理时间参数 + start_timestamp = self._process_time_parameter(start) + end_timestamp = self._process_time_parameter(end) + + if start_timestamp is not None: + conditions.append("timestamp >= :start") + condition_dict["start"] = start_timestamp + if end_timestamp is not None: + conditions.append("timestamp <= :end") + condition_dict["end"] = end_timestamp + + return conditions, condition_dict + + def insert_data_to_mysql(self, df: pd.DataFrame) -> None: + """ + 将巨量交易数据保存到MySQL的crypto_binance_huge_volume表 + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 适用场景:中小数据量(<10万条) + :param df: 巨量交易数据DataFrame + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql(df) + + def insert_data_to_mysql_fast(self, df: pd.DataFrame) -> None: + """ + 快速插入巨量交易数据(方案2:使用executemany批量插入) + 速度:⭐⭐⭐⭐ 很快 + 内存:⭐⭐⭐⭐⭐ 低 + 适用场景:中等数据量 + :param df: 巨量交易数据DataFrame + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_fast(df) + + def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000) -> None: + """ + 分块插入巨量交易数据(方案3:适合大数据量) + 速度:⭐⭐⭐ 中等 + 内存:⭐⭐⭐⭐⭐ 最低 + 适用场景:大数据量(>10万条) + :param df: 巨量交易数据DataFrame + :param chunk_size: 分块大小 + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) + + def insert_data_to_mysql_simple(self, df: pd.DataFrame) -> None: + """ + 简单插入巨量交易数据(方案4:直接使用to_sql,忽略重复) + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 注意:会抛出重复键错误,需要额外处理 + """ + if df is None or df.empty: + logger.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_simple(df) + + def query_latest_data(self, symbol: str, bar: str, window_size: int) -> Optional[Dict[str, Any]]: + """ + 查询最新巨量交易数据 + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :return: 最新数据记录或None + """ + sql = """ + SELECT * FROM crypto_binance_huge_volume + WHERE symbol = :symbol AND bar = :bar AND window_size = :window_size + ORDER BY timestamp DESC + LIMIT 1 + """ + condition_dict = {"symbol": symbol, "bar": bar, "window_size": window_size} + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def query_data_by_symbol_bar_window_size_timestamp(self, symbol: str, bar: str, window_size: int, timestamp: int) -> Optional[Dict[str, Any]]: + """ + 根据交易对、K线周期, 窗口大小和时间戳查询巨量交易数据 + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param timestamp: 时间戳 + :return: 数据记录或None + """ + sql = """ + SELECT * FROM crypto_binance_huge_volume + WHERE symbol = :symbol AND bar = :bar AND window_size = :window_size AND timestamp = :timestamp + """ + condition_dict = {"symbol": symbol, "bar": bar, "window_size": window_size, "timestamp": timestamp} + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def query_huge_volume_data_by_symbol_bar_window_size( + self, + symbol: str, + bar: str, + window_size: int, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 根据交易对、K线周期和窗口大小查询巨量交易数据 + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param start: 开始时间 + :param end: 结束时间 + :return: 数据记录列表或None + """ + conditions, condition_dict = self._build_query_conditions(symbol, bar, window_size, start, end) + + where_clause = " AND ".join(conditions) if conditions else "1=1" + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp ASC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_huge_volume_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询巨量交易记录(只返回huge_volume=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param start: 开始时间 + :param end: 结束时间 + :return: 巨量交易记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["huge_volume = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_volume_80_20_price_spike_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询80/20量价尖峰记录(只返回close_80_high=1或close_20_low=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param start: 开始时间 + :param end: 结束时间 + :return: 80/20量价尖峰记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["(close_80_high = 1 OR close_20_low = 1)"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_volume_90_10_price_spike_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询90/10量价尖峰记录(只返回close_90_high=1或close_10_low=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param start: 开始时间 + :param end: 结束时间 + :return: 90/10量价尖峰记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["(close_90_high = 1 OR close_10_low = 1)"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_close_80_high_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询价格达到80%分位数高点的记录(只返回close_80_high=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param start: 开始时间 + :param end: 结束时间 + :return: 价格80%分位数高点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["close_80_high = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_close_20_low_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询价格达到20%分位数低点的记录(只返回close_20_low=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + :return: 价格20%分位数低点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["close_20_low = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_close_90_high_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询价格达到90%分位数高点的记录(只返回close_90_high=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + :return: 价格90%分位数高点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["close_90_high = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_close_10_low_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询价格达到10%分位数低点的记录(只返回close_10_low=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + :return: 价格10%分位数低点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["close_10_low = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_high_80_high_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询最高价达到80%分位数高点的记录(只返回high_80_high=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param start: 开始时间 + :param end: 结束时间 + :return: 最高价80%分位数高点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["high_80_high = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_high_20_low_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询最高价达到20%分位数低点的记录(只返回high_20_low=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + :return: 最高价20%分位数低点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["high_20_low = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_high_90_high_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询最高价达到90%分位数高点的记录(只返回high_90_high=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + :return: 最高价90%分位数高点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["high_90_high = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_high_10_low_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询最高价达到10%分位数低点的记录(只返回high_10_low=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + :return: 最高价10%分位数低点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["high_10_low = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_low_80_high_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询最低价达到80%分位数高点的记录(只返回low_80_high=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param start: 开始时间 + :param end: 结束时间 + :return: 最低价80%分位数高点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["low_80_high = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_low_20_low_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询最低价达到20%分位数低点的记录(只返回low_20_low=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + :return: 最低价20%分位数低点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["low_20_low = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_low_90_high_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询最低价达到90%分位数高点的记录(只返回low_90_high=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + :return: 最低价90%分位数高点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["low_90_high = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_low_10_low_records( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + 查询最低价达到10%分位数低点的记录(只返回low_10_low=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + :return: 最低价10%分位数低点记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, start, end, additional_conditions=["low_10_low = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def get_statistics_summary( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[Dict[str, Any]]: + """ + 获取巨量交易统计摘要 + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param start: 开始时间 + :param end: 结束时间 + :return: 统计摘要或None + """ + conditions, condition_dict = self._build_query_conditions(symbol, bar, window_size, start, end) + + where_clause = " AND ".join(conditions) if conditions else "1=1" + sql = f""" + SELECT + COUNT(*) as total_records, + SUM(huge_volume) as huge_volume_count, + SUM(close_80_high) as close_80_high_count, + SUM(close_20_low) as close_20_low_count, + SUM(close_90_high) as close_90_high_count, + SUM(close_10_low) as close_10_low_count, + SUM(high_80_high) as high_80_high_count, + SUM(high_20_low) as high_20_low_count, + SUM(high_90_high) as high_90_high_count, + SUM(high_10_low) as high_10_low_count, + SUM(low_80_high) as low_80_high_count, + SUM(low_20_low) as low_20_low_count, + SUM(low_90_high) as low_90_high_count, + SUM(low_10_low) as low_10_low_count, + AVG(volume_ratio) as avg_volume_ratio, + MAX(volume_ratio) as max_volume_ratio, + AVG(spike_intensity) as avg_spike_intensity, + MAX(spike_intensity) as max_spike_intensity + FROM crypto_binance_huge_volume + WHERE {where_clause} + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def get_top_volume_spikes( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + limit: int = 10 + ) -> Optional[List[Dict[str, Any]]]: + """ + 获取成交量尖峰最高的记录 + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param limit: 返回记录数量 + :return: 成交量尖峰记录列表或None + """ + conditions, condition_dict = self._build_query_conditions( + symbol, bar, window_size, additional_conditions=["huge_volume = 1"] + ) + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_binance_huge_volume + WHERE {where_clause} + ORDER BY volume_ratio DESC + LIMIT :limit + """ + condition_dict["limit"] = limit + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def get_percentile_statistics( + self, + symbol: Optional[str] = None, + bar: Optional[str] = None, + window_size: Optional[int] = None, + start: Optional[Union[str, int]] = None, + end: Optional[Union[str, int]] = None + ) -> Optional[Dict[str, Any]]: + """ + 获取分位数统计信息 + :param symbol: 交易对 + :param bar: K线周期 + :param window_size: 窗口大小 + :param start: 开始时间 + :param end: 结束时间 + :return: 分位数统计信息或None + """ + conditions, condition_dict = self._build_query_conditions(symbol, bar, window_size, start, end) + + where_clause = " AND ".join(conditions) if conditions else "1=1" + sql = f""" + SELECT + AVG(close_80_percentile) as avg_close_80_percentile, + AVG(close_20_percentile) as avg_close_20_percentile, + AVG(close_90_percentile) as avg_close_90_percentile, + AVG(close_10_percentile) as avg_close_10_percentile, + MAX(close_80_percentile) as max_close_80_percentile, + MIN(close_20_percentile) as min_close_20_percentile, + MAX(close_90_percentile) as max_close_90_percentile, + MIN(close_10_percentile) as min_close_10_percentile, + AVG(high_80_percentile) as avg_high_80_percentile, + AVG(high_20_percentile) as avg_high_20_percentile, + AVG(high_90_percentile) as avg_high_90_percentile, + AVG(high_10_percentile) as avg_high_10_percentile, + MAX(high_80_percentile) as max_high_80_percentile, + MIN(high_20_percentile) as min_high_20_percentile, + MAX(high_90_percentile) as max_high_90_percentile, + MIN(high_10_percentile) as min_high_10_percentile, + AVG(low_80_percentile) as avg_low_80_percentile, + AVG(low_20_percentile) as avg_low_20_percentile, + AVG(low_90_percentile) as avg_low_90_percentile, + AVG(low_10_percentile) as avg_low_10_percentile, + MAX(low_80_percentile) as max_low_80_percentile, + MIN(low_20_percentile) as min_low_20_percentile, + MAX(low_90_percentile) as max_low_90_percentile, + MIN(low_10_percentile) as min_low_10_percentile + FROM crypto_binance_huge_volume + WHERE {where_clause} + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=False) diff --git a/core/utils.py b/core/utils.py index 8aeecf4..b9bd101 100644 --- a/core/utils.py +++ b/core/utils.py @@ -5,13 +5,18 @@ import core.logger as logging logger = logging.logger -def datetime_to_timestamp(date_str: str) -> int: +def datetime_to_timestamp(date_str: str, is_utc: bool = False) -> int: """ 将日期字符串(如 '2023-01-01 12:00:00')直接转换为毫秒级时间戳 :param date_str: 形如 '2023-01-01 12:00:00' 的日期时间字符串 + :param is_utc: 是否为 UTC 时间;False 时按北京时间(UTC+8)解析 :return: 毫秒级时间戳 """ - dt = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') + if is_utc: + dt = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc) + else: + # 默认按北京时间(UTC+8)解析 + dt = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone(timedelta(hours=8))) return int(dt.timestamp() * 1000) def timestamp_to_datetime(timestamp: int) -> str: diff --git a/huge_volume_main.py b/huge_volume_main.py index 789c58b..8ad15b7 100644 --- a/huge_volume_main.py +++ b/huge_volume_main.py @@ -2,11 +2,19 @@ from core.biz.huge_volume import HugeVolume from core.biz.huge_volume_chart import HugeVolumeChart from core.db.db_market_data import DBMarketData from core.db.db_huge_volume_data import DBHugeVolumeData +from core.db.db_binance_data import DBBinanceData +from core.db.db_binance_huge_volume_data import DBBinanceHugeVolumeData from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp from market_data_main import MarketDataMain from core.wechat import Wechat import core.logger as logging -from config import OKX_MONITOR_CONFIG, US_STOCK_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE +from config import ( + OKX_MONITOR_CONFIG, + US_STOCK_MONITOR_CONFIG, + MYSQL_CONFIG, + WINDOW_SIZE, + BINANCE_MONITOR_CONFIG, +) from datetime import datetime, timedelta import pandas as pd import os @@ -16,7 +24,12 @@ logger = logging.logger class HugeVolumeMain: - def __init__(self, threshold: float = 2.0, is_us_stock: bool = False): + def __init__( + self, + threshold: float = 2.0, + is_us_stock: bool = False, + is_binance: bool = False, + ): mysql_user = MYSQL_CONFIG.get("user", "xch") mysql_password = MYSQL_CONFIG.get("password", "") if not mysql_password: @@ -27,11 +40,18 @@ class HugeVolumeMain: self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" self.huge_volume = HugeVolume() - self.db_market_data = DBMarketData(self.db_url) - self.db_huge_volume_data = DBHugeVolumeData(self.db_url) - self.market_data_main = MarketDataMain(is_us_stock=is_us_stock) + if is_binance: + self.db_market_data = DBBinanceData(self.db_url) + self.db_huge_volume_data = DBBinanceHugeVolumeData(self.db_url) + else: + self.db_market_data = DBMarketData(self.db_url) + self.db_huge_volume_data = DBHugeVolumeData(self.db_url) + self.market_data_main = MarketDataMain( + is_us_stock=is_us_stock, is_binance=is_binance + ) self.threshold = threshold - + self.is_us_stock = is_us_stock + self.is_binance = is_binance self.output_folder = "./output/huge_volume_statistics/" os.makedirs(self.output_folder, exist_ok=True) @@ -67,7 +87,7 @@ class HugeVolumeMain: symbol: str = "XCH-USDT", bar: str = "5m", window_size: int = 50, - start: str = "2025-05-01 00:00:00", + start: str = None, end: str = None, only_output_huge_volume: bool = False, is_update: bool = False, @@ -78,9 +98,14 @@ class HugeVolumeMain: "initial_date", "2015-08-31 00:00:00" ) else: - start = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( - "initial_date", "2025-05-01 00:00:00" - ) + if self.is_binance: + start = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get( + "initial_date", "2017-08-16 00:00:00" + ) + else: + start = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( + "initial_date", "2025-05-01 00:00:00" + ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -116,17 +141,19 @@ class HugeVolumeMain: ) if data is not None: if is_update: - for index, row in data.iterrows(): - exist_huge_volume_data = self.db_huge_volume_data.query_data_by_symbol_bar_window_size_timestamp( - symbol, bar, window_size, row["timestamp"] - ) - if exist_huge_volume_data is not None: - # remove the exist_huge_volume_data from data - data = data[ - data["timestamp"] != exist_huge_volume_data["timestamp"] - ] + min_timestamp = int(data["timestamp"].min()) + max_timestamp = int(data["timestamp"].max()) + exist_data = self.db_huge_volume_data.query_huge_volume_data_by_symbol_bar_window_size( + symbol, bar, window_size, min_timestamp, max_timestamp + ) + if exist_data is not None and len(exist_data) > 0: + exist_data = pd.DataFrame(exist_data) + data = data[~data["timestamp"].isin(exist_data["timestamp"])] + if data is not None and len(data) > 0: + data = data[self.db_huge_volume_data.columns] self.db_huge_volume_data.insert_data_to_mysql(data) + logger.info(f"此次处理巨量交易数据: {len(data)}条") else: logger.warning( f"此次处理巨量交易数据为空: {symbol} {bar} {start} {end}" @@ -138,11 +165,101 @@ class HugeVolumeMain: def batch_update_volume_spike(self, window_size: int = 50): for symbol in self.market_data_main.symbols: for bar in self.market_data_main.bars: - self.update_volume_spike(symbol, bar, window_size) + try: + self.market_data_main.update_data(symbol, bar) + self.update_volume_spike(symbol, bar, window_size) + except Exception as e: + logger.error( + f"更新巨量交易数据失败: {symbol} {bar} 窗口大小: {window_size}: {e}" + ) + + def batch_import_binance_data_by_csv(self, root_path: str): + """ + 批量从binance csv文件导入数据 + 文件名示例:BTC-USDT_5m.csv,BTC-USDT_30m.csv,BTC-USDT_1h.csv + :param root_path: 根路径 + """ + if root_path is None: + logger.error("root_path is None") + return + if not os.path.exists(root_path): + logger.error(f"root_path: {root_path} 不存在") + return + window_sizes = WINDOW_SIZE.get("window_sizes", None) + if ( + window_sizes is None + or not isinstance(window_sizes, list) + or len(window_sizes) == 0 + ): + window_sizes = [50, 80, 100, 120] + + folders = os.listdir(root_path) + logger.info(f"共有{len(folders)}个文件夹") + symbols = self.market_data_main.symbols + bars = self.market_data_main.bars + for folder in folders: + if not os.path.isdir(os.path.join(root_path, folder)): + continue + logger.info(f"开始处理文件夹: {folder}") + files = os.listdir(os.path.join(root_path, folder)) + for file in files: + if not os.path.isfile(os.path.join(root_path, folder, file)): + continue + if not file.endswith(".csv"): + continue + full_file_path = os.path.join(root_path, folder, file) + file_pure_name = file.split(".")[0] + symbol = file_pure_name.split("_")[0] + bar = file_pure_name.split("_")[1] + if bar == "1h": + bar = "1H" + if symbol not in symbols or bar not in bars: + continue + logger.info(f"开始处理文件: {file} {symbol} {bar}") + self.import_binance_data_by_csv( + full_file_path, symbol, bar, window_sizes + ) + + def import_binance_data_by_csv( + self, full_file_path: str, symbol: str, bar: str, window_sizes: list + ): + """ + 从binance csv文件导入数据 + :param full_file_path: 文件路径 + :param symbol: 虚拟货币名称 + :param bar: 时间周期 + """ + if full_file_path is None or symbol is None or bar is None: + logger.error("信息不完整") + return + if not os.path.exists(full_file_path): + logger.error(f"文件不存在: {full_file_path}") + return + df = pd.read_csv(full_file_path, encoding="GBK") + if df is None or len(df) == 0: + logger.error(f"文件为空: {full_file_path}") + return + columns = list(df) + if "邢不行" in columns[0] or "Unnamed" in columns[1]: + # 将第一行作为列名 + df.columns = df.iloc[0] + df = df.iloc[1:] + df.reset_index(drop=True, inplace=True) + + df = self.market_data_main.adjust_binance_csv_data(symbol, bar, df) + df = self.market_data_main.post_save_data(df) + min_start_time_ts = int(df["timestamp"].min()) + max_start_time_ts = int(df["timestamp"].max()) + df = self.market_data_main.post_calculate_metrics( + symbol, bar, min_start_time_ts, max_start_time_ts + ) + df = df.sort_values(by="timestamp", ascending=True) + df = df.reset_index(drop=True) + for window_size in window_sizes: + self.update_volume_spike(symbol, bar, window_size) def update_volume_spike(self, symbol: str, bar: str, window_size: int = 50): try: - self.market_data_main.update_data(symbol, bar) latest_huge_volume_data = self.db_huge_volume_data.query_latest_data( symbol, bar, window_size ) @@ -498,6 +615,12 @@ def batch_update_volume_spike(threshold: float = 2.0, is_us_stock: bool = False) huge_volume_main.batch_update_volume_spike(window_size=window_size) +def batch_import_binance_data_by_csv(): + huge_volume_main = HugeVolumeMain(threshold=2.0, is_us_stock=False, is_binance=True) + root_path = "./data/binance/spot/" + huge_volume_main.batch_import_binance_data_by_csv(root_path) + + def test_send_huge_volume_data_to_wechat(): huge_volume_main = HugeVolumeMain(threshold=2.0) # 获得昨天日期 @@ -510,9 +633,11 @@ def test_send_huge_volume_data_to_wechat(): if __name__ == "__main__": + batch_import_binance_data_by_csv() + # batch_update_volume_spike(threshold=2.0, is_us_stock=False) # test_send_huge_volume_data_to_wechat() # batch_initial_detect_volume_spike(threshold=2.0) - batch_update_volume_spike(threshold=2.0, is_us_stock=False) + # huge_volume_main = HugeVolumeMain(threshold=2.0) # huge_volume_main.batch_next_periods_rise_or_fall(output_excel=True) # data_file_path = "./output/huge_volume_statistics/next_periods_rise_or_fall_stat_20250731200304.xlsx" diff --git a/market_data_main.py b/market_data_main.py index f485827..00c8543 100644 --- a/market_data_main.py +++ b/market_data_main.py @@ -4,6 +4,7 @@ from time import sleep import pandas as pd from core.biz.market_data import MarketData from core.db.db_market_data import DBMarketData +from core.db.db_binance_data import DBBinanceData from core.biz.metrics_calculation import MetricsCalculation from core.utils import ( datetime_to_timestamp, @@ -17,6 +18,7 @@ from config import ( PASSPHRASE, SANDBOX, OKX_MONITOR_CONFIG, + BINANCE_MONITOR_CONFIG, US_STOCK_MONITOR_CONFIG, MYSQL_CONFIG, BAR_THRESHOLD, @@ -26,7 +28,7 @@ logger = logging.logger class MarketDataMain: - def __init__(self, is_us_stock: bool = False): + def __init__(self, is_us_stock: bool = False, is_binance: bool = False): self.market_data = MarketData( api_key=API_KEY, secret_key=SECRET_KEY, @@ -44,6 +46,16 @@ class MarketDataMain: self.initial_date = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2015-08-30 00:00:00" ) + elif is_binance: + self.symbols = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get( + "symbols", ["BTC-USDT"] + ) + self.bars = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get( + "bars", ["5m", "30m", "1H"] + ) + self.initial_date = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get( + "initial_date", "2017-08-17 00:00:00" + ) else: self.symbols = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( "symbols", ["XCH-USDT"] @@ -63,7 +75,11 @@ class MarketDataMain: mysql_database = MYSQL_CONFIG.get("database", "okx") self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" - self.db_market_data = DBMarketData(self.db_url) + if is_binance: + self.db_market_data = DBBinanceData(self.db_url) + else: + self.db_market_data = DBMarketData(self.db_url) + self.is_binance = is_binance self.trade_data_main = TradeDataMain() self.is_us_stock = is_us_stock @@ -128,6 +144,7 @@ class MarketDataMain: get_data = False min_start_time_ts = start_time_ts + max_start_time_ts = None while start_time_ts < end_time_ts: current_start_time_ts = int(end_time_ts - threshold) if current_start_time_ts < start_time_ts: @@ -149,133 +166,215 @@ class MarketDataMain: limit=limit, ) if data is not None and len(data) > 0: - data["buy_sz"] = -1 - data["sell_sz"] = -1 - - if data is not None and len(data) > 0: - data = data[ - [ - "symbol", - "bar", - "timestamp", - "date_time", - "date_time_us", - "open", - "high", - "low", - "close", - "volume", - "volCcy", - "volCCyQuote", - "buy_sz", - "sell_sz", - "create_time", - ] - ] - data = self.add_new_columns(data) - self.db_market_data.insert_data_to_mysql(data) + data = self.post_save_data(data) current_min_start_time_ts = int(data["timestamp"].min()) if current_min_start_time_ts < min_start_time_ts: min_start_time_ts = current_min_start_time_ts - + current_max_start_time_ts = int(data["timestamp"].max()) + if max_start_time_ts is None: + max_start_time_ts = current_max_start_time_ts + else: + if current_max_start_time_ts > max_start_time_ts: + max_start_time_ts = current_max_start_time_ts get_data = True else: - logger.warning(f"获取行情数据为空: {symbol} {bar} 从 {start_date_time} 到 {end_date_time}") + logger.warning( + f"获取行情数据为空: {symbol} {bar} 从 {start_date_time} 到 {end_date_time}" + ) break if current_start_time_ts == start_time_ts: break - + if current_min_start_time_ts < current_start_time_ts: end_time_ts = current_min_start_time_ts else: end_time_ts = current_start_time_ts - if min_start_time_ts is not None and get_data: + if get_data: # 补充技术指标数据 # 获得min_start_time_ts之前30条数据 logger.info(f"开始补充技术指标数据: {symbol} {bar}") - before_data = self.db_market_data.query_data_before_timestamp( - symbol, bar, min_start_time_ts, 30 + data = self.post_calculate_metrics( + symbol, bar, min_start_time_ts, max_start_time_ts ) - latest_before_timestamp = None - if before_data is not None and len(before_data) > 0: - earliest_timestamp = before_data[-1]["timestamp"] - latest_before_timestamp = before_data[0]["timestamp"] - else: - earliest_timestamp = min_start_time_ts - handle_data = self.db_market_data.query_market_data_by_symbol_bar( - symbol=symbol, bar=bar, start=earliest_timestamp, end=None - ) - if handle_data is not None: - if before_data is not None and len(handle_data) <= len(before_data): - logger.error(f"handle_data数据条数小于before_data数据条数: {symbol} {bar}") - return None - if isinstance(handle_data, list): - handle_data = pd.DataFrame(handle_data) - elif isinstance(handle_data, dict): - handle_data = pd.DataFrame([handle_data]) - elif isinstance(handle_data, pd.DataFrame): - pass - else: - logger.error(f"handle_data类型错误: {type(handle_data)}") - return None - - handle_data = self.calculate_metrics(handle_data) - if latest_before_timestamp is not None: - handle_data = handle_data[handle_data["timestamp"] > latest_before_timestamp] - handle_data.reset_index(drop=True, inplace=True) - logger.info(f"开始保存技术指标数据: {symbol} {bar}") - self.db_market_data.insert_data_to_mysql(handle_data) return data + def adjust_binance_csv_data(self, symbol: str, bar: str, data: pd.DataFrame): + """ + 调整binance csv数据 + """ + data["symbol"] = symbol + data["bar"] = bar + data["timestamp"] = None + data["date_time"] = None + data["date_time_us"] = None + data["volCcy"] = None + data["volCCyQuote"] = None + data["create_time"] = None + + for index, row in data.iterrows(): + candle_begin_time = row["candle_begin_time"] + timestamp = datetime_to_timestamp(candle_begin_time, is_utc=True) + data.loc[index, "timestamp"] = timestamp + data.loc[index, "volCcy"] = row["quote_volume"] + data.loc[index, "volCCyQuote"] = row["quote_volume"] + data["timestamp"] = data["timestamp"].astype(int) + dt_series = pd.to_datetime(data['timestamp'].astype(int), unit='ms', utc=True, errors='coerce').dt.tz_convert('Asia/Shanghai') + data['date_time'] = dt_series.dt.strftime('%Y-%m-%d %H:%M:%S') + dt_us_series = pd.to_datetime(data['timestamp'].astype(int), unit='ms', utc=True, errors='coerce').dt.tz_convert('America/New_York') + data['date_time_us'] = dt_us_series.dt.strftime('%Y-%m-%d %H:%M:%S') + data['create_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + data["date_time"] = data["date_time"].astype(str) + data["date_time_us"] = data["date_time_us"].astype(str) + data["open"] = data["open"].astype(float) + data["high"] = data["high"].astype(float) + data["low"] = data["low"].astype(float) + data["close"] = data["close"].astype(float) + data["volume"] = data["volume"].astype(float) + data["volCcy"] = data["volCcy"].astype(float) + data["volCCyQuote"] = data["volCCyQuote"].astype(float) + data["create_time"] = data["create_time"].astype(str) + data = data[ + [ + "symbol", + "bar", + "timestamp", + "date_time", + "date_time_us", + "open", + "high", + "low", + "close", + "volume", + "volCcy", + "volCCyQuote", + "create_time", + ] + ] + data = data.sort_values(by="timestamp", ascending=True) + data = data.reset_index(drop=True) + return data + + def post_save_data(self, data: pd.DataFrame): + if data is not None and len(data) > 0: + data["buy_sz"] = -1 + data["sell_sz"] = -1 + + data = data[ + [ + "symbol", + "bar", + "timestamp", + "date_time", + "date_time_us", + "open", + "high", + "low", + "close", + "volume", + "volCcy", + "volCCyQuote", + "buy_sz", + "sell_sz", + "create_time", + ] + ] + data = self.add_new_columns(data) + self.db_market_data.insert_data_to_mysql(data) + return data + + def post_calculate_metrics( + self, symbol: str, bar: str, min_start_time_ts: int, max_start_time_ts: int + ): + logger.info(f"开始补充技术指标数据: {symbol} {bar}") + before_data = self.db_market_data.query_data_before_timestamp( + symbol, bar, min_start_time_ts, 31 + ) + if before_data is not None and len(before_data) > 0: + earliest_timestamp = before_data[-1]["timestamp"] + else: + earliest_timestamp = min_start_time_ts + handle_data = self.db_market_data.query_market_data_by_symbol_bar( + symbol=symbol, bar=bar, start=earliest_timestamp, end=max_start_time_ts + ) + if handle_data is not None: + if before_data is not None and len(handle_data) <= len(before_data): + logger.error( + f"handle_data数据条数小于before_data数据条数: {symbol} {bar}" + ) + return None + if isinstance(handle_data, list): + handle_data = pd.DataFrame(handle_data) + elif isinstance(handle_data, dict): + handle_data = pd.DataFrame([handle_data]) + elif isinstance(handle_data, pd.DataFrame): + pass + else: + logger.error(f"handle_data类型错误: {type(handle_data)}") + return None + + handle_data = self.calculate_metrics(handle_data) + handle_data = handle_data[handle_data["timestamp"] >= min_start_time_ts] + handle_data.reset_index(drop=True, inplace=True) + logger.info(f"开始保存技术指标数据: {symbol} {bar}") + self.db_market_data.insert_data_to_mysql(handle_data) + return handle_data + def add_new_columns(self, data: pd.DataFrame): """ 添加新列 """ + data = data.copy() columns = data.columns.tolist() if "buy_sz" not in columns: - data["buy_sz"] = -1 + data.loc[:, "buy_sz"] = -1 if "sell_sz" not in columns: - data["sell_sz"] = -1 - data["pre_close"] = None - data["close_change"] = None - data["pct_chg"] = None - data["ma1"] = None - data["ma2"] = None - data["dif"] = None - data["dea"] = None - data["macd"] = None - data["macd_signal"] = None - data["macd_divergence"] = None - data["kdj_k"] = None - data["kdj_d"] = None - data["kdj_j"] = None - data["kdj_signal"] = None - data["kdj_pattern"] = None - data["sar"] = None - data["sar_signal"] = None - data["ma5"] = None - data["ma10"] = None - data["ma20"] = None - data["ma30"] = None - data["ma_cross"] = None - data["ma5_close_diff"] = None - data["ma10_close_diff"] = None - data["ma20_close_diff"] = None - data["ma30_close_diff"] = None - data["ma_close_avg"] = None - data["ma_long_short"] = None - data["ma_divergence"] = None - data["rsi_14"] = None - data["rsi_signal"] = None - data["boll_upper"] = None - data["boll_middle"] = None - data["boll_lower"] = None - data["boll_signal"] = None - data["boll_pattern"] = None - data["k_length"] = None - data["k_shape"] = None - data["k_up_down"] = None + data.loc[:, "sell_sz"] = -1 + + new_cols = [ + "pre_close", + "close_change", + "pct_chg", + "ma1", + "ma2", + "dif", + "dea", + "macd", + "macd_signal", + "macd_divergence", + "kdj_k", + "kdj_d", + "kdj_j", + "kdj_signal", + "kdj_pattern", + "sar", + "sar_signal", + "ma5", + "ma10", + "ma20", + "ma30", + "ma_cross", + "ma5_close_diff", + "ma10_close_diff", + "ma20_close_diff", + "ma30_close_diff", + "ma_close_avg", + "ma_long_short", + "ma_divergence", + "rsi_14", + "rsi_signal", + "boll_upper", + "boll_middle", + "boll_lower", + "boll_signal", + "boll_pattern", + "k_length", + "k_shape", + "k_up_down", + ] + for col in new_cols: + data.loc[:, col] = pd.NA return data def calculate_metrics(self, data: pd.DataFrame): @@ -382,7 +481,7 @@ class MarketDataMain: return data = self.fetch_save_data(symbol, bar, latest_timestamp + 1) return data - + def batch_calculate_metrics(self): """ 批量计算技术指标 @@ -398,14 +497,17 @@ class MarketDataMain: for bar in self.bars: logger.info(f"开始计算技术指标: {symbol} {bar}") data = self.db_market_data.query_market_data_by_symbol_bar( - symbol=symbol, bar=bar, start=start_timestamp - 1, end=current_timestamp + symbol=symbol, + bar=bar, + start=start_timestamp - 1, + end=current_timestamp, ) if data is not None and len(data) > 0: data = pd.DataFrame(data) data = self.calculate_metrics(data) logger.info(f"开始保存技术指标数据: {symbol} {bar}") self.db_market_data.insert_data_to_mysql(data) - + def batch_ma_break_statistics(self): """ 批量计算MA突破统计 @@ -413,7 +515,6 @@ class MarketDataMain: logger.info("开始批量计算MA突破统计") self.ma_break_statistics.batch_statistics(all_change=False) self.ma_break_statistics.batch_statistics(all_change=True) - if __name__ == "__main__": @@ -421,4 +522,4 @@ if __name__ == "__main__": # market_data_main.batch_update_data() # market_data_main.initial_data() market_data_main.batch_calculate_metrics() - # market_data_main.batch_ma_break_statistics() \ No newline at end of file + # market_data_main.batch_ma_break_statistics() diff --git a/sql/table/crypto_binance_data.sql b/sql/table/crypto_binance_data.sql new file mode 100644 index 0000000..9f0e42c --- /dev/null +++ b/sql/table/crypto_binance_data.sql @@ -0,0 +1,63 @@ +-- 临时禁用安全模式 +SET SQL_SAFE_UPDATES = 0; + +CREATE TABLE IF NOT EXISTS crypto_binance_data ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + symbol VARCHAR(50) NOT NULL, + bar VARCHAR(20) NOT NULL, + timestamp BIGINT NOT NULL, + date_time VARCHAR(50) NOT NULL, + date_time_us VARCHAR(50) NULL COMMENT '美国时间格式的日期时间', + open DECIMAL(20,10) NOT NULL, + high DECIMAL(20,10) NOT NULL, + low DECIMAL(20,10) NOT NULL, + close DECIMAL(20,10) NOT NULL, + pre_close DECIMAL(20,10) NULL, + close_change DECIMAL(20,10) NULL, + pct_chg DECIMAL(20,10) NULL, + volume DECIMAL(30,10) NOT NULL, + volCcy DECIMAL(30,10) NOT NULL, + volCCyQuote DECIMAL(30,10) NOT NULL, + buy_sz DECIMAL(20, 10) NOT NULL, + sell_sz DECIMAL(20, 10) NOT NULL, + -- 技术指标字段 + ma1 DOUBLE DEFAULT NULL COMMENT '移动平均线1', + ma2 DOUBLE DEFAULT NULL COMMENT '移动平均线2', + dif DOUBLE DEFAULT NULL COMMENT 'MACD指标DIF线', + dea DOUBLE DEFAULT NULL COMMENT 'MACD指标DEA线', + macd DOUBLE DEFAULT NULL COMMENT 'MACD指标值', + macd_signal VARCHAR(15) DEFAULT NULL COMMENT 'MACD金叉死叉信号', + macd_divergence varchar(25) DEFAULT NULL COMMENT 'MACD背离,顶背离或底背离', + kdj_k DOUBLE DEFAULT NULL COMMENT 'KDJ指标K值', + kdj_d DOUBLE DEFAULT NULL COMMENT 'KDJ指标D值', + kdj_j DOUBLE DEFAULT NULL COMMENT 'KDJ指标J值', + kdj_signal VARCHAR(15) DEFAULT NULL COMMENT 'KDJ金叉死叉信号', + kdj_pattern varchar(25) DEFAULT NULL COMMENT 'KDJ超买,超卖,徘徊', + sar DOUBLE DEFAULT NULL COMMENT 'SAR指标值', + sar_signal VARCHAR(15) DEFAULT NULL COMMENT 'SAR多头空头信号', + ma5 DOUBLE DEFAULT NULL COMMENT '5移动平均线', + ma10 DOUBLE DEFAULT NULL COMMENT '10移动平均线', + ma20 DOUBLE DEFAULT NULL COMMENT '20移动平均线', + ma30 DOUBLE DEFAULT NULL COMMENT '30移动平均线', + ma_cross VARCHAR(150) DEFAULT NULL COMMENT '均线交叉信号', + ma5_close_diff double DEFAULT NULL COMMENT '5移动平均线与收盘价差值', + ma10_close_diff double DEFAULT NULL COMMENT '10移动平均线与收盘价差值', + ma20_close_diff double DEFAULT NULL COMMENT '20移动平均线与收盘价差值', + ma30_close_diff double DEFAULT NULL COMMENT '30移动平均线与收盘价差值', + ma_close_avg double DEFAULT NULL COMMENT '收盘价移动平均值', + ma_long_short varchar(25) DEFAULT NULL COMMENT '均线多空', + ma_divergence varchar(25) DEFAULT NULL COMMENT '均线发散,均线粘合,均线适中,均线发散,均线超发散', + rsi_14 DOUBLE DEFAULT NULL COMMENT '14RSI指标', + rsi_signal VARCHAR(15) DEFAULT NULL COMMENT 'RSI强弱信号', + boll_upper DOUBLE DEFAULT NULL COMMENT '布林带上轨', + boll_middle DOUBLE DEFAULT NULL COMMENT '布林带中轨', + boll_lower DOUBLE DEFAULT NULL COMMENT '布林带下轨', + boll_signal VARCHAR(15) DEFAULT NULL COMMENT '布林带强弱信号', + boll_pattern varchar(25) DEFAULT NULL COMMENT 'BOLL超买,超卖,震荡', + k_length varchar(25) DEFAULT NULL COMMENT 'K线长度', + k_shape varchar(25) DEFAULT NULL COMMENT 'K线形状', + k_up_down varchar(25) DEFAULT NULL COMMENT 'K线方向', + create_time VARCHAR(50) NOT NULL, + UNIQUE KEY uniq_symbol_bar_timestamp (symbol, bar, timestamp) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + diff --git a/sql/table/crypto_binance_huge_volume.sql b/sql/table/crypto_binance_huge_volume.sql new file mode 100644 index 0000000..018e3c7 --- /dev/null +++ b/sql/table/crypto_binance_huge_volume.sql @@ -0,0 +1,59 @@ +CREATE TABLE IF NOT EXISTS crypto_binance_huge_volume ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + symbol VARCHAR(50) NOT NULL COMMENT '交易对', + bar VARCHAR(20) NOT NULL COMMENT 'K线周期', + window_size INT NOT NULL COMMENT '窗口大小, 50, 80, 100, 120', + timestamp BIGINT NOT NULL COMMENT '时间戳', + date_time VARCHAR(50) NOT NULL COMMENT '日期时间', + open DECIMAL(20,10) NOT NULL COMMENT '开盘价', + high DECIMAL(20,10) NOT NULL COMMENT '最高价', + low DECIMAL(20,10) NOT NULL COMMENT '最低价', + close DECIMAL(20,10) NOT NULL COMMENT '收盘价', + volume DECIMAL(30,10) NOT NULL COMMENT '交易量', + volCcy DECIMAL(30,10) NOT NULL COMMENT '交易量(基础货币)', + volCCyQuote DECIMAL(30,10) NOT NULL COMMENT '交易量(计价货币)', + volume_ma DECIMAL(30,10) NULL COMMENT '交易量移动平均', + volume_std DECIMAL(30,10) NULL COMMENT '交易量标准差', + volume_threshold DECIMAL(30,10) NULL COMMENT '交易量阈值', + huge_volume TINYINT NOT NULL DEFAULT 0 COMMENT '是否为巨量(0:否,1:是)', + volume_ratio DECIMAL(20,10) NULL COMMENT '交易量比率', + spike_intensity DECIMAL(20,10) NULL COMMENT '尖峰强度', + close_80_percentile DECIMAL(20,10) NOT NULL COMMENT '收盘价80%分位数', + close_20_percentile DECIMAL(20,10) NOT NULL COMMENT '收盘价20%分位数', + close_80_high TINYINT NOT NULL DEFAULT 0 COMMENT '收盘价是否达到80%分位数高点(0:否,1:是)', + close_20_low TINYINT NOT NULL DEFAULT 0 COMMENT '收盘价是否达到20%分位数低点(0:否,1:是)', + close_90_percentile DECIMAL(20,10) NOT NULL COMMENT '收盘价90%分位数', + close_10_percentile DECIMAL(20,10) NOT NULL COMMENT '收盘价10%分位数', + close_90_high TINYINT NOT NULL DEFAULT 0 COMMENT '收盘价是否达到90%分位数高点(0:否,1:是)', + close_10_low TINYINT NOT NULL DEFAULT 0 COMMENT '收盘价是否达到10%分位数低点(0:否,1:是)', + high_80_percentile DECIMAL(20,10) NOT NULL COMMENT '最高价80%分位数', + high_20_percentile DECIMAL(20,10) NOT NULL COMMENT '最高价20%分位数', + high_80_high TINYINT NOT NULL DEFAULT 0 COMMENT '最高价是否达到80%分位数高点(0:否,1:是)', + high_20_low TINYINT NOT NULL DEFAULT 0 COMMENT '最高价是否达到20%分位数低点(0:否,1:是)', + high_90_percentile DECIMAL(20,10) NOT NULL COMMENT '最高价90%分位数', + high_10_percentile DECIMAL(20,10) NOT NULL COMMENT '最高价10%分位数', + high_90_high TINYINT NOT NULL DEFAULT 0 COMMENT '最高价是否达到90%分位数高点(0:否,1:是)', + high_10_low TINYINT NOT NULL DEFAULT 0 COMMENT '最高价是否达到10%分位数低点(0:否,1:是)', + low_80_percentile DECIMAL(20,10) NOT NULL COMMENT '最低价80%分位数', + low_20_percentile DECIMAL(20,10) NOT NULL COMMENT '最低价20%分位数', + low_80_high TINYINT NOT NULL DEFAULT 0 COMMENT '最低价是否达到80%分位数高点(0:否,1:是)', + low_20_low TINYINT NOT NULL DEFAULT 0 COMMENT '最低价是否达到20%分位数低点(0:否,1:是)', + low_90_percentile DECIMAL(20,10) NOT NULL COMMENT '最低价90%分位数', + low_10_percentile DECIMAL(20,10) NOT NULL COMMENT '最低价10%分位数', + low_90_high TINYINT NOT NULL DEFAULT 0 COMMENT '最低价是否达到90%分位数高点(0:否,1:是)', + low_10_low TINYINT NOT NULL DEFAULT 0 COMMENT '最低价是否达到10%分位数低点(0:否,1:是)', + create_time VARCHAR(50) NOT NULL COMMENT '创建时间', + UNIQUE KEY uniq_symbol_bar_window_size_timestamp (symbol, bar, window_size, timestamp), + INDEX idx_symbol_bar_window_size (symbol, bar, window_size), + INDEX idx_timestamp (timestamp), + INDEX idx_huge_volume (huge_volume), + INDEX idx_date_time (date_time) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='加密货币巨量交易数据表'; + +-- 添加注释说明 +-- 该表用于存储加密货币市场K线数据以及相关的巨量交易分析指标 +-- 主要功能: +-- 1. 存储基础K线数据(价格、成交量等) +-- 2. 计算并存储巨量交易相关指标 +-- 3. 识别价格和成交量的异常波动 +-- 4. 为交易策略提供数据支持