support import binance data

optimize orb strategy
This commit is contained in:
blade 2025-09-09 14:15:29 +08:00
parent 019b894c9a
commit 5b7e95f4d9
14 changed files with 729 additions and 272 deletions

View File

@ -7,68 +7,65 @@ logger = logging.logger
class DBBinanceData:
def __init__(
self,
db_url: str
):
def __init__(self, db_url: str):
self.db_url = db_url
self.table_name = "crypto_binance_data"
self.columns = [
"symbol",
"bar",
"timestamp",
"date_time",
"date_time_us",
"open",
"high",
"low",
"close",
"pre_close",
"close_change",
"pct_chg",
"volume",
"volCcy",
"volCCyQuote",
"buy_sz",
"sell_sz",
# 技术指标字段
"ma1",
"ma2",
"dif",
"dea",
"macd",
"macd_signal",
"macd_divergence",
"kdj_k",
"kdj_d",
"kdj_j",
"kdj_signal",
"kdj_pattern",
"sar",
"sar_signal",
"ma5",
"ma10",
"ma20",
"ma30",
"ma_cross",
"ma5_close_diff",
"ma10_close_diff",
"ma20_close_diff",
"ma30_close_diff",
"ma_close_avg",
"ma_long_short",
"ma_divergence",
"rsi_14",
"rsi_signal",
"boll_upper",
"boll_middle",
"boll_lower",
"boll_signal",
"boll_pattern",
"k_length",
"k_shape",
"k_up_down",
"create_time",
"symbol",
"bar",
"timestamp",
"date_time",
"date_time_us",
"open",
"high",
"low",
"close",
"pre_close",
"close_change",
"pct_chg",
"volume",
"volCcy",
"volCCyQuote",
"buy_sz",
"sell_sz",
# 技术指标字段
"ma1",
"ma2",
"dif",
"dea",
"macd",
"macd_signal",
"macd_divergence",
"kdj_k",
"kdj_d",
"kdj_j",
"kdj_signal",
"kdj_pattern",
"sar",
"sar_signal",
"ma5",
"ma10",
"ma20",
"ma30",
"ma_cross",
"ma5_close_diff",
"ma10_close_diff",
"ma20_close_diff",
"ma30_close_diff",
"ma_close_avg",
"ma_long_short",
"ma_divergence",
"rsi_14",
"rsi_signal",
"boll_upper",
"boll_middle",
"boll_lower",
"boll_signal",
"boll_pattern",
"k_length",
"k_shape",
"k_up_down",
"create_time",
]
self.db_manager = DBData(db_url, self.table_name, self.columns)
@ -85,7 +82,7 @@ class DBBinanceData:
return
self.db_manager.insert_data_to_mysql(df)
def insert_data_to_mysql_fast(self, df: pd.DataFrame):
"""
快速插入K线行情数据方案2使用executemany批量插入
@ -97,9 +94,9 @@ class DBBinanceData:
if df is None or df.empty:
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_fast(df)
def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000):
"""
分块插入K线行情数据方案3适合大数据量
@ -112,9 +109,9 @@ class DBBinanceData:
if df is None or df.empty:
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_chunk(df, chunk_size)
def insert_data_to_mysql_simple(self, df: pd.DataFrame):
"""
简单插入K线行情数据方案4直接使用to_sql忽略重复
@ -125,9 +122,9 @@ class DBBinanceData:
if df is None or df.empty:
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_simple(df)
def query_latest_data(self, symbol: str, bar: str):
"""
查询最新数据
@ -142,8 +139,10 @@ class DBBinanceData:
"""
condition_dict = {"symbol": symbol, "bar": bar}
return self.db_manager.query_data(sql, condition_dict, return_multi=False)
def query_data_before_timestamp(self, symbol: str, bar: str, timestamp: int, limit: int = 100):
def query_data_before_timestamp(
self, symbol: str, bar: str, timestamp: int, limit: int = 100
):
"""
根据时间戳查询之前的数据
:param symbol: 交易对
@ -157,20 +156,25 @@ class DBBinanceData:
ORDER BY timestamp DESC
LIMIT :limit
"""
condition_dict = {"symbol": symbol, "bar": bar, "timestamp": timestamp, "limit": limit}
condition_dict = {
"symbol": symbol,
"bar": bar,
"timestamp": timestamp,
"limit": limit,
}
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_data_by_technical_indicators(
self,
symbol: str,
bar: str,
start: str = None,
self,
symbol: str,
bar: str,
start: str = None,
end: str = None,
macd_signal: str = None,
kdj_signal: str = None,
rsi_signal: str = None,
boll_signal: str = None,
ma_cross: str = None
ma_cross: str = None,
):
"""
根据技术指标查询数据
@ -186,7 +190,7 @@ class DBBinanceData:
"""
conditions = ["symbol = :symbol", "bar = :bar"]
condition_dict = {"symbol": symbol, "bar": bar}
if macd_signal:
conditions.append("macd_signal = :macd_signal")
condition_dict["macd_signal"] = macd_signal
@ -202,7 +206,7 @@ class DBBinanceData:
if ma_cross:
conditions.append("ma_cross = :ma_cross")
condition_dict["ma_cross"] = ma_cross
# 处理时间范围
if start:
start_timestamp = transform_date_time_to_timestamp(start)
@ -214,23 +218,23 @@ class DBBinanceData:
if end_timestamp:
conditions.append("timestamp <= :end")
condition_dict["end"] = end_timestamp
where_clause = " AND ".join(conditions)
sql = f"""
SELECT * FROM crypto_binance_data
WHERE {where_clause}
ORDER BY timestamp DESC
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_macd_signals(
self,
symbol: str,
bar: str,
self,
symbol: str,
bar: str,
signal: str = None,
start: str = None,
end: str = None
start: str = None,
end: str = None,
):
"""
查询MACD信号数据
@ -242,11 +246,11 @@ class DBBinanceData:
"""
conditions = ["symbol = :symbol", "bar = :bar"]
condition_dict = {"symbol": symbol, "bar": bar}
if signal:
conditions.append("macd_signal = :signal")
condition_dict["signal"] = signal
# 处理时间范围
if start:
start_timestamp = transform_date_time_to_timestamp(start)
@ -258,24 +262,24 @@ class DBBinanceData:
if end_timestamp:
conditions.append("timestamp <= :end")
condition_dict["end"] = end_timestamp
where_clause = " AND ".join(conditions)
sql = f"""
SELECT * FROM crypto_binance_data
WHERE {where_clause}
ORDER BY timestamp DESC
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_kdj_signals(
self,
symbol: str,
bar: str,
self,
symbol: str,
bar: str,
signal: str = None,
pattern: str = None,
start: str = None,
end: str = None
start: str = None,
end: str = None,
):
"""
查询KDJ信号数据
@ -288,14 +292,14 @@ class DBBinanceData:
"""
conditions = ["symbol = :symbol", "bar = :bar"]
condition_dict = {"symbol": symbol, "bar": bar}
if signal:
conditions.append("kdj_signal = :signal")
condition_dict["signal"] = signal
if pattern:
conditions.append("kdj_pattern = :pattern")
condition_dict["pattern"] = pattern
# 处理时间范围
if start:
start_timestamp = transform_date_time_to_timestamp(start)
@ -307,25 +311,25 @@ class DBBinanceData:
if end_timestamp:
conditions.append("timestamp <= :end")
condition_dict["end"] = end_timestamp
where_clause = " AND ".join(conditions)
sql = f"""
SELECT * FROM crypto_binance_data
WHERE {where_clause}
ORDER BY timestamp DESC
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_ma_signals(
self,
symbol: str,
bar: str,
self,
symbol: str,
bar: str,
cross: str = None,
long_short: str = None,
divergence: str = None,
start: str = None,
end: str = None
start: str = None,
end: str = None,
):
"""
查询均线信号数据
@ -339,7 +343,7 @@ class DBBinanceData:
"""
conditions = ["symbol = :symbol", "bar = :bar"]
condition_dict = {"symbol": symbol, "bar": bar}
if cross:
conditions.append("ma_cross = :cross")
condition_dict["cross"] = cross
@ -349,7 +353,7 @@ class DBBinanceData:
if divergence:
conditions.append("ma_divergence = :divergence")
condition_dict["divergence"] = divergence
# 处理时间范围
if start:
start_timestamp = transform_date_time_to_timestamp(start)
@ -361,24 +365,24 @@ class DBBinanceData:
if end_timestamp:
conditions.append("timestamp <= :end")
condition_dict["end"] = end_timestamp
where_clause = " AND ".join(conditions)
sql = f"""
SELECT * FROM crypto_binance_data
WHERE {where_clause}
ORDER BY timestamp DESC
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_bollinger_signals(
self,
symbol: str,
bar: str,
self,
symbol: str,
bar: str,
signal: str = None,
pattern: str = None,
start: str = None,
end: str = None
start: str = None,
end: str = None,
):
"""
查询布林带信号数据
@ -391,14 +395,14 @@ class DBBinanceData:
"""
conditions = ["symbol = :symbol", "bar = :bar"]
condition_dict = {"symbol": symbol, "bar": bar}
if signal:
conditions.append("boll_signal = :signal")
condition_dict["signal"] = signal
if pattern:
conditions.append("boll_pattern = :pattern")
condition_dict["pattern"] = pattern
# 处理时间范围
if start:
start_timestamp = transform_date_time_to_timestamp(start)
@ -410,22 +414,18 @@ class DBBinanceData:
if end_timestamp:
conditions.append("timestamp <= :end")
condition_dict["end"] = end_timestamp
where_clause = " AND ".join(conditions)
sql = f"""
SELECT * FROM crypto_binance_data
WHERE {where_clause}
ORDER BY timestamp DESC
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def get_technical_statistics(
self,
symbol: str,
bar: str,
start: str = None,
end: str = None
self, symbol: str, bar: str, start: str = None, end: str = None
):
"""
获取技术指标统计信息
@ -436,7 +436,7 @@ class DBBinanceData:
"""
conditions = ["symbol = :symbol", "bar = :bar"]
condition_dict = {"symbol": symbol, "bar": bar}
# 处理时间范围
if start:
start_timestamp = transform_date_time_to_timestamp(start)
@ -448,7 +448,7 @@ class DBBinanceData:
if end_timestamp:
conditions.append("timestamp <= :end")
condition_dict["end"] = end_timestamp
where_clause = " AND ".join(conditions)
sql = f"""
SELECT
@ -470,20 +470,31 @@ class DBBinanceData:
FROM crypto_binance_data
WHERE {where_clause}
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=False)
def query_market_data_by_symbol_bar(self, symbol: str, bar: str, start: str = None, end: str = None):
def query_market_data_by_symbol_bar(
self,
symbol: str,
bar: str,
fields: list = None,
start: str = None,
end: str = None,
):
"""
根据交易对和K线周期查询数据
:param symbol: 交易对
:param bar: K线周期
:param fields: 字段列表
:param start: 开始时间
:param end: 结束时间
"""
if fields is None:
fields = ["*"]
fields_str = ", ".join(fields)
if start is None and end is None:
sql = """
SELECT * FROM crypto_binance_data
sql = f"""
SELECT {fields_str} FROM crypto_binance_data
WHERE symbol = :symbol AND bar = :bar
ORDER BY timestamp ASC
"""
@ -502,24 +513,29 @@ class DBBinanceData:
if start is not None and end is not None:
if start > end:
start, end = end, start
sql = """
SELECT * FROM crypto_binance_data
sql = f"""
SELECT {fields_str} FROM crypto_binance_data
WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end}
condition_dict = {
"symbol": symbol,
"bar": bar,
"start": start,
"end": end,
}
elif start is not None:
sql = """
SELECT * FROM crypto_binance_data
sql = f"""
SELECT {fields_str} FROM crypto_binance_data
WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start}
elif end is not None:
sql = """
SELECT * FROM crypto_binance_data
sql = f"""
SELECT {fields_str} FROM crypto_binance_data
WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "end": end}
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
return self.db_manager.query_data(sql, condition_dict, return_multi=True)

