crypto_quant/core/biz/metrics_calculation.py

1119 lines
45 KiB
Python
Raw Normal View History

"""
均线多空判定模块
本模块提供了多种科学的均线多空判定方法解决了传统方法过于严格的问题
传统方法的问题
1. 要求所有均线都严格满足条件MA5MA10MA20MA30都>0<0
2. 缺乏权重考虑短期和长期均线影响权重相同
3. 没有考虑趋势强度只是简单的正负判断
4. 缺乏历史对比使用固定阈值
改进方法
1. 加权投票机制短期均线权重更高MA5:40%, MA10:30%, MA20:20%, MA30:10%
2. 趋势强度评估考虑偏离幅度而非简单正负
3. 历史分位数对比动态阈值调整
4. 趋势一致性考虑均线排列顺序
5. 多种判定策略可根据不同市场环境选择最适合的方法
使用示例
```python
# 基本使用(改进后的方法)
metrics = MetricsCalculation()
data = metrics.set_ma_long_short_divergence(data)
# 高级使用(多种策略)
# 1. 加权投票机制(推荐)
data = metrics.set_ma_long_short_advanced(data, method="weighted_voting")
# 2. 趋势强度评估
data = metrics.set_ma_long_short_advanced(data, method="trend_strength")
# 3. 均线排列分析
data = metrics.set_ma_long_short_advanced(data, method="ma_alignment")
# 4. 统计分布方法
data = metrics.set_ma_long_short_advanced(data, method="statistical")
# 5. 混合方法
data = metrics.set_ma_long_short_advanced(data, method="hybrid")
```
判定结果说明
- ""多头趋势建议做多
- ""空头趋势建议做空
- "震荡"震荡市场建议观望或区间交易
"""
import logging
import pandas as pd
import numpy as np
import talib as tb
from talib import MA_Type
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class MetricsCalculation:
def __init__(self):
pass
def pre_close(self, df: pd.DataFrame):
# 计算前一日收盘价、涨跌幅、涨跌幅百分比
df["pre_close"] = df["close"].shift(1)
df["close_change"] = df["close"] - df["pre_close"]
df["pct_chg"] = df["close_change"] / df["pre_close"] * 100
# 设置k_up_down亦即阳线或阴线
df["k_up_down"] = ""
df.loc[df["close"] >= df["open"], "k_up_down"] = "阳线"
df.loc[df["close"] < df["open"], "k_up_down"] = "阴线"
return df
def macd(self, df: pd.DataFrame):
logging.info("计算MACD指标")
data = np.array(df.close)
ndata = len(data)
m, n, T = 12, 26, 9
EMA1 = np.copy(data)
EMA2 = np.copy(data)
f1 = (m - 1) / (m + 1)
f2 = (n - 1) / (n + 1)
f3 = (T - 1) / (T + 1)
for i in range(1, ndata):
EMA1[i] = EMA1[i - 1] * f1 + EMA1[i] * (1 - f1)
EMA2[i] = EMA2[i - 1] * f2 + EMA2[i] * (1 - f2)
df["ma1"] = EMA1
df["ma2"] = EMA2
DIF = EMA1 - EMA2
df["dif"] = DIF
DEA = np.copy(DIF)
for i in range(1, ndata):
DEA[i] = DEA[i - 1] * f3 + DEA[i] * (1 - f3)
df["dea"] = DEA
df["macd"] = 2 * (DIF - DEA)
# DIFF, macdsignal, macdhist = tb.MACD(data, fastperiod=12, slowperiod=26, signalperiod=9)
df["macd_signal"] = ""
macd_position = df["dif"] > df["dea"]
df.loc[
macd_position[
(macd_position == True) & (macd_position.shift() == False)
].index,
"macd_signal",
] = "金叉"
df.loc[
macd_position[
(macd_position == False) & (macd_position.shift() == True)
].index,
"macd_signal",
] = "死叉"
return df
def kdj(self, df: pd.DataFrame):
logging.info("计算KDJ指标")
low_list = df["low"].rolling(window=9).min()
low_list.fillna(value=df["low"].expanding().min(), inplace=True)
high_list = df["high"].rolling(window=9).max()
high_list.fillna(value=df["high"].expanding().max(), inplace=True)
rsv = (df["close"] - low_list) / (high_list - low_list) * 100
df["kdj_k"] = rsv.ewm(com=2).mean()
df["kdj_d"] = df["kdj_k"].ewm(com=2).mean()
df["kdj_j"] = 3 * df["kdj_k"] - 2 * df["kdj_d"]
df["kdj_signal"] = ""
kdj_position = df["kdj_k"] > df["kdj_d"]
df.loc[
kdj_position[
(kdj_position == True) & (kdj_position.shift() == False)
].index,
"kdj_signal",
] = "金叉"
df.loc[
kdj_position[
(kdj_position == False) & (kdj_position.shift() == True)
].index,
"kdj_signal",
] = "死叉"
return df
def set_kdj_pattern(self, df: pd.DataFrame):
"""
设置每一根K线数据对应的KDJ形态超买超卖情况
KDJ_K > 80, KDJ_D > 80, KDJ_J > 90: 超超买
KDJ_K > 70, KDJ_D > 70, KDJ_J > 80: 超买
KDJ_K < 20, KDJ_D < 20, KDJ_J < 10: 超超卖
KDJ_K < 30, KDJ_D < 30, KDJ_J < 20: 超卖
否则为"徘徊"
"""
2025-08-04 13:43:18 +00:00
logging.info("设置KDJ形态")
# 初始化kdj_pattern列
df["kdj_pattern"] = "徘徊"
# 超超买条件KDJ_K > 80, KDJ_D > 80, KDJ_J > 90
kdj_super_buy = (df["kdj_k"] > 80) & (df["kdj_d"] > 80) & (df["kdj_j"] > 90)
df.loc[kdj_super_buy, "kdj_pattern"] = "超超买"
# 超买条件KDJ_K > 70, KDJ_D > 70, KDJ_J > 80
kdj_buy = (df["kdj_k"] > 70) & (df["kdj_d"] > 70) & (df["kdj_j"] > 80)
df.loc[kdj_buy, "kdj_pattern"] = "超买"
# 超超卖条件KDJ_K < 20, KDJ_D < 20, KDJ_J < 10
kdj_super_sell = (df["kdj_k"] < 20) & (df["kdj_d"] < 20) & (df["kdj_j"] < 10)
df.loc[kdj_super_sell, "kdj_pattern"] = "超超卖"
# 超卖条件KDJ_K < 30, KDJ_D < 30, KDJ_J < 20
kdj_sell = (df["kdj_k"] < 30) & (df["kdj_d"] < 30) & (df["kdj_j"] < 20)
df.loc[kdj_sell, "kdj_pattern"] = "超卖"
return df
def calculate_ma_price_percent(self, data: pd.DataFrame):
data["ma5_close_diff"] = (data["close"] - data["ma5"]) / (data["close"]) * 100
data["ma10_close_diff"] = (data["close"] - data["ma10"]) / (data["close"]) * 100
data["ma20_close_diff"] = (data["close"] - data["ma20"]) / (data["close"]) * 100
data["ma30_close_diff"] = (data["close"] - data["ma30"]) / (data["close"]) * 100
data["ma_close_avg"] = (
data["ma5_close_diff"]
+ data["ma10_close_diff"]
+ data["ma20_close_diff"]
+ data["ma30_close_diff"]
) / 4
return data
def set_ma_long_short_divergence(self, data: pd.DataFrame):
"""
根据ma5_close_diff, ma10_close_diff, ma20_close_diff, ma30_close_diff, ma_close_avg
设置均线多空列: ma_long_short 震荡
设置均线发散列: ma_divergence 超发散发散适中粘合未知
改进的均线多空判定逻辑
1. 加权投票机制短期均线权重更高
2. 趋势强度评估考虑偏离幅度而非简单正负
3. 历史分位数对比动态阈值调整
4. 趋势一致性考虑均线排列顺序
均线发散度使用相对统计方法分类
- 超发散标准差Z-score > 1.5 均值Z-score绝对值 > 1.2
- 发散标准差Z-score > 0.8 均值Z-score绝对值 > 0.8
- 适中标准差Z-score在0.3-0.8之间且均值Z-score绝对值 < 0.5
- 粘合标准差Z-score < 0.3均线高度粘合
使用20个周期的滚动窗口计算相对统计特征避免绝对阈值过于严格的问题
"""
2025-08-04 13:43:18 +00:00
logging.info("设置均线多空和发散")
# 通过趋势强度计算多空
# 震荡:不满足多空条件的其他情况
# 震荡条件已经在初始化时设置,无需额外处理
data["ma_long_short"] = "震荡"
data = self._trend_strength_method(data)
# 计算各均线偏离度的标准差和均值
data["ma_divergence"] = "未知"
ma_diffs = data[
["ma5_close_diff", "ma10_close_diff", "ma20_close_diff", "ma30_close_diff"]
]
ma_std = ma_diffs.std(axis=1) # 标准差
ma_mean = ma_diffs.mean(axis=1) # 均值
abs_ma_mean = abs(ma_mean) # 均值的绝对值
# 计算标准差和均值绝对值的百分位数(基于历史数据分布)
# 这里使用 25%、50%、75% 分位数作为阈值,可根据实际需求调整
std_25, std_50, std_75 = ma_std.quantile([0.25, 0.50, 0.75])
mean_25, mean_50, mean_75 = abs_ma_mean.quantile([0.25, 0.50, 0.75])
# 超发散:标准差和均值绝对值均处于高百分位(>75%
super_divergence = (ma_std > std_75) & (abs_ma_mean > mean_75)
data.loc[super_divergence, "ma_divergence"] = "超发散"
# 发散标准差或均值绝对值处于中等偏高百分位50%-75%
divergence = ((ma_std > std_50) & (ma_std <= std_75)) | (
(abs_ma_mean > mean_50) & (abs_ma_mean <= mean_75)
)
data.loc[divergence & (data["ma_divergence"] == "未知"), "ma_divergence"] = (
"发散"
)
# 适中标准差和均值绝对值处于中等偏低百分位25%-50%
moderate = (ma_std > std_25) & (ma_std <= std_50) & (abs_ma_mean <= mean_50)
data.loc[moderate & (data["ma_divergence"] == "未知"), "ma_divergence"] = "适中"
# 粘合:标准差处于低百分位(<25%
convergence = ma_std <= std_25
data.loc[convergence & (data["ma_divergence"] == "未知"), "ma_divergence"] = (
"粘合"
)
return data
def update_macd_divergence_column(self, df: pd.DataFrame):
"""
更新整个DataFrame的macd_divergence列
计算每个时间点的MACD背离情况顶背离或底背离
:param df: 包含timestamp, close, dif, macd, kdj_j列的DataFrame
:return: 更新了macd_divergence列的DataFrame
"""
if df is None or df.empty:
return df
# 确保必要的列存在
required_columns = ["timestamp", "close", "dif", "macd", "kdj_j"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
return df
# 按时间戳排序(升序)
df = df.sort_values("timestamp").reset_index(drop=True)
# 初始化macd_divergence列
df["macd_divergence"] = "未知"
# 遍历DataFrame计算每个时间点的背离情况
for i in range(1, len(df)):
current_row = df.iloc[i]
previous_row = df.iloc[i - 1]
current_close = current_row["close"]
current_dif = current_row["dif"]
current_macd = current_row["macd"]
current_kdj_j = current_row["kdj_j"]
previous_close = previous_row["close"]
previous_dif = previous_row["dif"]
previous_macd = previous_row["macd"]
previous_kdj_j = previous_row["kdj_j"]
# 检查是否为顶背离
# 条件价格创新高但MACD指标没有创新高且KDJ超买
if (
current_close > previous_close
and current_kdj_j > 70
and current_dif <= previous_dif
and current_macd <= previous_macd
):
df.at[i, "macd_divergence"] = "顶背离"
# 检查是否为底背离
# 条件价格创新低但MACD指标没有创新低且KDJ超卖
elif (
current_close < previous_close
and current_kdj_j < 20
and current_dif >= previous_dif
and current_macd >= previous_macd
):
df.at[i, "macd_divergence"] = "底背离"
# 检查更严格的背离条件(与历史高点/低点比较)
else:
# 获取当前时间点之前的数据
historical_data = df.iloc[: i + 1]
# 检查顶背离价格接近历史高点但MACD指标明显低于历史高点
if current_kdj_j > 70:
price_high = historical_data["close"].max()
dif_high = historical_data["dif"].max()
macd_high = historical_data["macd"].max()
# 价格接近历史高点差距小于5%但MACD指标明显低于历史高点
if (
current_close >= price_high * 0.95
and current_dif <= dif_high * 0.8
and current_macd <= macd_high * 0.8
):
df.at[i, "macd_divergence"] = "顶背离"
# 检查底背离价格接近历史低点但MACD指标明显高于历史低点
elif current_kdj_j < 20:
price_low = historical_data["close"].min()
dif_low = historical_data["dif"].min()
macd_low = historical_data["macd"].min()
# 价格接近历史低点差距小于5%但MACD指标明显高于历史低点
if (
current_close <= price_low * 1.05
and current_dif >= dif_low * 1.2
and current_macd >= macd_low * 1.2
):
df.at[i, "macd_divergence"] = "底背离"
return df
def update_macd_divergence_column_simple(
self, df: pd.DataFrame, window_size: int = 20
):
"""
简化版本的MACD背离检测函数
使用滑动窗口来检测背离提高计算效率
:param df: 包含timestamp, close, dif, macd, kdj_j列的DataFrame
:param window_size: 滑动窗口大小用于检测背离
:return: 更新了macd_divergence列的DataFrame
"""
if df is None or df.empty:
return df
# 确保必要的列存在
required_columns = ["timestamp", "close", "dif", "macd", "kdj_j"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
return df
# 按时间戳排序(升序)
df = df.sort_values("timestamp").reset_index(drop=True)
# 初始化macd_divergence列
df["macd_divergence"] = "未知"
# 使用滑动窗口检测背离
for i in range(window_size, len(df)):
window_data = df.iloc[i - window_size : i + 1]
current_row = df.iloc[i]
current_close = current_row["close"]
current_dif = current_row["dif"]
current_macd = current_row["macd"]
current_kdj_j = current_row["kdj_j"]
# 计算窗口内的极值
window_price_high = window_data["close"].max()
window_price_low = window_data["close"].min()
window_dif_high = window_data["dif"].max()
window_dif_low = window_data["dif"].min()
window_macd_high = window_data["macd"].max()
window_macd_low = window_data["macd"].min()
# 检测顶背离
if (
current_kdj_j > 70
and current_close >= window_price_high * 0.98 # 价格接近窗口内最高点
and current_dif <= window_dif_high * 0.85 # DIF明显低于窗口内最高点
and current_macd <= window_macd_high * 0.85
): # MACD明显低于窗口内最高点
df.at[i, "macd_divergence"] = "顶背离"
# 检测底背离
elif (
current_kdj_j < 20
and current_close <= window_price_low * 1.02 # 价格接近窗口内最低点
and current_dif >= window_dif_low * 1.15 # DIF明显高于窗口内最低点
and current_macd >= window_macd_low * 1.15
): # MACD明显高于窗口内最低点
df.at[i, "macd_divergence"] = "底背离"
return df
def ma5102030(self, df: pd.DataFrame):
"""
计算均线指标并检测交叉信号
优化版本同时检测多个均线交叉更好地判断趋势转变
支持所有均线交叉类型5上穿10/20/3010上穿20/3020上穿30
以及对应的下穿信号30下穿20/10/5 20下穿10/510下穿5
"""
logging.info("计算均线指标")
df["ma5"] = df["close"].rolling(window=5).mean().dropna()
df["ma10"] = df["close"].rolling(window=10).mean().dropna()
df["ma20"] = df["close"].rolling(window=20).mean().dropna()
df["ma30"] = df["close"].rolling(window=30).mean().dropna()
df["ma_cross"] = ""
# 定义均线交叉检测函数
def detect_cross(short_ma, long_ma, short_name, long_name):
"""检测均线交叉"""
position = df[short_ma] > df[long_ma]
cross_up = (position == True) & (position.shift() == False)
cross_down = (position == False) & (position.shift() == True)
return cross_up, cross_down
# 检测所有均线交叉
crosses = {}
# MA5与其他均线的交叉
ma5_ma10_up, ma5_ma10_down = detect_cross("ma5", "ma10", "5", "10")
ma5_ma20_up, ma5_ma20_down = detect_cross("ma5", "ma20", "5", "20")
ma5_ma30_up, ma5_ma30_down = detect_cross("ma5", "ma30", "5", "30")
# MA10与其他均线的交叉
ma10_ma20_up, ma10_ma20_down = detect_cross("ma10", "ma20", "10", "20")
ma10_ma30_up, ma10_ma30_down = detect_cross("ma10", "ma30", "10", "30")
# MA20与MA30的交叉
ma20_ma30_up, ma20_ma30_down = detect_cross("ma20", "ma30", "20", "30")
# 存储上穿信号
crosses["5上穿10"] = ma5_ma10_up
crosses["5上穿20"] = ma5_ma20_up
crosses["5上穿30"] = ma5_ma30_up
crosses["10上穿20"] = ma10_ma20_up
crosses["10上穿30"] = ma10_ma30_up
crosses["20上穿30"] = ma20_ma30_up
# 存储下穿信号
crosses["10下穿5"] = ma5_ma10_down
crosses["20下穿10"] = ma10_ma20_down
crosses["20下穿5"] = ma5_ma20_down
crosses["30下穿20"] = ma20_ma30_down
crosses["30下穿10"] = ma10_ma30_down
crosses["30下穿5"] = ma5_ma30_down
# 分析每个时间点的交叉组合
for idx in df.index:
current_crosses = []
# 检查当前时间点的所有交叉信号
for cross_name, cross_signal in crosses.items():
if cross_signal.loc[idx]:
current_crosses.append(cross_name)
# 根据交叉类型组合信号
if len(current_crosses) > 0:
# 分离上穿和下穿信号
up_crosses = [c for c in current_crosses if "上穿" in c]
down_crosses = [c for c in current_crosses if "下穿" in c]
# 组合信号
if len(up_crosses) > 1:
# 多个上穿信号
df.loc[idx, "ma_cross"] = "".join(sorted(up_crosses))
elif len(down_crosses) > 1:
# 多个下穿信号
df.loc[idx, "ma_cross"] = "".join(sorted(down_crosses))
else:
# 单个交叉信号
df.loc[idx, "ma_cross"] = current_crosses[0]
return df
def rsi(self, df: pd.DataFrame):
logging.info("计算RSI指标")
df["rsi_14"] = tb.RSI(df["close"].values, timeperiod=14)
df["rsi_signal"] = ""
rsi_high = df["rsi_14"] > 70
rsi_low = df["rsi_14"] < 30
df.loc[
rsi_high[(rsi_high == True) & (rsi_high.shift() == False)].index,
"rsi_signal",
] = "超买"
df.loc[
rsi_low[(rsi_low == True) & (rsi_low.shift() == False)].index, "rsi_signal"
] = "超卖"
return df
def boll(self, df: pd.DataFrame):
logging.info("计算BOLL指标")
df["boll_upper"], df["boll_middle"], df["boll_lower"] = tb.BBANDS(
df["close"].values, timeperiod=20, matype=MA_Type.SMA
)
return df
def set_boll_pattern(self, df: pd.DataFrame):
"""
设置BOLL形态
根据价格与布林带的位置关系判断超买超卖状态
超超买价格接近或突破上轨且KDJ超买
超买价格接近上轨且KDJ超买
超超卖价格接近或突破下轨且KDJ超卖
超卖价格接近下轨且KDJ超卖
震荡其他情况
"""
2025-08-04 13:43:18 +00:00
logging.info("设置BOLL形态")
# 初始化boll_pattern列
df["boll_pattern"] = "震荡"
# 检查必要的列是否存在
required_columns = ["close", "boll_upper", "boll_lower", "kdj_j"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
return df
# 计算价格与布林带的距离百分比
df["upper_distance"] = abs(df["close"] - df["boll_upper"]) / df["close"] * 100
df["lower_distance"] = abs(df["close"] - df["boll_lower"]) / df["close"] * 100
# 超超买价格突破上轨且KDJ超买
super_buy_condition = (df["close"] >= df["boll_upper"]) & (df["kdj_j"] > 80)
df.loc[super_buy_condition, "boll_pattern"] = "超超买"
# 超买价格接近上轨距离小于2%且KDJ超买
buy_condition = (
(df["upper_distance"] <= 2)
& (df["kdj_j"] > 80)
& (df["boll_pattern"] == "震荡")
)
df.loc[buy_condition, "boll_pattern"] = "超买"
# 超超卖价格突破下轨且KDJ超卖
super_sell_condition = (df["close"] <= df["boll_lower"]) & (df["kdj_j"] < 20)
df.loc[super_sell_condition, "boll_pattern"] = "超超卖"
# 超卖价格接近下轨距离小于2%且KDJ超卖
sell_condition = (
(df["lower_distance"] <= 2)
& (df["kdj_j"] < 20)
& (df["boll_pattern"] == "震荡")
)
df.loc[sell_condition, "boll_pattern"] = "超卖"
# 设置boll_signal列保持与原有逻辑兼容
df["boll_signal"] = ""
# 突破下轨信号
close_gt_low = df["close"] > df["boll_lower"]
pre_close_less_low = df["pre_close"] < df["boll_lower"].shift()
low_break = close_gt_low & pre_close_less_low
df.loc[
low_break[(low_break == True) & (low_break.shift() == False)].index,
"boll_signal",
] = "突破下轨"
# 击穿上轨信号
close_less_high = df["close"] < df["boll_upper"]
pre_close_gt_high = df["pre_close"] > df["boll_upper"].shift()
high_down = close_less_high & pre_close_gt_high
df.loc[
high_down[(high_down == True) & (high_down.shift() == False)].index,
"boll_signal",
] = "击穿上轨"
# 删除临时列
df.drop(columns=["upper_distance", "lower_distance"], inplace=True)
return df
def set_k_length(self, df: pd.DataFrame):
"""
设置K线长度:k_length
根据close, open, high, low计算K线长度
使用统计方法标准差均值来分类K线长度
K线长度分类
- K线实体和影线都较短
- K线长度适中
- K线实体或影线较长
- 超长K线实体和影线都很长
"""
2025-08-04 13:43:18 +00:00
logging.info("设置K线长度")
# 检查必要的列是否存在
required_columns = ["close", "open", "high", "low"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
return df
# 计算K线的基本特征
df["k_body"] = abs(df["close"] - df["open"]) # K线实体长度
df["k_upper_shadow"] = df["high"] - df[["open", "close"]].max(
axis=1
) # 上影线长度
df["k_lower_shadow"] = (
df[["open", "close"]].min(axis=1) - df["low"]
) # 下影线长度
df["k_total_range"] = df["high"] - df["low"] # K线总长度
# 计算K线实体占总长度的比例
df["k_body_ratio"] = df["k_body"] / df["k_total_range"]
# 使用滚动窗口计算统计特征使用20个周期的滚动窗口
window_size = min(20, len(df))
# 计算K线总长度的统计特征
df["k_range_mean"] = (
df["k_total_range"].rolling(window=window_size, min_periods=1).mean()
)
df["k_range_std"] = (
df["k_total_range"].rolling(window=window_size, min_periods=1).std()
)
# 计算K线实体的统计特征
df["k_body_mean"] = (
df["k_body"].rolling(window=window_size, min_periods=1).mean()
)
df["k_body_std"] = df["k_body"].rolling(window=window_size, min_periods=1).std()
# 初始化k_length列
df["k_length"] = ""
# 计算Z-score标准化分数
df["k_range_zscore"] = (df["k_total_range"] - df["k_range_mean"]) / df[
"k_range_std"
]
df["k_body_zscore"] = (df["k_body"] - df["k_body_mean"]) / df["k_body_std"]
# 处理无穷大和NaN值
df["k_range_zscore"] = df["k_range_zscore"].replace([np.inf, -np.inf], 0)
df["k_body_zscore"] = df["k_body_zscore"].replace([np.inf, -np.inf], 0)
df["k_range_zscore"] = df["k_range_zscore"].fillna(0)
df["k_body_zscore"] = df["k_body_zscore"].fillna(0)
# 分类逻辑
# 超长K线总长度Z-score > 1.5 且 实体Z-score > 1.0
super_long_condition = (df["k_range_zscore"] > 1.5) & (
df["k_body_zscore"] > 1.0
)
df.loc[super_long_condition, "k_length"] = "超长"
# 长K线总长度Z-score > 0.8 或 实体Z-score > 0.8
long_condition = (
(df["k_range_zscore"] > 0.8) | (df["k_body_zscore"] > 0.8)
) & (df["k_length"] == "")
df.loc[long_condition, "k_length"] = ""
# 短K线总长度Z-score < -0.8 且 实体Z-score < -0.5
short_condition = (df["k_range_zscore"] < -0.8) & (df["k_body_zscore"] < -0.5)
df.loc[short_condition, "k_length"] = ""
# 清理临时列
temp_columns = [
"k_body",
"k_upper_shadow",
"k_lower_shadow",
"k_total_range",
"k_body_ratio",
"k_range_mean",
"k_range_std",
"k_body_mean",
"k_body_std",
"k_range_zscore",
"k_body_zscore",
]
df.drop(columns=temp_columns, inplace=True)
return df
def set_k_shape(self, df: pd.DataFrame):
"""
设置K线形状:k_shape
根据close, open, high, low计算K线形状
使用统计方法标准差均值来分类K线形状
K线形态分类
- 一字open, high, low, close几乎完全一样价格波动极小
- 长吊锤线实体占比30%上影线<25%实体占比<10%
- 吊锤线实体占比30%上影线<25%实体占比10%
- 长倒T线实体占比30%下影线<25%实体占比<10%
- 倒T线实体占比30%下影线<25%实体占比10%
- 长十字星实体占比30%上下影线都25%实体占比<10%
- 十字星实体占比30%上下影线都25%实体占比10%
- 小实体实体占比30%-55%
- 大实体实体占比55%-70%
- 超大实体实体占比70%-90%
- 光头光脚实体占比>90%非一字情况
"""
2025-08-04 13:43:18 +00:00
logging.info("设置K线形状")
# 检查必要的列是否存在
required_columns = ["close", "open", "high", "low"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
return df
# 计算K线的基本特征
df["high_low_diff"] = df["high"] - df["low"] # 最高价与最低价差值
df["open_close_diff"] = abs(
df["close"] - df["open"]
) # 开盘价与收盘价差值绝对值
df["high_close_diff"] = df["high"] - df[["open", "close"]].max(
axis=1
) # 上影线长度
df["low_close_diff"] = (
df[["open", "close"]].min(axis=1) - df["low"]
) # 下影线长度
# 计算实体占比
df["open_close_fill"] = df["open_close_diff"] / df["high_low_diff"].replace(0, np.nan)
df["open_close_fill"] = df["open_close_fill"].fillna(1.0) # 处理除零情况
# 计算影线占比
df["upper_shadow_ratio"] = df["high_close_diff"] / df["high_low_diff"].replace(0, np.nan)
df["lower_shadow_ratio"] = df["low_close_diff"] / df["high_low_diff"].replace(0, np.nan)
df["upper_shadow_ratio"] = df["upper_shadow_ratio"].fillna(0) # 无波动时影线占比为 0
df["lower_shadow_ratio"] = df["lower_shadow_ratio"].fillna(0)
# 初始化k_shape列
df["k_shape"] = "未知"
# 首先识别"一字"形态open, high, low, close几乎完全一样
# 计算价格波动范围相对于价格的百分比
df["price_range_ratio"] = df["high_low_diff"] / df["close"] * 100
# 使用滚动窗口计算价格波动范围的平均值,用于动态判断"一字"阈值
window_size = min(20, len(df))
df["avg_price_range"] = (
df["price_range_ratio"].rolling(window=window_size, min_periods=1).mean()
)
df["std_price_range"] = (
df["price_range_ratio"].rolling(window=window_size, min_periods=1).std()
)
# 计算价格波动范围的Z-score
df["price_range_zscore"] = (
df["price_range_ratio"] - df["avg_price_range"]
) / df["std_price_range"]
df["price_range_zscore"] = (
df["price_range_zscore"].replace([np.inf, -np.inf], 0).fillna(0)
)
# 计算滚动窗口内 price_range_ratio 和 price_range_zscore 的分位数
df["price_range_ratio_p75"] = df["price_range_ratio"].rolling(window=window_size, min_periods=1).quantile(0.75)
df["price_range_zscore_p75"] = df["price_range_zscore"].rolling(window=window_size, min_periods=1).quantile(0.75)
# 识别“一字”形态波动极小Z 分数 < -1.0 或 price_range_ratio < 0.05%)且无影线
one_line_condition = (
((df["price_range_zscore"] < -1.0) | (df["price_range_ratio"] < 0.05)) &
(df["upper_shadow_ratio"] <= 0.01) & # 上影线极小或无
(df["lower_shadow_ratio"] <= 0.01) & # 下影线极小或无
(df["open_close_diff"] / df["close"] < 0.0005) # 开收盘价差小于0.05%
)
df.loc[one_line_condition, "k_shape"] = "一字"
# 使用滚动窗口计算统计特征使用20个周期的滚动窗口
window_size = min(20, len(df))
# 计算实体占比的统计特征
df["fill_mean"] = (
df["open_close_fill"].rolling(window=window_size, min_periods=1).mean()
)
df["fill_std"] = (
df["open_close_fill"].rolling(window=window_size, min_periods=1).std()
)
# 计算Z-score标准化分数
df["fill_zscore"] = (df["open_close_fill"] - df["fill_mean"]) / df["fill_std"]
# 处理无穷大和NaN值
df["fill_zscore"] = df["fill_zscore"].replace([np.inf, -np.inf], 0)
df["fill_zscore"] = df["fill_zscore"].fillna(0)
# 分类逻辑(只在非"一字"的情况下进行分类)
# 实体占比≤30%的情况
small_body_condition = (df["open_close_fill"] <= 0.3) & (
df["k_shape"] != "一字"
)
# 长吊锤线实体占比≤30%,上影线<25%,实体占比<10%
long_hammer_condition = (
small_body_condition
& (df["upper_shadow_ratio"] < 0.25)
& (df["open_close_fill"] < 0.1)
)
df.loc[long_hammer_condition, "k_shape"] = "长吊锤线"
# 吊锤线实体占比≤30%,上影线<25%实体占比≥10%
hammer_condition = (
small_body_condition
& (df["upper_shadow_ratio"] < 0.25)
& (df["open_close_fill"] >= 0.1)
& (df["k_shape"] == "未知")
)
df.loc[hammer_condition, "k_shape"] = "吊锤线"
# 长倒T线实体占比≤30%,下影线<25%,实体占比<10%
long_inverted_t_condition = (
small_body_condition
& (df["lower_shadow_ratio"] < 0.25)
& (df["open_close_fill"] < 0.1)
& (df["k_shape"] == "未知")
)
df.loc[long_inverted_t_condition, "k_shape"] = "长倒T线"
# 倒T线实体占比≤30%,下影线<25%实体占比≥10%
inverted_t_condition = (
small_body_condition
& (df["lower_shadow_ratio"] < 0.25)
& (df["open_close_fill"] >= 0.1)
& (df["k_shape"] == "未知")
)
df.loc[inverted_t_condition, "k_shape"] = "倒T线"
# 长十字星实体占比≤30%上下影线都≥25%,实体占比<10%
long_doji_condition = (
small_body_condition
& (df["upper_shadow_ratio"] >= 0.25)
& (df["lower_shadow_ratio"] >= 0.25)
& (df["open_close_fill"] < 0.1)
& (df["k_shape"] == "未知")
)
df.loc[long_doji_condition, "k_shape"] = "长十字星"
# 十字星实体占比≤30%上下影线都≥25%实体占比≥10%
doji_condition = (
small_body_condition
& (df["upper_shadow_ratio"] >= 0.25)
& (df["lower_shadow_ratio"] >= 0.25)
& (df["open_close_fill"] >= 0.1)
& (df["k_shape"] == "未知")
)
df.loc[doji_condition, "k_shape"] = "十字星"
# 小实体实体占比30%-55%
small_body_condition_2 = (
(df["open_close_fill"] > 0.3)
& (df["open_close_fill"] <= 0.55)
& (df["k_shape"] != "一字")
)
df.loc[small_body_condition_2
& (df["upper_shadow_ratio"] >= 0.25) & (df["k_shape"] == "未知"), "k_shape"] = "长上影线纺锤体"
df.loc[small_body_condition_2
& (df["lower_shadow_ratio"] >= 0.25) & (df["k_shape"] == "未知"), "k_shape"] = "长下影线纺锤体"
df.loc[small_body_condition_2 & (df["k_shape"] == "未知"), "k_shape"] = "小实体"
# 大实体实体占比55%-90%
large_body_condition = (
(df["open_close_fill"] > 0.55)
& (df["open_close_fill"] <= 0.9)
& (df["k_shape"] != "一字")
)
df.loc[large_body_condition & (df["k_shape"] == "未知"), "k_shape"] = "大实体"
# 识别“超大实体”形态:实体占比 75%-90%,价格波动显著,且非“一字”或“大实体”
super_large_body_condition = (
(df["open_close_fill"] > 0.75) &
(df["open_close_fill"] <= 1) &
(df["price_range_ratio"] >= df["price_range_ratio_p75"]) & # 价格波动范围超过75th分位数
(df["k_shape"] != "一字")
)
df.loc[super_large_body_condition, "k_shape"] = "超大实体"
# 光头光脚:实体占比>90%(非一字情况)
bald_body_condition = (df["open_close_fill"] > 0.9) & (df["k_shape"] != "一字")
df.loc[bald_body_condition & (df["k_shape"] == "超大实体"), "k_shape"] = "超大实体+光头光脚"
df.loc[bald_body_condition & (df["k_shape"] == "未知"), "k_shape"] = "光头光脚"
# 清理临时列
temp_columns = [
"high_low_diff",
"open_close_diff",
"high_close_diff",
"low_close_diff",
"open_close_fill",
"upper_shadow_ratio",
"lower_shadow_ratio",
"fill_mean",
"fill_std",
"fill_zscore",
"price_range_ratio",
"avg_price_range",
"std_price_range",
"price_range_zscore",
2025-08-04 13:43:18 +00:00
"price_range_ratio_p75",
"price_range_zscore_p75",
]
df.drop(columns=temp_columns, inplace=True)
return df
def set_ma_long_short_advanced(self, data: pd.DataFrame, method="weighted_voting"):
"""
高级均线多空判定方法提供多种科学的判定策略
Args:
data: 包含均线数据的DataFrame
method: 判定方法
- "weighted_voting": 加权投票机制推荐
- "trend_strength": 趋势强度评估
- "ma_alignment": 均线排列分析
- "statistical": 统计分布方法
- "hybrid": 混合方法
"""
logging.info(f"使用{method}方法设置均线多空")
if method == "weighted_voting":
return self._weighted_voting_method(data)
elif method == "trend_strength":
return self._trend_strength_method(data)
elif method == "ma_alignment":
return self._ma_alignment_method(data)
elif method == "statistical":
return self._statistical_method(data)
elif method == "hybrid":
return self._hybrid_method(data)
else:
logging.warning(f"未知的方法: {method},使用默认加权投票方法")
return self._weighted_voting_method(data)
def _weighted_voting_method(self, data: pd.DataFrame):
"""加权投票机制:短期均线权重更高"""
# 权重设置:短期均线权重更高
weights = {
"ma5_close_diff": 0.4, # 40%权重
"ma10_close_diff": 0.3, # 30%权重
"ma20_close_diff": 0.2, # 20%权重
"ma30_close_diff": 0.1 # 10%权重
}
# 计算加权得分
weighted_score = sum(data[col] * weight for col, weight in weights.items())
# 动态阈值:基于历史分布
window_size = min(50, len(data) // 4)
if window_size > 10:
threshold_25 = weighted_score.rolling(window=window_size).quantile(0.25)
threshold_75 = weighted_score.rolling(window=window_size).quantile(0.75)
long_threshold = threshold_25 * 0.3
short_threshold = threshold_75 * 0.3
else:
long_threshold = 0.3
short_threshold = -0.3
# 判定逻辑
data.loc[weighted_score > long_threshold, "ma_long_short"] = ""
data.loc[weighted_score < short_threshold, "ma_long_short"] = ""
return data
def _trend_strength_method(self, data: pd.DataFrame):
"""趋势强度评估:考虑偏离幅度和趋势持续性"""
# 计算趋势强度(考虑偏离幅度)
trend_strength = data["ma_close_avg"]
# 计算趋势持续性(连续同向的周期数)
trend_persistence = self._calculate_trend_persistence(data)
# 综合评分
strength_threshold = 0.5
persistence_threshold = 3 # 至少连续3个周期
long_condition = (trend_strength > strength_threshold) & (trend_persistence >= persistence_threshold)
short_condition = (trend_strength < -strength_threshold) & (trend_persistence >= persistence_threshold)
data.loc[long_condition, "ma_long_short"] = ""
data.loc[short_condition, "ma_long_short"] = ""
return data
def _ma_alignment_method(self, data: pd.DataFrame):
"""均线排列分析:检查均线的排列顺序和间距"""
# 检查均线排列顺序
ma_alignment_score = 0
# 多头排列MA5 > MA10 > MA20 > MA30
bullish_alignment = (
(data["ma5_close_diff"] > data["ma10_close_diff"]) &
(data["ma10_close_diff"] > data["ma20_close_diff"]) &
(data["ma20_close_diff"] > data["ma30_close_diff"])
)
# 空头排列MA5 < MA10 < MA20 < MA30
bearish_alignment = (
(data["ma5_close_diff"] < data["ma10_close_diff"]) &
(data["ma10_close_diff"] < data["ma20_close_diff"]) &
(data["ma20_close_diff"] < data["ma30_close_diff"])
)
# 计算均线间距的合理性
ma_spacing = self._calculate_ma_spacing(data)
# 综合判定
long_condition = bullish_alignment & (ma_spacing > 0.2)
short_condition = bearish_alignment & (ma_spacing > 0.2)
data.loc[long_condition, "ma_long_short"] = ""
data.loc[short_condition, "ma_long_short"] = ""
return data
def _statistical_method(self, data: pd.DataFrame):
"""统计分布方法基于历史分位数和Z-score"""
# 计算各均线偏离度的Z-score
ma_cols = ["ma5_close_diff", "ma10_close_diff", "ma20_close_diff", "ma30_close_diff"]
# 使用滚动窗口计算Z-score
window_size = min(30, len(data) // 4)
if window_size > 10:
z_scores = pd.DataFrame()
for col in ma_cols:
rolling_mean = data[col].rolling(window=window_size).mean()
rolling_std = data[col].rolling(window=window_size).std()
z_scores[col] = (data[col] - rolling_mean) / rolling_std
# 计算综合Z-score
avg_z_score = z_scores.mean(axis=1)
# 基于Z-score判定
long_condition = avg_z_score > 0.5
short_condition = avg_z_score < -0.5
data.loc[long_condition, "ma_long_short"] = ""
data.loc[short_condition, "ma_long_short"] = ""
return data
def _hybrid_method(self, data: pd.DataFrame):
"""混合方法:结合多种判定策略"""
# 1. 加权投票得分
weights = {"ma5_close_diff": 0.4, "ma10_close_diff": 0.3,
"ma20_close_diff": 0.2, "ma30_close_diff": 0.1}
weighted_score = sum(data[col] * weight for col, weight in weights.items())
# 2. 均线排列得分
alignment_score = (
(data["ma5_close_diff"] >= data["ma10_close_diff"]) * 0.25 +
(data["ma10_close_diff"] >= data["ma20_close_diff"]) * 0.25 +
(data["ma20_close_diff"] >= data["ma30_close_diff"]) * 0.25 +
(data["ma_close_avg"] > 0) * 0.25
)
# 3. 趋势强度得分
strength_score = data["ma_close_avg"].abs()
# 4. 综合评分
composite_score = (
weighted_score * 0.4 +
alignment_score * 0.3 +
strength_score * 0.3
)
# 动态阈值
window_size = min(50, len(data) // 4)
if window_size > 10:
threshold_25 = composite_score.rolling(window=window_size).quantile(0.25)
threshold_75 = composite_score.rolling(window=window_size).quantile(0.75)
long_threshold = threshold_25 * 0.4
short_threshold = threshold_75 * 0.4
else:
long_threshold = 0.4
short_threshold = -0.4
# 判定
long_condition = composite_score > long_threshold
short_condition = composite_score < short_threshold
data.loc[long_condition, "ma_long_short"] = ""
data.loc[short_condition, "ma_long_short"] = ""
return data
def _calculate_trend_persistence(self, data: pd.DataFrame):
"""计算趋势持续性"""
trend_persistence = pd.Series(0, index=data.index)
for i in range(1, len(data)):
if data["ma_close_avg"].iloc[i] > 0 and data["ma_close_avg"].iloc[i-1] > 0:
trend_persistence.iloc[i] = trend_persistence.iloc[i-1] + 1
elif data["ma_close_avg"].iloc[i] < 0 and data["ma_close_avg"].iloc[i-1] < 0:
trend_persistence.iloc[i] = trend_persistence.iloc[i-1] + 1
else:
trend_persistence.iloc[i] = 0
return trend_persistence
def _calculate_ma_spacing(self, data: pd.DataFrame):
"""计算均线间距的合理性"""
# 计算相邻均线之间的间距
spacing_5_10 = abs(data["ma5_close_diff"] - data["ma10_close_diff"])
spacing_10_20 = abs(data["ma10_close_diff"] - data["ma20_close_diff"])
spacing_20_30 = abs(data["ma20_close_diff"] - data["ma30_close_diff"])
# 平均间距
avg_spacing = (spacing_5_10 + spacing_10_20 + spacing_20_30) / 3
return avg_spacing