diff --git a/core/db/__pycache__/db_manager.cpython-312.pyc b/core/db/__pycache__/db_manager.cpython-312.pyc index 1dabfb7..57b95f5 100644 Binary files a/core/db/__pycache__/db_manager.cpython-312.pyc and b/core/db/__pycache__/db_manager.cpython-312.pyc differ diff --git a/core/db/__pycache__/db_market_data.cpython-312.pyc b/core/db/__pycache__/db_market_data.cpython-312.pyc index 07c0daa..8d27039 100644 Binary files a/core/db/__pycache__/db_market_data.cpython-312.pyc and b/core/db/__pycache__/db_market_data.cpython-312.pyc differ diff --git a/core/db/db_binance_data.py b/core/db/db_binance_data.py index c4ea848..cf967c2 100644 --- a/core/db/db_binance_data.py +++ b/core/db/db_binance_data.py @@ -7,68 +7,65 @@ logger = logging.logger class DBBinanceData: - def __init__( - self, - db_url: str - ): + def __init__(self, db_url: str): self.db_url = db_url self.table_name = "crypto_binance_data" self.columns = [ - "symbol", - "bar", - "timestamp", - "date_time", - "date_time_us", - "open", - "high", - "low", - "close", - "pre_close", - "close_change", - "pct_chg", - "volume", - "volCcy", - "volCCyQuote", - "buy_sz", - "sell_sz", - # 技术指标字段 - "ma1", - "ma2", - "dif", - "dea", - "macd", - "macd_signal", - "macd_divergence", - "kdj_k", - "kdj_d", - "kdj_j", - "kdj_signal", - "kdj_pattern", - "sar", - "sar_signal", - "ma5", - "ma10", - "ma20", - "ma30", - "ma_cross", - "ma5_close_diff", - "ma10_close_diff", - "ma20_close_diff", - "ma30_close_diff", - "ma_close_avg", - "ma_long_short", - "ma_divergence", - "rsi_14", - "rsi_signal", - "boll_upper", - "boll_middle", - "boll_lower", - "boll_signal", - "boll_pattern", - "k_length", - "k_shape", - "k_up_down", - "create_time", + "symbol", + "bar", + "timestamp", + "date_time", + "date_time_us", + "open", + "high", + "low", + "close", + "pre_close", + "close_change", + "pct_chg", + "volume", + "volCcy", + "volCCyQuote", + "buy_sz", + "sell_sz", + # 技术指标字段 + "ma1", + "ma2", + "dif", + "dea", + "macd", + "macd_signal", + "macd_divergence", + "kdj_k", + "kdj_d", + "kdj_j", + "kdj_signal", + "kdj_pattern", + "sar", + "sar_signal", + "ma5", + "ma10", + "ma20", + "ma30", + "ma_cross", + "ma5_close_diff", + "ma10_close_diff", + "ma20_close_diff", + "ma30_close_diff", + "ma_close_avg", + "ma_long_short", + "ma_divergence", + "rsi_14", + "rsi_signal", + "boll_upper", + "boll_middle", + "boll_lower", + "boll_signal", + "boll_pattern", + "k_length", + "k_shape", + "k_up_down", + "create_time", ] self.db_manager = DBData(db_url, self.table_name, self.columns) @@ -85,7 +82,7 @@ class DBBinanceData: return self.db_manager.insert_data_to_mysql(df) - + def insert_data_to_mysql_fast(self, df: pd.DataFrame): """ 快速插入K线行情数据(方案2:使用executemany批量插入) @@ -97,9 +94,9 @@ class DBBinanceData: if df is None or df.empty: logger.warning("DataFrame为空,无需写入数据库。") return - + self.db_manager.insert_data_to_mysql_fast(df) - + def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000): """ 分块插入K线行情数据(方案3:适合大数据量) @@ -112,9 +109,9 @@ class DBBinanceData: if df is None or df.empty: logger.warning("DataFrame为空,无需写入数据库。") return - + self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) - + def insert_data_to_mysql_simple(self, df: pd.DataFrame): """ 简单插入K线行情数据(方案4:直接使用to_sql,忽略重复) @@ -125,9 +122,9 @@ class DBBinanceData: if df is None or df.empty: logger.warning("DataFrame为空,无需写入数据库。") return - + self.db_manager.insert_data_to_mysql_simple(df) - + def query_latest_data(self, symbol: str, bar: str): """ 查询最新数据 @@ -142,8 +139,10 @@ class DBBinanceData: """ condition_dict = {"symbol": symbol, "bar": bar} return self.db_manager.query_data(sql, condition_dict, return_multi=False) - - def query_data_before_timestamp(self, symbol: str, bar: str, timestamp: int, limit: int = 100): + + def query_data_before_timestamp( + self, symbol: str, bar: str, timestamp: int, limit: int = 100 + ): """ 根据时间戳查询之前的数据 :param symbol: 交易对 @@ -157,20 +156,25 @@ class DBBinanceData: ORDER BY timestamp DESC LIMIT :limit """ - condition_dict = {"symbol": symbol, "bar": bar, "timestamp": timestamp, "limit": limit} + condition_dict = { + "symbol": symbol, + "bar": bar, + "timestamp": timestamp, + "limit": limit, + } return self.db_manager.query_data(sql, condition_dict, return_multi=True) - + def query_data_by_technical_indicators( - self, - symbol: str, - bar: str, - start: str = None, + self, + symbol: str, + bar: str, + start: str = None, end: str = None, macd_signal: str = None, kdj_signal: str = None, rsi_signal: str = None, boll_signal: str = None, - ma_cross: str = None + ma_cross: str = None, ): """ 根据技术指标查询数据 @@ -186,7 +190,7 @@ class DBBinanceData: """ conditions = ["symbol = :symbol", "bar = :bar"] condition_dict = {"symbol": symbol, "bar": bar} - + if macd_signal: conditions.append("macd_signal = :macd_signal") condition_dict["macd_signal"] = macd_signal @@ -202,7 +206,7 @@ class DBBinanceData: if ma_cross: conditions.append("ma_cross = :ma_cross") condition_dict["ma_cross"] = ma_cross - + # 处理时间范围 if start: start_timestamp = transform_date_time_to_timestamp(start) @@ -214,23 +218,23 @@ class DBBinanceData: if end_timestamp: conditions.append("timestamp <= :end") condition_dict["end"] = end_timestamp - + where_clause = " AND ".join(conditions) sql = f""" SELECT * FROM crypto_binance_data WHERE {where_clause} ORDER BY timestamp DESC """ - + return self.db_manager.query_data(sql, condition_dict, return_multi=True) - + def query_macd_signals( - self, - symbol: str, - bar: str, + self, + symbol: str, + bar: str, signal: str = None, - start: str = None, - end: str = None + start: str = None, + end: str = None, ): """ 查询MACD信号数据 @@ -242,11 +246,11 @@ class DBBinanceData: """ conditions = ["symbol = :symbol", "bar = :bar"] condition_dict = {"symbol": symbol, "bar": bar} - + if signal: conditions.append("macd_signal = :signal") condition_dict["signal"] = signal - + # 处理时间范围 if start: start_timestamp = transform_date_time_to_timestamp(start) @@ -258,24 +262,24 @@ class DBBinanceData: if end_timestamp: conditions.append("timestamp <= :end") condition_dict["end"] = end_timestamp - + where_clause = " AND ".join(conditions) sql = f""" SELECT * FROM crypto_binance_data WHERE {where_clause} ORDER BY timestamp DESC """ - + return self.db_manager.query_data(sql, condition_dict, return_multi=True) - + def query_kdj_signals( - self, - symbol: str, - bar: str, + self, + symbol: str, + bar: str, signal: str = None, pattern: str = None, - start: str = None, - end: str = None + start: str = None, + end: str = None, ): """ 查询KDJ信号数据 @@ -288,14 +292,14 @@ class DBBinanceData: """ conditions = ["symbol = :symbol", "bar = :bar"] condition_dict = {"symbol": symbol, "bar": bar} - + if signal: conditions.append("kdj_signal = :signal") condition_dict["signal"] = signal if pattern: conditions.append("kdj_pattern = :pattern") condition_dict["pattern"] = pattern - + # 处理时间范围 if start: start_timestamp = transform_date_time_to_timestamp(start) @@ -307,25 +311,25 @@ class DBBinanceData: if end_timestamp: conditions.append("timestamp <= :end") condition_dict["end"] = end_timestamp - + where_clause = " AND ".join(conditions) sql = f""" SELECT * FROM crypto_binance_data WHERE {where_clause} ORDER BY timestamp DESC """ - + return self.db_manager.query_data(sql, condition_dict, return_multi=True) - + def query_ma_signals( - self, - symbol: str, - bar: str, + self, + symbol: str, + bar: str, cross: str = None, long_short: str = None, divergence: str = None, - start: str = None, - end: str = None + start: str = None, + end: str = None, ): """ 查询均线信号数据 @@ -339,7 +343,7 @@ class DBBinanceData: """ conditions = ["symbol = :symbol", "bar = :bar"] condition_dict = {"symbol": symbol, "bar": bar} - + if cross: conditions.append("ma_cross = :cross") condition_dict["cross"] = cross @@ -349,7 +353,7 @@ class DBBinanceData: if divergence: conditions.append("ma_divergence = :divergence") condition_dict["divergence"] = divergence - + # 处理时间范围 if start: start_timestamp = transform_date_time_to_timestamp(start) @@ -361,24 +365,24 @@ class DBBinanceData: if end_timestamp: conditions.append("timestamp <= :end") condition_dict["end"] = end_timestamp - + where_clause = " AND ".join(conditions) sql = f""" SELECT * FROM crypto_binance_data WHERE {where_clause} ORDER BY timestamp DESC """ - + return self.db_manager.query_data(sql, condition_dict, return_multi=True) - + def query_bollinger_signals( - self, - symbol: str, - bar: str, + self, + symbol: str, + bar: str, signal: str = None, pattern: str = None, - start: str = None, - end: str = None + start: str = None, + end: str = None, ): """ 查询布林带信号数据 @@ -391,14 +395,14 @@ class DBBinanceData: """ conditions = ["symbol = :symbol", "bar = :bar"] condition_dict = {"symbol": symbol, "bar": bar} - + if signal: conditions.append("boll_signal = :signal") condition_dict["signal"] = signal if pattern: conditions.append("boll_pattern = :pattern") condition_dict["pattern"] = pattern - + # 处理时间范围 if start: start_timestamp = transform_date_time_to_timestamp(start) @@ -410,22 +414,18 @@ class DBBinanceData: if end_timestamp: conditions.append("timestamp <= :end") condition_dict["end"] = end_timestamp - + where_clause = " AND ".join(conditions) sql = f""" SELECT * FROM crypto_binance_data WHERE {where_clause} ORDER BY timestamp DESC """ - + return self.db_manager.query_data(sql, condition_dict, return_multi=True) - + def get_technical_statistics( - self, - symbol: str, - bar: str, - start: str = None, - end: str = None + self, symbol: str, bar: str, start: str = None, end: str = None ): """ 获取技术指标统计信息 @@ -436,7 +436,7 @@ class DBBinanceData: """ conditions = ["symbol = :symbol", "bar = :bar"] condition_dict = {"symbol": symbol, "bar": bar} - + # 处理时间范围 if start: start_timestamp = transform_date_time_to_timestamp(start) @@ -448,7 +448,7 @@ class DBBinanceData: if end_timestamp: conditions.append("timestamp <= :end") condition_dict["end"] = end_timestamp - + where_clause = " AND ".join(conditions) sql = f""" SELECT @@ -470,20 +470,31 @@ class DBBinanceData: FROM crypto_binance_data WHERE {where_clause} """ - + return self.db_manager.query_data(sql, condition_dict, return_multi=False) - - def query_market_data_by_symbol_bar(self, symbol: str, bar: str, start: str = None, end: str = None): + + def query_market_data_by_symbol_bar( + self, + symbol: str, + bar: str, + fields: list = None, + start: str = None, + end: str = None, + ): """ 根据交易对和K线周期查询数据 :param symbol: 交易对 :param bar: K线周期 + :param fields: 字段列表 :param start: 开始时间 :param end: 结束时间 """ + if fields is None: + fields = ["*"] + fields_str = ", ".join(fields) if start is None and end is None: - sql = """ - SELECT * FROM crypto_binance_data + sql = f""" + SELECT {fields_str} FROM crypto_binance_data WHERE symbol = :symbol AND bar = :bar ORDER BY timestamp ASC """ @@ -502,24 +513,29 @@ class DBBinanceData: if start is not None and end is not None: if start > end: start, end = end, start - sql = """ - SELECT * FROM crypto_binance_data + sql = f""" + SELECT {fields_str} FROM crypto_binance_data WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end ORDER BY timestamp ASC """ - condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end} + condition_dict = { + "symbol": symbol, + "bar": bar, + "start": start, + "end": end, + } elif start is not None: - sql = """ - SELECT * FROM crypto_binance_data + sql = f""" + SELECT {fields_str} FROM crypto_binance_data WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start ORDER BY timestamp ASC """ condition_dict = {"symbol": symbol, "bar": bar, "start": start} elif end is not None: - sql = """ - SELECT * FROM crypto_binance_data + sql = f""" + SELECT {fields_str} FROM crypto_binance_data WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end ORDER BY timestamp ASC """ condition_dict = {"symbol": symbol, "bar": bar, "end": end} - return self.db_manager.query_data(sql, condition_dict, return_multi=True) \ No newline at end of file + return self.db_manager.query_data(sql, condition_dict, return_multi=True) diff --git a/core/db/db_manager.py b/core/db/db_manager.py index dc05b03..1a64776 100644 --- a/core/db/db_manager.py +++ b/core/db/db_manager.py @@ -218,13 +218,6 @@ class DBData: :param db_url: 数据库连接URL """ try: - engine = create_engine( - self.db_url, - pool_size=5, # 连接池大小 - max_overflow=10, # 允许的最大溢出连接 - pool_timeout=30, # 连接超时时间(秒) - pool_recycle=1800, # 连接回收时间(秒),避免长时间闲置 - ) with self.db_engine.connect() as conn: result = conn.execute(text(sql), condition_dict) if return_multi: diff --git a/core/db/db_market_data.py b/core/db/db_market_data.py index 65c1c17..adff462 100644 --- a/core/db/db_market_data.py +++ b/core/db/db_market_data.py @@ -473,17 +473,21 @@ class DBMarketData: return self.db_manager.query_data(sql, condition_dict, return_multi=False) - def query_market_data_by_symbol_bar(self, symbol: str, bar: str, start: str = None, end: str = None): + def query_market_data_by_symbol_bar(self, symbol: str, bar: str, fields: list = None, start: str = None, end: str = None): """ 根据交易对和K线周期查询数据 :param symbol: 交易对 :param bar: K线周期 + :param fields: 字段列表 :param start: 开始时间 :param end: 结束时间 """ + if fields is None: + fields = ["*"] + fields_str = ", ".join(fields) if start is None and end is None: - sql = """ - SELECT * FROM crypto_market_data + sql = f""" + SELECT {fields_str} FROM crypto_market_data WHERE symbol = :symbol AND bar = :bar ORDER BY timestamp ASC """ @@ -502,22 +506,22 @@ class DBMarketData: if start is not None and end is not None: if start > end: start, end = end, start - sql = """ - SELECT * FROM crypto_market_data + sql = f""" + SELECT {fields_str} FROM crypto_market_data WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end ORDER BY timestamp ASC """ condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end} elif start is not None: - sql = """ - SELECT * FROM crypto_market_data + sql = f""" + SELECT {fields_str} FROM crypto_market_data WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start ORDER BY timestamp ASC """ condition_dict = {"symbol": symbol, "bar": bar, "start": start} elif end is not None: - sql = """ - SELECT * FROM crypto_market_data + sql = f""" + SELECT {fields_str} FROM crypto_market_data WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end ORDER BY timestamp ASC """ diff --git a/core/trade/__pycache__/orb_trade.cpython-312.pyc b/core/trade/__pycache__/orb_trade.cpython-312.pyc index 1c695d0..1f5b9b1 100644 Binary files a/core/trade/__pycache__/orb_trade.cpython-312.pyc and b/core/trade/__pycache__/orb_trade.cpython-312.pyc differ diff --git a/core/trade/orb_trade.py b/core/trade/orb_trade.py index 2d60af7..fa26987 100644 --- a/core/trade/orb_trade.py +++ b/core/trade/orb_trade.py @@ -9,10 +9,12 @@ from openpyxl.drawing.image import Image import openpyxl from openpyxl.styles import Font from PIL import Image as PILImage +from datetime import datetime, timedelta import core.logger as logging from config import OKX_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE from core.db.db_market_data import DBMarketData +from core.db.db_binance_data import DBBinanceData from core.db.db_huge_volume_data import DBHugeVolumeData from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp @@ -35,11 +37,14 @@ class ORBStrategy: commission_per_share=0.0005, profit_target_multiple=10, is_us_stock=False, + is_binance=False, direction=None, by_sar=False, symbol_bar_data=None, + symbol_1h_data=None, price_range_mean_as_R=False, by_big_k=False, + by_1h_k=False, ): """ 初始化ORB策略参数 @@ -63,10 +68,14 @@ class ORBStrategy: :param commission_per_share: 每股交易佣金(美元,默认0.0005) :param profit_target_multiple: 盈利目标倍数(默认10倍$R,即10R) :param is_us_stock: 是否是美股 + :param is_binance: 是否是Binance :param direction: 方向,None=自动,Long=多头,Short=空头 :param by_sar: 是否根据SAR指标生成信号,True=是,False=否 + :param symbol_bar_data: 5分钟K线数据 + :param symbol_1h_data: 1小时K线数据 :param price_range_mean_as_R: 是否将价格振幅均值作为$R,True=是,False=否 :param by_big_k: 是否根据K线实体部分,亦即abs(open-close)超过high-low的50%,True=是,False=否 + :param by_1h_k: 是否根据1小时K线,True=是,False=否 """ logger.info( f"初始化ORB策略参数:股票代码={symbol},K线周期={bar},开始日期={start_date},结束日期={end_date},初始账户资金={initial_capital},最大杠杆倍数={max_leverage},单次交易风险比例={risk_per_trade},每股交易佣金={commission_per_share}" @@ -90,10 +99,15 @@ class ORBStrategy: mysql_host = MYSQL_CONFIG.get("host", "localhost") mysql_port = MYSQL_CONFIG.get("port", 3306) mysql_database = MYSQL_CONFIG.get("database", "okx") + self.is_us_stock = is_us_stock + self.is_binance = is_binance self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" - self.db_market_data = DBMarketData(self.db_url) - self.is_us_stock = is_us_stock + if self.is_binance: + self.db_market_data = DBBinanceData(self.db_url) + else: + self.db_market_data = DBMarketData(self.db_url) + self.output_chart_folder = r"./output/trade_sandbox/orb_strategy/chart/" self.output_excel_folder = r"./output/trade_sandbox/orb_strategy/excel/" os.makedirs(self.output_chart_folder, exist_ok=True) @@ -110,6 +124,7 @@ class ORBStrategy: if self.by_sar: self.sar_desc = "考虑SAR" self.symbol_bar_data = symbol_bar_data + self.symbol_1h_data = symbol_1h_data self.price_range_mean_as_R = price_range_mean_as_R if self.price_range_mean_as_R: self.price_range_mean_as_R_desc = "R为振幅均值" @@ -121,6 +136,11 @@ class ORBStrategy: self.by_big_k_desc = "K线实体过50%" else: self.by_big_k_desc = "无K线要求" + self.by_1h_k = by_1h_k + if self.by_1h_k: + self.by_1h_k_desc = "参照1小时K线" + else: + self.by_1h_k_desc = "不参照1小时K线" def run(self): """ @@ -132,7 +152,12 @@ class ORBStrategy: if len(self.trades) > 0: self.plot_equity_curve() self.output_trade_summary() - return self.symbol_bar_data, self.trades_df, self.trades_summary_df + return ( + self.symbol_bar_data, + self.symbol_1h_data, + self.trades_df, + self.trades_summary_df, + ) def fetch_intraday_data(self): """ @@ -146,47 +171,10 @@ class ORBStrategy: f"开始获取{self.symbol}数据:{self.start_date}至{self.end_date},间隔{self.bar}" ) if self.symbol_bar_data is None or len(self.symbol_bar_data) == 0: - data = self.db_market_data.query_market_data_by_symbol_bar( - self.symbol, self.bar, start=self.start_date, end=self.end_date - ) - data = pd.DataFrame(data) - data.sort_values(by="date_time", inplace=True) - # 保留核心列:开盘价、最高价、最低价、收盘价、成交量 - data["Open"] = data["open"] - data["High"] = data["high"] - data["Low"] = data["low"] - data["Close"] = data["close"] - data["Volume"] = data["volume"] - if self.is_us_stock: - date_time_field = "date_time_us" - else: - date_time_field = "date_time" - data[date_time_field] = pd.to_datetime(data[date_time_field]) - # data["Date"]为日期,不包括时分秒,即date_time如果是2025-01-01 10:00:00,则Date为2025-01-01 - data["Date"] = data[date_time_field].dt.date - # 将Date转换为datetime64[ns]类型以确保类型一致 - data["Date"] = pd.to_datetime(data["Date"]) - # 最小data["Date"] - self.start_date = data["Date"].min().strftime("%Y-%m-%d") - # 最大data["Date"] - self.end_date = data["Date"].max().strftime("%Y-%m-%d") - self.data = data[ - [ - "symbol", - "bar", - "Date", - date_time_field, - "Open", - "High", - "Low", - "Close", - "Volume", - "sar_signal", - ] - ].copy() - self.data.rename(columns={date_time_field: "date_time"}, inplace=True) + self.data = self.get_full_data(bar=self.bar) self.calculate_price_range_mean() self.symbol_bar_data = self.data.copy() + self.symbol_1h_data = self.get_full_data(bar="1H") else: self.data = self.symbol_bar_data.copy() @@ -210,6 +198,91 @@ class ORBStrategy: f"成功获取{self.symbol}数据:{len(self.data)}根{self.bar}K线,开始日期={self.start_date},结束日期={self.end_date}" ) + def get_full_data(self, bar: str = "5m"): + """ + 分段获取数据,并将数据合并为完整数据 + 分段依据:如果end_date与start_date相差超过一年,则每次取一年数据 + """ + data = pd.DataFrame() + start_date = datetime.strptime(self.start_date, "%Y-%m-%d") + end_date = datetime.strptime(self.end_date, "%Y-%m-%d") + timedelta(days=1) + fields = [ + "symbol", + "bar", + "date_time", + "date_time_us", + "open", + "high", + "low", + "close", + "volume", + "sar_signal", + "ma5", + "ma10", + "ma20", + "ma30", + "dif", + "macd", + ] + while start_date < end_date: + current_end_date = min(start_date + timedelta(days=180), end_date) + start_date_str = start_date.strftime("%Y-%m-%d") + current_end_date_str = current_end_date.strftime("%Y-%m-%d") + logger.info( + f"获取{self.symbol}数据:{start_date_str}至{current_end_date_str}" + ) + current_data = self.db_market_data.query_market_data_by_symbol_bar( + self.symbol, bar, fields, start=start_date_str, end=current_end_date_str + ) + if current_data is not None and len(current_data) > 0: + current_data = pd.DataFrame(current_data) + data = pd.concat([data, current_data]) + start_date = current_end_date + data.drop_duplicates(inplace=True) + if self.is_us_stock: + date_time_field = "date_time_us" + else: + date_time_field = "date_time" + data.sort_values(by=date_time_field, inplace=True) + data.reset_index(drop=True, inplace=True) + # 保留核心列:开盘价、最高价、最低价、收盘价、成交量 + data["Open"] = data["open"] + data["High"] = data["high"] + data["Low"] = data["low"] + data["Close"] = data["close"] + data["Volume"] = data["volume"] + data[date_time_field] = pd.to_datetime(data[date_time_field]) + # data["Date"]为日期,不包括时分秒,即date_time如果是2025-01-01 10:00:00,则Date为2025-01-01 + data["Date"] = data[date_time_field].dt.date + # 将Date转换为datetime64[ns]类型以确保类型一致 + data["Date"] = pd.to_datetime(data["Date"]) + # 最小data["Date"] + self.start_date = data["Date"].min().strftime("%Y-%m-%d") + # 最大data["Date"] + self.end_date = data["Date"].max().strftime("%Y-%m-%d") + data = data[ + [ + "symbol", + "bar", + "Date", + date_time_field, + "Open", + "High", + "Low", + "Close", + "Volume", + "sar_signal", + "ma5", + "ma10", + "ma20", + "ma30", + "dif", + "macd", + ] + ] + data.rename(columns={date_time_field: "date_time"}, inplace=True) + return data + def calculate_shares(self, account_value, entry_price, stop_price, risk_assumed): """ 根据ORB公式计算交易股数 @@ -247,7 +320,7 @@ class ORBStrategy: - 第二根5分钟K线:根据第一根K线方向生成多空信号 """ logger.info( - f"开始生成ORB策略信号:{self.direction_desc},根据SAR指标:{self.by_sar}" + f"开始生成ORB策略信号:{self.direction_desc},根据SAR指标:{self.by_sar},{self.by_1h_k_desc}" ) if self.data is None: raise ValueError("请先调用fetch_intraday_data获取数据") @@ -261,6 +334,7 @@ class ORBStrategy: # 第一根5分钟K线(开盘区间) first_candle = daily_data.iloc[0] + current_date = first_candle["Date"] high1 = first_candle["High"] low1 = first_candle["Low"] open1 = first_candle["Open"] @@ -273,6 +347,25 @@ class ORBStrategy: if (abs(open1 - close1) / (high1 - low1)) < 0.5: continue + ma5_1h = None + ma10_1h = None + dif_1h = None + macd_1h = None + if self.by_1h_k: + if self.symbol_1h_data is None or len(self.symbol_1h_data) == 0: + continue + if len(self.symbol_1h_data) < 2: + continue + symbol_1h_date_data = self.symbol_1h_data[ + self.symbol_1h_data["Date"] == current_date + ] + if len(symbol_1h_date_data) > 0: + first_candle_1h = symbol_1h_date_data.iloc[0] + ma5_1h = first_candle_1h["ma5"] + ma10_1h = first_candle_1h["ma10"] + dif_1h = first_candle_1h["dif"] + macd_1h = first_candle_1h["macd"] + # 第二根5分钟K线(entry信号) second_candle = daily_data.iloc[1] entry_price = second_candle["Open"] # entry价格=第二根K线开盘价 @@ -283,6 +376,22 @@ class ORBStrategy: open1 < close1 and (self.direction == "Long" or self.direction is None) and ((self.by_sar and sar_signal == "SAR多头") or not self.by_sar) + and ( + ( + self.by_1h_k + and ( + ma5_1h is not None + and ma10_1h is not None + and ma5_1h > ma10_1h + ) + and ( + dif_1h is not None + and macd_1h is not None + and (dif_1h > 0 or macd_1h > 0) + ) + ) + or not self.by_1h_k + ) ): # 第一根K线收涨→多头信号 signal = "Long" @@ -291,6 +400,22 @@ class ORBStrategy: open1 > close1 and (self.direction == "Short" or self.direction is None) and ((self.by_sar and sar_signal == "SAR空头") or not self.by_sar) + and ( + ( + self.by_1h_k + and ( + ma5_1h is not None + and ma10_1h is not None + and ma5_1h < ma10_1h + ) + and ( + dif_1h is not None + and macd_1h is not None + and (dif_1h < 0 or macd_1h < 0) + ) + ) + or not self.by_1h_k + ) ): # 第一根K线收跌→空头信号 signal = "Short" @@ -477,6 +602,7 @@ class ORBStrategy: "BySar": self.sar_desc, "PriceRangeMeanAsR": self.price_range_mean_as_R_desc, "ByBigK": self.by_big_k_desc, + "By1hK": self.by_1h_k_desc, "Symbol": self.symbol, "Bar": self.bar, "Date": date, @@ -576,6 +702,7 @@ class ORBStrategy: self.trades_summary["根据SAR"] = self.sar_desc self.trades_summary["R算法"] = self.price_range_mean_as_R_desc self.trades_summary["K线条件"] = self.by_big_k_desc + self.trades_summary["1小时K线条件"] = self.by_1h_k_desc self.trades_summary["股票代码"] = self.symbol self.trades_summary["K线周期"] = self.bar self.trades_summary["开始日期"] = self.start_date @@ -660,7 +787,7 @@ class ORBStrategy: markersize=4, ) plt.title( - f"{symbol} {bar} {self.direction_desc} {self.sar_desc} {self.price_range_mean_as_R_desc} {self.by_big_k_desc}", + f"{symbol} {bar} {self.direction_desc} {self.sar_desc} {self.price_range_mean_as_R_desc} {self.by_big_k_desc} {self.by_1h_k_desc}", fontsize=14, fontweight="bold", ) @@ -704,7 +831,7 @@ class ORBStrategy: fontsize=10, ) plt.tight_layout() - self.chart_save_path = f"{self.output_chart_folder}/{symbol}_{bar}_{self.direction_desc}_{self.sar_desc}_{self.price_range_mean_as_R_desc}_{self.by_big_k_desc}_orb.png" + self.chart_save_path = f"{self.output_chart_folder}/{symbol}_{bar}_{self.direction_desc}_{self.sar_desc}_{self.price_range_mean_as_R_desc}_{self.by_big_k_desc}_{self.by_1h_k_desc}_orb.png" plt.savefig(self.chart_save_path, dpi=150, bbox_inches="tight") plt.close() @@ -714,7 +841,7 @@ class ORBStrategy: """ start_date = self.start_date.replace("-", "") end_date = self.end_date.replace("-", "") - output_file_name = f"orb_{self.symbol}_{self.bar}_{start_date}_{end_date}_{self.direction_desc}_{self.sar_desc}_{self.price_range_mean_as_R_desc}_{self.by_big_k_desc}.xlsx" + output_file_name = f"orb_{self.symbol}_{self.bar}_{start_date}_{end_date}_{self.direction_desc}_{self.sar_desc}_{self.price_range_mean_as_R_desc}_{self.by_big_k_desc}_{self.by_1h_k_desc}.xlsx" output_file_path = os.path.join(self.output_excel_folder, output_file_name) logger.info(f"导出{output_file_path}") with pd.ExcelWriter(output_file_path) as writer: diff --git a/huge_volume_main.py b/huge_volume_main.py index 790fb0d..b78b537 100644 --- a/huge_volume_main.py +++ b/huge_volume_main.py @@ -678,9 +678,9 @@ def batch_import_binance_data_by_csv(): def test_import_binance_data_by_csv(): huge_volume_main = HugeVolumeMain(threshold=2.0, is_us_stock=False, is_binance=True) - file_path = "./data/binance/spot/2020-08-11/SOL-USDT_1h.csv" + file_path = "./data/binance/spot/2020-08-12/XRP-USDT_1h.csv" huge_volume_main.import_binance_data_by_csv( - file_path, "SOL-USDT", "1H", [50, 80, 100, 120] + file_path, "XRP-USDT", "1H", [50, 80, 100, 120] ) @@ -696,8 +696,8 @@ def test_send_huge_volume_data_to_wechat(): if __name__ == "__main__": - test_import_binance_data_by_csv() - # batch_import_binance_data_by_csv() + # test_import_binance_data_by_csv() + batch_import_binance_data_by_csv() # batch_update_volume_spike(threshold=2.0, is_us_stock=True) # test_send_huge_volume_data_to_wechat() # batch_initial_detect_volume_spike(threshold=2.0) diff --git a/orb_trade_main.py b/orb_trade_main.py index aa2ea03..9360c37 100644 --- a/orb_trade_main.py +++ b/orb_trade_main.py @@ -1,5 +1,5 @@ from core.trade.orb_trade import ORBStrategy -from config import US_STOCK_MONITOR_CONFIG, OKX_MONITOR_CONFIG +from config import US_STOCK_MONITOR_CONFIG, OKX_MONITOR_CONFIG, BINANCE_MONITOR_CONFIG import core.logger as logging from datetime import datetime from openpyxl import Workbook @@ -12,13 +12,21 @@ logger = logging.logger def main(): - is_us_stock_list = [True, False] + # is_us_stock_list = [True, False] + is_us_stock_list = [False] + is_binance = True bar = "5m" direction_list = [None, "Long", "Short"] by_sar_list = [False, True] price_range_mean_as_R_list = [False, True] by_big_k_list = [False, True] - start_date = "2024-01-01" + by_1h_k_list = [False, True] + if is_binance: + start_date = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get("initial_date", "2017-08-16 00:00:00") + if len(start_date) > 10: + start_date = start_date[:10] + else: + start_date = "2024-01-01" end_date = datetime.now().strftime("%Y-%m-%d") profit_target_multiple = 10 initial_capital = 25000 @@ -34,57 +42,68 @@ def main(): for by_sar in by_sar_list: for price_range_mean_as_R in price_range_mean_as_R_list: for by_big_k in by_big_k_list: - if is_us_stock: - symbols = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get( - "symbols", ["QQQ"] - ) - else: - symbols = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( - "symbols", ["BTC-USDT"] - ) - for symbol in symbols: - logger.info( - f"开始回测 {symbol}, 交易周期:{bar}, 开始日期:{start_date}, 结束日期:{end_date}, 是否是美股:{is_us_stock}, 交易方向:{direction}, 是否使用SAR:{by_sar}, 是否使用R为entry减stop:{price_range_mean_as_R}, 是否使用K线实体过50%:{by_big_k}" - ) - symbol_bar_data = None - found_symbol_bar_data = False - for symbol_data_dict in symbol_data_cache: - if ( - symbol_data_dict["symbol"] == symbol - and symbol_data_dict["bar"] == bar - ): - symbol_bar_data = symbol_data_dict["data"] - found_symbol_bar_data = True - break - - orb_strategy = ORBStrategy( - symbol=symbol, - bar=bar, - start_date=start_date, - end_date=end_date, - is_us_stock=is_us_stock, - direction=direction, - by_sar=by_sar, - profit_target_multiple=profit_target_multiple, - initial_capital=initial_capital, - max_leverage=max_leverage, - risk_per_trade=risk_per_trade, - commission_per_share=commission_per_share, - symbol_bar_data=symbol_bar_data, - price_range_mean_as_R=price_range_mean_as_R, - by_big_k=by_big_k, - ) - symbol_bar_data, trades_df, trades_summary_df = orb_strategy.run() - if symbol_bar_data is None or len(symbol_bar_data) == 0: - continue - if not found_symbol_bar_data: - symbol_data_cache.append( - {"symbol": symbol, "bar": bar, "data": symbol_bar_data} + for by_1h_k in by_1h_k_list: + if is_us_stock: + symbols = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get( + "symbols", ["QQQ"] ) - if trades_summary_df is None or len(trades_summary_df) == 0: - continue - trades_summary_df_list.append(trades_summary_df) - trades_df_list.append(trades_df) + else: + if is_binance: + symbols = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get( + "symbols", ["BTC-USDT"] + ) + else: + symbols = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( + "symbols", ["BTC-USDT"] + ) + for symbol in symbols: + logger.info( + f"开始回测 {symbol}, 交易周期:{bar}, 开始日期:{start_date}, 结束日期:{end_date}, 是否是美股:{is_us_stock}, 交易方向:{direction}, 是否使用SAR:{by_sar}, 是否使用R为entry减stop:{price_range_mean_as_R}, 是否使用K线实体过50%:{by_big_k}" + ) + symbol_bar_data = None + symbol_1h_data = None + found_symbol_bar_data = False + for symbol_data_dict in symbol_data_cache: + if ( + symbol_data_dict["symbol"] == symbol + and symbol_data_dict["bar"] == bar + ): + symbol_bar_data = symbol_data_dict["data"] + symbol_1h_data = symbol_data_dict["1h_data"] + found_symbol_bar_data = True + break + + orb_strategy = ORBStrategy( + symbol=symbol, + bar=bar, + start_date=start_date, + end_date=end_date, + is_us_stock=is_us_stock, + is_binance=is_binance, + direction=direction, + by_sar=by_sar, + profit_target_multiple=profit_target_multiple, + initial_capital=initial_capital, + max_leverage=max_leverage, + risk_per_trade=risk_per_trade, + commission_per_share=commission_per_share, + symbol_bar_data=symbol_bar_data, + symbol_1h_data=symbol_1h_data, + price_range_mean_as_R=price_range_mean_as_R, + by_big_k=by_big_k, + by_1h_k=by_1h_k, + ) + symbol_bar_data, symbol_1h_data, trades_df, trades_summary_df = orb_strategy.run() + if symbol_bar_data is None or len(symbol_bar_data) == 0: + continue + if not found_symbol_bar_data: + symbol_data_cache.append( + {"symbol": symbol, "bar": bar, "data": symbol_bar_data, "1h_data": symbol_1h_data} + ) + if trades_summary_df is None or len(trades_summary_df) == 0: + continue + trades_summary_df_list.append(trades_summary_df) + trades_df_list.append(trades_df) total_trades_df = pd.concat(trades_df_list) total_trades_summary_df = pd.concat(trades_summary_df_list) statitics_dict = statistics_summary(total_trades_summary_df) @@ -117,6 +136,9 @@ def main(): statitics_dict["max_total_return_record_df_K_count"].to_excel( writer, sheet_name="最大总收益率_K线条件", index=False ) + statitics_dict["max_total_return_record_df_1hK_count"].to_excel( + writer, sheet_name="最大总收益率_1小时K线条件", index=False + ) chart_path = r"./output/trade_sandbox/orb_strategy/chart/" os.makedirs(chart_path, exist_ok=True) copy_chart_to_excel(chart_path, output_file_path) @@ -170,6 +192,7 @@ def statistics_summary(trades_summary_df: pd.DataFrame): summary["根据SAR"] = max_total_return_record["根据SAR"] summary["R算法"] = max_total_return_record["R算法"] summary["K线条件"] = max_total_return_record["K线条件"] + summary["1小时K线条件"] = max_total_return_record["1小时K线条件"] summary["总收益率%"] = max_total_return_record["总收益率%"] summary["自然收益率%"] = max_total_return_record["自然收益率%"] max_total_return_record_list.append(summary) @@ -192,14 +215,14 @@ def statistics_summary(trades_summary_df: pd.DataFrame): # 其它(如Series、list、dict、ndarray等)转字符串 return str(v) - for key_col in ["方向", "根据SAR", "R算法", "K线条件"]: + for key_col in ["方向", "根据SAR", "R算法", "K线条件", "1小时K线条件"]: if key_col in max_total_return_record_df.columns: max_total_return_record_df[key_col] = max_total_return_record_df[ key_col ].apply(_to_hashable_scalar) # 分组统计 max_total_return_record_df_grouped_count = ( - max_total_return_record_df.groupby(["方向", "根据SAR", "R算法", "K线条件"], dropna=False) + max_total_return_record_df.groupby(["方向", "根据SAR", "R算法", "K线条件", "1小时K线条件"], dropna=False) .size() .reset_index(name="数量") ) @@ -251,18 +274,27 @@ def statistics_summary(trades_summary_df: pd.DataFrame): by="数量", ascending=False, inplace=True ) max_total_return_record_df_K_count.reset_index(drop=True, inplace=True) + + # 统计1小时K线条件的记录数目 + max_total_return_record_df_1hK_count = ( + max_total_return_record_df.groupby(["1小时K线条件"], dropna=False) + .size() + .reset_index(name="数量") + ) + max_total_return_record_df_1hK_count.sort_values( + by="数量", ascending=False, inplace=True + ) + max_total_return_record_df_1hK_count.reset_index(drop=True, inplace=True) else: # 构造空结果,保证下游写入Excel不报错 max_total_return_record_df_grouped_count = pd.DataFrame( - columns=["方向", "根据SAR", "R算法", "K线条件", "数量"] + columns=["方向", "根据SAR", "R算法", "K线条件", "1小时K线条件", "数量"] ) - max_total_return_record_df_direction_count = pd.DataFrame( - columns=["方向", "数量"] - ) + max_total_return_record_df_direction_count = pd.DataFrame(columns=["方向", "数量"]) max_total_return_record_df_sar_count = pd.DataFrame(columns=["根据SAR", "数量"]) max_total_return_record_df_R_count = pd.DataFrame(columns=["R算法", "数量"]) max_total_return_record_df_K_count = pd.DataFrame(columns=["K线条件", "数量"]) - + max_total_return_record_df_1hK_count = pd.DataFrame(columns=["1小时K线条件", "数量"]) result = { "statistics_summary_df": statistics_summary_df, "max_total_return_record_df": max_total_return_record_df, @@ -271,6 +303,7 @@ def statistics_summary(trades_summary_df: pd.DataFrame): "max_total_return_record_df_sar_count": max_total_return_record_df_sar_count, "max_total_return_record_df_R_count": max_total_return_record_df_R_count, "max_total_return_record_df_K_count": max_total_return_record_df_K_count, + "max_total_return_record_df_1hK_count": max_total_return_record_df_1hK_count, } return result diff --git a/play.py b/play.py index 453b77a..621dce7 100644 --- a/play.py +++ b/play.py @@ -2,6 +2,10 @@ import logging from core.biz.quant_trader import QuantTrader from core.biz.strategy import QuantStrategy +from config import MYSQL_CONFIG +from sqlalchemy import create_engine, exc, text +import pandas as pd + logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') def main() -> None: @@ -94,5 +98,78 @@ def main() -> None: else: logging.warning("无效选择,请重新输入") + +def test_query(): + mysql_user = MYSQL_CONFIG.get("user", "xch") + mysql_password = MYSQL_CONFIG.get("password", "") + if not mysql_password: + raise ValueError("MySQL password is not set") + mysql_host = MYSQL_CONFIG.get("host", "localhost") + mysql_port = MYSQL_CONFIG.get("port", 3306) + mysql_database = MYSQL_CONFIG.get("database", "okx") + db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" + db_engine = create_engine( + db_url, + pool_size=25, # 连接池大小 + max_overflow=10, # 允许的最大溢出连接 + pool_timeout=30, # 连接超时时间(秒) + pool_recycle=60, # 连接回收时间(秒),避免长时间闲置 + ) + sql = "SELECT symbol, min(date_time), max(date_time) FROM okx.crypto_binance_data where bar='5m' group by symbol;" + condition_dict = {} + return_multi = True + try: + result = query_data(db_engine, sql, condition_dict, return_multi) + if result is not None and len(result) > 0: + data = pd.DataFrame(result) + data.columns = ["symbol", "min_date_time", "max_date_time"] + print(data) + # output to excel + data.to_excel("./data/binance/crypto_binance_data_5m.xlsx", index=False) + except Exception as e: + print(f"查询数据出错: {e}") + return None + +def transform_data_type(data: dict): + """ + 遍历字典,将所有Decimal类型的值转换为float类型 + """ + from decimal import Decimal + for key, value in data.items(): + if isinstance(value, Decimal): + data[key] = float(value) + return data + + +def query_data(db_engine, sql: str, condition_dict: dict, return_multi: bool = True): + """ + 查询数据 + :param sql: 查询SQL + :param db_url: 数据库连接URL + """ + try: + with db_engine.connect() as conn: + result = conn.execute(text(sql), condition_dict) + if return_multi: + result = result.fetchall() + if result: + result_list = [ + transform_data_type(dict(row._mapping)) for row in result + ] + return result_list + else: + return None + else: + result = result.fetchone() + if result: + result_dict = transform_data_type(dict(result._mapping)) + return result_dict + else: + return None + except Exception as e: + print(f"查询数据出错: {e}") + return None + if __name__ == "__main__": - main() + # main() + test_query() diff --git a/sql/query/sql_playground.sql b/sql/query/sql_playground.sql index 41b21c5..b63cf22 100644 --- a/sql/query/sql_playground.sql +++ b/sql/query/sql_playground.sql @@ -1,11 +1,24 @@ +use okx; + +select DISTINCT symbol from crypto_huge_volume +where symbol in ("QQQ", "TQQQ", "MSFT", "AAPL", "GOOG", "NVDA", "META", "AMZN", "TSLA", "AVGO") and bar="30m" +and window_size=120 +group by symbol + select * from crypto_market_data -where symbol = "TQQQ" and bar="5m" -order by date_time_us DESC; +where symbol="PLTR" and bar="5m" +order by date_time_us -select * from crypto_huge_volume -where symbol = "QQQ" and bar="5m" -order by date_time; +select * from crypto_market_data +where symbol = "BTC-USDT" and bar="1H" #and open between 114271 and 114272 # AND date_time > "2025-08-21" +order by date_time desc; + +select count(1) from crypto_market_data where +symbol in ("QQQ", "TQQQ", "MSFT", "AAPL", "GOOG", "NVDA", "META", "AMZN", "TSLA", "AVGO") + +select count(1) from crypto_huge_volume where +symbol in ("QQQ", "TQQQ", "MSFT", "AAPL", "GOOG", "NVDA", "META", "AMZN", "TSLA", "AVGO") select symbol, bar, date_time, close, diff --git a/sql/table/crypto_binance_data_essential_indexes.sql b/sql/table/crypto_binance_data_essential_indexes.sql new file mode 100644 index 0000000..3a32c20 --- /dev/null +++ b/sql/table/crypto_binance_data_essential_indexes.sql @@ -0,0 +1,54 @@ +-- crypto_binance_data表核心索引优化脚本 +-- 针对500万+数据量的关键查询性能优化 +-- 基于实际代码中的查询模式分析 + +-- 核心索引1: 主要查询模式索引 +-- 覆盖: WHERE symbol = ? AND bar = ? ORDER BY timestamp +-- 这是代码中最常见的查询模式 +CREATE INDEX idx_symbol_bar_timestamp ON crypto_binance_data (symbol, bar, timestamp); + +-- 核心索引2: 时间范围查询索引 +-- 覆盖: WHERE symbol = ? AND bar = ? AND timestamp BETWEEN ? AND ? +-- 覆盖: WHERE symbol = ? AND bar = ? AND timestamp >= ? AND timestamp <= ? +# CREATE INDEX idx_symbol_bar_timestamp_range ON crypto_binance_data (symbol, bar, timestamp); + +-- 核心索引3: 分组查询索引 +-- 覆盖: GROUP BY symbol, bar +-- 覆盖: SELECT symbol, min(date_time), max(date_time) FROM crypto_binance_data WHERE bar='5m' GROUP BY symbol +CREATE INDEX idx_bar_symbol ON crypto_binance_data (bar, symbol); + +-- 核心索引4: 技术指标查询索引 +-- 覆盖: WHERE symbol = ? AND bar = ? AND macd_signal = ? +-- 覆盖: WHERE symbol = ? AND bar = ? AND kdj_signal = ? +-- 覆盖: WHERE symbol = ? AND bar = ? AND rsi_signal = ? +-- 覆盖: WHERE symbol = ? AND bar = ? AND boll_signal = ? +-- 覆盖: WHERE symbol = ? AND bar = ? AND ma_cross = ? +CREATE INDEX idx_symbol_bar_indicators ON crypto_binance_data (symbol, bar, macd_signal, kdj_signal, rsi_signal, boll_signal, ma_cross); + +-- 核心索引5: 时间戳排序索引 +-- 覆盖: ORDER BY timestamp DESC/ASC +-- 优化: 查询最新数据 ORDER BY timestamp DESC LIMIT 1 +CREATE INDEX idx_timestamp_desc ON crypto_binance_data (timestamp DESC); + +-- 核心索引6: 日期时间查询索引 +-- 覆盖: WHERE date_time >= ? AND date_time <= ? +-- 覆盖: ORDER BY date_time +CREATE INDEX idx_date_time ON crypto_binance_data (date_time); + +-- 核心索引7: 成交量查询索引 +-- 覆盖: WHERE symbol = ? AND bar = ? AND volume > ? +-- 覆盖: ORDER BY volume DESC +CREATE INDEX idx_symbol_bar_volume ON crypto_binance_data (symbol, bar, volume); + +-- 核心索引8: 价格查询索引 +-- 覆盖: WHERE symbol = ? AND bar = ? AND close > ? AND close < ? +-- 覆盖: ORDER BY close DESC/ASC +CREATE INDEX idx_symbol_bar_close ON crypto_binance_data (symbol, bar, close); + +-- 执行前检查现有索引 +-- SHOW INDEX FROM crypto_binance_data; + +-- 执行后验证索引效果 +-- EXPLAIN SELECT * FROM crypto_binance_data WHERE symbol = 'BTC-USDT' AND bar = '5m' ORDER BY timestamp DESC LIMIT 100; +-- EXPLAIN SELECT symbol, min(date_time), max(date_time) FROM crypto_binance_data WHERE bar='5m' GROUP BY symbol; +-- EXPLAIN SELECT * FROM crypto_binance_data WHERE symbol = 'BTC-USDT' AND bar = '5m' AND timestamp BETWEEN 1640995200000 AND 1641081600000; diff --git a/sql/table/crypto_binance_data_indexes.sql b/sql/table/crypto_binance_data_indexes.sql new file mode 100644 index 0000000..2338ac2 --- /dev/null +++ b/sql/table/crypto_binance_data_indexes.sql @@ -0,0 +1,71 @@ +-- crypto_binance_data表索引优化脚本 +-- 针对500万+数据量的查询性能优化 + +-- 1. 主要查询索引 - 覆盖最常见的查询模式 +-- 支持: WHERE symbol = ? AND bar = ? ORDER BY timestamp +CREATE INDEX idx_symbol_bar_timestamp ON crypto_binance_data (symbol, bar, timestamp); + +-- 2. 时间范围查询索引 - 优化时间范围查询 +-- 支持: WHERE symbol = ? AND bar = ? AND timestamp BETWEEN ? AND ? +CREATE INDEX idx_symbol_bar_timestamp_range ON crypto_binance_data (symbol, bar, timestamp); + +-- 3. 按symbol分组查询索引 - 优化GROUP BY symbol查询 +-- 支持: GROUP BY symbol, bar +CREATE INDEX idx_symbol_bar ON crypto_binance_data (symbol, bar); + +-- 4. 时间戳排序索引 - 优化ORDER BY timestamp查询 +-- 支持: ORDER BY timestamp DESC/ASC +CREATE INDEX idx_timestamp ON crypto_binance_data (timestamp); + +-- 5. 技术指标查询索引 - 优化技术指标相关查询 +-- 支持: WHERE symbol = ? AND bar = ? AND macd_signal = ? +CREATE INDEX idx_symbol_bar_macd_signal ON crypto_binance_data (symbol, bar, macd_signal); + +-- 支持: WHERE symbol = ? AND bar = ? AND kdj_signal = ? +CREATE INDEX idx_symbol_bar_kdj_signal ON crypto_binance_data (symbol, bar, kdj_signal); + +-- 支持: WHERE symbol = ? AND bar = ? AND rsi_signal = ? +CREATE INDEX idx_symbol_bar_rsi_signal ON crypto_binance_data (symbol, bar, rsi_signal); + +-- 支持: WHERE symbol = ? AND bar = ? AND boll_signal = ? +CREATE INDEX idx_symbol_bar_boll_signal ON crypto_binance_data (symbol, bar, boll_signal); + +-- 支持: WHERE symbol = ? AND bar = ? AND ma_cross = ? +CREATE INDEX idx_symbol_bar_ma_cross ON crypto_binance_data (symbol, bar, ma_cross); + +-- 6. 复合技术指标查询索引 - 优化多条件技术指标查询 +-- 支持: WHERE symbol = ? AND bar = ? AND macd_signal = ? AND kdj_signal = ? +CREATE INDEX idx_symbol_bar_macd_kdj ON crypto_binance_data (symbol, bar, macd_signal, kdj_signal); + +-- 7. 日期时间查询索引 - 优化按日期时间查询 +-- 支持: WHERE date_time >= ? AND date_time <= ? +CREATE INDEX idx_date_time ON crypto_binance_data (date_time); + +-- 8. 交易对时间索引 - 优化特定交易对的时间查询 +-- 支持: WHERE symbol = ? ORDER BY timestamp +CREATE INDEX idx_symbol_timestamp ON crypto_binance_data (symbol, timestamp); + +-- 9. 周期时间索引 - 优化特定周期的时间查询 +-- 支持: WHERE bar = ? ORDER BY timestamp +CREATE INDEX idx_bar_timestamp ON crypto_binance_data (bar, timestamp); + +-- 10. 统计查询优化索引 - 优化COUNT, AVG等聚合查询 +-- 支持: SELECT COUNT(*) FROM crypto_binance_data WHERE symbol = ? AND bar = ? +CREATE INDEX idx_symbol_bar_volume ON crypto_binance_data (symbol, bar, volume); + +-- 11. 价格相关查询索引 - 优化价格相关查询 +-- 支持: WHERE symbol = ? AND bar = ? AND close > ? AND close < ? +CREATE INDEX idx_symbol_bar_close ON crypto_binance_data (symbol, bar, close); + +-- 12. 成交量相关查询索引 - 优化成交量相关查询 +-- 支持: WHERE symbol = ? AND bar = ? AND volume > ? +CREATE INDEX idx_symbol_bar_volume_high ON crypto_binance_data (symbol, bar, volume); + +-- 删除可能存在的重复或低效索引 +-- 注意:在实际执行前请先检查现有索引,避免删除正在使用的索引 + +-- 查看当前索引状态 +-- SHOW INDEX FROM crypto_binance_data; + +-- 分析查询性能 +-- EXPLAIN SELECT * FROM crypto_binance_data WHERE symbol = 'BTC-USDT' AND bar = '5m' ORDER BY timestamp DESC LIMIT 100; diff --git a/sql/table/crypto_binance_huge_volume_essential_indexes.sql b/sql/table/crypto_binance_huge_volume_essential_indexes.sql new file mode 100644 index 0000000..a358f94 --- /dev/null +++ b/sql/table/crypto_binance_huge_volume_essential_indexes.sql @@ -0,0 +1,69 @@ +-- crypto_binance_huge_volume表核心索引优化脚本 +-- 针对2000万+数据量的关键查询性能优化 +-- 基于实际代码中的查询模式分析 + +-- 核心索引1: 主要查询模式索引 +-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? ORDER BY timestamp +-- 这是代码中最常见的查询模式,用于获取特定交易对、周期、窗口大小的数据 +CREATE INDEX idx_symbol_bar_window_size_timestamp ON crypto_binance_huge_volume (symbol, bar, window_size, timestamp); + +-- 核心索引2: 时间范围查询索引 +-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? AND timestamp BETWEEN ? AND ? +-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? AND timestamp >= ? AND timestamp <= ? +# CREATE INDEX idx_symbol_bar_window_size_timestamp_range ON crypto_binance_huge_volume (symbol, bar, window_size, timestamp); + +-- 核心索引3: 巨量交易查询索引 +-- 覆盖: WHERE huge_volume = 1 +-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? AND huge_volume = 1 +CREATE INDEX idx_huge_volume_symbol_bar_window_size ON crypto_binance_huge_volume (huge_volume, symbol, bar, window_size); + +-- 核心索引4: 成交量比率排序索引 +-- 覆盖: ORDER BY volume_ratio DESC +-- 覆盖: WHERE huge_volume = 1 ORDER BY volume_ratio DESC +CREATE INDEX idx_volume_ratio_desc ON crypto_binance_huge_volume (volume_ratio DESC); + +-- 核心索引5: 价格分位数高点查询索引 +-- 覆盖: WHERE close_80_high = 1, close_90_high = 1 +-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? AND close_80_high = 1 +CREATE INDEX idx_close_80_90_high ON crypto_binance_huge_volume (close_80_high, close_90_high, symbol, bar, window_size); + +-- 核心索引6: 价格分位数低点查询索引 +-- 覆盖: WHERE close_20_low = 1, close_10_low = 1 +-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? AND close_20_low = 1 +CREATE INDEX idx_close_20_10_low ON crypto_binance_huge_volume (close_20_low, close_10_low, symbol, bar, window_size); + +-- 核心索引7: 最高价分位数查询索引 +-- 覆盖: WHERE high_80_high = 1, high_90_high = 1, high_20_low = 1, high_10_low = 1 +CREATE INDEX idx_high_percentiles ON crypto_binance_huge_volume (high_80_high, high_90_high, high_20_low, high_10_low, symbol, bar, window_size); + +-- 核心索引8: 最低价分位数查询索引 +-- 覆盖: WHERE low_80_high = 1, low_90_high = 1, low_20_low = 1, low_10_low = 1 +CREATE INDEX idx_low_percentiles ON crypto_binance_huge_volume (low_80_high, low_90_high, low_20_low, low_10_low, symbol, bar, window_size); + +-- 核心索引9: 复合量价尖峰查询索引 +-- 覆盖: WHERE huge_volume = 1 AND (close_80_high = 1 OR close_20_low = 1) +-- 覆盖: WHERE huge_volume = 1 AND (close_90_high = 1 OR close_10_low = 1) +CREATE INDEX idx_huge_volume_price_spike ON crypto_binance_huge_volume (huge_volume, close_80_high, close_20_low, close_90_high, close_10_low, symbol, bar, window_size); + +-- 核心索引10: 分组统计查询索引 +-- 覆盖: GROUP BY symbol, bar, window_size +-- 覆盖: GROUP BY symbol, bar +CREATE INDEX idx_symbol_bar_window_size_group ON crypto_binance_huge_volume (symbol, bar, window_size); + +-- 核心索引11: 时间戳排序索引 +-- 覆盖: ORDER BY timestamp DESC/ASC +-- 优化: 查询最新数据 ORDER BY timestamp DESC LIMIT 1 +CREATE INDEX idx_timestamp_desc ON crypto_binance_huge_volume (timestamp DESC); + +-- 核心索引12: 日期时间查询索引 +-- 覆盖: WHERE date_time >= ? AND date_time <= ? +-- 覆盖: ORDER BY date_time +CREATE INDEX idx_date_time ON crypto_binance_huge_volume (date_time); + +-- 执行前检查现有索引 +-- SHOW INDEX FROM crypto_binance_huge_volume; + +-- 执行后验证索引效果 +-- EXPLAIN SELECT * FROM crypto_binance_huge_volume WHERE symbol = 'BTC-USDT' AND bar = '5m' AND window_size = 50 ORDER BY timestamp DESC LIMIT 100; +-- EXPLAIN SELECT * FROM crypto_binance_huge_volume WHERE huge_volume = 1 AND close_80_high = 1 ORDER BY volume_ratio DESC LIMIT 10; +-- EXPLAIN SELECT COUNT(*) FROM crypto_binance_huge_volume WHERE symbol = 'BTC-USDT' AND bar = '5m' AND window_size = 50;