View File

@ -218,13 +218,6 @@ class DBData:
:param db_url: 数据库连接URL
"""
try:
engine = create_engine(
self.db_url,
pool_size=5, # 连接池大小
max_overflow=10, # 允许的最大溢出连接
pool_timeout=30, # 连接超时时间(秒)
pool_recycle=1800, # 连接回收时间(秒),避免长时间闲置
)
with self.db_engine.connect() as conn:
result = conn.execute(text(sql), condition_dict)
if return_multi:

View File

@ -473,17 +473,21 @@ class DBMarketData:
return self.db_manager.query_data(sql, condition_dict, return_multi=False)
def query_market_data_by_symbol_bar(self, symbol: str, bar: str, start: str = None, end: str = None):
def query_market_data_by_symbol_bar(self, symbol: str, bar: str, fields: list = None, start: str = None, end: str = None):
"""
根据交易对和K线周期查询数据
:param symbol: 交易对
:param bar: K线周期
:param fields: 字段列表
:param start: 开始时间
:param end: 结束时间
"""
if fields is None:
fields = ["*"]
fields_str = ", ".join(fields)
if start is None and end is None:
sql = """
SELECT * FROM crypto_market_data
sql = f"""
SELECT {fields_str} FROM crypto_market_data
WHERE symbol = :symbol AND bar = :bar
ORDER BY timestamp ASC
"""
@ -502,22 +506,22 @@ class DBMarketData:
if start is not None and end is not None:
if start > end:
start, end = end, start
sql = """
SELECT * FROM crypto_market_data
sql = f"""
SELECT {fields_str} FROM crypto_market_data
WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end}
elif start is not None:
sql = """
SELECT * FROM crypto_market_data
sql = f"""
SELECT {fields_str} FROM crypto_market_data
WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start}
elif end is not None:
sql = """
SELECT * FROM crypto_market_data
sql = f"""
SELECT {fields_str} FROM crypto_market_data
WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end
ORDER BY timestamp ASC
"""

View File

@ -9,10 +9,12 @@ from openpyxl.drawing.image import Image
import openpyxl
from openpyxl.styles import Font
from PIL import Image as PILImage
from datetime import datetime, timedelta
import core.logger as logging
from config import OKX_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE
from core.db.db_market_data import DBMarketData
from core.db.db_binance_data import DBBinanceData
from core.db.db_huge_volume_data import DBHugeVolumeData
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
@ -35,11 +37,14 @@ class ORBStrategy:
commission_per_share=0.0005,
profit_target_multiple=10,
is_us_stock=False,
is_binance=False,
direction=None,
by_sar=False,
symbol_bar_data=None,
symbol_1h_data=None,
price_range_mean_as_R=False,
by_big_k=False,
by_1h_k=False,
):
"""
初始化ORB策略参数
@ -63,10 +68,14 @@ class ORBStrategy:
:param commission_per_share: 每股交易佣金美元默认0.0005
:param profit_target_multiple: 盈利目标倍数默认10倍$R即10R
:param is_us_stock: 是否是美股
:param is_binance: 是否是Binance
:param direction: 方向None=自动Long=多头Short=空头
:param by_sar: 是否根据SAR指标生成信号True=False=
:param symbol_bar_data: 5分钟K线数据
:param symbol_1h_data: 1小时K线数据
:param price_range_mean_as_R: 是否将价格振幅均值作为$RTrue=False=
:param by_big_k: 是否根据K线实体部分亦即abs(open-close)超过high-low的50%True=False=
:param by_1h_k: 是否根据1小时K线True=False=
"""
logger.info(
f"初始化ORB策略参数股票代码={symbol}K线周期={bar},开始日期={start_date},结束日期={end_date},初始账户资金={initial_capital},最大杠杆倍数={max_leverage},单次交易风险比例={risk_per_trade},每股交易佣金={commission_per_share}"
@ -90,10 +99,15 @@ class ORBStrategy:
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
self.is_us_stock = is_us_stock
self.is_binance = is_binance
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_market_data = DBMarketData(self.db_url)
self.is_us_stock = is_us_stock
if self.is_binance:
self.db_market_data = DBBinanceData(self.db_url)
else:
self.db_market_data = DBMarketData(self.db_url)
self.output_chart_folder = r"./output/trade_sandbox/orb_strategy/chart/"
self.output_excel_folder = r"./output/trade_sandbox/orb_strategy/excel/"
os.makedirs(self.output_chart_folder, exist_ok=True)
@ -110,6 +124,7 @@ class ORBStrategy:
if self.by_sar:
self.sar_desc = "考虑SAR"
self.symbol_bar_data = symbol_bar_data
self.symbol_1h_data = symbol_1h_data
self.price_range_mean_as_R = price_range_mean_as_R
if self.price_range_mean_as_R:
self.price_range_mean_as_R_desc = "R为振幅均值"
@ -121,6 +136,11 @@ class ORBStrategy:
self.by_big_k_desc = "K线实体过50%"
else:
self.by_big_k_desc = "无K线要求"
self.by_1h_k = by_1h_k
if self.by_1h_k:
self.by_1h_k_desc = "参照1小时K线"
else:
self.by_1h_k_desc = "不参照1小时K线"
def run(self):
"""
@ -132,7 +152,12 @@ class ORBStrategy:
if len(self.trades) > 0:
self.plot_equity_curve()
self.output_trade_summary()
return self.symbol_bar_data, self.trades_df, self.trades_summary_df
return (
self.symbol_bar_data,
self.symbol_1h_data,
self.trades_df,
self.trades_summary_df,
)
def fetch_intraday_data(self):
"""
@ -146,47 +171,10 @@ class ORBStrategy:
f"开始获取{self.symbol}数据:{self.start_date}{self.end_date},间隔{self.bar}"
)
if self.symbol_bar_data is None or len(self.symbol_bar_data) == 0:
data = self.db_market_data.query_market_data_by_symbol_bar(
self.symbol, self.bar, start=self.start_date, end=self.end_date
)
data = pd.DataFrame(data)
data.sort_values(by="date_time", inplace=True)
# 保留核心列:开盘价、最高价、最低价、收盘价、成交量
data["Open"] = data["open"]
data["High"] = data["high"]
data["Low"] = data["low"]
data["Close"] = data["close"]
data["Volume"] = data["volume"]
if self.is_us_stock:
date_time_field = "date_time_us"
else:
date_time_field = "date_time"
data[date_time_field] = pd.to_datetime(data[date_time_field])
# data["Date"]为日期不包括时分秒即date_time如果是2025-01-01 10:00:00则Date为2025-01-01
data["Date"] = data[date_time_field].dt.date
# 将Date转换为datetime64[ns]类型以确保类型一致
data["Date"] = pd.to_datetime(data["Date"])
# 最小data["Date"]
self.start_date = data["Date"].min().strftime("%Y-%m-%d")
# 最大data["Date"]
self.end_date = data["Date"].max().strftime("%Y-%m-%d")
self.data = data[
[
"symbol",
"bar",
"Date",
date_time_field,
"Open",
"High",
"Low",
"Close",
"Volume",
"sar_signal",
]
].copy()
self.data.rename(columns={date_time_field: "date_time"}, inplace=True)
self.data = self.get_full_data(bar=self.bar)
self.calculate_price_range_mean()
self.symbol_bar_data = self.data.copy()
self.symbol_1h_data = self.get_full_data(bar="1H")
else:
self.data = self.symbol_bar_data.copy()
@ -210,6 +198,91 @@ class ORBStrategy:
f"成功获取{self.symbol}数据:{len(self.data)}{self.bar}K线,开始日期={self.start_date},结束日期={self.end_date}"
)
def get_full_data(self, bar: str = "5m"):
"""
分段获取数据并将数据合并为完整数据
分段依据如果end_date与start_date相差超过一年则每次取一年数据
"""
data = pd.DataFrame()
start_date = datetime.strptime(self.start_date, "%Y-%m-%d")
end_date = datetime.strptime(self.end_date, "%Y-%m-%d") + timedelta(days=1)
fields = [
"symbol",
"bar",
"date_time",
"date_time_us",
"open",
"high",
"low",
"close",
"volume",
"sar_signal",
"ma5",
"ma10",
"ma20",
"ma30",
"dif",
"macd",
]
while start_date < end_date:
current_end_date = min(start_date + timedelta(days=180), end_date)
start_date_str = start_date.strftime("%Y-%m-%d")
current_end_date_str = current_end_date.strftime("%Y-%m-%d")
logger.info(
f"获取{self.symbol}数据:{start_date_str}{current_end_date_str}"
)
current_data = self.db_market_data.query_market_data_by_symbol_bar(
self.symbol, bar, fields, start=start_date_str, end=current_end_date_str
)
if current_data is not None and len(current_data) > 0:
current_data = pd.DataFrame(current_data)
data = pd.concat([data, current_data])
start_date = current_end_date
data.drop_duplicates(inplace=True)
if self.is_us_stock:
date_time_field = "date_time_us"
else:
date_time_field = "date_time"
data.sort_values(by=date_time_field, inplace=True)
data.reset_index(drop=True, inplace=True)
# 保留核心列:开盘价、最高价、最低价、收盘价、成交量
data["Open"] = data["open"]
data["High"] = data["high"]
data["Low"] = data["low"]
data["Close"] = data["close"]
data["Volume"] = data["volume"]
data[date_time_field] = pd.to_datetime(data[date_time_field])
# data["Date"]为日期不包括时分秒即date_time如果是2025-01-01 10:00:00则Date为2025-01-01
data["Date"] = data[date_time_field].dt.date
# 将Date转换为datetime64[ns]类型以确保类型一致
data["Date"] = pd.to_datetime(data["Date"])
# 最小data["Date"]
self.start_date = data["Date"].min().strftime("%Y-%m-%d")
# 最大data["Date"]
self.end_date = data["Date"].max().strftime("%Y-%m-%d")
data = data[
[
"symbol",
"bar",
"Date",
date_time_field,
"Open",
"High",
"Low",
"Close",
"Volume",
"sar_signal",
"ma5",
"ma10",
"ma20",
"ma30",
"dif",
"macd",
]
]
data.rename(columns={date_time_field: "date_time"}, inplace=True)
return data
def calculate_shares(self, account_value, entry_price, stop_price, risk_assumed):
"""
根据ORB公式计算交易股数
@ -247,7 +320,7 @@ class ORBStrategy:
- 第二根5分钟K线根据第一根K线方向生成多空信号
"""
logger.info(
f"开始生成ORB策略信号{self.direction_desc}根据SAR指标{self.by_sar}"
f"开始生成ORB策略信号{self.direction_desc}根据SAR指标{self.by_sar}{self.by_1h_k_desc}"
)
if self.data is None:
raise ValueError("请先调用fetch_intraday_data获取数据")
@ -261,6 +334,7 @@ class ORBStrategy:
# 第一根5分钟K线开盘区间
first_candle = daily_data.iloc[0]
current_date = first_candle["Date"]
high1 = first_candle["High"]
low1 = first_candle["Low"]
open1 = first_candle["Open"]
@ -273,6 +347,25 @@ class ORBStrategy:
if (abs(open1 - close1) / (high1 - low1)) < 0.5:
continue
ma5_1h = None
ma10_1h = None
dif_1h = None
macd_1h = None
if self.by_1h_k:
if self.symbol_1h_data is None or len(self.symbol_1h_data) == 0:
continue
if len(self.symbol_1h_data) < 2:
continue
symbol_1h_date_data = self.symbol_1h_data[
self.symbol_1h_data["Date"] == current_date
]
if len(symbol_1h_date_data) > 0:
first_candle_1h = symbol_1h_date_data.iloc[0]
ma5_1h = first_candle_1h["ma5"]
ma10_1h = first_candle_1h["ma10"]
dif_1h = first_candle_1h["dif"]
macd_1h = first_candle_1h["macd"]
# 第二根5分钟K线entry信号
second_candle = daily_data.iloc[1]
entry_price = second_candle["Open"] # entry价格=第二根K线开盘价
@ -283,6 +376,22 @@ class ORBStrategy:
open1 < close1
and (self.direction == "Long" or self.direction is None)
and ((self.by_sar and sar_signal == "SAR多头") or not self.by_sar)
and (
(
self.by_1h_k
and (
ma5_1h is not None
and ma10_1h is not None
and ma5_1h > ma10_1h
)
and (
dif_1h is not None
and macd_1h is not None
and (dif_1h > 0 or macd_1h > 0)
)
)
or not self.by_1h_k
)
):
# 第一根K线收涨→多头信号
signal = "Long"
@ -291,6 +400,22 @@ class ORBStrategy:
open1 > close1
and (self.direction == "Short" or self.direction is None)
and ((self.by_sar and sar_signal == "SAR空头") or not self.by_sar)
and (
(
self.by_1h_k
and (
ma5_1h is not None
and ma10_1h is not None
and ma5_1h < ma10_1h
)
and (
dif_1h is not None
and macd_1h is not None
and (dif_1h < 0 or macd_1h < 0)
)
)
or not self.by_1h_k
)
):
# 第一根K线收跌→空头信号
signal = "Short"
@ -477,6 +602,7 @@ class ORBStrategy:
"BySar": self.sar_desc,
"PriceRangeMeanAsR": self.price_range_mean_as_R_desc,
"ByBigK": self.by_big_k_desc,
"By1hK": self.by_1h_k_desc,
"Symbol": self.symbol,
"Bar": self.bar,
"Date": date,
@ -576,6 +702,7 @@ class ORBStrategy:
self.trades_summary["根据SAR"] = self.sar_desc
self.trades_summary["R算法"] = self.price_range_mean_as_R_desc
self.trades_summary["K线条件"] = self.by_big_k_desc
self.trades_summary["1小时K线条件"] = self.by_1h_k_desc
self.trades_summary["股票代码"] = self.symbol
self.trades_summary["K线周期"] = self.bar
self.trades_summary["开始日期"] = self.start_date
@ -660,7 +787,7 @@ class ORBStrategy:
markersize=4,
)
plt.title(
f"{symbol} {bar} {self.direction_desc} {self.sar_desc} {self.price_range_mean_as_R_desc} {self.by_big_k_desc}",
f"{symbol} {bar} {self.direction_desc} {self.sar_desc} {self.price_range_mean_as_R_desc} {self.by_big_k_desc} {self.by_1h_k_desc}",
fontsize=14,
fontweight="bold",
)
@ -704,7 +831,7 @@ class ORBStrategy:
fontsize=10,
)
plt.tight_layout()
self.chart_save_path = f"{self.output_chart_folder}/{symbol}_{bar}_{self.direction_desc}_{self.sar_desc}_{self.price_range_mean_as_R_desc}_{self.by_big_k_desc}_orb.png"
self.chart_save_path = f"{self.output_chart_folder}/{symbol}_{bar}_{self.direction_desc}_{self.sar_desc}_{self.price_range_mean_as_R_desc}_{self.by_big_k_desc}_{self.by_1h_k_desc}_orb.png"
plt.savefig(self.chart_save_path, dpi=150, bbox_inches="tight")
plt.close()
@ -714,7 +841,7 @@ class ORBStrategy:
"""
start_date = self.start_date.replace("-", "")
end_date = self.end_date.replace("-", "")
output_file_name = f"orb_{self.symbol}_{self.bar}_{start_date}_{end_date}_{self.direction_desc}_{self.sar_desc}_{self.price_range_mean_as_R_desc}_{self.by_big_k_desc}.xlsx"
output_file_name = f"orb_{self.symbol}_{self.bar}_{start_date}_{end_date}_{self.direction_desc}_{self.sar_desc}_{self.price_range_mean_as_R_desc}_{self.by_big_k_desc}_{self.by_1h_k_desc}.xlsx"
output_file_path = os.path.join(self.output_excel_folder, output_file_name)
logger.info(f"导出{output_file_path}")
with pd.ExcelWriter(output_file_path) as writer:

