crypto_quant/core/trade/orb_trade.py

504 lines
22 KiB
Python
Raw Normal View History

2025-08-31 03:20:59 +00:00
import yfinance as yf
2025-09-01 10:01:21 +00:00
import os
2025-08-31 03:20:59 +00:00
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import core.logger as logging
from config import OKX_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE
from core.db.db_market_data import DBMarketData
from core.db.db_huge_volume_data import DBHugeVolumeData
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
# seaborn支持中文
plt.rcParams["font.family"] = ["SimHei"]
logger = logging.logger
class ORBStrategy:
def __init__(
self,
initial_capital=25000,
max_leverage=4,
risk_per_trade=0.01,
commission_per_share=0.0005,
2025-09-01 10:01:21 +00:00
is_us_stock=False,
2025-08-31 03:20:59 +00:00
):
"""
初始化ORB策略参数
2025-09-01 10:01:21 +00:00
ORB策略说明
1. 每天仅1次交易机会多头或空头,排除十字星open1 == close1
2. 第一根5分钟K线确定开盘区间High1, Low1
3. 第二根5分钟K线根据第一根K线方向生成多空信号open1<close1为多头open1>close1为空头
entry_price=第二根K线开盘价stop_price=第一根K线最低价多头或第一根K线最高价空头
4. 多头跌破止损止损突破止盈止盈
5. 空头突破止损止损跌破止盈止盈
6. 止损/止盈根据$R计算$R=|entry_price-stop_price|
7. 盈利目标10R即10*$R
8. 账户净值曲线账户价值与市场价格
2025-08-31 03:20:59 +00:00
:param initial_capital: 初始账户资金美元
:param max_leverage: 最大杠杆倍数默认4倍符合FINRA规定
:param risk_per_trade: 单次交易风险比例默认1%
:param commission_per_share: 每股交易佣金美元默认0.0005
"""
2025-09-01 10:01:21 +00:00
logger.info(
f"初始化ORB策略参数初始账户资金={initial_capital},最大杠杆倍数={max_leverage},单次交易风险比例={risk_per_trade},每股交易佣金={commission_per_share}"
)
2025-08-31 03:20:59 +00:00
self.initial_capital = initial_capital
self.max_leverage = max_leverage
self.risk_per_trade = risk_per_trade
self.commission_per_share = commission_per_share
self.data = None # 存储K线数据
self.trades = [] # 存储交易记录
self.equity_curve = None # 存储账户净值曲线
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_market_data = DBMarketData(self.db_url)
2025-09-01 10:01:21 +00:00
self.is_us_stock = is_us_stock
self.output_chart_folder = r"./output/trade_sandbox/orb_strategy/chart/"
os.makedirs(self.output_chart_folder, exist_ok=True)
2025-08-31 03:20:59 +00:00
def fetch_intraday_data(self, symbol, start_date, end_date, interval="5m"):
"""
获取日内5分钟K线数据需yfinance支持部分数据可能有延迟
:param ticker: 股票代码如QQQTQQQ
:param start_date: 起始日期格式YYYY-MM-DD
:param end_date: 结束日期格式YYYY-MM-DD
:param interval: K线周期默认5分钟
"""
logger.info(f"开始获取{symbol}数据:{start_date}{end_date},间隔{interval}")
# data = yf.download(
# symbol, start=start_date, end=end_date, interval=interval, progress=False
# )
data = self.db_market_data.query_market_data_by_symbol_bar(
symbol, interval, start=start_date, end=end_date
)
data = pd.DataFrame(data)
data.sort_values(by="date_time", inplace=True)
# 保留核心列:开盘价、最高价、最低价、收盘价、成交量
data["Open"] = data["open"]
data["High"] = data["high"]
data["Low"] = data["low"]
data["Close"] = data["close"]
data["Volume"] = data["volume"]
2025-09-01 10:01:21 +00:00
if self.is_us_stock:
date_time_field = "date_time_us"
else:
date_time_field = "date_time"
data[date_time_field] = pd.to_datetime(data[date_time_field])
2025-08-31 03:20:59 +00:00
# data["Date"]为日期不包括时分秒即date_time如果是2025-01-01 10:00:00则Date为2025-01-01
2025-09-01 10:01:21 +00:00
data["Date"] = data[date_time_field].dt.date
2025-08-31 03:20:59 +00:00
# 将Date转换为datetime64[ns]类型以确保类型一致
data["Date"] = pd.to_datetime(data["Date"])
2025-09-01 10:01:21 +00:00
self.data = data[
["symbol", "bar", "Date", date_time_field, "Open", "High", "Low", "Close", "Volume"]
].copy()
self.data.rename(columns={date_time_field: "date_time"}, inplace=True)
2025-08-31 03:20:59 +00:00
logger.info(f"成功获取{symbol}数据:{len(self.data)}{interval}K线")
def calculate_shares(self, account_value, entry_price, stop_price):
"""
根据ORB公式计算交易股数
:param account_value: 当前账户价值美元
:param entry_price: 交易entry价格第二根5分钟K线开盘价
:param stop_price: 止损价格多头=第一根K线最低价空头=第一根K线最高价
:return: 整数股数Shares
"""
2025-09-01 10:01:21 +00:00
logger.info(
f"开始计算交易股数:账户价值={account_value}entry价格={entry_price},止损价格={stop_price}"
)
2025-08-31 03:20:59 +00:00
# 计算单交易风险金额($R
risk_per_trade_dollar = abs(entry_price - stop_price) # 风险金额取绝对值
if risk_per_trade_dollar <= 0:
return 0 # 无风险时不交易
# 公式1基于风险预算的最大股数风险控制优先
shares_risk = (account_value * self.risk_per_trade) / risk_per_trade_dollar
# 公式2基于杠杆限制的最大股数杠杆约束
shares_leverage = (self.max_leverage * account_value) / entry_price
# 取两者最小值(满足风险和杠杆双重约束)
max_shares = min(shares_risk, shares_leverage)
# 扣除佣金影响(简化计算:假设佣金从可用资金中扣除)
commission_cost = max_shares * self.commission_per_share
if (account_value - commission_cost) < 0:
return 0 # 扣除佣金后资金不足,不交易
return int(max_shares) # 股数取整
2025-09-02 04:44:34 +00:00
def generate_orb_signals(self, direction: str = None, by_sar: bool = False):
2025-08-31 03:20:59 +00:00
"""
生成ORB策略信号每日仅1次交易机会
- 第一根5分钟K线确定开盘区间High1, Low1
- 第二根5分钟K线根据第一根K线方向生成多空信号
2025-09-02 04:44:34 +00:00
:param direction: 方向None=自动Long=多头Short=空头
:param by_sar: 是否根据SAR指标生成信号True=False=
2025-08-31 03:20:59 +00:00
"""
2025-09-02 04:44:34 +00:00
direction_desc = "既做多又做空"
if direction == "Long":
direction_desc = "做多"
elif direction == "Short":
direction_desc = "做空"
logger.info(f"开始生成ORB策略信号{direction_desc}根据SAR指标{by_sar}")
2025-08-31 03:20:59 +00:00
if self.data is None:
raise ValueError("请先调用fetch_intraday_data获取数据")
signals = []
# 按日期分组处理每日数据
for date, daily_data in self.data.groupby("Date"):
daily_data = daily_data.sort_index() # 按时间排序
if len(daily_data) < 2:
continue # 当日K线不足2根跳过
# 第一根5分钟K线开盘区间
first_candle = daily_data.iloc[0]
high1 = first_candle["High"]
low1 = first_candle["Low"]
open1 = first_candle["Open"]
close1 = first_candle["Close"]
# 第二根5分钟K线entry信号
second_candle = daily_data.iloc[1]
entry_price = second_candle["Open"] # entry价格=第二根K线开盘价
entry_time = second_candle.date_time # entry时间
# 生成信号第一根K线方向决定多空排除十字星open1 == close1
2025-09-02 04:44:34 +00:00
if open1 < close1 and (direction == "Long" or direction is None):
2025-08-31 03:20:59 +00:00
# 第一根K线收涨→多头信号
signal = "Long"
stop_price = low1 # 多头止损=第一根K线最低价
2025-09-02 04:44:34 +00:00
elif open1 > close1 and (direction == "Short" or direction is None):
2025-08-31 03:20:59 +00:00
# 第一根K线收跌→空头信号
signal = "Short"
stop_price = high1 # 空头止损=第一根K线最高价
else:
2025-09-02 04:44:34 +00:00
# 与direction不一致或十字星→无信号
2025-09-01 10:01:21 +00:00
signal = None
2025-08-31 03:20:59 +00:00
stop_price = None
signals.append(
{
"Date": date,
"EntryTime": entry_time,
"Signal": signal,
"EntryPrice": entry_price,
"StopPrice": stop_price,
"High1": high1,
"Low1": low1,
}
)
# 将信号合并到原始数据
signals_df = pd.DataFrame(signals)
# 确保Date列类型一致将Date转换为datetime64[ns]类型
2025-09-01 10:01:21 +00:00
signals_df["Date"] = pd.to_datetime(signals_df["Date"])
2025-08-31 03:20:59 +00:00
# 使用merge而不是join来合并数据,根据signals_df的EntryTime与self.data的date_time进行匹配
# TODO: 这里需要优化
2025-09-01 10:01:21 +00:00
self.data = self.data.merge(
signals_df, left_on="date_time", right_on="EntryTime", how="left"
)
2025-08-31 03:20:59 +00:00
# 将Date_x和Date_y合并为Date
self.data["Date"] = self.data["Date_x"].combine_first(self.data["Date_y"])
# 删除Date_x和Date_y
self.data.drop(columns=["Date_x", "Date_y"], inplace=True)
logger.info(
f"生成信号完成:共{len(signals_df)}个交易日,其中多头{sum(signals_df['Signal']=='Long')}次,空头{sum(signals_df['Signal']=='Short')}"
)
def backtest(self, profit_target_multiple=10):
"""
回测ORB策略
:param profit_target_multiple: 盈利目标倍数默认10倍$R即10R
"""
logger.info(f"开始回测ORB策略盈利目标倍数={profit_target_multiple}")
if "Signal" not in self.data.columns:
raise ValueError("请先调用generate_orb_signals生成策略信号")
account_value = self.initial_capital # 初始账户价值
current_position = None # 当前持仓None=空仓Long/Short=持仓)
equity_history = [account_value] # 净值历史
trade_id = 0 # 交易ID
# 按时间遍历数据每日仅处理第二根K线后的信号
for date, daily_data in self.data.groupby("Date"):
daily_data = daily_data.sort_index()
if len(daily_data) < 2:
continue
# 获取当日信号第二根K线的信号
signal_row = (
daily_data[~pd.isna(daily_data["Signal"])].iloc[0]
if sum(~pd.isna(daily_data["Signal"])) > 0
else None
)
if signal_row is None:
# 无信号→当日不交易,净值保持不变
equity_history.append(account_value)
continue
# 提取信号参数
signal = signal_row["Signal"]
if pd.isna(signal):
continue
2025-09-01 10:01:21 +00:00
2025-08-31 03:20:59 +00:00
entry_price = signal_row["EntryPrice"]
stop_price = signal_row["StopPrice"]
high1 = signal_row["High1"]
low1 = signal_row["Low1"]
risk_assumed = abs(entry_price - stop_price) # 计算$R
profit_target = (
entry_price + (risk_assumed * profit_target_multiple)
if signal == "Long"
else entry_price - (risk_assumed * profit_target_multiple)
)
# 计算交易股数
shares = self.calculate_shares(account_value, entry_price, stop_price)
if shares == 0:
# 股数为0→不交易
equity_history.append(account_value)
continue
# 计算佣金(买入/卖出各收一次)
total_commission = shares * self.commission_per_share * 2 # 往返佣金
# 模拟日内持仓:寻找止损/止盈触发点,或当日收盘平仓
daily_prices = daily_data[
daily_data.date_time > signal_row.date_time
] # 从entry时间开始遍历
exit_price = None
exit_time = None
exit_reason = None
for idx, (time, row) in enumerate(daily_prices.iterrows()):
high = row["High"]
low = row["Low"]
close = row["Close"]
# 检查止损/止盈条件
if signal == "Long":
# 多头:跌破止损→止损;突破止盈→止盈
if low <= stop_price:
exit_price = stop_price
exit_reason = "Stop Loss"
2025-09-01 10:01:21 +00:00
exit_time = row["date_time"]
2025-08-31 03:20:59 +00:00
break
elif high >= profit_target:
exit_price = profit_target
exit_reason = "Profit Target (10R)"
2025-09-01 10:01:21 +00:00
exit_time = row["date_time"]
2025-08-31 03:20:59 +00:00
break
elif signal == "Short":
# 空头:突破止损→止损;跌破止盈→止盈
if high >= stop_price:
exit_price = stop_price
exit_reason = "Stop Loss"
2025-09-01 10:01:21 +00:00
exit_time = row["date_time"]
2025-08-31 03:20:59 +00:00
break
elif low <= profit_target:
exit_price = profit_target
exit_reason = "Profit Target (10R)"
2025-09-01 10:01:21 +00:00
exit_time = row["date_time"]
2025-08-31 03:20:59 +00:00
break
# 若未触发止损/止盈,当日收盘平仓
if exit_price is None:
exit_price = daily_prices.iloc[-1]["Close"]
exit_reason = "End of Day (EoD)"
exit_time = daily_prices.iloc[-1].date_time
2025-09-01 10:01:21 +00:00
initial_account_value = account_value
2025-08-31 03:20:59 +00:00
# 计算盈亏
if signal == "Long":
profit_loss = (exit_price - entry_price) * shares - total_commission
else: # Short
profit_loss = (entry_price - exit_price) * shares - total_commission
2025-09-01 10:01:21 +00:00
# 计算盈亏百分比profit_loss除以当期初始资金
profit_loss_percentage = (profit_loss / initial_account_value) * 100
2025-08-31 03:20:59 +00:00
# 更新账户价值
account_value += profit_loss
account_value = max(account_value, 0) # 账户价值不能为负
# 记录交易
self.trades.append(
{
"TradeID": trade_id,
"Date": date,
"Signal": signal,
2025-09-01 10:01:21 +00:00
"EntryTime": signal_row.date_time.strftime("%Y-%m-%d %H:%M:%S"),
2025-08-31 03:20:59 +00:00
"EntryPrice": entry_price,
2025-09-01 10:01:21 +00:00
"ExitTime": exit_time.strftime("%Y-%m-%d %H:%M:%S"),
2025-08-31 03:20:59 +00:00
"ExitPrice": exit_price,
"Shares": shares,
"RiskAssumed": risk_assumed,
"ProfitLoss": profit_loss,
2025-09-01 10:01:21 +00:00
"ProfitLossPercentage": profit_loss_percentage,
2025-08-31 03:20:59 +00:00
"ExitReason": exit_reason,
2025-09-01 10:01:21 +00:00
"AccountValueInitial": initial_account_value,
2025-08-31 03:20:59 +00:00
"AccountValueAfter": account_value,
}
)
# 记录净值
equity_history.append(account_value)
trade_id += 1
# 生成净值曲线
2025-09-01 10:01:21 +00:00
self.create_equity_curve()
2025-08-31 03:20:59 +00:00
# 输出回测结果
trades_df = pd.DataFrame(self.trades)
total_return = (
(account_value - self.initial_capital) / self.initial_capital * 100
)
win_rate = (
(trades_df["ProfitLoss"] > 0).sum() / len(trades_df) * 100
if len(trades_df) > 0
else 0
)
2025-09-01 10:01:21 +00:00
# 计算盈亏比
profit_sum = trades_df[trades_df["ProfitLoss"] > 0]["ProfitLoss"].sum()
loss_sum = abs(trades_df[trades_df["ProfitLoss"] < 0]["ProfitLoss"].sum())
if loss_sum == 0:
profit_loss_ratio = float('inf')
else:
profit_loss_ratio = (profit_sum / loss_sum) * 100
2025-08-31 03:20:59 +00:00
logger.info("\n" + "=" * 50)
logger.info("ORB策略回测结果")
logger.info("=" * 50)
logger.info(f"初始资金:${self.initial_capital:,.2f}")
logger.info(f"最终资金:${account_value:,.2f}")
logger.info(f"总收益率:{total_return:.2f}%")
logger.info(f"总交易次数:{len(trades_df)}")
2025-09-01 10:01:21 +00:00
logger.info(f"盈亏比:{profit_loss_ratio:.2f}%")
2025-08-31 03:20:59 +00:00
logger.info(f"胜率:{win_rate:.2f}%")
if len(trades_df) > 0:
logger.info(f"平均每笔盈亏:${trades_df['ProfitLoss'].mean():.2f}")
logger.info(f"最大单笔盈利:${trades_df['ProfitLoss'].max():.2f}")
logger.info(f"最大单笔亏损:${trades_df['ProfitLoss'].min():.2f}")
2025-09-01 10:01:21 +00:00
def create_equity_curve(self):
"""
创建账户净值曲线
"""
equity_curve_list = []
# 将self.data.index[0].Date的值转换为字符串且格式为YYYY-MM-DD
first_date = self.data.iloc[0].date_time.strftime("%Y-%m-%d %H:%M:%S")
first_open = float(self.data.iloc[0].Open)
equity_curve_list.append(
{
"DateTime": first_date,
"AccountValue": self.initial_capital,
"MarketPrice": first_open,
}
)
for trade in self.trades:
equity_curve_list.append(
{
"DateTime": trade["ExitTime"],
"AccountValue": trade["AccountValueAfter"],
"MarketPrice": trade["ExitPrice"],
}
)
self.equity_curve = pd.DataFrame(equity_curve_list)
self.equity_curve.sort_values(by="DateTime", inplace=True)
self.equity_curve.reset_index(drop=True, inplace=True)
2025-08-31 03:20:59 +00:00
def plot_equity_curve(self):
"""
绘制账户净值曲线
"""
logger.info("开始绘制账户净值曲线")
if self.equity_curve is None:
raise ValueError("请先调用backtest进行回测")
2025-09-01 10:01:21 +00:00
2025-08-31 03:20:59 +00:00
# seaborn风格设置
sns.set_theme(style="whitegrid")
# plt.rcParams['font.family'] = "SimHei"
plt.rcParams["font.sans-serif"] = ["SimHei"] # 也可直接用字体名
plt.rcParams["font.size"] = 11 # 设置字体大小
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
2025-09-01 10:01:21 +00:00
symbol = self.data.iloc[0].symbol
bar = self.data.iloc[0].bar
first_account_value = self.equity_curve.iloc[0]["AccountValue"]
first_market_price = self.equity_curve.iloc[0]["MarketPrice"]
account_value_to_1 = self.equity_curve["AccountValue"] / first_account_value
market_price_to_1 = self.equity_curve["MarketPrice"] / first_market_price
2025-08-31 03:20:59 +00:00
plt.figure(figsize=(12, 6))
2025-09-01 10:01:21 +00:00
plt.plot(self.equity_curve["DateTime"], account_value_to_1, label="账户价值", color='blue', linewidth=2, marker='o', markersize=4)
plt.plot(self.equity_curve["DateTime"], market_price_to_1, label="市场价格", color='green', linewidth=2, marker='s', markersize=4)
plt.title(f"ORB策略账户净值曲线 {symbol} {bar}", fontsize=14, fontweight='bold')
plt.xlabel("时间", fontsize=12)
plt.ylabel("涨跌变化", fontsize=12)
plt.legend(fontsize=11)
2025-08-31 03:20:59 +00:00
plt.grid(True, alpha=0.3)
2025-09-01 10:01:21 +00:00
# 设置x轴标签避免matplotlib警告
# 选择合适的时间间隔显示标签,避免过于密集
if len(self.equity_curve) > 30:
# 如果数据点较多,选择间隔显示,但确保第一条和最后一条始终显示
step = max(1, len(self.equity_curve) // 30)
# 创建标签索引列表,确保包含首尾数据
label_indices = [0] # 第一条
# 添加中间间隔的标签
for i in range(step, len(self.equity_curve) - 1, step):
label_indices.append(i)
# 添加最后一条(如果还没有包含的话)
if len(self.equity_curve) - 1 not in label_indices:
label_indices.append(len(self.equity_curve) - 1)
# 设置x轴标签
plt.xticks(self.equity_curve["DateTime"].iloc[label_indices],
self.equity_curve["DateTime"].iloc[label_indices],
rotation=45, ha='right', fontsize=10)
else:
# 如果数据点较少,全部显示
plt.xticks(self.equity_curve["DateTime"],
self.equity_curve["DateTime"],
rotation=45, ha='right', fontsize=10)
plt.tight_layout()
save_path = f"{self.output_chart_folder}/{symbol}_{bar}_orb_strategy_equity_curve.png"
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.close()
2025-08-31 03:20:59 +00:00
# ------------------- 策略示例回测QQQ的ORB策略2016-2023 -------------------
if __name__ == "__main__":
# 初始化ORB策略
orb_strategy = ORBStrategy(
initial_capital=25000,
max_leverage=4,
risk_per_trade=0.01,
commission_per_share=0.0005,
)
# 1. 获取QQQ的5分钟日内数据2024-2025注意yfinance免费版可能限制历史日内数据建议用专业数据源
orb_strategy.fetch_intraday_data(
symbol="ETH-USDT", start_date="2025-05-15", end_date="2025-08-20", interval="5m"
)
# 2. 生成ORB策略信号
orb_strategy.generate_orb_signals()
# 3. 回测策略盈利目标10R
orb_strategy.backtest(profit_target_multiple=10)
# 4. 绘制净值曲线
orb_strategy.plot_equity_curve()