1. optimize binance ma quant strategy

2. support monitor 1m volume
This commit is contained in:
blade 2025-09-17 19:33:37 +08:00
parent 23e533ddf3
commit adba888bb2
7 changed files with 515 additions and 239 deletions

View File

@ -71,6 +71,28 @@ OKX_MONITOR_CONFIG = {
},
}
OKX_REALTIME_MONITOR_CONFIG = {
"volume_monitor": {
"symbols": [
"XCH-USDT",
"BTC-USDT",
"SOL-USDT",
"ETH-USDT",
"DOGE-USDT",
],
"bars": ["1m", "5m", "15m", "30m", "1H"],
"initial_date": "2025-05-15 00:00:00",
},
"price_monitor": {
"symbols": ["XCH-USDT"],
"bats": [
{"bar": "5m", "threshold": 0.025},
{"bar": "15m", "threshold": 0.5},
{"bar": "1H", "threshold": 0.1},
],
},
}
BINANCE_MONITOR_CONFIG = {
"volume_monitor": {
"symbols": [

View File

@ -8,12 +8,19 @@ from datetime import datetime, timedelta, timezone
from core.utils import get_current_date_time
import re
import json
import math
from openpyxl import Workbook
from openpyxl.drawing.image import Image
import openpyxl
from openpyxl.styles import Font
from PIL import Image as PILImage
from config import OKX_MONITOR_CONFIG, US_STOCK_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE, BINANCE_MONITOR_CONFIG
from config import (
OKX_MONITOR_CONFIG,
US_STOCK_MONITOR_CONFIG,
MYSQL_CONFIG,
WINDOW_SIZE,
BINANCE_MONITOR_CONFIG,
)
from core.db.db_market_data import DBMarketData
from core.db.db_binance_data import DBBinanceData
from core.db.db_huge_volume_data import DBHugeVolumeData
@ -34,7 +41,12 @@ class MaBreakStatistics:
之间的涨跌幅
"""
def __init__(self, is_us_stock: bool = False, is_binance: bool = False):
def __init__(
self,
is_us_stock: bool = False,
is_binance: bool = False,
commission_per_share: float = 0.0008,
):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
if not mysql_password:
@ -65,16 +77,16 @@ class MaBreakStatistics:
"symbols", ["BTC-USDT"]
)
self.bars = ["30m", "1H"]
self.initial_date = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2017-08-16 00:00:00"
)
self.initial_date = BINANCE_MONITOR_CONFIG.get(
"volume_monitor", {}
).get("initial_date", "2017-08-16 00:00:00")
self.db_market_data = DBBinanceData(self.db_url)
else:
self.symbols = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["XCH-USDT"]
"symbols", ["XCH-USDT"]
)
self.bars = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get(
"bars", ["5m", "15m", "30m", "1H"]
"bars", ["5m", "15m", "30m", "1H"]
)
self.initial_date = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-15 00:00:00"
@ -82,7 +94,8 @@ class MaBreakStatistics:
self.db_market_data = DBMarketData(self.db_url)
if len(self.initial_date) > 10:
self.initial_date = self.initial_date[:10]
self.end_date = get_current_date_time()
self.end_date = get_current_date_time(format="%Y-%m-%d")
self.commission_per_share = commission_per_share
self.trade_strategy_config = self.get_trade_strategy_config()
self.main_strategy = self.trade_strategy_config.get("均线系统策略", None)
@ -93,14 +106,26 @@ class MaBreakStatistics:
def batch_statistics(self, strategy_name: str = "全均线策略"):
if self.is_us_stock:
self.stats_output_dir = f"./output/trade_sandbox/ma_strategy/us_stock/excel/{strategy_name}/"
self.stats_chart_dir = f"./output/trade_sandbox/ma_strategy/us_stock/chart/{strategy_name}/"
self.stats_output_dir = (
f"./output/trade_sandbox/ma_strategy/us_stock/excel/{strategy_name}/"
)
self.stats_chart_dir = (
f"./output/trade_sandbox/ma_strategy/us_stock/chart/{strategy_name}/"
)
elif self.is_binance:
self.stats_output_dir = f"./output/trade_sandbox/ma_strategy/binance/excel/{strategy_name}/"
self.stats_chart_dir = f"./output/trade_sandbox/ma_strategy/binance/chart/{strategy_name}/"
self.stats_output_dir = (
f"./output/trade_sandbox/ma_strategy/binance/excel/{strategy_name}/"
)
self.stats_chart_dir = (
f"./output/trade_sandbox/ma_strategy/binance/chart/{strategy_name}/"
)
else:
self.stats_output_dir = f"./output/trade_sandbox/ma_strategy/okx/excel/{strategy_name}/"
self.stats_chart_dir = f"./output/trade_sandbox/ma_strategy/okx/chart/{strategy_name}/"
self.stats_output_dir = (
f"./output/trade_sandbox/ma_strategy/okx/excel/{strategy_name}/"
)
self.stats_chart_dir = (
f"./output/trade_sandbox/ma_strategy/okx/chart/{strategy_name}/"
)
os.makedirs(self.stats_output_dir, exist_ok=True)
os.makedirs(self.stats_chart_dir, exist_ok=True)
@ -111,7 +136,7 @@ class MaBreakStatistics:
for symbol in self.symbols:
for bar in self.bars:
logger.info(
f"开始计算{symbol} {bar}的MA突破区间涨跌幅统计, 策略: {strategy_name}"
f"开始计算{symbol} {bar}的MA突破区间涨跌幅统计, 策略: {strategy_name},交易费率:{self.commission_per_share}"
)
ma_break_market_data, market_data_pct_chg = self.trade_simulate(
symbol, bar, strategy_name
@ -131,35 +156,14 @@ class MaBreakStatistics:
ma_break_market_data = pd.concat(ma_break_market_data_list)
market_data_pct_chg_df = pd.DataFrame(market_data_pct_chg_list)
# 依据symbol和bar分组统计每个symbol和bar的pct_chg的max, min, mean, std, median, count
ma_break_market_data.sort_values(by="begin_timestamp", ascending=True, inplace=True)
ma_break_market_data.sort_values(
by="begin_timestamp", ascending=True, inplace=True
)
ma_break_market_data.reset_index(drop=True, inplace=True)
pct_chg_df = (
ma_break_market_data.groupby(["symbol", "bar"])["pct_chg"]
.agg(
pct_chg_sum="sum",
pct_chg_max="max",
pct_chg_min="min",
pct_chg_mean="mean",
pct_chg_std="std",
pct_chg_median="median",
pct_chg_count="count",
)
.reset_index()
)
pct_chg_df["strategy_name"] = strategy_name
pct_chg_df["pct_chg_total"] = 0
pct_chg_df["market_pct_chg"] = 0
# 将pct_chg_total与market_pct_chg的值类型转换为float
pct_chg_df["pct_chg_total"] = pct_chg_df["pct_chg_total"].astype(float)
pct_chg_df["market_pct_chg"] = pct_chg_df["market_pct_chg"].astype(float)
# 统计pct_chg_total
# 算法要求ma_break_market_data然后pct_chg/100 + 1
ma_break_market_data["pct_chg_total"] = (
ma_break_market_data["pct_chg"] / 100 + 1
)
# 遍历symbol和bar按照end_timestamp排序计算pct_chg_total的值然后相乘
for symbol in pct_chg_df["symbol"].unique():
for bar in pct_chg_df["bar"].unique():
account_value_chg_list = []
for symbol in market_data_pct_chg_df["symbol"].unique():
for bar in market_data_pct_chg_df["bar"].unique():
symbol_bar_data = ma_break_market_data[
(ma_break_market_data["symbol"] == symbol)
& (ma_break_market_data["bar"] == bar)
@ -169,51 +173,80 @@ class MaBreakStatistics:
by="end_timestamp", ascending=True, inplace=True
)
symbol_bar_data.reset_index(drop=True, inplace=True)
symbol_bar_data["pct_chg_total"] = symbol_bar_data[
"pct_chg_total"
].cumprod()
# 将更新后的pct_chg_total数据同步更新到ma_break_market_data的对应数据行中
for idx, row in symbol_bar_data.iterrows():
mask = (ma_break_market_data["symbol"] == symbol) & \
(ma_break_market_data["bar"] == bar) & \
(ma_break_market_data["end_timestamp"] == row["end_timestamp"])
ma_break_market_data.loc[mask, "pct_chg_total"] = row["pct_chg_total"]
last_pct_chg_total = symbol_bar_data["pct_chg_total"].iloc[-1]
last_pct_chg_total = (last_pct_chg_total - 1) * 100
pct_chg_df.loc[
(pct_chg_df["symbol"] == symbol)
& (pct_chg_df["bar"] == bar),
"pct_chg_total",
] = last_pct_chg_total
initial_capital = int(market_data_pct_chg_df.loc[
(market_data_pct_chg_df["symbol"] == symbol)
& (market_data_pct_chg_df["bar"] == bar),
"initial_capital",
].values[0])
final_account_value = float(symbol_bar_data["end_account_value"].iloc[-1])
account_value_chg = (final_account_value - initial_capital) / initial_capital * 100
account_value_chg = round(account_value_chg, 4)
market_pct_chg = market_data_pct_chg_df.loc[
(market_data_pct_chg_df["symbol"] == symbol)
& (market_data_pct_chg_df["bar"] == bar),
"pct_chg",
].values[0]
pct_chg_df.loc[
(pct_chg_df["symbol"] == symbol)
& (pct_chg_df["bar"] == bar),
"market_pct_chg",
] = market_pct_chg
pct_chg_df = pct_chg_df[
total_buy_commission = float(symbol_bar_data["buy_commission"].sum())
total_sell_commission = float(symbol_bar_data["sell_commission"].sum())
total_commission = total_buy_commission + total_sell_commission
total_commission = round(total_commission, 4)
total_buy_commission = round(total_buy_commission, 4)
total_sell_commission = round(total_sell_commission, 4)
account_value_chg_list.append({
"strategy_name": strategy_name,
"symbol": symbol,
"bar": bar,
"total_buy_commission": total_buy_commission,
"total_sell_commission": total_sell_commission,
"total_commission": total_commission,
"initial_account_value": initial_capital,
"final_account_value": final_account_value,
"account_value_chg": account_value_chg,
"market_pct_chg": market_pct_chg,
})
account_value_chg_df = pd.DataFrame(account_value_chg_list)
account_value_chg_df = account_value_chg_df[
[
"strategy_name",
"symbol",
"bar",
"total_buy_commission",
"total_sell_commission",
"total_commission",
"initial_account_value",
"final_account_value",
"account_value_chg",
"market_pct_chg",
"pct_chg_total",
"pct_chg_sum",
"pct_chg_max",
"pct_chg_min",
"pct_chg_mean",
"pct_chg_std",
"pct_chg_median",
"pct_chg_count",
]
]
account_value_statistics_df = (
ma_break_market_data.groupby(["symbol", "bar"])["end_account_value"]
.agg(
account_value_max="max",
account_value_min="min",
account_value_mean="mean",
account_value_std="std",
account_value_median="median",
account_value_count="count",
)
.reset_index()
)
account_value_statistics_df["strategy_name"] = strategy_name
account_value_statistics_df = account_value_statistics_df[
[
"strategy_name",
"symbol",
"bar",
"account_value_max",
"account_value_min",
"account_value_mean",
"account_value_std",
"account_value_median",
"account_value_count",
]
]
# 依据symbol和bar分组统计每个symbol和bar的interval_minutes的max, min, mean, std, median, count
interval_minutes_df = (
ma_break_market_data.groupby(["symbol", "bar"])["interval_minutes"]
@ -227,6 +260,20 @@ class MaBreakStatistics:
)
.reset_index()
)
interval_minutes_df["strategy_name"] = strategy_name
interval_minutes_df = interval_minutes_df[
[
"strategy_name",
"symbol",
"bar",
"interval_minutes_max",
"interval_minutes_min",
"interval_minutes_mean",
"interval_minutes_std",
"interval_minutes_median",
"interval_minutes_count",
]
]
earliest_market_date_time = ma_break_market_data["begin_date_time"].min()
earliest_market_date_time = re.sub(
@ -238,7 +285,10 @@ class MaBreakStatistics:
latest_market_date_time = re.sub(
r"[\:\-\s]", "", str(latest_market_date_time)
)
output_file_name = f"ma_break_stats_from_{earliest_market_date_time}_to_{latest_market_date_time}_{strategy_name}.xlsx"
if self.commission_per_share > 0:
output_file_name = f"ma_break_stats_from_{earliest_market_date_time}_to_{latest_market_date_time}_{strategy_name}_with_commission.xlsx"
else:
output_file_name = f"ma_break_stats_from_{earliest_market_date_time}_to_{latest_market_date_time}_{strategy_name}_without_commission.xlsx"
output_file_path = os.path.join(self.stats_output_dir, output_file_name)
logger.info(f"导出{output_file_path}")
strategy_info_df = self.get_strategy_info(strategy_name)
@ -247,16 +297,21 @@ class MaBreakStatistics:
ma_break_market_data.to_excel(
writer, sheet_name="买卖记录明细", index=False
)
pct_chg_df.to_excel(writer, sheet_name="买卖涨跌幅统计", index=False)
account_value_chg_df.to_excel(writer, sheet_name="资产价值变化", index=False)
account_value_statistics_df.to_excel(
writer, sheet_name="买卖账户价值统计", index=False
)
interval_minutes_df.to_excel(
writer, sheet_name="买卖时间间隔统计", index=False
)
chart_dict = self.draw_quant_pct_chg_bar_chart(pct_chg_df, strategy_name)
chart_dict = self.draw_quant_pct_chg_bar_chart(account_value_chg_df, strategy_name)
self.output_chart_to_excel(output_file_path, chart_dict)
chart_dict = self.draw_quant_line_chart(ma_break_market_data, strategy_name)
chart_dict = self.draw_quant_line_chart(
ma_break_market_data, market_data_pct_chg_df, strategy_name
)
self.output_chart_to_excel(output_file_path, chart_dict)
return pct_chg_df
return account_value_chg_df
else:
return None
@ -333,6 +388,14 @@ class MaBreakStatistics:
date_time_field = "date_time_us"
else:
date_time_field = "date_time"
close_mean = market_data["close"].mean()
self.update_initial_capital(close_mean)
logger.info(
f"成功获取{symbol}数据:{len(market_data)}{bar}K线,开始日期={market_data[date_time_field].min()},结束日期={market_data[date_time_field].max()}"
)
account_value = self.initial_capital
for index, row in market_data.iterrows():
ma_cross = row["ma_cross"]
timestamp = row["timestamp"]
@ -345,7 +408,6 @@ class MaBreakStatistics:
macd_diff = float(row["dif"])
macd_dea = float(row["dea"])
macd = float(row["macd"])
if ma_break_market_data_pair.get("begin_timestamp", None) is None:
buy_condition = self.fit_strategy(
strategy_name=strategy_name,
@ -355,6 +417,18 @@ class MaBreakStatistics:
)
if buy_condition:
entry_price = close
# 计算交易股数
shares, account_value = self.calculate_shares(
account_value, entry_price
)
if shares == 0:
# 股数为0→不交易
continue
# 计算佣金
buy_commission = shares * close * self.commission_per_share
ma_break_market_data_pair = {}
ma_break_market_data_pair["symbol"] = symbol
ma_break_market_data_pair["bar"] = bar
@ -368,6 +442,9 @@ class MaBreakStatistics:
ma_break_market_data_pair["begin_macd_diff"] = macd_diff
ma_break_market_data_pair["begin_macd_dea"] = macd_dea
ma_break_market_data_pair["begin_macd"] = macd
ma_break_market_data_pair["shares"] = shares
ma_break_market_data_pair["buy_commission"] = buy_commission
ma_break_market_data_pair["begin_account_value"] = account_value
continue
else:
sell_condition = self.fit_strategy(
@ -378,6 +455,20 @@ class MaBreakStatistics:
)
if sell_condition:
shares = ma_break_market_data_pair["shares"]
entry_price = ma_break_market_data_pair["begin_close"]
exit_price = close
sell_commission = (
shares * exit_price * self.commission_per_share
)
profit_loss = (exit_price - entry_price) * shares
begin_account_value = ma_break_market_data_pair[
"begin_account_value"
]
account_value = (
begin_account_value + profit_loss - sell_commission
)
ma_break_market_data_pair["end_timestamp"] = timestamp
ma_break_market_data_pair["end_date_time"] = date_time
ma_break_market_data_pair["end_close"] = close
@ -389,11 +480,14 @@ class MaBreakStatistics:
ma_break_market_data_pair["end_macd_dea"] = macd_dea
ma_break_market_data_pair["end_macd"] = macd
ma_break_market_data_pair["pct_chg"] = (
close - ma_break_market_data_pair["begin_close"]
) / ma_break_market_data_pair["begin_close"]
exit_price - entry_price
) / entry_price
ma_break_market_data_pair["pct_chg"] = round(
ma_break_market_data_pair["pct_chg"] * 100, 4
)
ma_break_market_data_pair["profit_loss"] = profit_loss
ma_break_market_data_pair["sell_commission"] = sell_commission
ma_break_market_data_pair["end_account_value"] = account_value
ma_break_market_data_pair["interval_seconds"] = (
timestamp - ma_break_market_data_pair["begin_timestamp"]
) / 1000
@ -413,7 +507,9 @@ class MaBreakStatistics:
if len(ma_break_market_data_pair_list) > 0:
ma_break_market_data = pd.DataFrame(ma_break_market_data_pair_list)
# sort by end_timestamp
ma_break_market_data.sort_values(by="begin_timestamp", ascending=True, inplace=True)
ma_break_market_data.sort_values(
by="begin_timestamp", ascending=True, inplace=True
)
ma_break_market_data.reset_index(drop=True, inplace=True)
logger.info(
f"获取{symbol} {bar} 的买卖记录明细成功, 买卖次数: {len(ma_break_market_data)}"
@ -421,16 +517,85 @@ class MaBreakStatistics:
# 量化期间,市场的波动率:
# ma_break_market_data(最后一条数据的end_close - 第一条数据的begin_close) / 第一条数据的begin_close * 100
pct_chg = (
(ma_break_market_data["end_close"].iloc[-1] - ma_break_market_data["begin_close"].iloc[0])
(
ma_break_market_data["end_close"].iloc[-1]
- ma_break_market_data["begin_close"].iloc[0]
)
/ ma_break_market_data["begin_close"].iloc[0]
* 100
)
pct_chg = round(pct_chg, 4)
market_data_pct_chg = {"symbol": symbol, "bar": bar, "pct_chg": pct_chg}
market_data_pct_chg = {
"symbol": symbol,
"bar": bar,
"pct_chg": pct_chg,
"initial_capital": self.initial_capital,
}
return ma_break_market_data, market_data_pct_chg
else:
return None, None
def update_initial_capital(self, close_mean: float):
self.initial_capital = 25000
if close_mean > 10000:
self.initial_capital = self.initial_capital * 10000
elif close_mean > 5000:
self.initial_capital = self.initial_capital * 5000
elif close_mean > 1000:
self.initial_capital = self.initial_capital * 1000
elif close_mean > 500:
self.initial_capital = self.initial_capital * 500
elif close_mean > 100:
self.initial_capital = self.initial_capital * 100
else:
pass
logger.info(f"收盘价均值:{close_mean}")
logger.info(f"初始资金调整为:{self.initial_capital}")
def calculate_shares(self, account_value, entry_price):
"""
根据ORB公式计算交易股数
:param account_value: 当前账户价值美元
:param entry_price: 交易买入价格
:param commission_per_share: 交易佣金, 默认为0.0008
:return: 整数股数Shares
"""
logger.info(
f"开始计算交易股数:账户价值={account_value},买入价格={entry_price}"
)
try:
# 验证输入参数
if account_value <= 0 or entry_price <= 0 or self.commission_per_share < 0:
logger.error("账户价值、买入价格或佣金不能为负或零")
return 0, account_value # 返回0股账户价值不变
# 计算考虑手续费后的每单位BTC总成本
total_cost_per_share = entry_price * (1 + self.commission_per_share)
# 计算可购买的BTC数量向下取整
shares = math.floor(account_value / total_cost_per_share)
# 计算总成本(含手续费)
total_cost = shares * total_cost_per_share
# 计算剩余现金
remaining_cash = account_value - total_cost
# 计算总资产价值 = (购买的BTC数量 × 买入价格) + 剩余现金
remaining_value = (shares * entry_price) + remaining_cash
# 记录计算结果
logger.info(
f"计算结果:可购买股数={shares},总成本={total_cost:.2f}美元,"
f"剩余现金={remaining_cash:.2f}美元,总资产价值={remaining_value:.2f}美元"
)
return shares, remaining_value
except Exception as e:
logger.error(f"计算股数或账户价值时出错:{str(e)}")
return 0, account_value # 出错时返回0股账户价值不变
def get_full_data(self, symbol: str, bar: str = "5m"):
"""
分段获取数据并将数据合并为完整数据
@ -464,9 +629,7 @@ class MaBreakStatistics:
current_end_date = min(start_date + timedelta(days=180), end_date)
start_date_str = start_date.strftime("%Y-%m-%d")
current_end_date_str = current_end_date.strftime("%Y-%m-%d")
logger.info(
f"获取{symbol}数据:{start_date_str}{current_end_date_str}"
)
logger.info(f"获取{symbol}数据:{start_date_str}{current_end_date_str}")
current_data = self.db_market_data.query_market_data_by_symbol_bar(
symbol, bar, fields, start=start_date_str, end=current_end_date_str
)
@ -483,7 +646,6 @@ class MaBreakStatistics:
data.reset_index(drop=True, inplace=True)
return data
def fit_strategy(
self,
strategy_name: str = "全均线策略",
@ -626,92 +788,105 @@ class MaBreakStatistics:
plt.rcParams["font.size"] = 11 # 设置字体大小
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
chart_dict = {}
column_name_dict = {"pct_chg_total": "量化策略涨跌", "pct_chg_mean": "量化策略涨跌均值"}
column_name_dict = {
"account_value_chg": "量化策略涨跌",
}
for column_name, column_name_text in column_name_dict.items():
for bar in data["bar"].unique():
bar_data = data[data["bar"] == bar].copy() # 一次筛选即可
if bar_data.empty:
continue
bar_data.rename(columns={column_name: column_name_text}, inplace=True)
if column_name == "pct_chg_total":
bar_data.rename(columns={"market_pct_chg": "市场自然涨跌"}, inplace=True)
bar_data.rename(
columns={"market_pct_chg": "市场自然涨跌"}, inplace=True
)
# 可选:按均值排序
bar_data.sort_values(by=column_name_text, ascending=False, inplace=True)
bar_data.reset_index(drop=True, inplace=True)
# 如果column_name_text是"量化策略涨跌",则柱状图,同时绘制量化策略涨跌与市场自然涨跌的柱状图,并绘制在同一个图表中
if column_name == "pct_chg_total":
plt.figure(figsize=(12, 7))
plt.figure(figsize=(12, 7))
# 设置x轴位置为并列柱状图做准备
x = np.arange(len(bar_data))
width = 0.35 # 柱状图宽度
# 设置x轴位置为并列柱状图做准备
x = np.arange(len(bar_data))
width = 0.35 # 柱状图宽度
# 确保symbol列是字符串类型避免matplotlib警告
bar_data["symbol"] = bar_data["symbol"].astype(str)
# 确保symbol列是字符串类型避免matplotlib警告
bar_data["symbol"] = bar_data["symbol"].astype(str)
# 绘制量化策略涨跌柱状图(蓝色渐变色)
bars1 = plt.bar(x - width/2, bar_data[column_name_text], width,
label=column_name_text,
color=plt.cm.Blues(np.linspace(0.6, 0.9, len(bar_data))))
# 绘制量化策略涨跌柱状图(蓝色渐变色)
bars1 = plt.bar(
x - width / 2,
bar_data[column_name_text],
width,
label=column_name_text,
color=plt.cm.Blues(np.linspace(0.6, 0.9, len(bar_data))),
)
# 绘制市场自然涨跌柱状图(绿色渐变色)
bars2 = plt.bar(x + width/2, bar_data["市场自然涨跌"], width,
label="市场自然涨跌",
color=plt.cm.Greens(np.linspace(0.6, 0.9, len(bar_data))))
# 绘制市场自然涨跌柱状图(绿色渐变色)
bars2 = plt.bar(
x + width / 2,
bar_data["市场自然涨跌"],
width,
label="市场自然涨跌",
color=plt.cm.Greens(np.linspace(0.6, 0.9, len(bar_data))),
)
# 设置图表标题和标签
plt.title(f"{bar}趋势{column_name_text}与市场自然涨跌对比(%)", fontsize=14, fontweight='bold')
plt.xlabel("Symbol", fontsize=12)
plt.ylabel("涨跌幅(%)", fontsize=12)
plt.xticks(x, bar_data['symbol'], rotation=45, ha='right')
plt.legend()
plt.grid(True, alpha=0.3)
# 设置图表标题和标签
plt.title(
f"{bar}趋势{column_name_text}与市场自然涨跌对比(%)",
fontsize=14,
fontweight="bold",
)
plt.xlabel("Symbol", fontsize=12)
plt.ylabel("涨跌幅(%)", fontsize=12)
plt.xticks(x, bar_data["symbol"], rotation=45, ha="right")
plt.legend()
plt.grid(True, alpha=0.3)
# 在量化策略涨跌柱状图上方添加数值标签
for i, (bar1, value1) in enumerate(zip(bars1, bar_data[column_name_text])):
plt.text(bar1.get_x() + bar1.get_width()/2, value1 + (0.01 if value1 >= 0 else -0.01),
f'{value1:.3f}%', ha='center', va='bottom' if value1 >= 0 else 'top',
fontsize=9, fontweight='bold', color='darkblue')
# 在市场自然涨跌柱状图上方添加数值标签
for i, (bar2, value2) in enumerate(zip(bars2, bar_data["市场自然涨跌"])):
plt.text(bar2.get_x() + bar2.get_width()/2, value2 + (0.01 if value2 >= 0 else -0.01),
f'{value2:.3f}%', ha='center', va='bottom' if value2 >= 0 else 'top',
fontsize=9, fontweight='bold', color='darkgreen')
else:
plt.figure(figsize=(10, 6))
# 确保symbol列是字符串类型避免matplotlib警告
bar_data["symbol"] = bar_data["symbol"].astype(str)
ax = sns.barplot(
x="symbol", y=column_name_text, data=bar_data, palette="Blues_d"
# 在量化策略涨跌柱状图上方添加数值标签
for i, (bar1, value1) in enumerate(
zip(bars1, bar_data[column_name_text])
):
plt.text(
bar1.get_x() + bar1.get_width() / 2,
value1 + (0.01 if value1 >= 0 else -0.01),
f"{value1:.3f}%",
ha="center",
va="bottom" if value1 >= 0 else "top",
fontsize=9,
fontweight="bold",
color="darkblue",
)
plt.title(f"{bar}趋势{column_name_text}(%)")
plt.xlabel("symbol")
plt.ylabel(column_name_text)
plt.xticks(rotation=45, ha="right")
# 在柱状图上添加数值标签
for i, v in enumerate(bar_data[column_name_text]):
ax.text(
i,
v,
f"{v:.3f}",
ha="center",
va="bottom",
fontsize=10,
fontweight="bold",
)
# 在市场自然涨跌柱状图上方添加数值标签
for i, (bar2, value2) in enumerate(
zip(bars2, bar_data["市场自然涨跌"])
):
plt.text(
bar2.get_x() + bar2.get_width() / 2,
value2 + (0.01 if value2 >= 0 else -0.01),
f"{value2:.3f}%",
ha="center",
va="bottom" if value2 >= 0 else "top",
fontsize=9,
fontweight="bold",
color="darkgreen",
)
plt.tight_layout()
save_path = os.path.join(
self.stats_chart_dir,
f"{bar}_bar_chart_{column_name}_{strategy_name}.png",
)
if self.commission_per_share > 0:
save_path = os.path.join(
self.stats_chart_dir,
f"{bar}_bar_chart_{column_name}_{strategy_name}_with_commission.png",
)
else:
save_path = os.path.join(
self.stats_chart_dir,
f"{bar}_bar_chart_{column_name}_{strategy_name}_without_commission.png",
)
plt.savefig(save_path, dpi=150)
plt.close()
@ -719,10 +894,16 @@ class MaBreakStatistics:
chart_dict[sheet_name] = save_path
return chart_dict
def draw_quant_line_chart(self, data: pd.DataFrame, strategy_name: str = "全均线策略"):
def draw_quant_line_chart(
self,
data: pd.DataFrame,
market_data_pct_chg_df: pd.DataFrame,
strategy_name: str = "全均线策略",
):
"""
根据量化策略买卖明细记录绘制量化策略涨跌与市场自然涨跌的折线图
:param data: 量化策略买卖明细记录
:param market_data_pct_chg_df: 市场自然涨跌记录
:param strategy_name: 策略名称
:return: None
"""
@ -731,16 +912,27 @@ class MaBreakStatistics:
chart_dict = {}
for symbol in symbols:
for bar in bars:
symbol_bar_data = data[(data["symbol"] == symbol) & (data["bar"] == bar)]
symbol_bar_data = data[
(data["symbol"] == symbol) & (data["bar"] == bar)
]
if symbol_bar_data.empty:
continue
# 获取第一行数据作为基准
first_row = symbol_bar_data.iloc[0].copy()
initial_capital = int(
market_data_pct_chg_df.loc[
(market_data_pct_chg_df["symbol"] == symbol)
& (market_data_pct_chg_df["bar"] == bar),
"initial_capital",
].values[0]
)
# 创建初始化行,设置基准值
init_row = first_row.copy()
init_row.loc["pct_chg_total"] = 1.0 # 量化策略初始值为1
init_row.loc["profit_loss"] = 0
init_row.loc["end_account_value"] = (
initial_capital # 量化策略初始值为初始资金
)
init_row.loc["end_timestamp"] = first_row["begin_timestamp"]
init_row.loc["end_date_time"] = first_row["begin_date_time"]
init_row.loc["end_close"] = first_row["begin_close"]
@ -759,29 +951,62 @@ class MaBreakStatistics:
# 将初始化行添加到数据开头
symbol_bar_data = pd.concat([pd.DataFrame([init_row]), symbol_bar_data])
symbol_bar_data.sort_values(by="end_timestamp", ascending=True, inplace=True)
symbol_bar_data.sort_values(
by="end_timestamp", ascending=True, inplace=True
)
symbol_bar_data.reset_index(drop=True, inplace=True)
# 确保时间列是datetime类型避免matplotlib警告
symbol_bar_data["end_date_time"] = pd.to_datetime(symbol_bar_data["end_date_time"])
symbol_bar_data["end_date_time"] = pd.to_datetime(
symbol_bar_data["end_date_time"]
)
# 计算持仓价值归一化数据(相对于初始价格)
symbol_bar_data["end_account_value_to_1"] = (
symbol_bar_data["end_account_value"] / initial_capital
)
symbol_bar_data["end_account_value_to_1"] = symbol_bar_data[
"end_account_value_to_1"
].round(4)
# 计算市场价位归一化数据(相对于初始价格)
symbol_bar_data["end_close_to_1"] = symbol_bar_data["end_close"] / init_row["end_close"]
symbol_bar_data["end_close_to_1"] = symbol_bar_data["end_close_to_1"].round(4)
symbol_bar_data["end_close_to_1"] = (
symbol_bar_data["end_close"] / init_row["end_close"]
)
symbol_bar_data["end_close_to_1"] = symbol_bar_data[
"end_close_to_1"
].round(4)
# 绘制折线图
plt.figure(figsize=(12, 7))
# 绘制量化策略涨跌线(蓝色)
plt.plot(symbol_bar_data["end_date_time"], symbol_bar_data["pct_chg_total"],
label="量化策略涨跌", color='blue', linewidth=2, marker='o', markersize=4)
plt.plot(
symbol_bar_data["end_date_time"],
symbol_bar_data["end_account_value_to_1"],
label="量化策略涨跌",
color="blue",
linewidth=2,
marker="o",
markersize=4,
)
# 绘制市场自然涨跌线(绿色)
plt.plot(symbol_bar_data["end_date_time"], symbol_bar_data["end_close_to_1"],
label="市场自然涨跌", color='green', linewidth=2, marker='s', markersize=4)
plt.plot(
symbol_bar_data["end_date_time"],
symbol_bar_data["end_close_to_1"],
label="市场自然涨跌",
color="green",
linewidth=2,
marker="s",
markersize=4,
)
plt.title(f"{symbol} {bar} 量化与市场折线图_{strategy_name}",
fontsize=14, fontweight='bold')
plt.title(
f"{symbol} {bar} 量化与市场折线图_{strategy_name}",
fontsize=14,
fontweight="bold",
)
plt.xlabel("时间", fontsize=12)
plt.ylabel("涨跌变化", fontsize=12)
plt.legend(fontsize=11)
@ -805,22 +1030,37 @@ class MaBreakStatistics:
label_indices.append(len(symbol_bar_data) - 1)
# 设置x轴标签
plt.xticks(symbol_bar_data["end_date_time"].iloc[label_indices],
symbol_bar_data["end_date_time"].iloc[label_indices].dt.strftime('%Y%m%d %H:%M'),
rotation=45, ha='right')
plt.xticks(
symbol_bar_data["end_date_time"].iloc[label_indices],
symbol_bar_data["end_date_time"]
.iloc[label_indices]
.dt.strftime("%Y%m%d %H:%M"),
rotation=45,
ha="right",
)
else:
# 如果数据点较少,全部显示
plt.xticks(symbol_bar_data["end_date_time"],
symbol_bar_data["end_date_time"].dt.strftime('%Y%m%d %H:%M'),
rotation=45, ha='right')
plt.xticks(
symbol_bar_data["end_date_time"],
symbol_bar_data["end_date_time"].dt.strftime("%Y%m%d %H:%M"),
rotation=45,
ha="right",
)
plt.tight_layout()
save_path = os.path.join(
self.stats_chart_dir,
f"{symbol}_{bar}_line_chart_{strategy_name}.png",
)
plt.savefig(save_path, dpi=150, bbox_inches='tight')
if self.commission_per_share > 0:
save_path = os.path.join(
self.stats_chart_dir,
f"{symbol}_{bar}_line_chart_{strategy_name}_with_commission.png",
)
else:
save_path = os.path.join(
self.stats_chart_dir,
f"{symbol}_{bar}_line_chart_{strategy_name}_without_commission.png",
)
plt.savefig(save_path, dpi=150, bbox_inches="tight")
plt.close()
sheet_name = f"{symbol}_{bar}_折线图_{strategy_name}"

View File

@ -4,7 +4,7 @@ from huge_volume_main import HugeVolumeMain
from core.biz.market_monitor import create_metrics_report
from core.db.db_market_monitor import DBMarketMonitor
from core.wechat import Wechat
from config import OKX_MONITOR_CONFIG, MYSQL_CONFIG, WECHAT_CONFIG
from config import OKX_MONITOR_CONFIG, OKX_REALTIME_MONITOR_CONFIG, MYSQL_CONFIG, WECHAT_CONFIG
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
import core.logger as logging
@ -52,6 +52,14 @@ class MarketMonitorMain:
else:
self.is_binance = True
self.symbols = OKX_REALTIME_MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["XCH-USDT"]
)
self.bars = OKX_REALTIME_MONITOR_CONFIG.get("volume_monitor", {}).get(
"bars", ["1m", "5m", "15m", "30m", "1H"]
)
def get_latest_record(self):
"""
获取最新记录
@ -252,13 +260,13 @@ class MarketMonitorMain:
获取下一个长周期实时数据
"""
if next:
# 获得bar在self.market_data_main.bars中的索引
bar_index = self.market_data_main.bars.index(bar)
if bar_index == len(self.market_data_main.bars) - 1:
# 获得bar在self.bars中的索引
bar_index = self.bars.index(bar)
if bar_index == len(self.bars) - 1:
logger.error(f"已经是最后一个bar: {bar}")
return None
# 获得下一个bar
bar = self.market_data_main.bars[bar_index + 1]
bar = self.bars[bar_index + 1]
# 获得下一个bar的实时数据
data = self.market_data_main.market_data.get_realtime_kline_data(
symbol=symbol, bar=bar, end_time=end_time, limit=100
@ -278,11 +286,11 @@ class MarketMonitorMain:
only_output_over_mean_volume: bool = True,
only_output_rise: bool = True,
):
for symbol in self.market_data_main.symbols:
for symbol in self.symbols:
if self.is_binance and symbol == "XCH-USDT":
logger.info(f"币安交易所无: {symbol}")
continue
for bar in self.market_data_main.bars:
for bar in self.bars:
logger.info(
f"开始监控: {symbol} {bar} 窗口大小: {self.window_size} 行情数据"
)
@ -305,7 +313,7 @@ if __name__ == "__main__":
market_monitor_main = MarketMonitorMain()
market_monitor_main.monitor_realtime_market(
symbol="BTC-USDT",
bar="5m",
bar="1m",
only_output_huge_volume=False,
only_output_rise=False,
)

View File

@ -24,9 +24,19 @@ from config import (
logger = logging.logger
class TradeMaStrategyMain:
def __init__(self, is_us_stock: bool = False, is_binance: bool = False):
self.ma_break_statistics = MaBreakStatistics(is_us_stock=is_us_stock, is_binance=is_binance)
def __init__(
self,
is_us_stock: bool = False,
is_binance: bool = False,
commission_per_share: float = 0,
):
self.ma_break_statistics = MaBreakStatistics(
is_us_stock=is_us_stock,
is_binance=is_binance,
commission_per_share=commission_per_share,
)
def batch_ma_break_statistics(self):
"""
@ -34,32 +44,28 @@ class TradeMaStrategyMain:
"""
logger.info("开始批量计算MA突破统计")
strategy_dict = self.ma_break_statistics.main_strategy
pct_chg_df_list = []
account_value_chg_df_list = []
for strategy_name, strategy_info in strategy_dict.items():
if "macd" in strategy_name:
# 只计算macd策略
pct_chg_df = self.ma_break_statistics.batch_statistics(strategy_name=strategy_name)
pct_chg_df_list.append(pct_chg_df)
pct_chg_df = pd.concat(pct_chg_df_list)
def statistics_pct_chg(self, pct_chg_df: pd.DataFrame):
"""
1. 将各个symbol, 各个bar, 各个策略的pct_chg_total构建为新的数据结构,
symbol, bar, stratege_name_1, stratege_name_2, stratege_name_3, ...
stratege_name_1的值, 为该策略的pct_chg_total的值
2. 构建新的数据结构: symbol, bar, max_pct_chg_total_strategy_name, min_pct_chg_total_strategy_name
: BCT-USDT, 15m, 均线macd结合策略2, 全均线策略
3. 构建新的数据结构, bar, max_pct_chg_total_strategy_name, min_pct_chg_total_strategy_name
: 15m, 均线macd结合策略2, 全均线策略
4. 构建新的数据结构, symbol, max_pct_chg_total_strategy_name, min_pct_chg_total_strategy_name
: BCT-USDT, 均线macd结合策略2, 全均线策略
"""
logger.info("开始统计pct_chg")
account_value_chg_df = self.ma_break_statistics.batch_statistics(
strategy_name=strategy_name
)
account_value_chg_df_list.append(account_value_chg_df)
total_account_value_chg_df = pd.concat(account_value_chg_df_list)
return total_account_value_chg_df
def statistics_account_value_chg(self, account_value_chg_df: pd.DataFrame):
logger.info("开始统计account_value_chg")
if __name__ == "__main__":
trade_ma_strategy_main = TradeMaStrategyMain(is_us_stock=False, is_binance=True)
trade_ma_strategy_main.batch_ma_break_statistics()
commission_per_share_list = [0, 0.0008]
for commission_per_share in commission_per_share_list:
trade_ma_strategy_main = TradeMaStrategyMain(
is_us_stock=False,
is_binance=True,
commission_per_share=commission_per_share,
)
trade_ma_strategy_main.batch_ma_break_statistics()