View File

@ -678,9 +678,9 @@ def batch_import_binance_data_by_csv():
def test_import_binance_data_by_csv():
huge_volume_main = HugeVolumeMain(threshold=2.0, is_us_stock=False, is_binance=True)
file_path = "./data/binance/spot/2020-08-11/SOL-USDT_1h.csv"
file_path = "./data/binance/spot/2020-08-12/XRP-USDT_1h.csv"
huge_volume_main.import_binance_data_by_csv(
file_path, "SOL-USDT", "1H", [50, 80, 100, 120]
file_path, "XRP-USDT", "1H", [50, 80, 100, 120]
)
@ -696,8 +696,8 @@ def test_send_huge_volume_data_to_wechat():
if __name__ == "__main__":
test_import_binance_data_by_csv()
# batch_import_binance_data_by_csv()
# test_import_binance_data_by_csv()
batch_import_binance_data_by_csv()
# batch_update_volume_spike(threshold=2.0, is_us_stock=True)
# test_send_huge_volume_data_to_wechat()
# batch_initial_detect_volume_spike(threshold=2.0)

View File

@ -1,5 +1,5 @@
from core.trade.orb_trade import ORBStrategy
from config import US_STOCK_MONITOR_CONFIG, OKX_MONITOR_CONFIG
from config import US_STOCK_MONITOR_CONFIG, OKX_MONITOR_CONFIG, BINANCE_MONITOR_CONFIG
import core.logger as logging
from datetime import datetime
from openpyxl import Workbook
@ -12,13 +12,21 @@ logger = logging.logger
def main():
is_us_stock_list = [True, False]
# is_us_stock_list = [True, False]
is_us_stock_list = [False]
is_binance = True
bar = "5m"
direction_list = [None, "Long", "Short"]
by_sar_list = [False, True]
price_range_mean_as_R_list = [False, True]
by_big_k_list = [False, True]
start_date = "2024-01-01"
by_1h_k_list = [False, True]
if is_binance:
start_date = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get("initial_date", "2017-08-16 00:00:00")
if len(start_date) > 10:
start_date = start_date[:10]
else:
start_date = "2024-01-01"
end_date = datetime.now().strftime("%Y-%m-%d")
profit_target_multiple = 10
initial_capital = 25000
@ -34,57 +42,68 @@ def main():
for by_sar in by_sar_list:
for price_range_mean_as_R in price_range_mean_as_R_list:
for by_big_k in by_big_k_list:
if is_us_stock:
symbols = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["QQQ"]
)
else:
symbols = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["BTC-USDT"]
)
for symbol in symbols:
logger.info(
f"开始回测 {symbol}, 交易周期:{bar}, 开始日期:{start_date}, 结束日期:{end_date}, 是否是美股:{is_us_stock}, 交易方向:{direction}, 是否使用SAR:{by_sar}, 是否使用R为entry减stop:{price_range_mean_as_R}, 是否使用K线实体过50%:{by_big_k}"
)
symbol_bar_data = None
found_symbol_bar_data = False
for symbol_data_dict in symbol_data_cache:
if (
symbol_data_dict["symbol"] == symbol
and symbol_data_dict["bar"] == bar
):
symbol_bar_data = symbol_data_dict["data"]
found_symbol_bar_data = True
break
orb_strategy = ORBStrategy(
symbol=symbol,
bar=bar,
start_date=start_date,
end_date=end_date,
is_us_stock=is_us_stock,
direction=direction,
by_sar=by_sar,
profit_target_multiple=profit_target_multiple,
initial_capital=initial_capital,
max_leverage=max_leverage,
risk_per_trade=risk_per_trade,
commission_per_share=commission_per_share,
symbol_bar_data=symbol_bar_data,
price_range_mean_as_R=price_range_mean_as_R,
by_big_k=by_big_k,
)
symbol_bar_data, trades_df, trades_summary_df = orb_strategy.run()
if symbol_bar_data is None or len(symbol_bar_data) == 0:
continue
if not found_symbol_bar_data:
symbol_data_cache.append(
{"symbol": symbol, "bar": bar, "data": symbol_bar_data}
for by_1h_k in by_1h_k_list:
if is_us_stock:
symbols = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["QQQ"]
)
if trades_summary_df is None or len(trades_summary_df) == 0:
continue
trades_summary_df_list.append(trades_summary_df)
trades_df_list.append(trades_df)
else:
if is_binance:
symbols = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["BTC-USDT"]
)
else:
symbols = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["BTC-USDT"]
)
for symbol in symbols:
logger.info(
f"开始回测 {symbol}, 交易周期:{bar}, 开始日期:{start_date}, 结束日期:{end_date}, 是否是美股:{is_us_stock}, 交易方向:{direction}, 是否使用SAR:{by_sar}, 是否使用R为entry减stop:{price_range_mean_as_R}, 是否使用K线实体过50%:{by_big_k}"
)
symbol_bar_data = None
symbol_1h_data = None
found_symbol_bar_data = False
for symbol_data_dict in symbol_data_cache:
if (
symbol_data_dict["symbol"] == symbol
and symbol_data_dict["bar"] == bar
):
symbol_bar_data = symbol_data_dict["data"]
symbol_1h_data = symbol_data_dict["1h_data"]
found_symbol_bar_data = True
break
orb_strategy = ORBStrategy(
symbol=symbol,
bar=bar,
start_date=start_date,
end_date=end_date,
is_us_stock=is_us_stock,
is_binance=is_binance,
direction=direction,
by_sar=by_sar,
profit_target_multiple=profit_target_multiple,
initial_capital=initial_capital,
max_leverage=max_leverage,
risk_per_trade=risk_per_trade,
commission_per_share=commission_per_share,
symbol_bar_data=symbol_bar_data,
symbol_1h_data=symbol_1h_data,
price_range_mean_as_R=price_range_mean_as_R,
by_big_k=by_big_k,
by_1h_k=by_1h_k,
)
symbol_bar_data, symbol_1h_data, trades_df, trades_summary_df = orb_strategy.run()
if symbol_bar_data is None or len(symbol_bar_data) == 0:
continue
if not found_symbol_bar_data:
symbol_data_cache.append(
{"symbol": symbol, "bar": bar, "data": symbol_bar_data, "1h_data": symbol_1h_data}
)
if trades_summary_df is None or len(trades_summary_df) == 0:
continue
trades_summary_df_list.append(trades_summary_df)
trades_df_list.append(trades_df)
total_trades_df = pd.concat(trades_df_list)
total_trades_summary_df = pd.concat(trades_summary_df_list)
statitics_dict = statistics_summary(total_trades_summary_df)
@ -117,6 +136,9 @@ def main():
statitics_dict["max_total_return_record_df_K_count"].to_excel(
writer, sheet_name="最大总收益率_K线条件", index=False
)
statitics_dict["max_total_return_record_df_1hK_count"].to_excel(
writer, sheet_name="最大总收益率_1小时K线条件", index=False
)
chart_path = r"./output/trade_sandbox/orb_strategy/chart/"
os.makedirs(chart_path, exist_ok=True)
copy_chart_to_excel(chart_path, output_file_path)
@ -170,6 +192,7 @@ def statistics_summary(trades_summary_df: pd.DataFrame):
summary["根据SAR"] = max_total_return_record["根据SAR"]
summary["R算法"] = max_total_return_record["R算法"]
summary["K线条件"] = max_total_return_record["K线条件"]
summary["1小时K线条件"] = max_total_return_record["1小时K线条件"]
summary["总收益率%"] = max_total_return_record["总收益率%"]
summary["自然收益率%"] = max_total_return_record["自然收益率%"]
max_total_return_record_list.append(summary)
@ -192,14 +215,14 @@ def statistics_summary(trades_summary_df: pd.DataFrame):
# 其它如Series、list、dict、ndarray等转字符串
return str(v)
for key_col in ["方向", "根据SAR", "R算法", "K线条件"]:
for key_col in ["方向", "根据SAR", "R算法", "K线条件", "1小时K线条件"]:
if key_col in max_total_return_record_df.columns:
max_total_return_record_df[key_col] = max_total_return_record_df[
key_col
].apply(_to_hashable_scalar)
# 分组统计
max_total_return_record_df_grouped_count = (
max_total_return_record_df.groupby(["方向", "根据SAR", "R算法", "K线条件"], dropna=False)
max_total_return_record_df.groupby(["方向", "根据SAR", "R算法", "K线条件", "1小时K线条件"], dropna=False)
.size()
.reset_index(name="数量")
)
@ -251,18 +274,27 @@ def statistics_summary(trades_summary_df: pd.DataFrame):
by="数量", ascending=False, inplace=True
)
max_total_return_record_df_K_count.reset_index(drop=True, inplace=True)
# 统计1小时K线条件的记录数目
max_total_return_record_df_1hK_count = (
max_total_return_record_df.groupby(["1小时K线条件"], dropna=False)
.size()
.reset_index(name="数量")
)
max_total_return_record_df_1hK_count.sort_values(
by="数量", ascending=False, inplace=True
)
max_total_return_record_df_1hK_count.reset_index(drop=True, inplace=True)
else:
# 构造空结果保证下游写入Excel不报错
max_total_return_record_df_grouped_count = pd.DataFrame(
columns=["方向", "根据SAR", "R算法", "K线条件", "数量"]
columns=["方向", "根据SAR", "R算法", "K线条件", "1小时K线条件", "数量"]
)
max_total_return_record_df_direction_count = pd.DataFrame(
columns=["方向", "数量"]
)
max_total_return_record_df_direction_count = pd.DataFrame(columns=["方向", "数量"])
max_total_return_record_df_sar_count = pd.DataFrame(columns=["根据SAR", "数量"])
max_total_return_record_df_R_count = pd.DataFrame(columns=["R算法", "数量"])
max_total_return_record_df_K_count = pd.DataFrame(columns=["K线条件", "数量"])
max_total_return_record_df_1hK_count = pd.DataFrame(columns=["1小时K线条件", "数量"])
result = {
"statistics_summary_df": statistics_summary_df,
"max_total_return_record_df": max_total_return_record_df,
@ -271,6 +303,7 @@ def statistics_summary(trades_summary_df: pd.DataFrame):
"max_total_return_record_df_sar_count": max_total_return_record_df_sar_count,
"max_total_return_record_df_R_count": max_total_return_record_df_R_count,
"max_total_return_record_df_K_count": max_total_return_record_df_K_count,
"max_total_return_record_df_1hK_count": max_total_return_record_df_1hK_count,
}
return result

