diff --git a/.gitignore b/.gitignore index e6f8a5f..43f58ef 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ core/db/__pycache__/*.pyc core/biz/__pycache__/*.pyc core/statistics/__pycache__/*.pyc +/data diff --git a/core/biz/market_data_from_csv.py b/core/biz/market_data_from_csv.py new file mode 100644 index 0000000..e69de29 diff --git a/core/trade/__pycache__/orb_trade.cpython-312.pyc b/core/trade/__pycache__/orb_trade.cpython-312.pyc index be7de17..1c695d0 100644 Binary files a/core/trade/__pycache__/orb_trade.cpython-312.pyc and b/core/trade/__pycache__/orb_trade.cpython-312.pyc differ diff --git a/core/trade/orb_trade.py b/core/trade/orb_trade.py index 733e458..2d60af7 100644 --- a/core/trade/orb_trade.py +++ b/core/trade/orb_trade.py @@ -38,6 +38,8 @@ class ORBStrategy: direction=None, by_sar=False, symbol_bar_data=None, + price_range_mean_as_R=False, + by_big_k=False, ): """ 初始化ORB策略参数 @@ -63,6 +65,8 @@ class ORBStrategy: :param is_us_stock: 是否是美股 :param direction: 方向,None=自动,Long=多头,Short=空头 :param by_sar: 是否根据SAR指标生成信号,True=是,False=否 + :param price_range_mean_as_R: 是否将价格振幅均值作为$R,True=是,False=否 + :param by_big_k: 是否根据K线实体部分,亦即abs(open-close)超过high-low的50%,True=是,False=否 """ logger.info( f"初始化ORB策略参数:股票代码={symbol},K线周期={bar},开始日期={start_date},结束日期={end_date},初始账户资金={initial_capital},最大杠杆倍数={max_leverage},单次交易风险比例={risk_per_trade},每股交易佣金={commission_per_share}" @@ -101,12 +105,23 @@ class ORBStrategy: self.direction_desc = "做多" elif self.direction == "Short": self.direction_desc = "做空" - + self.sar_desc = "不考虑SAR" if self.by_sar: self.sar_desc = "考虑SAR" self.symbol_bar_data = symbol_bar_data - + self.price_range_mean_as_R = price_range_mean_as_R + if self.price_range_mean_as_R: + self.price_range_mean_as_R_desc = "R为振幅均值" + else: + self.price_range_mean_as_R_desc = "R为entry减stop" + + self.by_big_k = by_big_k + if self.by_big_k: + self.by_big_k_desc = "K线实体过50%" + else: + self.by_big_k_desc = "无K线要求" + def run(self): """ 运行ORB策略 @@ -127,7 +142,9 @@ class ORBStrategy: :param end_date: 结束日期(格式:YYYY-MM-DD) :param interval: K线周期(默认5分钟) """ - logger.info(f"开始获取{self.symbol}数据:{self.start_date}至{self.end_date},间隔{self.bar}") + logger.info( + f"开始获取{self.symbol}数据:{self.start_date}至{self.end_date},间隔{self.bar}" + ) if self.symbol_bar_data is None or len(self.symbol_bar_data) == 0: data = self.db_market_data.query_market_data_by_symbol_bar( self.symbol, self.bar, start=self.start_date, end=self.end_date @@ -153,7 +170,6 @@ class ORBStrategy: self.start_date = data["Date"].min().strftime("%Y-%m-%d") # 最大data["Date"] self.end_date = data["Date"].max().strftime("%Y-%m-%d") - self.data = data[ [ "symbol", @@ -169,6 +185,7 @@ class ORBStrategy: ] ].copy() self.data.rename(columns={date_time_field: "date_time"}, inplace=True) + self.calculate_price_range_mean() self.symbol_bar_data = self.data.copy() else: self.data = self.symbol_bar_data.copy() @@ -189,21 +206,24 @@ class ORBStrategy: pass logger.info(f"收盘价均值:{self.close_mean}") logger.info(f"初始资金调整为:{self.initial_capital}") - logger.info(f"成功获取{self.symbol}数据:{len(self.data)}根{self.bar}K线,开始日期={self.start_date},结束日期={self.end_date}") + logger.info( + f"成功获取{self.symbol}数据:{len(self.data)}根{self.bar}K线,开始日期={self.start_date},结束日期={self.end_date}" + ) - def calculate_shares(self, account_value, entry_price, stop_price): + def calculate_shares(self, account_value, entry_price, stop_price, risk_assumed): """ 根据ORB公式计算交易股数 :param account_value: 当前账户价值(美元) :param entry_price: 交易entry价格(第二根5分钟K线开盘价) :param stop_price: 止损价格(多头=第一根K线最低价,空头=第一根K线最高价) + :param risk_assumed: 风险金额($R),根据price_range_mean_as_R决定 :return: 整数股数(Shares) """ logger.info( f"开始计算交易股数:账户价值={account_value},entry价格={entry_price},止损价格={stop_price}" ) # 计算单交易风险金额($R) - risk_per_trade_dollar = abs(entry_price - stop_price) # 风险金额取绝对值 + risk_per_trade_dollar = risk_assumed # 风险金额取绝对值 if risk_per_trade_dollar <= 0: return 0 # 无风险时不交易 @@ -247,6 +267,12 @@ class ORBStrategy: close1 = first_candle["Close"] sar_signal = first_candle["sar_signal"] + if high1 == low1: + continue + if self.by_big_k: + if (abs(open1 - close1) / (high1 - low1)) < 0.5: + continue + # 第二根5分钟K线(entry信号) second_candle = daily_data.iloc[1] entry_price = second_candle["Open"] # entry价格=第二根K线开盘价 @@ -303,6 +329,17 @@ class ORBStrategy: f"生成信号完成:共{len(signals_df)}个交易日,其中多头{sum(signals_df['Signal']=='Long')}次,空头{sum(signals_df['Signal']=='Short')}次" ) + def calculate_price_range_mean(self): + """ + 计算价格振幅均值,振幅为最高价与最低价之差 + 计算价格振幅标准差 + 要求用滑动窗口: window_size=100计算均值,每次计算都包含当前行 + 返回一个新列,列名为"PriceRangeMean" + """ + self.data["PriceRange"] = self.data["High"] - self.data["Low"] + self.data["PriceRangeMean"] = self.data["PriceRange"].rolling(window=100).mean() + self.data["PriceRangeStd"] = self.data["PriceRange"].rolling(window=100).std() + def backtest(self): """ 回测ORB策略 @@ -343,7 +380,18 @@ class ORBStrategy: stop_price = signal_row["StopPrice"] high1 = signal_row["High1"] low1 = signal_row["Low1"] - risk_assumed = abs(entry_price - stop_price) # 计算$R + price_range = signal_row["PriceRange"] + price_range_mean = signal_row["PriceRangeMean"] + price_range_std = signal_row["PriceRangeStd"] + # 计算$R + if ( + self.price_range_mean_as_R + and price_range_mean is not None + and price_range_mean > 0 + ): + risk_assumed = price_range_mean + else: + risk_assumed = abs(entry_price - stop_price) profit_target = ( entry_price + (risk_assumed * self.profit_target_multiple) if signal == "Long" @@ -351,7 +399,9 @@ class ORBStrategy: ) # 计算交易股数 - shares = self.calculate_shares(account_value, entry_price, stop_price) + shares = self.calculate_shares( + account_value, entry_price, stop_price, risk_assumed + ) if shares == 0: # 股数为0→不交易 equity_history.append(account_value) @@ -425,6 +475,8 @@ class ORBStrategy: "TradeID": trade_id, "Direction": self.direction_desc, "BySar": self.sar_desc, + "PriceRangeMeanAsR": self.price_range_mean_as_R_desc, + "ByBigK": self.by_big_k_desc, "Symbol": self.symbol, "Bar": self.bar, "Date": date, @@ -467,8 +519,12 @@ class ORBStrategy: else 0 ) # 计算盈亏比 - profit_sum = self.trades_df[self.trades_df["ProfitLoss"] > 0]["ProfitLoss"].sum() - loss_sum = abs(self.trades_df[self.trades_df["ProfitLoss"] < 0]["ProfitLoss"].sum()) + profit_sum = self.trades_df[self.trades_df["ProfitLoss"] > 0][ + "ProfitLoss" + ].sum() + loss_sum = abs( + self.trades_df[self.trades_df["ProfitLoss"] < 0]["ProfitLoss"].sum() + ) if loss_sum == 0: profit_loss_ratio = float("inf") else: @@ -504,11 +560,13 @@ class ORBStrategy: logger.info(f"最大单笔盈利:${self.trades_df['ProfitLoss'].max():.2f}") self.trades_summary["最大单笔盈利$"] = self.trades_df["ProfitLoss"].max() logger.info(f"最大单笔亏损:${abs(self.trades_df['ProfitLoss'].min()):.2f}") - self.trades_summary["最大单笔亏损$"] = abs(self.trades_df["ProfitLoss"].min()) + self.trades_summary["最大单笔亏损$"] = abs( + self.trades_df["ProfitLoss"].min() + ) else: logger.info("没有交易") self.trades_summary_df = pd.DataFrame([self.trades_summary]) - + def initial_trade_summary(self): """ 初始化交易总结 @@ -516,6 +574,8 @@ class ORBStrategy: self.trades_summary = {} self.trades_summary["方向"] = self.direction_desc self.trades_summary["根据SAR"] = self.sar_desc + self.trades_summary["R算法"] = self.price_range_mean_as_R_desc + self.trades_summary["K线条件"] = self.by_big_k_desc self.trades_summary["股票代码"] = self.symbol self.trades_summary["K线周期"] = self.bar self.trades_summary["开始日期"] = self.start_date @@ -532,7 +592,6 @@ class ORBStrategy: self.trades_summary["最大单笔盈利$"] = 0 self.trades_summary["最大单笔亏损$"] = 0 - def create_equity_curve(self): """ 创建账户净值曲线 @@ -600,7 +659,11 @@ class ORBStrategy: marker="s", markersize=4, ) - plt.title(f"ORB曲线 {symbol} {bar} {self.direction_desc} {self.sar_desc}", fontsize=14, fontweight="bold") + plt.title( + f"{symbol} {bar} {self.direction_desc} {self.sar_desc} {self.price_range_mean_as_R_desc} {self.by_big_k_desc}", + fontsize=14, + fontweight="bold", + ) plt.xlabel("时间", fontsize=12) plt.ylabel("涨跌变化", fontsize=12) plt.legend(fontsize=11) @@ -641,30 +704,26 @@ class ORBStrategy: fontsize=10, ) plt.tight_layout() - self.chart_save_path = ( - f"{self.output_chart_folder}/{symbol}_{bar}_{self.direction_desc}_{self.sar_desc}_orb_strategy_equity_curve.png" - ) + self.chart_save_path = f"{self.output_chart_folder}/{symbol}_{bar}_{self.direction_desc}_{self.sar_desc}_{self.price_range_mean_as_R_desc}_{self.by_big_k_desc}_orb.png" plt.savefig(self.chart_save_path, dpi=150, bbox_inches="tight") plt.close() - + def output_trade_summary(self): """ 输出交易明细,交易总结与Chart图片到Excel """ start_date = self.start_date.replace("-", "") end_date = self.end_date.replace("-", "") - output_file_name = f"orb_{self.symbol}_{self.bar}_{start_date}_{end_date}_{self.direction_desc}_{self.sar_desc}.xlsx" + output_file_name = f"orb_{self.symbol}_{self.bar}_{start_date}_{end_date}_{self.direction_desc}_{self.sar_desc}_{self.price_range_mean_as_R_desc}_{self.by_big_k_desc}.xlsx" output_file_path = os.path.join(self.output_excel_folder, output_file_name) logger.info(f"导出{output_file_path}") with pd.ExcelWriter(output_file_path) as writer: self.trades_df.to_excel(writer, sheet_name="交易明细", index=False) self.trades_summary_df.to_excel(writer, sheet_name="交易总结", index=False) if os.path.exists(self.chart_save_path): - charts_dict = { - "账户净值曲线": self.chart_save_path - } + charts_dict = {"账户净值曲线": self.chart_save_path} self.output_chart_to_excel(output_file_path, charts_dict) - + def output_chart_to_excel(self, excel_file_path: str, charts_dict: dict): """ 输出Excel文件,包含所有图表 @@ -694,7 +753,7 @@ class ORBStrategy: # Save Excel file wb.save(excel_file_path) logger.info(f"图表已输出到{excel_file_path}") - + # ------------------- 策略示例:回测QQQ的ORB策略(2016-2023) ------------------- if __name__ == "__main__": diff --git a/orb_trade_main.py b/orb_trade_main.py index 0f53e19..aa2ea03 100644 --- a/orb_trade_main.py +++ b/orb_trade_main.py @@ -16,6 +16,8 @@ def main(): bar = "5m" direction_list = [None, "Long", "Short"] by_sar_list = [False, True] + price_range_mean_as_R_list = [False, True] + by_big_k_list = [False, True] start_date = "2024-01-01" end_date = datetime.now().strftime("%Y-%m-%d") profit_target_multiple = 10 @@ -30,55 +32,59 @@ def main(): for is_us_stock in is_us_stock_list: for direction in direction_list: for by_sar in by_sar_list: - if is_us_stock: - symbols = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get( - "symbols", ["QQQ"] - ) - else: - symbols = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( - "symbols", ["BTC-USDT"] - ) - for symbol in symbols: - logger.info( - f"开始回测 {symbol}, 交易周期:{bar}, 开始日期:{start_date}, 结束日期:{end_date}, 是否是美股:{is_us_stock}, 交易方向:{direction}, 是否使用SAR:{by_sar}" - ) - symbol_bar_data = None - found_symbol_bar_data = False - for symbol_data_dict in symbol_data_cache: - if ( - symbol_data_dict["symbol"] == symbol - and symbol_data_dict["bar"] == bar - ): - symbol_bar_data = symbol_data_dict["data"] - found_symbol_bar_data = True - break + for price_range_mean_as_R in price_range_mean_as_R_list: + for by_big_k in by_big_k_list: + if is_us_stock: + symbols = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get( + "symbols", ["QQQ"] + ) + else: + symbols = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( + "symbols", ["BTC-USDT"] + ) + for symbol in symbols: + logger.info( + f"开始回测 {symbol}, 交易周期:{bar}, 开始日期:{start_date}, 结束日期:{end_date}, 是否是美股:{is_us_stock}, 交易方向:{direction}, 是否使用SAR:{by_sar}, 是否使用R为entry减stop:{price_range_mean_as_R}, 是否使用K线实体过50%:{by_big_k}" + ) + symbol_bar_data = None + found_symbol_bar_data = False + for symbol_data_dict in symbol_data_cache: + if ( + symbol_data_dict["symbol"] == symbol + and symbol_data_dict["bar"] == bar + ): + symbol_bar_data = symbol_data_dict["data"] + found_symbol_bar_data = True + break - orb_strategy = ORBStrategy( - symbol=symbol, - bar=bar, - start_date=start_date, - end_date=end_date, - is_us_stock=is_us_stock, - direction=direction, - by_sar=by_sar, - profit_target_multiple=profit_target_multiple, - initial_capital=initial_capital, - max_leverage=max_leverage, - risk_per_trade=risk_per_trade, - commission_per_share=commission_per_share, - symbol_bar_data=symbol_bar_data, - ) - symbol_bar_data, trades_df, trades_summary_df = orb_strategy.run() - if symbol_bar_data is None or len(symbol_bar_data) == 0: - continue - if not found_symbol_bar_data: - symbol_data_cache.append( - {"symbol": symbol, "bar": bar, "data": symbol_bar_data} - ) - if trades_summary_df is None or len(trades_summary_df) == 0: - continue - trades_summary_df_list.append(trades_summary_df) - trades_df_list.append(trades_df) + orb_strategy = ORBStrategy( + symbol=symbol, + bar=bar, + start_date=start_date, + end_date=end_date, + is_us_stock=is_us_stock, + direction=direction, + by_sar=by_sar, + profit_target_multiple=profit_target_multiple, + initial_capital=initial_capital, + max_leverage=max_leverage, + risk_per_trade=risk_per_trade, + commission_per_share=commission_per_share, + symbol_bar_data=symbol_bar_data, + price_range_mean_as_R=price_range_mean_as_R, + by_big_k=by_big_k, + ) + symbol_bar_data, trades_df, trades_summary_df = orb_strategy.run() + if symbol_bar_data is None or len(symbol_bar_data) == 0: + continue + if not found_symbol_bar_data: + symbol_data_cache.append( + {"symbol": symbol, "bar": bar, "data": symbol_bar_data} + ) + if trades_summary_df is None or len(trades_summary_df) == 0: + continue + trades_summary_df_list.append(trades_summary_df) + trades_df_list.append(trades_df) total_trades_df = pd.concat(trades_df_list) total_trades_summary_df = pd.concat(trades_summary_df_list) statitics_dict = statistics_summary(total_trades_summary_df) @@ -97,13 +103,19 @@ def main(): writer, sheet_name="最大总收益率记录", index=False ) statitics_dict["max_total_return_record_df_grouped_count"].to_excel( - writer, sheet_name="最大总收益率记录_方向和根据SAR的组合", index=False + writer, sheet_name="最大总收益率_所有条件组合", index=False ) statitics_dict["max_total_return_record_df_direction_count"].to_excel( - writer, sheet_name="最大总收益率记录_方向", index=False + writer, sheet_name="最大总收益率_方向", index=False ) statitics_dict["max_total_return_record_df_sar_count"].to_excel( - writer, sheet_name="最大总收益率记录_根据SAR", index=False + writer, sheet_name="最大总收益率_根据SAR", index=False + ) + statitics_dict["max_total_return_record_df_R_count"].to_excel( + writer, sheet_name="最大总收益率_R算法", index=False + ) + statitics_dict["max_total_return_record_df_K_count"].to_excel( + writer, sheet_name="最大总收益率_K线条件", index=False ) chart_path = r"./output/trade_sandbox/orb_strategy/chart/" os.makedirs(chart_path, exist_ok=True) @@ -156,6 +168,8 @@ def statistics_summary(trades_summary_df: pd.DataFrame): summary["股票代码"] = symbol summary["方向"] = max_total_return_record["方向"] summary["根据SAR"] = max_total_return_record["根据SAR"] + summary["R算法"] = max_total_return_record["R算法"] + summary["K线条件"] = max_total_return_record["K线条件"] summary["总收益率%"] = max_total_return_record["总收益率%"] summary["自然收益率%"] = max_total_return_record["自然收益率%"] max_total_return_record_list.append(summary) @@ -178,14 +192,14 @@ def statistics_summary(trades_summary_df: pd.DataFrame): # 其它(如Series、list、dict、ndarray等)转字符串 return str(v) - for key_col in ["方向", "根据SAR"]: + for key_col in ["方向", "根据SAR", "R算法", "K线条件"]: if key_col in max_total_return_record_df.columns: max_total_return_record_df[key_col] = max_total_return_record_df[ key_col ].apply(_to_hashable_scalar) # 分组统计 max_total_return_record_df_grouped_count = ( - max_total_return_record_df.groupby(["方向", "根据SAR"], dropna=False) + max_total_return_record_df.groupby(["方向", "根据SAR", "R算法", "K线条件"], dropna=False) .size() .reset_index(name="数量") ) @@ -215,15 +229,39 @@ def statistics_summary(trades_summary_df: pd.DataFrame): by="数量", ascending=False, inplace=True ) max_total_return_record_df_sar_count.reset_index(drop=True, inplace=True) + + # 统计R算法的记录数目 + max_total_return_record_df_R_count = ( + max_total_return_record_df.groupby(["R算法"], dropna=False) + .size() + .reset_index(name="数量") + ) + max_total_return_record_df_R_count.sort_values( + by="数量", ascending=False, inplace=True + ) + max_total_return_record_df_R_count.reset_index(drop=True, inplace=True) + + # 统计K线条件的记录数目 + max_total_return_record_df_K_count = ( + max_total_return_record_df.groupby(["K线条件"], dropna=False) + .size() + .reset_index(name="数量") + ) + max_total_return_record_df_K_count.sort_values( + by="数量", ascending=False, inplace=True + ) + max_total_return_record_df_K_count.reset_index(drop=True, inplace=True) else: # 构造空结果,保证下游写入Excel不报错 max_total_return_record_df_grouped_count = pd.DataFrame( - columns=["方向", "根据SAR", "数量"] - ) + columns=["方向", "根据SAR", "R算法", "K线条件", "数量"] + ) max_total_return_record_df_direction_count = pd.DataFrame( columns=["方向", "数量"] ) max_total_return_record_df_sar_count = pd.DataFrame(columns=["根据SAR", "数量"]) + max_total_return_record_df_R_count = pd.DataFrame(columns=["R算法", "数量"]) + max_total_return_record_df_K_count = pd.DataFrame(columns=["K线条件", "数量"]) result = { "statistics_summary_df": statistics_summary_df, @@ -231,6 +269,8 @@ def statistics_summary(trades_summary_df: pd.DataFrame): "max_total_return_record_df_grouped_count": max_total_return_record_df_grouped_count, "max_total_return_record_df_direction_count": max_total_return_record_df_direction_count, "max_total_return_record_df_sar_count": max_total_return_record_df_sar_count, + "max_total_return_record_df_R_count": max_total_return_record_df_R_count, + "max_total_return_record_df_K_count": max_total_return_record_df_K_count, } return result @@ -335,9 +375,5 @@ def test(): if __name__ == "__main__": - # main() - - chart_path = r"./output/trade_sandbox/orb_strategy/chart/" - excel_file_path = r"./output/trade_sandbox/orb_strategy/excel/summary/orb_strategy_summary_20250902174203.xlsx" - copy_chart_to_excel(chart_path, excel_file_path) + main() # test()