QUANT optimize

This commit is contained in:
blade 2025-10-09 11:29:00 +08:00
parent 11c6e25490
commit 18168010ce
4 changed files with 681 additions and 192 deletions

View File

@ -49,6 +49,7 @@ import core.logger as logging
import pandas as pd
import numpy as np
import talib as tb
from typing import List, Tuple
from talib import MA_Type
logger = logging.logger
@ -306,6 +307,40 @@ class MetricsCalculation:
return data
def calculate_percentile_indicators(
self,
data: pd.DataFrame,
window_size: int = 50,
price_column: str = "close",
percentiles: List[Tuple[float, str]] = [(0.8, "80"), (0.2, "20"), (0.9, "90"), (0.1, "10")]
) -> pd.DataFrame:
"""
计算分位数指标
:param data: 数据DataFrame
:param window_size: 窗口大小
:param percentiles: 分位数配置列表格式为[(分位数, 名称后缀)]
:return: 包含分位数指标的DataFrame
"""
for percentile, suffix in percentiles:
# 计算分位数
data[f"{price_column}_{suffix}_percentile"] = (
data[price_column].rolling(window=window_size, min_periods=1).quantile(percentile)
)
# 判断价格是否达到分位数
if suffix in ["80", "90"]:
# 高点分位数
data[f"{price_column}_{suffix}_high"] = (
data[price_column] >= data[f"{price_column}_{suffix}_percentile"]
).astype(int)
else:
# 低点分位数
data[f"{price_column}_{suffix}_low"] = (
data[price_column] <= data[f"{price_column}_{suffix}_percentile"]
).astype(int)
return data
def update_macd_divergence_column(self, df: pd.DataFrame):
"""
更新整个DataFrame的macd_divergence列
@ -1217,3 +1252,159 @@ class MetricsCalculation:
avg_spacing = (spacing_5_10 + spacing_10_20 + spacing_20_30) / 3
return avg_spacing
def get_peaks_valleys_mean(self, data: pd.DataFrame):
"""计算上涨波峰和下跌波谷的均值与中位数"""
# 确保输入数据包含必要的列
if not all(col in data.columns for col in ["open", "high", "low", "close"]):
raise ValueError(
"DataFrame must contain 'open', 'high', 'low', 'close' columns"
)
if len(data) < 100:
return None, None
window = 5
# 初始化结果列表
peaks_valleys = []
# 检测波峰基于high价格
highs = data["high"]
for i in range(window, len(data) - window):
if i + window >= len(data):
break
# 当前K线的high价格
current_high = highs.iloc[i]
# 窗口内的前后K线的high价格
window_highs = highs.iloc[i - window : i + window + 1]
# 如果当前high是窗口内的最大值标记为波峰
if (
current_high == window_highs.max()
and current_high > highs.iloc[i - 1]
and current_high > highs.iloc[i + 1]
):
peaks_valleys.append(
{
"symbol": data.iloc[i]["symbol"],
"bar": data.iloc[i]["bar"],
"timestamp": data.iloc[i]["timestamp"],
"date_time": data.iloc[i]["date_time"],
"price": current_high,
"type": "peak",
}
)
# 检测波谷基于low价格
lows = data["low"]
for i in range(window, len(data) - window):
if i + window >= len(data):
break
# 当前K线的low价格
current_low = lows.iloc[i]
# 窗口内的前后K线的low价格
window_lows = lows.iloc[i - window : i + window + 1]
# 如果当前low是窗口内的最小值标记为波谷
if (
current_low == window_lows.min()
and current_low < lows.iloc[i - 1]
and current_low < lows.iloc[i + 1]
):
peaks_valleys.append(
{
"symbol": data.iloc[i]["symbol"],
"bar": data.iloc[i]["bar"],
"timestamp": data.iloc[i]["timestamp"],
"date_time": data.iloc[i]["date_time"],
"price": current_low,
"type": "valley",
}
)
# 转换为DataFrame并按时间排序
result_df = pd.DataFrame(peaks_valleys)
if not result_df.empty:
result_df = result_df.sort_values(by="timestamp").reset_index(drop=True)
else:
result_df = pd.DataFrame(
columns=["symbol", "timestamp", "date_time", "bar", "price", "type"]
)
# 检查result_df如果type为peak时下一条数据type依然为peak则删除当前数据
if not result_df.empty:
# 使用布尔索引来标记要删除的行
to_drop_peaks = []
handled_indexes = []
for i in range(len(result_df) - 1):
if i in handled_indexes:
continue
if result_df.iloc[i]["type"] == "peak":
current_peak_value = result_df.iloc[i]["price"]
current_peak_index = i
# 如type连续为peak只应该保留price最大的行删除其他行
# 如type连续为peak且存在price为8 7 10 9 8 11 10的情况只应该保留price为11的行
for j in range(i + 1, len(result_df)):
if result_df.iloc[j]["type"] == "peak":
next_peak_value = result_df.iloc[j]["price"]
if current_peak_value > next_peak_value:
to_drop_peaks.append(j)
else:
to_drop_peaks.append(current_peak_index)
current_peak_value = next_peak_value
current_peak_index = j
handled_indexes.append(j)
else:
break
# 删除标记的行
result_df = result_df.drop(to_drop_peaks).reset_index(drop=True)
# 如type连续为valley只应该保留price最小的行删除其他行
# 如type连续为valley且存在price为8 7 10 9 8的情况只应该保留price为7的行
to_drop_valleys = []
handled_indexes = []
for i in range(len(result_df) - 1):
if i in handled_indexes:
continue
if result_df.iloc[i]["type"] == "valley":
current_valley_value = result_df.iloc[i]["price"]
current_valley_index = i
for j in range(i + 1, len(result_df)):
if result_df.iloc[j]["type"] == "valley":
next_valley_value = result_df.iloc[j]["price"]
if current_valley_value < next_valley_value:
to_drop_valleys.append(j)
else:
to_drop_valleys.append(current_valley_index)
current_valley_value = next_valley_value
current_valley_index = j
handled_indexes.append(j)
else:
break
# 删除标记的行
result_df = result_df.drop(to_drop_valleys).reset_index(drop=True)
# 初始化价格变化列
result_df["price_change"] = 0.0
result_df["price_change_ratio"] = 0.0
# 计算下一条数据与当前数据之间的价格差,并计算价格差与当前数据价格的比率
peaks_mean = None
valleys_mean = None
if len(result_df) > 1:
for i in range(len(result_df) - 1):
result_df.iloc[i + 1, result_df.columns.get_loc("price_change")] = (
result_df.iloc[i + 1]["price"] - result_df.iloc[i]["price"]
)
result_df.iloc[
i + 1, result_df.columns.get_loc("price_change_ratio")
] = (
result_df.iloc[i + 1]["price_change"] / result_df.iloc[i]["price"]
) * 100
# peaks mean为result_df中price_change_ratio > 0的price_change_ratio的均值与中位数
peaks_mean = abs(float(result_df[result_df["price_change_ratio"] > 0]["price_change_ratio"].mean()))
peaks_median = abs(float(result_df[result_df["price_change_ratio"] > 0]["price_change_ratio"].median()))
# valleys mean为result_df中price_change_ratio < 0的price_change_ratio的均值与中位数
valleys_mean = abs(float(result_df[result_df["price_change_ratio"] < 0]["price_change_ratio"].mean()))
valleys_median = abs(float(result_df[result_df["price_change_ratio"] < 0]["price_change_ratio"].median()))
result = {"peaks_valleys_data": result_df, "peaks_mean": peaks_mean, "peaks_median": peaks_median, "valleys_mean": valleys_mean, "valleys_median": valleys_median}
return result

View File

@ -52,8 +52,18 @@ class MaBreakStatistics:
is_astock: bool = False,
is_aindex: bool = False,
is_binance: bool = True,
buy_by_long_period: dict = {"by_week": False, "by_month": False},
long_period_condition: dict = {"ma5>ma10": True, "ma10>ma20": False, "macd_diff>0": True, "macd>0": True},
buy_by_long_period: dict = {
"by_week": False,
"by_month": False,
"buy_by_10_percentile": False,
},
long_period_condition: dict = {
"ma5>ma10": True,
"ma10>ma20": False,
"macd_diff>0": True,
"macd>0": True,
},
cut_loss_by_valleys_median: bool = False,
commission_per_share: float = 0.0008,
):
if is_astock or is_aindex:
@ -146,6 +156,8 @@ class MaBreakStatistics:
self.main_strategy = self.trade_strategy_config.get("均线系统策略", None)
self.buy_by_long_period = buy_by_long_period
self.long_period_condition = long_period_condition
self.cut_loss_by_valleys_median = cut_loss_by_valleys_median
self.metrics_calculation = MetricsCalculation()
def get_trade_strategy_config(self):
with open("./json/trade_strategy.json", "r", encoding="utf-8") as f:
@ -160,6 +172,8 @@ class MaBreakStatistics:
by_long_period += "1W"
if by_month:
by_long_period += "1M"
if self.buy_by_long_period.get("buy_by_10_percentile", False):
by_long_period += "_10percentile"
if by_long_period == "":
return "no_long_period_judge"
by_condition = ""
@ -172,62 +186,66 @@ class MaBreakStatistics:
if self.long_period_condition.get("macd>0", False):
by_condition += "_macdgt0"
return by_long_period + "_" + by_condition
def batch_statistics(self, strategy_name: str = "全均线策略"):
if self.is_us_stock:
self.stats_output_dir = (
f"./output/trade_sandbox/ma_strategy/us_stock/excel/{strategy_name}/"
)
self.stats_chart_dir = (
f"./output/trade_sandbox/ma_strategy/us_stock/chart/{strategy_name}/"
)
main_folder = "./output/trade_sandbox/ma_strategy/us_stock/"
if self.cut_loss_by_valleys_median:
main_folder += "cut_loss_by_valleys_median/"
else:
main_folder += "no_cut_loss_by_valleys_median/"
self.stats_output_dir = f"{main_folder}excel/{strategy_name}/"
self.stats_chart_dir = f"{main_folder}chart/{strategy_name}/"
elif self.is_binance:
self.stats_output_dir = (
f"./output/trade_sandbox/ma_strategy/binance/excel/{strategy_name}/"
)
self.stats_chart_dir = (
f"./output/trade_sandbox/ma_strategy/binance/chart/{strategy_name}/"
)
main_folder = "./output/trade_sandbox/ma_strategy/binance/"
if self.cut_loss_by_valleys_median:
main_folder += "cut_loss_by_valleys_median/"
else:
main_folder += "no_cut_loss_by_valleys_median/"
self.stats_output_dir = f"{main_folder}excel/{strategy_name}/"
self.stats_chart_dir = f"{main_folder}chart/{strategy_name}/"
elif self.is_astock:
long_period_desc = self.get_by_long_period_desc()
main_folder = "./output/trade_sandbox/ma_strategy/astock/"
if self.cut_loss_by_valleys_median:
main_folder += "cut_loss_by_valleys_median/"
else:
main_folder += "no_cut_loss_by_valleys_median/"
if len(long_period_desc) > 0:
self.stats_output_dir = (
f"./output/trade_sandbox/ma_strategy/astock/{long_period_desc}/excel/{strategy_name}/"
f"{main_folder}{long_period_desc}/excel/{strategy_name}/"
)
self.stats_chart_dir = (
f"./output/trade_sandbox/ma_strategy/astock/{long_period_desc}/chart/{strategy_name}/"
f"{main_folder}{long_period_desc}/chart/{strategy_name}/"
)
else:
self.stats_output_dir = (
f"./output/trade_sandbox/ma_strategy/astock/excel/{strategy_name}/"
)
self.stats_chart_dir = (
f"./output/trade_sandbox/ma_strategy/astock/chart/{strategy_name}/"
)
self.stats_output_dir = f"{main_folder}excel/{strategy_name}/"
self.stats_chart_dir = f"{main_folder}chart/{strategy_name}/"
elif self.is_aindex:
main_folder = "./output/trade_sandbox/ma_strategy/aindex/"
if self.cut_loss_by_valleys_median:
main_folder += "cut_loss_by_valleys_median/"
else:
main_folder += "no_cut_loss_by_valleys_median/"
long_period_desc = self.get_by_long_period_desc()
if len(long_period_desc) > 0:
self.stats_output_dir = (
f"./output/trade_sandbox/ma_strategy/aindex/{long_period_desc}/excel/{strategy_name}/"
f"{main_folder}{long_period_desc}/excel/{strategy_name}/"
)
self.stats_chart_dir = (
f"./output/trade_sandbox/ma_strategy/aindex/{long_period_desc}/chart/{strategy_name}/"
f"{main_folder}{long_period_desc}/chart/{strategy_name}/"
)
else:
self.stats_output_dir = (
f"./output/trade_sandbox/ma_strategy/aindex/excel/{strategy_name}/"
)
self.stats_chart_dir = (
f"./output/trade_sandbox/ma_strategy/aindex/chart/{strategy_name}/"
)
self.stats_output_dir = f"{main_folder}excel/{strategy_name}/"
self.stats_chart_dir = f"{main_folder}chart/{strategy_name}/"
else:
self.stats_output_dir = (
f"./output/trade_sandbox/ma_strategy/okx/excel/{strategy_name}/"
)
self.stats_chart_dir = (
f"./output/trade_sandbox/ma_strategy/okx/chart/{strategy_name}/"
)
main_folder = "./output/trade_sandbox/ma_strategy/okx/"
if self.cut_loss_by_valleys_median:
main_folder += "cut_loss_by_valleys_median/"
else:
main_folder += "no_cut_loss_by_valleys_median/"
self.stats_output_dir = f"{main_folder}excel/{strategy_name}/"
self.stats_chart_dir = f"{main_folder}chart/{strategy_name}/"
os.makedirs(self.stats_output_dir, exist_ok=True)
os.makedirs(self.stats_chart_dir, exist_ok=True)
@ -275,39 +293,53 @@ class MaBreakStatistics:
by="end_timestamp", ascending=True, inplace=True
)
symbol_bar_data.reset_index(drop=True, inplace=True)
initial_capital = int(market_data_pct_chg_df.loc[
(market_data_pct_chg_df["symbol"] == symbol)
& (market_data_pct_chg_df["bar"] == bar),
"initial_capital",
].values[0])
final_account_value = float(symbol_bar_data["end_account_value"].iloc[-1])
account_value_chg = (final_account_value - initial_capital) / initial_capital * 100
initial_capital = int(
market_data_pct_chg_df.loc[
(market_data_pct_chg_df["symbol"] == symbol)
& (market_data_pct_chg_df["bar"] == bar),
"initial_capital",
].values[0]
)
final_account_value = float(
symbol_bar_data["end_account_value"].iloc[-1]
)
account_value_chg = (
(final_account_value - initial_capital)
/ initial_capital
* 100
)
account_value_chg = round(account_value_chg, 4)
market_pct_chg = market_data_pct_chg_df.loc[
(market_data_pct_chg_df["symbol"] == symbol)
& (market_data_pct_chg_df["bar"] == bar),
"pct_chg",
].values[0]
total_buy_commission = float(symbol_bar_data["buy_commission"].sum())
total_sell_commission = float(symbol_bar_data["sell_commission"].sum())
total_buy_commission = float(
symbol_bar_data["buy_commission"].sum()
)
total_sell_commission = float(
symbol_bar_data["sell_commission"].sum()
)
total_commission = total_buy_commission + total_sell_commission
total_commission = round(total_commission, 4)
total_buy_commission = round(total_buy_commission, 4)
total_sell_commission = round(total_sell_commission, 4)
symbol_name = str(symbol_bar_data["symbol_name"].iloc[0])
account_value_chg_list.append({
"strategy_name": strategy_name,
"symbol": symbol,
"symbol_name": symbol_name,
"bar": bar,
"total_buy_commission": total_buy_commission,
"total_sell_commission": total_sell_commission,
"total_commission": total_commission,
"initial_account_value": initial_capital,
"final_account_value": final_account_value,
"account_value_chg": account_value_chg,
"market_pct_chg": market_pct_chg,
})
account_value_chg_list.append(
{
"strategy_name": strategy_name,
"symbol": symbol,
"symbol_name": symbol_name,
"bar": bar,
"total_buy_commission": total_buy_commission,
"total_sell_commission": total_sell_commission,
"total_commission": total_commission,
"initial_account_value": initial_capital,
"final_account_value": final_account_value,
"account_value_chg": account_value_chg,
"market_pct_chg": market_pct_chg,
}
)
account_value_chg_df = pd.DataFrame(account_value_chg_list)
account_value_chg_df = account_value_chg_df[
[
@ -326,7 +358,9 @@ class MaBreakStatistics:
]
account_value_statistics_df = (
ma_break_market_data.groupby(["symbol", "symbol_name", "bar"])["end_account_value"]
ma_break_market_data.groupby(["symbol", "symbol_name", "bar"])[
"end_account_value"
]
.agg(
account_value_max="max",
account_value_min="min",
@ -355,7 +389,9 @@ class MaBreakStatistics:
# 依据symbol和bar分组统计每个symbol和bar的interval_minutes的max, min, mean, std, median, count
interval_minutes_df = (
ma_break_market_data.groupby(["symbol", "symbol_name", "bar"])["interval_minutes"]
ma_break_market_data.groupby(["symbol", "symbol_name", "bar"])[
"interval_minutes"
]
.agg(
interval_minutes_max="max",
interval_minutes_min="min",
@ -404,7 +440,9 @@ class MaBreakStatistics:
ma_break_market_data.to_excel(
writer, sheet_name="买卖记录明细", index=False
)
account_value_chg_df.to_excel(writer, sheet_name="资产价值变化", index=False)
account_value_chg_df.to_excel(
writer, sheet_name="资产价值变化", index=False
)
account_value_statistics_df.to_excel(
writer, sheet_name="买卖账户价值统计", index=False
)
@ -412,7 +450,9 @@ class MaBreakStatistics:
writer, sheet_name="买卖时间间隔统计", index=False
)
chart_dict = self.draw_quant_pct_chg_bar_chart(account_value_chg_df, strategy_name)
chart_dict = self.draw_quant_pct_chg_bar_chart(
account_value_chg_df, strategy_name
)
self.output_chart_to_excel(output_file_path, chart_dict)
chart_dict = self.draw_quant_line_chart(
ma_break_market_data, market_data_pct_chg_df, strategy_name
@ -442,7 +482,7 @@ class MaBreakStatistics:
strategy_info["买入策略"] = buy_and_text + " 或者 \n" + buy_or_text
else:
strategy_info["买入策略"] = buy_and_text
# 假如根据长周期判断买入,则需要设置长周期策略
by_week = self.buy_by_long_period.get("by_week", False)
by_month = self.buy_by_long_period.get("by_month", False)
@ -540,6 +580,8 @@ class MaBreakStatistics:
strategy_name=strategy_name,
row=row,
behavior="buy",
buy_price=None,
window_100_valleys_median=None,
)
if buy_condition:
@ -574,10 +616,21 @@ class MaBreakStatistics:
ma_break_market_data_pair["begin_account_value"] = account_value
continue
else:
valleys_median = None
if self.cut_loss_by_valleys_median and index >= 100:
window_100_records = market_data.iloc[index - 100 : index]
peaks_valleys = self.metrics_calculation.get_peaks_valleys_mean(
window_100_records
)
valleys_median = peaks_valleys.get("valleys_median", None)
if valleys_median is not None and valleys_median > 0:
valleys_median = valleys_median / 100
sell_condition = self.fit_strategy(
strategy_name=strategy_name,
row=row,
behavior="sell",
buy_price=ma_break_market_data_pair["begin_close"],
window_100_valleys_median=valleys_median,
)
if sell_condition or index == len(market_data) - 1:
@ -612,6 +665,12 @@ class MaBreakStatistics:
ma_break_market_data_pair["pct_chg"] = round(
ma_break_market_data_pair["pct_chg"] * 100, 4
)
if valleys_median is not None:
ma_break_market_data_pair["valleys_median"] = (
valleys_median * 100
)
else:
ma_break_market_data_pair["valleys_median"] = None
ma_break_market_data_pair["profit_loss"] = profit_loss
ma_break_market_data_pair["sell_commission"] = sell_commission
ma_break_market_data_pair["end_account_value"] = account_value
@ -804,7 +863,12 @@ class MaBreakStatistics:
current_end_date_str = current_end_date.strftime("%Y-%m-%d")
logger.info(f"获取{symbol}数据:{start_date_str}{current_end_date_str}")
current_data = self.db_market_data.query_market_data_by_symbol_bar(
symbol, bar, fields, start=start_date_str, end=current_end_date_str, table_name=table_name
symbol,
bar,
fields,
start=start_date_str,
end=current_end_date_str,
table_name=table_name,
)
if current_data is not None and len(current_data) > 0:
current_data = pd.DataFrame(current_data)
@ -816,7 +880,7 @@ class MaBreakStatistics:
if self.is_astock or self.is_aindex:
data = self.update_data(data)
return data
def get_long_period_data(self, symbol: str, bar: str, end_date: str):
"""
获取长周期数据
@ -843,56 +907,67 @@ class MaBreakStatistics:
if len(end_date) != 10:
end_date = self.change_date_format(end_date)
if bar == "1M":
# 获取上两个月的日期
last_date = datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days=60)
# 获取上五年的日期
last_date = datetime.strptime(end_date, "%Y-%m-%d") - timedelta(
days=360 * 5
)
last_date = last_date.strftime("%Y-%m-%d")
elif bar == "1W":
# 获取上两周的日期
last_date = datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days=14)
# 获取上两年的日期
last_date = datetime.strptime(end_date, "%Y-%m-%d") - timedelta(
days=360 * 2
)
last_date = last_date.strftime("%Y-%m-%d")
else:
last_date = None
if len(table_name) == 0 or last_date is None:
return None
fields = [
"a.ts_code as symbol",
"b.name as symbol_name",
f"'{bar}' as bar",
"0 as timestamp",
"trade_date as date_time",
"open",
"high",
"low",
"close",
"vol as volume",
"MA5 as ma5",
"MA10 as ma10",
"MA20 as ma20",
"MA30 as ma30",
"均线交叉 as ma_cross",
"DIF as dif",
"DEA as dea",
"MACD as macd",
]
"a.ts_code as symbol",
"b.name as symbol_name",
f"'{bar}' as bar",
"0 as timestamp",
"trade_date as date_time",
"open",
"high",
"low",
"close",
"vol as volume",
"MA5 as ma5",
"MA10 as ma10",
"MA20 as ma20",
"MA30 as ma30",
"均线交叉 as ma_cross",
"DIF as dif",
"DEA as dea",
"MACD as macd",
]
data = self.db_market_data.query_market_data_by_symbol_bar(
symbol, bar, fields, start=last_date, end=end_date, table_name=table_name
)
if data is not None and len(data) > 0:
data = pd.DataFrame(data)
data.sort_values(by="date_time", inplace=True)
data = self.metrics_calculation.calculate_percentile_indicators(
data=data,
window_size=50,
price_column="close",
percentiles=[(0.1, "10")],
)
latest_row = data.iloc[-1]
if (latest_row["ma5"] is None or
latest_row["ma10"] is None or
latest_row["ma20"] is None or
latest_row["dif"] is None or
latest_row["macd"] is None):
if (
latest_row["ma5"] is None
or latest_row["ma10"] is None
or latest_row["ma20"] is None
or latest_row["dif"] is None
or latest_row["macd"] is None
):
return None
return latest_row
else:
return None
def update_data(self, data: pd.DataFrame):
"""
更新数据
@ -902,12 +977,15 @@ class MaBreakStatistics:
:param data: 数据
:return: 更新后的数据
"""
data["date_time"] = data["date_time"].apply(lambda x: self.change_date_format(x))
data["timestamp"] = data["date_time"].apply(lambda x: transform_date_time_to_timestamp(x))
metrics_calculation = MetricsCalculation()
data = metrics_calculation.ma5102030(data)
data["date_time"] = data["date_time"].apply(
lambda x: self.change_date_format(x)
)
data["timestamp"] = data["date_time"].apply(
lambda x: transform_date_time_to_timestamp(x)
)
data = self.metrics_calculation.ma5102030(data)
return data
def change_date_format(self, date_text: str):
# 将20210104这种格式替换为2021-01-04的格式
if len(date_text) == 8:
@ -920,7 +998,23 @@ class MaBreakStatistics:
strategy_name: str = "全均线策略",
row: pd.Series = None,
behavior: str = "buy",
buy_price: float = None,
window_100_valleys_median: float = None,
):
# 如果行为是卖出,则判断是否根据止损价格卖出
# 止损价格 = 买入价格 * (1 - window_100_valleys_median)
# window_100_valleys_median为100日下跌波谷幅度中位数
# 当前价格 < 止损价格,则卖出
if (
behavior == "sell"
and buy_price is not None
and window_100_valleys_median is not None
):
current_price = float(row["close"])
if current_price < buy_price:
loss_ratio = (buy_price - current_price) / buy_price
if loss_ratio > window_100_valleys_median:
return True
strategy_config = self.main_strategy.get(strategy_name, None)
if strategy_config is None:
logger.error(f"策略{strategy_name}不存在")
@ -953,21 +1047,56 @@ class MaBreakStatistics:
long_period_condition_list.append("macd>0")
if len(long_period_condition_list) > 0:
if self.buy_by_long_period.get("by_week", False):
long_period_data = self.get_long_period_data(row["symbol"], "1W", date_time)
long_period_data = self.get_long_period_data(
row["symbol"], "1W", date_time
)
if long_period_data is not None:
condition = self.get_judge_result(long_period_data, long_period_condition_list, "and", condition)
condition = self.get_judge_result(
long_period_data,
long_period_condition_list,
"and",
condition,
)
if not condition:
logger.info(f"根据周线指标,{row['symbol']}不满足买入条件")
# 如果周线处于空头条件但收盘价位于50窗口的低点10分位数则买入
if self.buy_by_long_period.get("buy_by_10_percentile", False):
if long_period_data["close_10_low"] == 1:
condition = True
if not condition:
logger.info(
f"根据周线指标,{row['symbol']}不满足买入条件"
)
if self.buy_by_long_period.get("by_month", False):
long_period_data = self.get_long_period_data(row["symbol"], "1M", date_time)
long_period_data = self.get_long_period_data(
row["symbol"], "1M", date_time
)
if long_period_data is not None:
condition = self.get_judge_result(long_period_data, long_period_condition_list, "and", condition)
condition = self.get_judge_result(
long_period_data,
long_period_condition_list,
"and",
condition,
)
if not condition:
logger.info(f"根据月线指标,{row['symbol']}不满足买入条件")
# 如果月线处于空头条件但收盘价位于50窗口的低点10分位数则买入
if self.buy_by_long_period.get("buy_by_10_percentile", False):
if long_period_data["close_10_low"] == 1:
condition = True
if not condition:
logger.info(
f"根据月线指标,{row['symbol']}不满足买入条件"
)
return condition
def get_judge_result(self, row: pd.Series, condition_list: list, and_or: str = "and", raw_condition: bool = True):
def get_judge_result(
self,
row: pd.Series,
condition_list: list,
and_or: str = "and",
raw_condition: bool = True,
):
ma_cross = row["ma_cross"]
if pd.isna(ma_cross) or ma_cross is None:
ma_cross = ""
@ -985,51 +1114,53 @@ class MaBreakStatistics:
macd_dea = float(row["dea"])
macd = float(row["macd"])
if and_or == "and":
for and_condition in condition_list:
if and_condition == "5上穿10":
raw_condition = raw_condition and ("5上穿10" in ma_cross)
elif and_condition == "10上穿20":
raw_condition = raw_condition and ("10上穿20" in ma_cross)
elif and_condition == "20上穿30":
raw_condition = raw_condition and ("20上穿30" in ma_cross)
elif and_condition == "ma5>ma10":
raw_condition = raw_condition and (ma5 > ma10)
elif and_condition == "ma10>ma20":
raw_condition = raw_condition and (ma10 > ma20)
elif and_condition == "ma20>ma30":
raw_condition = raw_condition and (ma20 > ma30)
elif and_condition == "close>ma20":
raw_condition = raw_condition and (close > ma20)
elif and_condition == "volume_pct_chg>0.2" and volume_pct_chg is not None:
raw_condition = raw_condition and (volume_pct_chg > 0.2)
elif and_condition == "macd_diff>0":
raw_condition = raw_condition and (macd_diff > 0)
elif and_condition == "macd_dea>0":
raw_condition = raw_condition and (macd_dea > 0)
elif and_condition == "macd>0":
raw_condition = raw_condition and (macd > 0)
elif and_condition == "10下穿5":
raw_condition = raw_condition and ("10下穿5" in ma_cross)
elif and_condition == "20下穿10":
raw_condition = raw_condition and ("20下穿10" in ma_cross)
elif and_condition == "30下穿20":
raw_condition = raw_condition and ("30下穿20" in ma_cross)
elif and_condition == "ma5<ma10":
raw_condition = raw_condition and (ma5 < ma10)
elif and_condition == "ma10<ma20":
raw_condition = raw_condition and (ma10 < ma20)
elif and_condition == "ma20<ma30":
raw_condition = raw_condition and (ma20 < ma30)
elif and_condition == "close<ma20":
raw_condition = raw_condition and (close < ma20)
elif and_condition == "macd_diff<0":
raw_condition = raw_condition and (macd_diff < 0)
elif and_condition == "macd_dea<0":
raw_condition = raw_condition and (macd_dea < 0)
elif and_condition == "macd<0":
raw_condition = raw_condition and (macd < 0)
else:
pass
for and_condition in condition_list:
if and_condition == "5上穿10":
raw_condition = raw_condition and ("5上穿10" in ma_cross)
elif and_condition == "10上穿20":
raw_condition = raw_condition and ("10上穿20" in ma_cross)
elif and_condition == "20上穿30":
raw_condition = raw_condition and ("20上穿30" in ma_cross)
elif and_condition == "ma5>ma10":
raw_condition = raw_condition and (ma5 > ma10)
elif and_condition == "ma10>ma20":
raw_condition = raw_condition and (ma10 > ma20)
elif and_condition == "ma20>ma30":
raw_condition = raw_condition and (ma20 > ma30)
elif and_condition == "close>ma20":
raw_condition = raw_condition and (close > ma20)
elif (
and_condition == "volume_pct_chg>0.2" and volume_pct_chg is not None
):
raw_condition = raw_condition and (volume_pct_chg > 0.2)
elif and_condition == "macd_diff>0":
raw_condition = raw_condition and (macd_diff > 0)
elif and_condition == "macd_dea>0":
raw_condition = raw_condition and (macd_dea > 0)
elif and_condition == "macd>0":
raw_condition = raw_condition and (macd > 0)
elif and_condition == "10下穿5":
raw_condition = raw_condition and ("10下穿5" in ma_cross)
elif and_condition == "20下穿10":
raw_condition = raw_condition and ("20下穿10" in ma_cross)
elif and_condition == "30下穿20":
raw_condition = raw_condition and ("30下穿20" in ma_cross)
elif and_condition == "ma5<ma10":
raw_condition = raw_condition and (ma5 < ma10)
elif and_condition == "ma10<ma20":
raw_condition = raw_condition and (ma10 < ma20)
elif and_condition == "ma20<ma30":
raw_condition = raw_condition and (ma20 < ma30)
elif and_condition == "close<ma20":
raw_condition = raw_condition and (close < ma20)
elif and_condition == "macd_diff<0":
raw_condition = raw_condition and (macd_diff < 0)
elif and_condition == "macd_dea<0":
raw_condition = raw_condition and (macd_dea < 0)
elif and_condition == "macd<0":
raw_condition = raw_condition and (macd < 0)
else:
pass
elif and_or == "or":
for or_condition in condition_list:
if or_condition == "5上穿10":
@ -1046,7 +1177,9 @@ class MaBreakStatistics:
raw_condition = raw_condition or (ma20 > ma30)
elif or_condition == "close>ma20":
raw_condition = raw_condition or (close > ma20)
elif or_condition == "volume_pct_chg>0.2" and volume_pct_chg is not None:
elif (
or_condition == "volume_pct_chg>0.2" and volume_pct_chg is not None
):
raw_condition = raw_condition or (volume_pct_chg > 0.2)
elif or_condition == "macd_diff>0":
raw_condition = raw_condition or (macd_diff > 0)
@ -1078,7 +1211,6 @@ class MaBreakStatistics:
pass
return raw_condition
def draw_quant_pct_chg_bar_chart(
self, data: pd.DataFrame, strategy_name: str = "全均线策略"
):