79
play.py
View File

@ -2,6 +2,10 @@ import logging
from core.biz.quant_trader import QuantTrader
from core.biz.strategy import QuantStrategy
from config import MYSQL_CONFIG
from sqlalchemy import create_engine, exc, text
import pandas as pd
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
def main() -> None:
@ -94,5 +98,78 @@ def main() -> None:
else:
logging.warning("无效选择,请重新输入")
def test_query():
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
db_engine = create_engine(
db_url,
pool_size=25, # 连接池大小
max_overflow=10, # 允许的最大溢出连接
pool_timeout=30, # 连接超时时间(秒)
pool_recycle=60, # 连接回收时间(秒),避免长时间闲置
)
sql = "SELECT symbol, min(date_time), max(date_time) FROM okx.crypto_binance_data where bar='5m' group by symbol;"
condition_dict = {}
return_multi = True
try:
result = query_data(db_engine, sql, condition_dict, return_multi)
if result is not None and len(result) > 0:
data = pd.DataFrame(result)
data.columns = ["symbol", "min_date_time", "max_date_time"]
print(data)
# output to excel
data.to_excel("./data/binance/crypto_binance_data_5m.xlsx", index=False)
except Exception as e:
print(f"查询数据出错: {e}")
return None
def transform_data_type(data: dict):
"""
遍历字典将所有Decimal类型的值转换为float类型
"""
from decimal import Decimal
for key, value in data.items():
if isinstance(value, Decimal):
data[key] = float(value)
return data
def query_data(db_engine, sql: str, condition_dict: dict, return_multi: bool = True):
"""
查询数据
:param sql: 查询SQL
:param db_url: 数据库连接URL
"""
try:
with db_engine.connect() as conn:
result = conn.execute(text(sql), condition_dict)
if return_multi:
result = result.fetchall()
if result:
result_list = [
transform_data_type(dict(row._mapping)) for row in result
]
return result_list
else:
return None
else:
result = result.fetchone()
if result:
result_dict = transform_data_type(dict(result._mapping))
return result_dict
else:
return None
except Exception as e:
print(f"查询数据出错: {e}")
return None
if __name__ == "__main__":
main()
# main()
test_query()

