crypto_quant/core/biz/metrics_calculation.py

854 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import logging
import numpy as np
import talib as tb
from talib import MA_Type
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class MetricsCalculation:
def __init__(self):
pass
def pre_close(self, df: pd.DataFrame):
# 计算前一日收盘价、涨跌幅、涨跌幅百分比
df["pre_close"] = df["close"].shift(1)
df["close_change"] = df["close"] - df["pre_close"]
df["pct_chg"] = df["close_change"] / df["pre_close"] * 100
# 设置k_up_down亦即阳线或阴线
df["k_up_down"] = ""
df.loc[df["close"] >= df["open"], "k_up_down"] = "阳线"
df.loc[df["close"] < df["open"], "k_up_down"] = "阴线"
return df
def macd(self, df: pd.DataFrame):
logging.info("计算MACD指标")
data = np.array(df.close)
ndata = len(data)
m, n, T = 12, 26, 9
EMA1 = np.copy(data)
EMA2 = np.copy(data)
f1 = (m - 1) / (m + 1)
f2 = (n - 1) / (n + 1)
f3 = (T - 1) / (T + 1)
for i in range(1, ndata):
EMA1[i] = EMA1[i - 1] * f1 + EMA1[i] * (1 - f1)
EMA2[i] = EMA2[i - 1] * f2 + EMA2[i] * (1 - f2)
df["ma1"] = EMA1
df["ma2"] = EMA2
DIF = EMA1 - EMA2
df["dif"] = DIF
DEA = np.copy(DIF)
for i in range(1, ndata):
DEA[i] = DEA[i - 1] * f3 + DEA[i] * (1 - f3)
df["dea"] = DEA
df["macd"] = 2 * (DIF - DEA)
# DIFF, macdsignal, macdhist = tb.MACD(data, fastperiod=12, slowperiod=26, signalperiod=9)
df["macd_signal"] = ""
macd_position = df["dif"] > df["dea"]
df.loc[
macd_position[
(macd_position == True) & (macd_position.shift() == False)
].index,
"macd_signal",
] = "金叉"
df.loc[
macd_position[
(macd_position == False) & (macd_position.shift() == True)
].index,
"macd_signal",
] = "死叉"
return df
def kdj(self, df: pd.DataFrame):
logging.info("计算KDJ指标")
low_list = df["low"].rolling(window=9).min()
low_list.fillna(value=df["low"].expanding().min(), inplace=True)
high_list = df["high"].rolling(window=9).max()
high_list.fillna(value=df["high"].expanding().max(), inplace=True)
rsv = (df["close"] - low_list) / (high_list - low_list) * 100
df["kdj_k"] = rsv.ewm(com=2).mean()
df["kdj_d"] = df["kdj_k"].ewm(com=2).mean()
df["kdj_j"] = 3 * df["kdj_k"] - 2 * df["kdj_d"]
df["kdj_signal"] = ""
kdj_position = df["kdj_k"] > df["kdj_d"]
df.loc[
kdj_position[
(kdj_position == True) & (kdj_position.shift() == False)
].index,
"kdj_signal",
] = "金叉"
df.loc[
kdj_position[
(kdj_position == False) & (kdj_position.shift() == True)
].index,
"kdj_signal",
] = "死叉"
return df
def set_kdj_pattern(self, df: pd.DataFrame):
"""
设置每一根K线数据对应的KDJ形态超买超卖情况
KDJ_K > 80, KDJ_D > 80, KDJ_J > 90: 超超买
KDJ_K > 70, KDJ_D > 70, KDJ_J > 80: 超买
KDJ_K < 20, KDJ_D < 20, KDJ_J < 10: 超超卖
KDJ_K < 30, KDJ_D < 30, KDJ_J < 20: 超卖
否则为"徘徊"
"""
logging.info("设置KDJ形态")
# 初始化kdj_pattern列
df["kdj_pattern"] = "徘徊"
# 超超买条件KDJ_K > 80, KDJ_D > 80, KDJ_J > 90
kdj_super_buy = (df["kdj_k"] > 80) & (df["kdj_d"] > 80) & (df["kdj_j"] > 90)
df.loc[kdj_super_buy, "kdj_pattern"] = "超超买"
# 超买条件KDJ_K > 70, KDJ_D > 70, KDJ_J > 80
kdj_buy = (df["kdj_k"] > 70) & (df["kdj_d"] > 70) & (df["kdj_j"] > 80)
df.loc[kdj_buy, "kdj_pattern"] = "超买"
# 超超卖条件KDJ_K < 20, KDJ_D < 20, KDJ_J < 10
kdj_super_sell = (df["kdj_k"] < 20) & (df["kdj_d"] < 20) & (df["kdj_j"] < 10)
df.loc[kdj_super_sell, "kdj_pattern"] = "超超卖"
# 超卖条件KDJ_K < 30, KDJ_D < 30, KDJ_J < 20
kdj_sell = (df["kdj_k"] < 30) & (df["kdj_d"] < 30) & (df["kdj_j"] < 20)
df.loc[kdj_sell, "kdj_pattern"] = "超卖"
return df
def calculate_ma_price_percent(self, data: pd.DataFrame):
data["ma5_close_diff"] = (data["close"] - data["ma5"]) / (data["close"]) * 100
data["ma10_close_diff"] = (data["close"] - data["ma10"]) / (data["close"]) * 100
data["ma20_close_diff"] = (data["close"] - data["ma20"]) / (data["close"]) * 100
data["ma30_close_diff"] = (data["close"] - data["ma30"]) / (data["close"]) * 100
data["ma_close_avg"] = (
data["ma5_close_diff"]
+ data["ma10_close_diff"]
+ data["ma20_close_diff"]
+ data["ma30_close_diff"]
) / 4
return data
def set_ma_long_short_divergence(self, data: pd.DataFrame):
"""
根据ma5_close_diff, ma10_close_diff, ma20_close_diff, ma30_close_diff, ma_close_avg
设置均线多空列: ma_long_short (多,空,震荡)
设置均线发散列: ma_divergence (超发散,发散,适中,粘合,未知)
均线发散度使用相对统计方法分类:
- 超发散标准差Z-score > 1.5 且 均值Z-score绝对值 > 1.2
- 发散标准差Z-score > 0.8 或 均值Z-score绝对值 > 0.8
- 适中标准差Z-score在0.3-0.8之间且均值Z-score绝对值 < 0.5
- 粘合标准差Z-score < 0.3,均线高度粘合
使用20个周期的滚动窗口计算相对统计特征避免绝对阈值过于严格的问题
"""
logging.info("设置均线多空和发散")
data["ma_long_short"] = "震荡"
data["ma_divergence"] = "未知"
# 检查数据完整性
# if (pd.isnull(data['ma5_close_diff']).any() or
# pd.isnull(data['ma10_close_diff']).any() or
# pd.isnull(data['ma20_close_diff']).any() or
# pd.isnull(data['ma30_close_diff']).any() or
# pd.isnull(data['ma_close_avg']).any()):
# data['ma_long_short'] = '数据不全'
# return data
# 设置均线多空逻辑
# 多:所有均线都在价格下方,且平均偏离度为正
long_condition = (
(data["ma5_close_diff"] > 0)
& (data["ma10_close_diff"] > 0)
& (data["ma20_close_diff"] > 0)
& (data["ma30_close_diff"] > 0)
& (data["ma_close_avg"] > 0)
)
data.loc[long_condition, "ma_long_short"] = ""
# 空:所有均线都在价格上方,且平均偏离度为负
short_condition = (
(data["ma5_close_diff"] < 0)
& (data["ma10_close_diff"] < 0)
& (data["ma20_close_diff"] < 0)
& (data["ma30_close_diff"] < 0)
& (data["ma_close_avg"] < 0)
)
data.loc[short_condition, "ma_long_short"] = ""
# 计算各均线偏离度的标准差和均值
data["ma_divergence"] = "未知"
ma_diffs = data[
["ma5_close_diff", "ma10_close_diff", "ma20_close_diff", "ma30_close_diff"]
]
ma_std = ma_diffs.std(axis=1) # 标准差
ma_mean = ma_diffs.mean(axis=1) # 均值
abs_ma_mean = abs(ma_mean) # 均值的绝对值
# 计算标准差和均值绝对值的百分位数(基于历史数据分布)
# 这里使用 25%、50%、75% 分位数作为阈值,可根据实际需求调整
std_25, std_50, std_75 = ma_std.quantile([0.25, 0.50, 0.75])
mean_25, mean_50, mean_75 = abs_ma_mean.quantile([0.25, 0.50, 0.75])
# 超发散:标准差和均值绝对值均处于高百分位(>75%
super_divergence = (ma_std > std_75) & (abs_ma_mean > mean_75)
data.loc[super_divergence, "ma_divergence"] = "超发散"
# 发散标准差或均值绝对值处于中等偏高百分位50%-75%
divergence = ((ma_std > std_50) & (ma_std <= std_75)) | (
(abs_ma_mean > mean_50) & (abs_ma_mean <= mean_75)
)
data.loc[divergence & (data["ma_divergence"] == "未知"), "ma_divergence"] = (
"发散"
)
# 适中标准差和均值绝对值处于中等偏低百分位25%-50%
moderate = (ma_std > std_25) & (ma_std <= std_50) & (abs_ma_mean <= mean_50)
data.loc[moderate & (data["ma_divergence"] == "未知"), "ma_divergence"] = "适中"
# 粘合:标准差处于低百分位(<25%
convergence = ma_std <= std_25
data.loc[convergence & (data["ma_divergence"] == "未知"), "ma_divergence"] = (
"粘合"
)
return data
def update_macd_divergence_column(self, df: pd.DataFrame):
"""
更新整个DataFrame的macd_divergence列
计算每个时间点的MACD背离情况顶背离或底背离
:param df: 包含timestamp, close, dif, macd, kdj_j列的DataFrame
:return: 更新了macd_divergence列的DataFrame
"""
if df is None or df.empty:
return df
# 确保必要的列存在
required_columns = ["timestamp", "close", "dif", "macd", "kdj_j"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
return df
# 按时间戳排序(升序)
df = df.sort_values("timestamp").reset_index(drop=True)
# 初始化macd_divergence列
df["macd_divergence"] = "未知"
# 遍历DataFrame计算每个时间点的背离情况
for i in range(1, len(df)):
current_row = df.iloc[i]
previous_row = df.iloc[i - 1]
current_close = current_row["close"]
current_dif = current_row["dif"]
current_macd = current_row["macd"]
current_kdj_j = current_row["kdj_j"]
previous_close = previous_row["close"]
previous_dif = previous_row["dif"]
previous_macd = previous_row["macd"]
previous_kdj_j = previous_row["kdj_j"]
# 检查是否为顶背离
# 条件价格创新高但MACD指标没有创新高且KDJ超买
if (
current_close > previous_close
and current_kdj_j > 70
and current_dif <= previous_dif
and current_macd <= previous_macd
):
df.at[i, "macd_divergence"] = "顶背离"
# 检查是否为底背离
# 条件价格创新低但MACD指标没有创新低且KDJ超卖
elif (
current_close < previous_close
and current_kdj_j < 20
and current_dif >= previous_dif
and current_macd >= previous_macd
):
df.at[i, "macd_divergence"] = "底背离"
# 检查更严格的背离条件(与历史高点/低点比较)
else:
# 获取当前时间点之前的数据
historical_data = df.iloc[: i + 1]
# 检查顶背离价格接近历史高点但MACD指标明显低于历史高点
if current_kdj_j > 70:
price_high = historical_data["close"].max()
dif_high = historical_data["dif"].max()
macd_high = historical_data["macd"].max()
# 价格接近历史高点差距小于5%但MACD指标明显低于历史高点
if (
current_close >= price_high * 0.95
and current_dif <= dif_high * 0.8
and current_macd <= macd_high * 0.8
):
df.at[i, "macd_divergence"] = "顶背离"
# 检查底背离价格接近历史低点但MACD指标明显高于历史低点
elif current_kdj_j < 20:
price_low = historical_data["close"].min()
dif_low = historical_data["dif"].min()
macd_low = historical_data["macd"].min()
# 价格接近历史低点差距小于5%但MACD指标明显高于历史低点
if (
current_close <= price_low * 1.05
and current_dif >= dif_low * 1.2
and current_macd >= macd_low * 1.2
):
df.at[i, "macd_divergence"] = "底背离"
return df
def update_macd_divergence_column_simple(
self, df: pd.DataFrame, window_size: int = 20
):
"""
简化版本的MACD背离检测函数
使用滑动窗口来检测背离,提高计算效率
:param df: 包含timestamp, close, dif, macd, kdj_j列的DataFrame
:param window_size: 滑动窗口大小,用于检测背离
:return: 更新了macd_divergence列的DataFrame
"""
if df is None or df.empty:
return df
# 确保必要的列存在
required_columns = ["timestamp", "close", "dif", "macd", "kdj_j"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
return df
# 按时间戳排序(升序)
df = df.sort_values("timestamp").reset_index(drop=True)
# 初始化macd_divergence列
df["macd_divergence"] = "未知"
# 使用滑动窗口检测背离
for i in range(window_size, len(df)):
window_data = df.iloc[i - window_size : i + 1]
current_row = df.iloc[i]
current_close = current_row["close"]
current_dif = current_row["dif"]
current_macd = current_row["macd"]
current_kdj_j = current_row["kdj_j"]
# 计算窗口内的极值
window_price_high = window_data["close"].max()
window_price_low = window_data["close"].min()
window_dif_high = window_data["dif"].max()
window_dif_low = window_data["dif"].min()
window_macd_high = window_data["macd"].max()
window_macd_low = window_data["macd"].min()
# 检测顶背离
if (
current_kdj_j > 70
and current_close >= window_price_high * 0.98 # 价格接近窗口内最高点
and current_dif <= window_dif_high * 0.85 # DIF明显低于窗口内最高点
and current_macd <= window_macd_high * 0.85
): # MACD明显低于窗口内最高点
df.at[i, "macd_divergence"] = "顶背离"
# 检测底背离
elif (
current_kdj_j < 20
and current_close <= window_price_low * 1.02 # 价格接近窗口内最低点
and current_dif >= window_dif_low * 1.15 # DIF明显高于窗口内最低点
and current_macd >= window_macd_low * 1.15
): # MACD明显高于窗口内最低点
df.at[i, "macd_divergence"] = "底背离"
return df
def ma5102030(self, df: pd.DataFrame):
logging.info("计算均线指标")
df["ma5"] = df["close"].rolling(window=5).mean().dropna()
df["ma10"] = df["close"].rolling(window=10).mean().dropna()
df["ma20"] = df["close"].rolling(window=20).mean().dropna()
df["ma30"] = df["close"].rolling(window=30).mean().dropna()
df["ma_cross"] = ""
ma_position = df["ma5"] > df["ma10"]
df.loc[
ma_position[(ma_position == True) & (ma_position.shift() == False)].index,
"ma_cross",
] = "5穿10"
ma_position = df["ma5"] > df["ma20"]
df.loc[
ma_position[(ma_position == True) & (ma_position.shift() == False)].index,
"ma_cross",
] = "5穿20"
ma_position = df["ma5"] > df["ma30"]
df.loc[
ma_position[(ma_position == True) & (ma_position.shift() == False)].index,
"ma_cross",
] = "5穿30"
ma_position = df["ma10"] > df["ma30"]
df.loc[
ma_position[(ma_position == True) & (ma_position.shift() == False)].index,
"ma_cross",
] = "10穿30"
ma_position = df["ma5"] < df["ma10"]
df.loc[
ma_position[(ma_position == True) & (ma_position.shift() == False)].index,
"ma_cross",
] = "10穿5"
ma_position = df["ma5"] < df["ma20"]
df.loc[
ma_position[(ma_position == True) & (ma_position.shift() == False)].index,
"ma_cross",
] = "20穿5"
ma_position = df["ma5"] < df["ma30"]
df.loc[
ma_position[(ma_position == True) & (ma_position.shift() == False)].index,
"ma_cross",
] = "30穿5"
ma_position = df["ma10"] < df["ma30"]
df.loc[
ma_position[(ma_position == True) & (ma_position.shift() == False)].index,
"ma_cross",
] = "30穿10"
return df
def rsi(self, df: pd.DataFrame):
logging.info("计算RSI指标")
df["rsi_14"] = tb.RSI(df["close"].values, timeperiod=14)
df["rsi_signal"] = ""
rsi_high = df["rsi_14"] > 70
rsi_low = df["rsi_14"] < 30
df.loc[
rsi_high[(rsi_high == True) & (rsi_high.shift() == False)].index,
"rsi_signal",
] = "超买"
df.loc[
rsi_low[(rsi_low == True) & (rsi_low.shift() == False)].index, "rsi_signal"
] = "超卖"
return df
def boll(self, df: pd.DataFrame):
logging.info("计算BOLL指标")
df["boll_upper"], df["boll_middle"], df["boll_lower"] = tb.BBANDS(
df["close"].values, timeperiod=20, matype=MA_Type.SMA
)
return df
def set_boll_pattern(self, df: pd.DataFrame):
"""
设置BOLL形态
根据价格与布林带的位置关系判断超买超卖状态
超超买价格接近或突破上轨且KDJ超买
超买价格接近上轨且KDJ超买
超超卖价格接近或突破下轨且KDJ超卖
超卖价格接近下轨且KDJ超卖
震荡:其他情况
"""
logging.info("设置BOLL形态")
# 初始化boll_pattern列
df["boll_pattern"] = "震荡"
# 检查必要的列是否存在
required_columns = ["close", "boll_upper", "boll_lower", "kdj_j"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
return df
# 计算价格与布林带的距离百分比
df["upper_distance"] = abs(df["close"] - df["boll_upper"]) / df["close"] * 100
df["lower_distance"] = abs(df["close"] - df["boll_lower"]) / df["close"] * 100
# 超超买价格突破上轨且KDJ超买
super_buy_condition = (df["close"] >= df["boll_upper"]) & (df["kdj_j"] > 80)
df.loc[super_buy_condition, "boll_pattern"] = "超超买"
# 超买价格接近上轨距离小于2%且KDJ超买
buy_condition = (
(df["upper_distance"] <= 2)
& (df["kdj_j"] > 80)
& (df["boll_pattern"] == "震荡")
)
df.loc[buy_condition, "boll_pattern"] = "超买"
# 超超卖价格突破下轨且KDJ超卖
super_sell_condition = (df["close"] <= df["boll_lower"]) & (df["kdj_j"] < 20)
df.loc[super_sell_condition, "boll_pattern"] = "超超卖"
# 超卖价格接近下轨距离小于2%且KDJ超卖
sell_condition = (
(df["lower_distance"] <= 2)
& (df["kdj_j"] < 20)
& (df["boll_pattern"] == "震荡")
)
df.loc[sell_condition, "boll_pattern"] = "超卖"
# 设置boll_signal列保持与原有逻辑兼容
df["boll_signal"] = ""
# 突破下轨信号
close_gt_low = df["close"] > df["boll_lower"]
pre_close_less_low = df["pre_close"] < df["boll_lower"].shift()
low_break = close_gt_low & pre_close_less_low
df.loc[
low_break[(low_break == True) & (low_break.shift() == False)].index,
"boll_signal",
] = "突破下轨"
# 击穿上轨信号
close_less_high = df["close"] < df["boll_upper"]
pre_close_gt_high = df["pre_close"] > df["boll_upper"].shift()
high_down = close_less_high & pre_close_gt_high
df.loc[
high_down[(high_down == True) & (high_down.shift() == False)].index,
"boll_signal",
] = "击穿上轨"
# 删除临时列
df.drop(columns=["upper_distance", "lower_distance"], inplace=True)
return df
def set_k_length(self, df: pd.DataFrame):
"""
设置K线长度:k_length
根据close, open, high, low计算K线长度
使用统计方法标准差、均值来分类K线长度
K线长度分类
- 短K线实体和影线都较短
- 中K线长度适中
- 长K线实体或影线较长
- 超长K线实体和影线都很长
"""
logging.info("设置K线长度")
# 检查必要的列是否存在
required_columns = ["close", "open", "high", "low"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
return df
# 计算K线的基本特征
df["k_body"] = abs(df["close"] - df["open"]) # K线实体长度
df["k_upper_shadow"] = df["high"] - df[["open", "close"]].max(
axis=1
) # 上影线长度
df["k_lower_shadow"] = (
df[["open", "close"]].min(axis=1) - df["low"]
) # 下影线长度
df["k_total_range"] = df["high"] - df["low"] # K线总长度
# 计算K线实体占总长度的比例
df["k_body_ratio"] = df["k_body"] / df["k_total_range"]
# 使用滚动窗口计算统计特征使用20个周期的滚动窗口
window_size = min(20, len(df))
# 计算K线总长度的统计特征
df["k_range_mean"] = (
df["k_total_range"].rolling(window=window_size, min_periods=1).mean()
)
df["k_range_std"] = (
df["k_total_range"].rolling(window=window_size, min_periods=1).std()
)
# 计算K线实体的统计特征
df["k_body_mean"] = (
df["k_body"].rolling(window=window_size, min_periods=1).mean()
)
df["k_body_std"] = df["k_body"].rolling(window=window_size, min_periods=1).std()
# 初始化k_length列
df["k_length"] = ""
# 计算Z-score标准化分数
df["k_range_zscore"] = (df["k_total_range"] - df["k_range_mean"]) / df[
"k_range_std"
]
df["k_body_zscore"] = (df["k_body"] - df["k_body_mean"]) / df["k_body_std"]
# 处理无穷大和NaN值
df["k_range_zscore"] = df["k_range_zscore"].replace([np.inf, -np.inf], 0)
df["k_body_zscore"] = df["k_body_zscore"].replace([np.inf, -np.inf], 0)
df["k_range_zscore"] = df["k_range_zscore"].fillna(0)
df["k_body_zscore"] = df["k_body_zscore"].fillna(0)
# 分类逻辑
# 超长K线总长度Z-score > 1.5 且 实体Z-score > 1.0
super_long_condition = (df["k_range_zscore"] > 1.5) & (
df["k_body_zscore"] > 1.0
)
df.loc[super_long_condition, "k_length"] = "超长"
# 长K线总长度Z-score > 0.8 或 实体Z-score > 0.8
long_condition = (
(df["k_range_zscore"] > 0.8) | (df["k_body_zscore"] > 0.8)
) & (df["k_length"] == "")
df.loc[long_condition, "k_length"] = ""
# 短K线总长度Z-score < -0.8 且 实体Z-score < -0.5
short_condition = (df["k_range_zscore"] < -0.8) & (df["k_body_zscore"] < -0.5)
df.loc[short_condition, "k_length"] = ""
# 清理临时列
temp_columns = [
"k_body",
"k_upper_shadow",
"k_lower_shadow",
"k_total_range",
"k_body_ratio",
"k_range_mean",
"k_range_std",
"k_body_mean",
"k_body_std",
"k_range_zscore",
"k_body_zscore",
]
df.drop(columns=temp_columns, inplace=True)
return df
def set_k_shape(self, df: pd.DataFrame):
"""
设置K线形状:k_shape
根据close, open, high, low计算K线形状
使用统计方法标准差、均值来分类K线形状
K线形态分类
- 一字open, high, low, close几乎完全一样价格波动极小
- 长吊锤线实体占比≤30%,上影线<25%,实体占比<10%
- 吊锤线实体占比≤30%,上影线<25%实体占比≥10%
- 长倒T线实体占比≤30%,下影线<25%,实体占比<10%
- 倒T线实体占比≤30%,下影线<25%实体占比≥10%
- 长十字星实体占比≤30%上下影线都≥25%,实体占比<10%
- 十字星实体占比≤30%上下影线都≥25%实体占比≥10%
- 小实体实体占比30%-55%
- 大实体实体占比55%-70%
- 超大实体实体占比70%-90%
- 光头光脚:实体占比>90%(非一字情况)
"""
logging.info("设置K线形状")
# 检查必要的列是否存在
required_columns = ["close", "open", "high", "low"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
return df
# 计算K线的基本特征
df["high_low_diff"] = df["high"] - df["low"] # 最高价与最低价差值
df["open_close_diff"] = abs(
df["close"] - df["open"]
) # 开盘价与收盘价差值绝对值
df["high_close_diff"] = df["high"] - df[["open", "close"]].max(
axis=1
) # 上影线长度
df["low_close_diff"] = (
df[["open", "close"]].min(axis=1) - df["low"]
) # 下影线长度
# 计算实体占比
df["open_close_fill"] = df["open_close_diff"] / df["high_low_diff"].replace(0, np.nan)
df["open_close_fill"] = df["open_close_fill"].fillna(1.0) # 处理除零情况
# 计算影线占比
df["upper_shadow_ratio"] = df["high_close_diff"] / df["high_low_diff"].replace(0, np.nan)
df["lower_shadow_ratio"] = df["low_close_diff"] / df["high_low_diff"].replace(0, np.nan)
df["upper_shadow_ratio"] = df["upper_shadow_ratio"].fillna(0) # 无波动时影线占比为 0
df["lower_shadow_ratio"] = df["lower_shadow_ratio"].fillna(0)
# 初始化k_shape列
df["k_shape"] = "未知"
# 首先识别"一字"形态open, high, low, close几乎完全一样
# 计算价格波动范围相对于价格的百分比
df["price_range_ratio"] = df["high_low_diff"] / df["close"] * 100
# 使用滚动窗口计算价格波动范围的平均值,用于动态判断"一字"阈值
window_size = min(20, len(df))
df["avg_price_range"] = (
df["price_range_ratio"].rolling(window=window_size, min_periods=1).mean()
)
df["std_price_range"] = (
df["price_range_ratio"].rolling(window=window_size, min_periods=1).std()
)
# 计算价格波动范围的Z-score
df["price_range_zscore"] = (
df["price_range_ratio"] - df["avg_price_range"]
) / df["std_price_range"]
df["price_range_zscore"] = (
df["price_range_zscore"].replace([np.inf, -np.inf], 0).fillna(0)
)
# 计算滚动窗口内 price_range_ratio 和 price_range_zscore 的分位数
df["price_range_ratio_p75"] = df["price_range_ratio"].rolling(window=window_size, min_periods=1).quantile(0.75)
df["price_range_zscore_p75"] = df["price_range_zscore"].rolling(window=window_size, min_periods=1).quantile(0.75)
# 识别“一字”形态波动极小Z 分数 < -1.0 或 price_range_ratio < 0.05%)且无影线
one_line_condition = (
((df["price_range_zscore"] < -1.0) | (df["price_range_ratio"] < 0.05)) &
(df["upper_shadow_ratio"] <= 0.01) & # 上影线极小或无
(df["lower_shadow_ratio"] <= 0.01) & # 下影线极小或无
(df["open_close_diff"] / df["close"] < 0.0005) # 开收盘价差小于0.05%
)
df.loc[one_line_condition, "k_shape"] = "一字"
# 使用滚动窗口计算统计特征使用20个周期的滚动窗口
window_size = min(20, len(df))
# 计算实体占比的统计特征
df["fill_mean"] = (
df["open_close_fill"].rolling(window=window_size, min_periods=1).mean()
)
df["fill_std"] = (
df["open_close_fill"].rolling(window=window_size, min_periods=1).std()
)
# 计算Z-score标准化分数
df["fill_zscore"] = (df["open_close_fill"] - df["fill_mean"]) / df["fill_std"]
# 处理无穷大和NaN值
df["fill_zscore"] = df["fill_zscore"].replace([np.inf, -np.inf], 0)
df["fill_zscore"] = df["fill_zscore"].fillna(0)
# 分类逻辑(只在非"一字"的情况下进行分类)
# 实体占比≤30%的情况
small_body_condition = (df["open_close_fill"] <= 0.3) & (
df["k_shape"] != "一字"
)
# 长吊锤线实体占比≤30%,上影线<25%,实体占比<10%
long_hammer_condition = (
small_body_condition
& (df["upper_shadow_ratio"] < 0.25)
& (df["open_close_fill"] < 0.1)
)
df.loc[long_hammer_condition, "k_shape"] = "长吊锤线"
# 吊锤线实体占比≤30%,上影线<25%实体占比≥10%
hammer_condition = (
small_body_condition
& (df["upper_shadow_ratio"] < 0.25)
& (df["open_close_fill"] >= 0.1)
& (df["k_shape"] == "未知")
)
df.loc[hammer_condition, "k_shape"] = "吊锤线"
# 长倒T线实体占比≤30%,下影线<25%,实体占比<10%
long_inverted_t_condition = (
small_body_condition
& (df["lower_shadow_ratio"] < 0.25)
& (df["open_close_fill"] < 0.1)
& (df["k_shape"] == "未知")
)
df.loc[long_inverted_t_condition, "k_shape"] = "长倒T线"
# 倒T线实体占比≤30%,下影线<25%实体占比≥10%
inverted_t_condition = (
small_body_condition
& (df["lower_shadow_ratio"] < 0.25)
& (df["open_close_fill"] >= 0.1)
& (df["k_shape"] == "未知")
)
df.loc[inverted_t_condition, "k_shape"] = "倒T线"
# 长十字星实体占比≤30%上下影线都≥25%,实体占比<10%
long_doji_condition = (
small_body_condition
& (df["upper_shadow_ratio"] >= 0.25)
& (df["lower_shadow_ratio"] >= 0.25)
& (df["open_close_fill"] < 0.1)
& (df["k_shape"] == "未知")
)
df.loc[long_doji_condition, "k_shape"] = "长十字星"
# 十字星实体占比≤30%上下影线都≥25%实体占比≥10%
doji_condition = (
small_body_condition
& (df["upper_shadow_ratio"] >= 0.25)
& (df["lower_shadow_ratio"] >= 0.25)
& (df["open_close_fill"] >= 0.1)
& (df["k_shape"] == "未知")
)
df.loc[doji_condition, "k_shape"] = "十字星"
# 小实体实体占比30%-55%
small_body_condition_2 = (
(df["open_close_fill"] > 0.3)
& (df["open_close_fill"] <= 0.55)
& (df["k_shape"] != "一字")
)
df.loc[small_body_condition_2
& (df["upper_shadow_ratio"] >= 0.25) & (df["k_shape"] == "未知"), "k_shape"] = "长上影线纺锤体"
df.loc[small_body_condition_2
& (df["lower_shadow_ratio"] >= 0.25) & (df["k_shape"] == "未知"), "k_shape"] = "长下影线纺锤体"
df.loc[small_body_condition_2 & (df["k_shape"] == "未知"), "k_shape"] = "小实体"
# 大实体实体占比55%-90%
large_body_condition = (
(df["open_close_fill"] > 0.55)
& (df["open_close_fill"] <= 0.9)
& (df["k_shape"] != "一字")
)
df.loc[large_body_condition & (df["k_shape"] == "未知"), "k_shape"] = "大实体"
# 识别“超大实体”形态:实体占比 75%-90%,价格波动显著,且非“一字”或“大实体”
super_large_body_condition = (
(df["open_close_fill"] > 0.75) &
(df["open_close_fill"] <= 1) &
(df["price_range_ratio"] >= df["price_range_ratio_p75"]) & # 价格波动范围超过75th分位数
(df["k_shape"] != "一字")
)
df.loc[super_large_body_condition, "k_shape"] = "超大实体"
# 光头光脚:实体占比>90%(非一字情况)
bald_body_condition = (df["open_close_fill"] > 0.9) & (df["k_shape"] != "一字")
df.loc[bald_body_condition & (df["k_shape"] == "超大实体"), "k_shape"] = "超大实体+光头光脚"
df.loc[bald_body_condition & (df["k_shape"] == "未知"), "k_shape"] = "光头光脚"
# 清理临时列
temp_columns = [
"high_low_diff",
"open_close_diff",
"high_close_diff",
"low_close_diff",
"open_close_fill",
"upper_shadow_ratio",
"lower_shadow_ratio",
"fill_mean",
"fill_std",
"fill_zscore",
"price_range_ratio",
"avg_price_range",
"std_price_range",
"price_range_zscore",
"price_range_ratio_p75",
"price_range_zscore_p75",
]
df.drop(columns=temp_columns, inplace=True)
return df