View File

@ -2,6 +2,7 @@ import core.logger as logging
from datetime import datetime
from time import sleep
import pandas as pd
import os
from core.biz.market_data import MarketData
from core.trade.ma_break_statistics import MaBreakStatistics
from core.db.db_market_data import DBMarketData
@ -33,7 +34,13 @@ class TradeMaStrategyMain:
is_binance: bool = False,
commission_per_share: float = 0,
buy_by_long_period: dict = {"by_week": False, "by_month": False},
long_period_condition: dict = {"ma5>ma10": False, "ma10>ma20": False, "macd_diff>0": False, "macd>0": False},
long_period_condition: dict = {
"ma5>ma10": False,
"ma10>ma20": False,
"macd_diff>0": False,
"macd>0": False,
},
cut_loss_by_valleys_median: bool = False,
):
self.ma_break_statistics = MaBreakStatistics(
is_us_stock=is_us_stock,
@ -43,6 +50,7 @@ class TradeMaStrategyMain:
commission_per_share=commission_per_share,
buy_by_long_period=buy_by_long_period,
long_period_condition=long_period_condition,
cut_loss_by_valleys_median=cut_loss_by_valleys_median,
)
def batch_ma_break_statistics(self):
@ -77,41 +85,199 @@ def test_single_symbol():
)
symbol = "600111.SH"
bar = "1D"
ma_break_statistics.trade_simulate(symbol=symbol, bar=bar, strategy_name="均线macd结合策略2")
ma_break_statistics.trade_simulate(
symbol=symbol, bar=bar, strategy_name="均线macd结合策略2"
)
def batch_run_strategy():
commission_per_share_list = [0, 0.0008]
# cut_loss_by_valleys_median_list = [True, False]
cut_loss_by_valleys_median_list = [False]
buy_by_long_period_list = [
# {"by_week": True, "by_month": True, "buy_by_10_percentile": True},
{"by_week": False, "by_month": True, "buy_by_10_percentile": True},
{"by_week": True, "by_month": False, "buy_by_10_percentile": True},
{"by_week": False, "by_month": False, "buy_by_10_percentile": False},
]
# buy_by_long_period_list = [{"by_week": False, "by_month": False}]
long_period_condition_list = [
{"ma5>ma10": True, "ma10>ma20": True, "macd_diff>0": True, "macd>0": True},
{"ma5>ma10": True, "ma10>ma20": False, "macd_diff>0": True, "macd>0": True},
{"ma5>ma10": False, "ma10>ma20": True, "macd_diff>0": True, "macd>0": True},
]
for commission_per_share in commission_per_share_list:
for cut_loss_by_valleys_median in cut_loss_by_valleys_median_list:
for buy_by_long_period in buy_by_long_period_list:
for long_period_condition in long_period_condition_list:
logger.info(
f"开始计算, 主要参数commission_per_share: {commission_per_share}, buy_by_long_period: {buy_by_long_period}, long_period_condition: {long_period_condition}"
)
trade_ma_strategy_main = TradeMaStrategyMain(
is_us_stock=False,
is_astock=False,
is_aindex=True,
is_binance=False,
commission_per_share=commission_per_share,
buy_by_long_period=buy_by_long_period,
long_period_condition=long_period_condition,
cut_loss_by_valleys_median=cut_loss_by_valleys_median,
)
trade_ma_strategy_main.batch_ma_break_statistics()
trade_ma_strategy_main = TradeMaStrategyMain(
is_us_stock=False,
is_astock=True,
is_aindex=False,
is_binance=False,
commission_per_share=commission_per_share,
buy_by_long_period=buy_by_long_period,
long_period_condition=long_period_condition,
cut_loss_by_valleys_median=cut_loss_by_valleys_median,
)
trade_ma_strategy_main.batch_ma_break_statistics()
def pickup_data_from_excel():
main_path = r"./output/trade_sandbox/ma_strategy"
sub_main_paths = ["aindex", "astock"]
fix_sub_path = "no_cut_loss_by_valleys_median"
sub_folder = r"excel/均线macd结合策略2/"
file_feature_name = "with_commission"
original_df_columns = [
"strategy_name",
"symbol",
"symbol_name",
"bar",
"total_buy_commission",
"total_sell_commission",
"total_commission",
"initial_account_value",
"final_account_value",
"account_value_chg",
"market_pct_chg",
]
original_df_list = []
for sub_main_path in sub_main_paths:
logger.info(f"开始读取{sub_main_path}数据")
full_sub_main_path = os.path.join(main_path, sub_main_path, fix_sub_path)
# 读取sub_main_path下的所有文件夹
folder_list = os.listdir(full_sub_main_path)
for folder in folder_list:
logger.info(f"开始读取{folder}数据")
folder_path = os.path.join(full_sub_main_path, folder)
properties = get_properties_by_folder_name(folder, sub_main_path)
logger.info(f"开始读取{folder}数据")
# 读取folder_path的sub_folder下的所有文件
sub_folder_path = os.path.join(folder_path, sub_folder)
file_list = os.listdir(sub_folder_path)
for file in file_list:
logger.info(f"开始读取{file}数据")
if file_feature_name in file:
file_path = os.path.join(sub_folder_path, file)
df = pd.read_excel(file_path, sheet_name="资产价值变化")
logger.info(f"开始读取{file}数据")
# 向df添加properties
df = df.assign(**properties)
df = df[list(properties.keys()) + original_df_columns]
# 将df添加到original_df_list
original_df_list.append(df)
final_df = pd.concat(original_df_list)
excel_folder_path = os.path.join(main_path, "aindex_astock_均线macd结合策略2")
os.makedirs(excel_folder_path, exist_ok=True)
excel_file_path = os.path.join(
excel_folder_path, "all_strategy_with_commission.xlsx"
)
with pd.ExcelWriter(excel_file_path) as writer:
final_df.to_excel(
writer, sheet_name="all_strategy_with_commission", index=False
)
def get_properties_by_folder_name(folder_name: str, symbol_type: str):
properties = {}
sub_properties = folder_name.split("_")
properties["symbol_type"] = symbol_type
properties["buy_by_long_period"] = "no_long_period"
properties["long_period_ma5gtma10"] = False
properties["long_period_ma10gtma20"] = False
properties["long_period_macd_diffgt0"] = False
properties["long_period_macdgt0"] = False
properties["buy_by_long_period_10_percentile"] = False
if "1M" in sub_properties:
properties["buy_by_long_period"] = "1M"
if "1W" in sub_properties:
properties["buy_by_long_period"] = "1W"
if "1W1M" in sub_properties:
properties["buy_by_long_period"] = "1W1M"
if "ma5gtma10" in sub_properties:
properties["long_period_ma5gtma10"] = True
if "ma10gtma20" in sub_properties:
properties["long_period_ma10gtma20"] = True
if "macd_diffgt0" in sub_properties:
properties["long_period_macd_diffgt0"] = True
if "macdgt0" in sub_properties:
properties["long_period_macdgt0"] = True
if "10percentile" in sub_properties:
properties["buy_by_long_period_10_percentile"] = True
return properties
def profit_loss_ratio():
"""
计算利润损失比
公式盈利/盈利交易次数 : 亏损/亏损交易次数
"""
folder = r"./output/trade_sandbox/ma_strategy/binance/excel/均线macd结合策略2/"
prefix = ["无交易费用", "有交易费用"]
for prefix in prefix:
excel_file_path = os.path.join(
folder, f"{prefix}_趋势投资_from_201708181600_to_202509020600.xlsx"
)
df = pd.read_excel(excel_file_path, sheet_name="买卖记录明细")
symbol_list = list(df["symbol"].unique())
bar_list = list(df["bar"].unique())
data_list = []
for symbol in symbol_list:
for bar in bar_list:
df_symbol_bar = df[df["symbol"] == symbol][df["bar"] == bar]
start_date = df_symbol_bar["begin_date_time"].min()
end_date = df_symbol_bar["end_date_time"].max()
profit_df = df_symbol_bar[df_symbol_bar["profit_loss"] > 0]
loss_df = df_symbol_bar[df_symbol_bar["profit_loss"] < 0]
profit_amount = sum(profit_df["profit_loss"])
loss_amount = abs(sum(loss_df["profit_loss"]))
profit_count = len(profit_df)
loss_count = len(loss_df)
if profit_count == 0 or loss_count == 0:
continue
profit_loss_ratio = round(
(profit_amount / profit_count) / (loss_amount / loss_count) * 100, 4
)
data_list.append(
{
"币种": symbol,
"交易周期": bar,
"开始时间": start_date,
"结束时间": end_date,
"盈利金额": profit_amount,
"盈利次数": profit_count,
"亏损金额": loss_amount,
"亏损次数": loss_count,
"盈亏比": profit_loss_ratio,
"盈亏比公式": f"盈利金额/盈利次数 : 亏损金额/亏损次数",
}
)
final_df = pd.DataFrame(data_list)
final_df.to_excel(os.path.join(folder, f"{prefix}时虚拟货币利润损失比.xlsx"), index=False)
if __name__ == "__main__":
commission_per_share_list = [0, 0.0008]
buy_by_long_period_list = [{"by_week": True, "by_month": True},
{"by_week": True, "by_month": False},
{"by_week": False, "by_month": True},
{"by_week": False, "by_month": False}]
long_period_condition_list = [{"ma5>ma10": True, "ma10>ma20": True, "macd_diff>0": True, "macd>0": True},
{"ma5>ma10": True, "ma10>ma20": False, "macd_diff>0": True, "macd>0": True},
{"ma5>ma10": False, "ma10>ma20": True, "macd_diff>0": True, "macd>0": True}]
for commission_per_share in commission_per_share_list:
for buy_by_long_period in buy_by_long_period_list:
for long_period_condition in long_period_condition_list:
logger.info(f"开始计算, 主要参数commission_per_share: {commission_per_share}, buy_by_long_period: {buy_by_long_period}, long_period_condition: {long_period_condition}")
trade_ma_strategy_main = TradeMaStrategyMain(
is_us_stock=False,
is_astock=False,
is_aindex=True,
is_binance=False,
commission_per_share=commission_per_share,
buy_by_long_period=buy_by_long_period,
long_period_condition=long_period_condition,
)
trade_ma_strategy_main.batch_ma_break_statistics()
trade_ma_strategy_main = TradeMaStrategyMain(
is_us_stock=False,
is_astock=True,
is_aindex=False,
is_binance=False,
commission_per_share=commission_per_share,
buy_by_long_period=buy_by_long_period,
long_period_condition=long_period_condition,
)
trade_ma_strategy_main.batch_ma_break_statistics()
# batch_run_strategy()
profit_loss_ratio()