View File

@ -1,11 +1,24 @@
use okx;
select DISTINCT symbol from crypto_huge_volume
where symbol in ("QQQ", "TQQQ", "MSFT", "AAPL", "GOOG", "NVDA", "META", "AMZN", "TSLA", "AVGO") and bar="30m"
and window_size=120
group by symbol
select * from crypto_market_data
where symbol = "TQQQ" and bar="5m"
order by date_time_us DESC;
where symbol="PLTR" and bar="5m"
order by date_time_us
select * from crypto_huge_volume
where symbol = "QQQ" and bar="5m"
order by date_time;
select * from crypto_market_data
where symbol = "BTC-USDT" and bar="1H" #and open between 114271 and 114272 # AND date_time > "2025-08-21"
order by date_time desc;
select count(1) from crypto_market_data where
symbol in ("QQQ", "TQQQ", "MSFT", "AAPL", "GOOG", "NVDA", "META", "AMZN", "TSLA", "AVGO")
select count(1) from crypto_huge_volume where
symbol in ("QQQ", "TQQQ", "MSFT", "AAPL", "GOOG", "NVDA", "META", "AMZN", "TSLA", "AVGO")
select symbol, bar, date_time, close,

View File

@ -0,0 +1,54 @@
-- crypto_binance_data表核心索引优化脚本
-- 针对500万+数据量的关键查询性能优化
-- 基于实际代码中的查询模式分析
-- 核心索引1: 主要查询模式索引
-- 覆盖: WHERE symbol = ? AND bar = ? ORDER BY timestamp
-- 这是代码中最常见的查询模式
CREATE INDEX idx_symbol_bar_timestamp ON crypto_binance_data (symbol, bar, timestamp);
-- 核心索引2: 时间范围查询索引
-- 覆盖: WHERE symbol = ? AND bar = ? AND timestamp BETWEEN ? AND ?
-- 覆盖: WHERE symbol = ? AND bar = ? AND timestamp >= ? AND timestamp <= ?
# CREATE INDEX idx_symbol_bar_timestamp_range ON crypto_binance_data (symbol, bar, timestamp);
-- 核心索引3: 分组查询索引
-- 覆盖: GROUP BY symbol, bar
-- 覆盖: SELECT symbol, min(date_time), max(date_time) FROM crypto_binance_data WHERE bar='5m' GROUP BY symbol
CREATE INDEX idx_bar_symbol ON crypto_binance_data (bar, symbol);
-- 核心索引4: 技术指标查询索引
-- 覆盖: WHERE symbol = ? AND bar = ? AND macd_signal = ?
-- 覆盖: WHERE symbol = ? AND bar = ? AND kdj_signal = ?
-- 覆盖: WHERE symbol = ? AND bar = ? AND rsi_signal = ?
-- 覆盖: WHERE symbol = ? AND bar = ? AND boll_signal = ?
-- 覆盖: WHERE symbol = ? AND bar = ? AND ma_cross = ?
CREATE INDEX idx_symbol_bar_indicators ON crypto_binance_data (symbol, bar, macd_signal, kdj_signal, rsi_signal, boll_signal, ma_cross);
-- 核心索引5: 时间戳排序索引
-- 覆盖: ORDER BY timestamp DESC/ASC
-- 优化: 查询最新数据 ORDER BY timestamp DESC LIMIT 1
CREATE INDEX idx_timestamp_desc ON crypto_binance_data (timestamp DESC);
-- 核心索引6: 日期时间查询索引
-- 覆盖: WHERE date_time >= ? AND date_time <= ?
-- 覆盖: ORDER BY date_time
CREATE INDEX idx_date_time ON crypto_binance_data (date_time);
-- 核心索引7: 成交量查询索引
-- 覆盖: WHERE symbol = ? AND bar = ? AND volume > ?
-- 覆盖: ORDER BY volume DESC
CREATE INDEX idx_symbol_bar_volume ON crypto_binance_data (symbol, bar, volume);
-- 核心索引8: 价格查询索引
-- 覆盖: WHERE symbol = ? AND bar = ? AND close > ? AND close < ?
-- 覆盖: ORDER BY close DESC/ASC
CREATE INDEX idx_symbol_bar_close ON crypto_binance_data (symbol, bar, close);
-- 执行前检查现有索引
-- SHOW INDEX FROM crypto_binance_data;
-- 执行后验证索引效果
-- EXPLAIN SELECT * FROM crypto_binance_data WHERE symbol = 'BTC-USDT' AND bar = '5m' ORDER BY timestamp DESC LIMIT 100;
-- EXPLAIN SELECT symbol, min(date_time), max(date_time) FROM crypto_binance_data WHERE bar='5m' GROUP BY symbol;
-- EXPLAIN SELECT * FROM crypto_binance_data WHERE symbol = 'BTC-USDT' AND bar = '5m' AND timestamp BETWEEN 1640995200000 AND 1641081600000;

View File

@ -0,0 +1,71 @@
-- crypto_binance_data表索引优化脚本
-- 针对500万+数据量的查询性能优化
-- 1. 主要查询索引 - 覆盖最常见的查询模式
-- 支持: WHERE symbol = ? AND bar = ? ORDER BY timestamp
CREATE INDEX idx_symbol_bar_timestamp ON crypto_binance_data (symbol, bar, timestamp);
-- 2. 时间范围查询索引 - 优化时间范围查询
-- 支持: WHERE symbol = ? AND bar = ? AND timestamp BETWEEN ? AND ?
CREATE INDEX idx_symbol_bar_timestamp_range ON crypto_binance_data (symbol, bar, timestamp);
-- 3. 按symbol分组查询索引 - 优化GROUP BY symbol查询
-- 支持: GROUP BY symbol, bar
CREATE INDEX idx_symbol_bar ON crypto_binance_data (symbol, bar);
-- 4. 时间戳排序索引 - 优化ORDER BY timestamp查询
-- 支持: ORDER BY timestamp DESC/ASC
CREATE INDEX idx_timestamp ON crypto_binance_data (timestamp);
-- 5. 技术指标查询索引 - 优化技术指标相关查询
-- 支持: WHERE symbol = ? AND bar = ? AND macd_signal = ?
CREATE INDEX idx_symbol_bar_macd_signal ON crypto_binance_data (symbol, bar, macd_signal);
-- 支持: WHERE symbol = ? AND bar = ? AND kdj_signal = ?
CREATE INDEX idx_symbol_bar_kdj_signal ON crypto_binance_data (symbol, bar, kdj_signal);
-- 支持: WHERE symbol = ? AND bar = ? AND rsi_signal = ?
CREATE INDEX idx_symbol_bar_rsi_signal ON crypto_binance_data (symbol, bar, rsi_signal);
-- 支持: WHERE symbol = ? AND bar = ? AND boll_signal = ?
CREATE INDEX idx_symbol_bar_boll_signal ON crypto_binance_data (symbol, bar, boll_signal);
-- 支持: WHERE symbol = ? AND bar = ? AND ma_cross = ?
CREATE INDEX idx_symbol_bar_ma_cross ON crypto_binance_data (symbol, bar, ma_cross);
-- 6. 复合技术指标查询索引 - 优化多条件技术指标查询
-- 支持: WHERE symbol = ? AND bar = ? AND macd_signal = ? AND kdj_signal = ?
CREATE INDEX idx_symbol_bar_macd_kdj ON crypto_binance_data (symbol, bar, macd_signal, kdj_signal);
-- 7. 日期时间查询索引 - 优化按日期时间查询
-- 支持: WHERE date_time >= ? AND date_time <= ?
CREATE INDEX idx_date_time ON crypto_binance_data (date_time);
-- 8. 交易对时间索引 - 优化特定交易对的时间查询
-- 支持: WHERE symbol = ? ORDER BY timestamp
CREATE INDEX idx_symbol_timestamp ON crypto_binance_data (symbol, timestamp);
-- 9. 周期时间索引 - 优化特定周期的时间查询
-- 支持: WHERE bar = ? ORDER BY timestamp
CREATE INDEX idx_bar_timestamp ON crypto_binance_data (bar, timestamp);
-- 10. 统计查询优化索引 - 优化COUNT, AVG等聚合查询
-- 支持: SELECT COUNT(*) FROM crypto_binance_data WHERE symbol = ? AND bar = ?
CREATE INDEX idx_symbol_bar_volume ON crypto_binance_data (symbol, bar, volume);
-- 11. 价格相关查询索引 - 优化价格相关查询
-- 支持: WHERE symbol = ? AND bar = ? AND close > ? AND close < ?
CREATE INDEX idx_symbol_bar_close ON crypto_binance_data (symbol, bar, close);
-- 12. 成交量相关查询索引 - 优化成交量相关查询
-- 支持: WHERE symbol = ? AND bar = ? AND volume > ?
CREATE INDEX idx_symbol_bar_volume_high ON crypto_binance_data (symbol, bar, volume);
-- 删除可能存在的重复或低效索引
-- 注意:在实际执行前请先检查现有索引,避免删除正在使用的索引
-- 查看当前索引状态
-- SHOW INDEX FROM crypto_binance_data;
-- 分析查询性能
-- EXPLAIN SELECT * FROM crypto_binance_data WHERE symbol = 'BTC-USDT' AND bar = '5m' ORDER BY timestamp DESC LIMIT 100;

View File

@ -0,0 +1,69 @@
-- crypto_binance_huge_volume表核心索引优化脚本
-- 针对2000万+数据量的关键查询性能优化
-- 基于实际代码中的查询模式分析
-- 核心索引1: 主要查询模式索引
-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? ORDER BY timestamp
-- 这是代码中最常见的查询模式,用于获取特定交易对、周期、窗口大小的数据
CREATE INDEX idx_symbol_bar_window_size_timestamp ON crypto_binance_huge_volume (symbol, bar, window_size, timestamp);
-- 核心索引2: 时间范围查询索引
-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? AND timestamp BETWEEN ? AND ?
-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? AND timestamp >= ? AND timestamp <= ?
# CREATE INDEX idx_symbol_bar_window_size_timestamp_range ON crypto_binance_huge_volume (symbol, bar, window_size, timestamp);
-- 核心索引3: 巨量交易查询索引
-- 覆盖: WHERE huge_volume = 1
-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? AND huge_volume = 1
CREATE INDEX idx_huge_volume_symbol_bar_window_size ON crypto_binance_huge_volume (huge_volume, symbol, bar, window_size);
-- 核心索引4: 成交量比率排序索引
-- 覆盖: ORDER BY volume_ratio DESC
-- 覆盖: WHERE huge_volume = 1 ORDER BY volume_ratio DESC
CREATE INDEX idx_volume_ratio_desc ON crypto_binance_huge_volume (volume_ratio DESC);
-- 核心索引5: 价格分位数高点查询索引
-- 覆盖: WHERE close_80_high = 1, close_90_high = 1
-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? AND close_80_high = 1
CREATE INDEX idx_close_80_90_high ON crypto_binance_huge_volume (close_80_high, close_90_high, symbol, bar, window_size);
-- 核心索引6: 价格分位数低点查询索引
-- 覆盖: WHERE close_20_low = 1, close_10_low = 1
-- 覆盖: WHERE symbol = ? AND bar = ? AND window_size = ? AND close_20_low = 1
CREATE INDEX idx_close_20_10_low ON crypto_binance_huge_volume (close_20_low, close_10_low, symbol, bar, window_size);
-- 核心索引7: 最高价分位数查询索引
-- 覆盖: WHERE high_80_high = 1, high_90_high = 1, high_20_low = 1, high_10_low = 1
CREATE INDEX idx_high_percentiles ON crypto_binance_huge_volume (high_80_high, high_90_high, high_20_low, high_10_low, symbol, bar, window_size);
-- 核心索引8: 最低价分位数查询索引
-- 覆盖: WHERE low_80_high = 1, low_90_high = 1, low_20_low = 1, low_10_low = 1
CREATE INDEX idx_low_percentiles ON crypto_binance_huge_volume (low_80_high, low_90_high, low_20_low, low_10_low, symbol, bar, window_size);
-- 核心索引9: 复合量价尖峰查询索引
-- 覆盖: WHERE huge_volume = 1 AND (close_80_high = 1 OR close_20_low = 1)
-- 覆盖: WHERE huge_volume = 1 AND (close_90_high = 1 OR close_10_low = 1)
CREATE INDEX idx_huge_volume_price_spike ON crypto_binance_huge_volume (huge_volume, close_80_high, close_20_low, close_90_high, close_10_low, symbol, bar, window_size);
-- 核心索引10: 分组统计查询索引
-- 覆盖: GROUP BY symbol, bar, window_size
-- 覆盖: GROUP BY symbol, bar
CREATE INDEX idx_symbol_bar_window_size_group ON crypto_binance_huge_volume (symbol, bar, window_size);
-- 核心索引11: 时间戳排序索引
-- 覆盖: ORDER BY timestamp DESC/ASC
-- 优化: 查询最新数据 ORDER BY timestamp DESC LIMIT 1
CREATE INDEX idx_timestamp_desc ON crypto_binance_huge_volume (timestamp DESC);
-- 核心索引12: 日期时间查询索引
-- 覆盖: WHERE date_time >= ? AND date_time <= ?
-- 覆盖: ORDER BY date_time
CREATE INDEX idx_date_time ON crypto_binance_huge_volume (date_time);
-- 执行前检查现有索引
-- SHOW INDEX FROM crypto_binance_huge_volume;
-- 执行后验证索引效果
-- EXPLAIN SELECT * FROM crypto_binance_huge_volume WHERE symbol = 'BTC-USDT' AND bar = '5m' AND window_size = 50 ORDER BY timestamp DESC LIMIT 100;
-- EXPLAIN SELECT * FROM crypto_binance_huge_volume WHERE huge_volume = 1 AND close_80_high = 1 ORDER BY volume_ratio DESC LIMIT 10;
-- EXPLAIN SELECT COUNT(*) FROM crypto_binance_huge_volume WHERE symbol = 'BTC-USDT' AND bar = '5m' AND window_size = 50;