crypto_quant/core/statistics/price_volume_stats.py

884 lines
41 KiB
Python
Raw Permalink Normal View History

import core.logger as logging
2025-08-07 10:09:51 +00:00
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
2025-09-16 06:31:15 +00:00
from datetime import datetime, timezone, timedelta
from core.utils import get_current_date_time
2025-08-07 10:09:51 +00:00
import re
from openpyxl import Workbook
from openpyxl.drawing.image import Image
import openpyxl
from openpyxl.styles import Font
from PIL import Image as PILImage
2025-09-25 04:28:43 +00:00
from config import OKX_MONITOR_CONFIG, COIN_MYSQL_CONFIG, WINDOW_SIZE
2025-08-07 10:09:51 +00:00
from core.db.db_market_data import DBMarketData
from core.db.db_huge_volume_data import DBHugeVolumeData
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
# seaborn支持中文
plt.rcParams["font.family"] = ["SimHei"]
logger = logging.logger
2025-08-07 10:09:51 +00:00
class PriceVolumeStats:
def __init__(self):
2025-09-25 04:28:43 +00:00
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
2025-08-07 10:09:51 +00:00
if not mysql_password:
raise ValueError("MySQL password is not set")
2025-09-25 04:28:43 +00:00
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
2025-08-07 10:09:51 +00:00
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_market_data = DBMarketData(self.db_url)
self.db_huge_volume_data = DBHugeVolumeData(self.db_url)
2025-08-31 03:20:59 +00:00
self.symbol_list = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get("symbols", [])
self.bars_list = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get("bars", [])
2025-08-07 10:09:51 +00:00
self.windows_list = WINDOW_SIZE.get("window_sizes", [])
2025-08-31 03:20:59 +00:00
self.initial_date = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get(
2025-08-07 10:09:51 +00:00
"initial_date", "2025-05-15 00:00:00"
)
2025-09-16 06:31:15 +00:00
self.end_date = get_current_date_time()
2025-08-07 10:09:51 +00:00
self.window_size = 100
self.stats_output_dir = "./output/statistics/excel/"
os.makedirs(self.stats_output_dir, exist_ok=True)
self.stats_chart_dir = "./output/statistics/chart/"
os.makedirs(self.stats_chart_dir, exist_ok=True)
def batch_price_volume_statistics(self):
high_volume_hours_list = []
huge_high_volume_hours_list = []
2025-08-07 10:09:51 +00:00
price_stats_list = []
pct_change_stats_list = []
peak_valley_data_list = []
peak_valley_stats_list = []
volume_stats_list = []
price_volume_stats_list = []
earliest_market_timestamp = None
latest_market_timestamp = None
logger.info(
2025-08-07 10:09:51 +00:00
f"开始统计{self.initial_date}{self.end_date}的价格、成交量、成交量与高价低价关系数据"
)
for symbol in self.symbol_list:
for bar in self.bars_list:
data = self.db_huge_volume_data.query_huge_volume_data_by_symbol_bar_window_size(
symbol, bar, self.window_size, self.initial_date, self.end_date
)
if data is not None:
data = pd.DataFrame(data)
data.sort_values(by="timestamp", inplace=True)
# pct_change计算涨跌百分比
data["pct_change"] = data["close"].pct_change() * 100
if earliest_market_timestamp is None:
earliest_market_timestamp = data["timestamp"].iloc[0]
else:
if data["timestamp"].iloc[0] < earliest_market_timestamp:
earliest_market_timestamp = data["timestamp"].iloc[0]
if latest_market_timestamp is None:
latest_market_timestamp = data["timestamp"].iloc[-1]
else:
if data["timestamp"].iloc[-1] > latest_market_timestamp:
latest_market_timestamp = data["timestamp"].iloc[-1]
# 统计高成交量小时分布
logger.info(f"统计{symbol} {bar} 巨量小时分布数据")
high_volume_hours_data = self.stats_high_volume_hours(data)
high_volume_hours_list.append(high_volume_hours_data)
huge_high_volume_hours_data = self.stats_high_volume_hours(data, 4)
huge_high_volume_hours_list.append(huge_high_volume_hours_data)
logger.info(f"统计{symbol} {bar} 价格数据")
2025-08-07 10:09:51 +00:00
price_stats_data = self.calculate_price_statistics(data)
logger.info(f"统计{symbol} {bar} 涨跌百分比数据")
2025-08-07 10:09:51 +00:00
pct_change_stats_data = self.calculate_pct_change_statistics(data)
logger.info(f"统计{symbol} {bar} 价格变化峰值和谷值数据")
2025-08-07 10:09:51 +00:00
peak_valley_data, peak_valley_stats_data = (
self.calculate_price_change_peak_valley_statistics(data)
)
logger.info(f"统计{symbol} {bar} 成交量数据")
2025-08-07 10:09:51 +00:00
volume_stats_data = self.calculate_volume_statistics(data)
logger.info(f"统计{symbol} {bar} 成交量与高价低价关系数据")
2025-08-07 10:09:51 +00:00
price_volume_stats_data = self.calculate_price_volume_statistics(
data
)
price_stats_list.append(price_stats_data)
pct_change_stats_list.append(pct_change_stats_data)
peak_valley_data_list.append(peak_valley_data)
peak_valley_stats_list.append(peak_valley_stats_data)
volume_stats_list.append(volume_stats_data)
price_volume_stats_list.append(price_volume_stats_data)
high_volume_hours_df = pd.concat(high_volume_hours_list)
high_volume_hours_df.sort_values(by=["symbol", "bar", "hour"], inplace=True)
huge_high_volume_hours_df = pd.concat(huge_high_volume_hours_list)
huge_high_volume_hours_df.sort_values(by=["symbol", "bar", "hour"], inplace=True)
2025-08-07 10:09:51 +00:00
price_stats_df = pd.DataFrame(price_stats_list)
price_stats_df.sort_values(by=["symbol", "bar"], inplace=True)
pct_change_stats_df = pd.DataFrame(pct_change_stats_list)
pct_change_stats_df.sort_values(by=["symbol", "bar"], inplace=True)
peak_valley_data_df = pd.concat(peak_valley_data_list)
peak_valley_data_df.sort_values(by=["symbol", "bar", "timestamp"], inplace=True)
peak_valley_stats_df = pd.DataFrame(peak_valley_stats_list)
peak_valley_stats_df.sort_values(by=["symbol", "bar"], inplace=True)
volume_stats_df = pd.DataFrame(volume_stats_list)
volume_stats_df.sort_values(by=["symbol", "bar"], inplace=True)
price_volume_stats_df = pd.DataFrame(price_volume_stats_list)
price_volume_stats_df.sort_values(by=["symbol", "bar"], inplace=True)
earliest_market_date_time = timestamp_to_datetime(earliest_market_timestamp)
earliest_market_date_time = re.sub(
r"[\:\-\s]", "", str(earliest_market_date_time)
)
latest_market_date_time = timestamp_to_datetime(latest_market_timestamp)
latest_market_date_time = re.sub(r"[\:\-\s]", "", str(latest_market_date_time))
output_file_name = f"price_volume_stats_window_size_{self.window_size}_from_{earliest_market_date_time}_to_{latest_market_date_time}.xlsx"
output_file_path = os.path.join(self.stats_output_dir, output_file_name)
logger.info(f"导出{output_file_path}")
2025-08-07 10:09:51 +00:00
with pd.ExcelWriter(output_file_path) as writer:
price_stats_df.to_excel(writer, sheet_name="价格统计", index=False)
2025-08-07 10:09:51 +00:00
pct_change_stats_df.to_excel(
writer, sheet_name="涨跌百分比统计", index=False
2025-08-07 10:09:51 +00:00
)
peak_valley_data_df.to_excel(
writer, sheet_name="波峰波谷明细", index=False
2025-08-07 10:09:51 +00:00
)
peak_valley_stats_df.to_excel(
writer, sheet_name="波峰波谷统计", index=False
2025-08-07 10:09:51 +00:00
)
volume_stats_df.to_excel(writer, sheet_name="量能统计", index=False)
2025-08-07 10:09:51 +00:00
price_volume_stats_df.to_excel(
writer, sheet_name="量价统计", index=False
)
high_volume_hours_df.to_excel(
writer, sheet_name="放量小时分布", index=False
)
huge_high_volume_hours_df.to_excel(
writer, sheet_name="4倍放量小时分布", index=False
2025-08-07 10:09:51 +00:00
)
chart_dict = self.draw_price_change_peak_valley_chart(peak_valley_stats_df)
self.output_chart_to_excel(output_file_path, chart_dict)
chart_dict = self.draw_high_volume_hours_chart(high_volume_hours_df, normal=True)
self.output_chart_to_excel(output_file_path, chart_dict)
chart_dict = self.draw_high_volume_hours_chart(huge_high_volume_hours_df, normal=False)
self.output_chart_to_excel(output_file_path, chart_dict)
2025-08-07 10:09:51 +00:00
return price_stats_df, volume_stats_df, price_volume_stats_df
def calculate_price_statistics(self, data: pd.DataFrame):
"""
计算价格统计数据
:param data: 市场数据
:return: 价格统计数据
"""
if data is None:
return None
stats_data = self.base_statistics(data, "close")
return stats_data
def calculate_pct_change_statistics(self, data: pd.DataFrame):
"""
计算涨跌百分比统计数据
:param data: 市场数据
:return: 涨跌百分比统计数据
"""
if data is None:
return None
stats_data = self.base_statistics(data, "pct_change")
return stats_data
def calculate_volume_statistics(self, data):
"""
计算成交量统计数据
针对volume计算, 包括:max, min, mean, std, median,percentile,
mode, range, kurtosis, skewness, z-score
:param data: 市场数据
:return: 成交量统计数据
"""
if data is None:
return None
stats_data = self.base_statistics(data, "volume")
return stats_data
def base_statistics(self, data: pd.DataFrame, column: str):
"""
计算基础统计数据
:param data: 市场数据
:param column: 列名
针对column计算, 包括:max, min, mean, std, median,percentile,
mode, range, kurtosis, skewness, z-score
Max(最大值) 含义:数据集中的最大值
应用:在K线数据中, max可表示high价格的最高点, 标识波峰
示例:某股票一周最高价为120
Min(最小值) 含义:数据集中的最小值
应用:在K线数据中, min可表示low价格的最低点, 标识波谷
示例:某股票一周最低价为100
Mean(均值/平均值) 含义:所有值的总和除以数据点个数, mean = Σx / n
应用:分析K线close价格平均水平, 或SEC财务数据的平均收入
示例:一周收盘价均值为105
Std(标准差) 含义:衡量数据点偏离均值的分散程度, std = sqrt(Σ(x - mean)² / n)
应用:在K线中, 高std表示价格波动大; 在SEC数据中, 反映收入稳定性
示例:股票价格std为3.2, 波动较大
Median(中位数) 含义:数据集排序后的中间值(偶数个数据取中间两值平均)
应用:对K线close或SEC收入计算中位数, 减少极端值影响
示例:收盘价[100, 102, 105, 110, 120]的中位数为105
Percentile(百分位数) 含义:数据集中低于某值的百分比位置, 如第25百分位数(Q1)表示25%数据低于该值
应用:在K线数据中, 分析价格分布(如75%价格低于某值);
在SEC数据中, 比较公司收入在行业中的排名
示例:股票价格的75th percentile为110, 表示75%价格低于110
Range(极差) 含义:最大值与最小值之差, range = max - min
应用:衡量K线价格波动幅度, 或SEC数据(如利润)的变化范围
示例:股票一周价格极差为20(120 - 100)
Kurtosis(峰度) 含义:描述数据分布的尖锐程度和尾部厚度
高峰度(>3):分布尖锐, 尾部厚(多极端值)
低峰度(<3):分布平坦, 尾部薄
应用:在K线回报率中, 高峰度提示极端波动风险;
在SEC数据中, 分析收入异常值
示例:股票回报率峰度为4, 可能有较大波动
Skewness(偏度) 含义:描述数据分布的偏斜方向
正偏(>0):右尾长(如少数高收入)
负偏(<0):左尾长(如多亏损)
零偏(0):分布对称
应用:在K线中, 正偏表示上涨概率高;
在SEC数据中, 偏度反映财务指标非对称性
示例:股票收益正偏为0.5, 上涨可能性较大
:return: 基础统计数据
"""
if data is None:
return None
if column not in data.columns:
return None
stats_data = {"symbol": data["symbol"].iloc[0], "bar": data["bar"].iloc[0]}
target_data = data[column]
stats_data["count"] = len(target_data)
stats_data["max"] = target_data.max()
stats_data["min"] = target_data.min()
stats_data["mean"] = target_data.mean()
stats_data["std"] = target_data.std()
stats_data["median"] = target_data.median()
if column in ["pct_change", "price_change_ratio"]:
# 计算上涨次数,下跌次数,上涨次数/总次数
stats_data["up_count"] = len(target_data[target_data > 0])
stats_data["down_count"] = len(target_data[target_data < 0])
stats_data["up_ratio"] = (
stats_data["up_count"] / stats_data["count"]
) * 100
stats_data["down_ratio"] = (
stats_data["down_count"] / stats_data["count"]
) * 100
# 计算上涨过程中,平均涨幅,涨幅中位数,涨幅标准差
up_data = target_data[target_data > 0]
stats_data["up_mean"] = up_data.mean()
stats_data["up_median"] = up_data.median()
stats_data["up_std"] = up_data.std()
# 计算下跌过程中,平均跌幅,跌幅中位数,跌幅标准差
down_data = target_data[target_data < 0]
stats_data["down_mean"] = down_data.mean()
stats_data["down_median"] = down_data.median()
stats_data["down_std"] = down_data.std()
stats_data["range"] = target_data.max() - target_data.min()
stats_data["kurtosis"] = target_data.kurt()
stats_data["skewness"] = target_data.skew()
# 计算百分位数要求102030405060708090100
percentile_list = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
percentile_data = target_data.quantile(percentile_list)
for i in range(len(percentile_list)):
percentile_position = percentile_list[i]
percentile_name = f"percentile_{int(percentile_position * 100)}"
stats_data[percentile_name] = percentile_data.loc[percentile_position]
return stats_data
def calculate_price_volume_statistics(self, data):
"""
计算高价低价出现时与巨量成交量之间的关系
:param data: 市场数据
:return: 价格成交量统计数据
"""
if data is None:
return None
stats_data = {"symbol": data["symbol"].iloc[0], "bar": data["bar"].iloc[0]}
stats_data["count"] = len(data)
stats_data["huge_volume_count"] = len(data[data["huge_volume"] == 1])
stats_data["huge_volume_ratio"] = (
stats_data["huge_volume_count"] / stats_data["count"]
) * 100
stats_data["close_80_high_count"] = len(data[data["close_80_high"] == 1])
stats_data["hv_close_80_high_count"] = len(
data[(data["huge_volume"] == 1) & (data["close_80_high"] == 1)]
)
stats_data["hv_close_80_high_ratio"] = (
stats_data["hv_close_80_high_count"] / stats_data["close_80_high_count"]
) * 100
stats_data["close_90_high_count"] = len(data[data["close_90_high"] == 1])
stats_data["hv_close_90_high_count"] = len(
data[(data["huge_volume"] == 1) & (data["close_90_high"] == 1)]
)
stats_data["hv_close_90_high_ratio"] = (
stats_data["hv_close_90_high_count"] / stats_data["close_90_high_count"]
) * 100
stats_data["high_80_high_count"] = len(data[data["high_80_high"] == 1])
stats_data["hv_high_80_high_count"] = len(
data[(data["huge_volume"] == 1) & (data["high_80_high"] == 1)]
)
stats_data["hv_high_80_high_ratio"] = (
stats_data["hv_high_80_high_count"] / stats_data["high_80_high_count"]
) * 100
stats_data["high_90_high_count"] = len(data[data["high_90_high"] == 1])
stats_data["hv_high_90_high_count"] = len(
data[(data["huge_volume"] == 1) & (data["high_90_high"] == 1)]
)
stats_data["hv_high_90_high_ratio"] = (
stats_data["hv_high_90_high_count"] / stats_data["high_90_high_count"]
) * 100
stats_data["close_20_low_count"] = len(data[data["close_20_low"] == 1])
stats_data["hv_close_20_low_count"] = len(
data[(data["huge_volume"] == 1) & (data["close_20_low"] == 1)]
)
stats_data["hv_close_20_low_ratio"] = (
stats_data["hv_close_20_low_count"] / stats_data["close_20_low_count"]
) * 100
stats_data["close_10_low_count"] = len(data[data["close_10_low"] == 1])
stats_data["hv_close_10_low_count"] = len(
data[(data["huge_volume"] == 1) & (data["close_10_low"] == 1)]
)
stats_data["low_20_low_count"] = len(data[data["low_20_low"] == 1])
stats_data["hv_low_20_low_count"] = len(
data[(data["huge_volume"] == 1) & (data["low_20_low"] == 1)]
)
stats_data["hv_low_20_low_ratio"] = (
stats_data["hv_low_20_low_count"] / stats_data["low_20_low_count"]
) * 100
stats_data["low_10_low_count"] = len(data[data["low_10_low"] == 1])
stats_data["hv_low_10_low_count"] = len(
data[(data["huge_volume"] == 1) & (data["low_10_low"] == 1)]
)
stats_data["hv_low_10_low_ratio"] = (
stats_data["hv_low_10_low_count"] / stats_data["low_10_low_count"]
) * 100
# huge_volume == 1时价格处于高位的次数
stats_data["hv_price_high_count"] = len(
data[
(data["huge_volume"] == 1)
& (
(data["close_80_high"] == 1)
| (data["close_90_high"] == 1)
| (data["high_80_high"] == 1)
| (data["high_90_high"] == 1)
)
]
)
stats_data["hv_price_high_ratio"] = (
stats_data["hv_price_high_count"] / stats_data["huge_volume_count"]
) * 100
# huge_volume == 1时价格处于低位的次数
stats_data["hv_price_low_count"] = len(
data[
(data["huge_volume"] == 1)
& (
(data["close_20_low"] == 1)
| (data["close_10_low"] == 1)
| (data["low_20_low"] == 1)
| (data["low_10_low"] == 1)
)
]
)
stats_data["hv_price_low_ratio"] = (
stats_data["hv_price_low_count"] / stats_data["huge_volume_count"]
) * 100
return stats_data
def calculate_price_change_peak_valley_statistics(self, data):
"""
计算价格变化峰值和谷值统计数据
:param data: 市场数据
:return: 价格变化峰值和谷值统计数据
"""
if data is None:
return None
peak_valley_data = self.find_peaks_valleys(data)
stats_data = self.base_statistics(peak_valley_data, "price_change_ratio")
return peak_valley_data, stats_data
def draw_high_volume_hours_chart(self, data: pd.DataFrame, normal: bool = True):
"""
绘制高成交量小时分布图表美观保存到self.stats_chart_dir
:param data: 高成交量小时分布数据如high_volume_hours_df
:return: None
"""
if data is None or data.empty:
return None
# seaborn风格设置
sns.set_theme(style="whitegrid")
# plt.rcParams['font.family'] = "SimHei"
plt.rcParams["font.sans-serif"] = ["SimHei"] # 也可直接用字体名
plt.rcParams["font.size"] = 11 # 设置字体大小
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
chart_dict = {}
for symbol in data["symbol"].unique():
symbol_data = data[data["symbol"] == symbol]
if normal:
sheet_name = f"{symbol}_量时分布图表"
else:
sheet_name = f"{symbol}_4倍量时分布图表"
chart_dict[sheet_name] = {}
for bar in symbol_data["bar"].unique():
bar_data = symbol_data[symbol_data["bar"] == bar].copy()
# 将hour改名为小时
bar_data.rename(columns={"hour": "小时"}, inplace=True)
# huge_volume_count改名为巨量次数
bar_data.rename(columns={"huge_volume_count": "巨量次数"}, inplace=True)
# huge_volume_ratio改名为巨量次数占比
bar_data.rename(columns={"huge_volume_ratio": "巨量次数占比"}, inplace=True)
# huge_volume_rise_count改名为巨量上涨次数
bar_data.rename(columns={"huge_volume_rise_count": "巨量上涨次数"}, inplace=True)
# huge_volume_fall_count改名为巨量下跌次数
bar_data.rename(columns={"huge_volume_fall_count": "巨量下跌次数"}, inplace=True)
bar_data.reset_index(drop=True, inplace=True)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
fig.suptitle(f"巨量小时分布 - {symbol} {bar}", fontsize=18)
# huge_volume_count
# 柱状图使用不同颜色,巨量次数使用渐变蓝色
palette = sns.color_palette("Blues_d", 2)
palette[0] = sns.color_palette("Blues_d", 2)[1]
palette[1] = sns.color_palette("Reds_d", 2)[1]
sns.barplot(
ax=axes[0],
x="小时",
y="巨量次数",
data=bar_data,
hue="symbol",
palette=palette,
legend=False,
)
axes[0].set_title("巨量小时分布")
axes[0].set_ylabel("巨量次数")
# huge_volume_rise_count与huge_volume_fall_count
# 创建一个图表都位于axes[1, 0],包含两个柱状图:
# huge_volume_rise_count与huge_volume_fall_count并列放置
# 并使用不同的颜色
df_long = pd.melt(bar_data, id_vars=['小时'], value_vars=['巨量上涨次数', '巨量下跌次数'],
var_name='类别', value_name='次数')
# 柱状图使用不同颜色,巨量上涨次数使用渐变红色,巨量下跌次数使用渐变绿色
palette = sns.color_palette("Blues_d", 2)
palette[0] = sns.color_palette("Reds_d", 2)[1]
palette[1] = sns.color_palette("Greens_d", 2)[1]
sns.barplot(
ax=axes[1],
x="小时",
y="次数",
data=df_long,
hue="类别",
palette=palette,
legend=False,
)
axes[1].set_title("巨量小时上涨下跌分布")
axes[1].set_ylabel("次数")
# 旋转x轴标签
for ax in axes.flat:
for label in ax.get_xticklabels():
label.set_rotation(45)
plt.tight_layout(rect=[0, 0, 1, 0.96])
if normal:
save_path = os.path.join(self.stats_chart_dir, f"{symbol}_{bar}_high_volume_hours.png")
else:
save_path = os.path.join(self.stats_chart_dir, f"{symbol}_{bar}_4_high_volume_hours.png")
plt.savefig(save_path, dpi=150)
plt.close(fig)
chart_dict[sheet_name][
f"巨量小时分布 - {bar}"
] = save_path
return chart_dict
2025-08-07 10:09:51 +00:00
def draw_price_change_peak_valley_chart(self, data: pd.DataFrame):
"""
绘制价格变化峰值和谷值图表美观保存到self.stats_chart_dir
:param data: 价格变化峰值和谷值统计数据如peak_valley_stats_df
:return: None
"""
if data is None or data.empty:
return None
# seaborn风格设置
sns.set_theme(style="whitegrid")
# plt.rcParams['font.family'] = "SimHei"
plt.rcParams["font.sans-serif"] = ["SimHei"] # 也可直接用字体名
plt.rcParams["font.size"] = 11 # 设置字体大小
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
chart_dict = {"波峰波谷图表": {}}
2025-08-07 10:09:51 +00:00
for bar in data["bar"].unique():
bar_data = data[data["bar"] == bar]
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle(f"波段变化峰值和谷值统计 - {bar}", fontsize=18)
# up_mean
sns.barplot(
ax=axes[0, 0],
x="symbol",
y="up_mean",
data=bar_data,
hue="symbol",
palette="Blues_d",
legend=False,
)
axes[0, 0].set_title("波段上涨均值(%)")
axes[0, 0].set_ylabel("up_mean")
# down_mean
sns.barplot(
ax=axes[0, 1],
x="symbol",
y=bar_data["down_mean"].abs(),
data=bar_data,
hue="symbol",
palette="Reds_d",
legend=False,
)
axes[0, 1].set_title("波段下跌均值(绝对值,%)")
axes[0, 1].set_ylabel("down_mean(abs)")
# max
sns.barplot(
ax=axes[1, 0],
x="symbol",
y="max",
data=bar_data,
hue="symbol",
palette="Greens_d",
legend=False,
)
axes[1, 0].set_title("波段最大涨幅(%)")
axes[1, 0].set_ylabel("max")
# min
sns.barplot(
ax=axes[1, 1],
x="symbol",
y=bar_data["min"].abs(),
data=bar_data,
hue="symbol",
palette="Purples_d",
legend=False,
)
axes[1, 1].set_title("波段最大跌幅(绝对值,%)")
axes[1, 1].set_ylabel("min(abs)")
# 旋转x轴标签
for ax in axes.flat:
for label in ax.get_xticklabels():
label.set_rotation(45)
plt.tight_layout(rect=[0, 0, 1, 0.96])
save_path = os.path.join(self.stats_chart_dir, f"peak_valley_{bar}.png")
plt.savefig(save_path, dpi=150)
plt.close(fig)
chart_dict["波峰波谷图表"][
2025-08-07 10:09:51 +00:00
f"波段变化峰值和谷值统计 - {bar}"
] = save_path
return chart_dict
def output_chart_to_excel(self, excel_file_path: str, charts_dict: dict):
"""
输出Excel文件包含所有图表
charts_dict: 图表数据字典格式为
{
"sheet_name": {
"chart_name": "chart_path"
}
}
"""
logger.info(f"将图表输出到{excel_file_path}")
2025-08-07 10:09:51 +00:00
# 打开已经存在的Excel文件
wb = openpyxl.load_workbook(excel_file_path)
for sheet_name, chart_data_dict in charts_dict.items():
try:
ws = wb.create_sheet(title=sheet_name)
row_offset = 1
for chart_name, chart_path in chart_data_dict.items():
# Load image to get dimensions
with PILImage.open(chart_path) as img:
width_px, height_px = img.size
# Convert pixel height to Excel row height (approximate: 1 point = 1.333 pixels, 1 row ≈ 15 points for 20 pixels)
pixels_per_point = 1.333
points_per_row = 15 # Default row height in points
pixels_per_row = (
points_per_row * pixels_per_point
) # ≈ 20 pixels per row
chart_rows = max(
10, int(height_px / pixels_per_row)
) # Minimum 10 rows for small charts
# Add chart title
# 支持中文标题
ws[f"A{row_offset}"] = chart_name.encode("utf-8").decode("utf-8")
ws[f"A{row_offset}"].font = openpyxl.styles.Font(bold=True, size=12)
row_offset += 2 # Add 2 rows for title and spacing
# Insert chart image
img = Image(chart_path)
ws.add_image(img, f"A{row_offset}")
# Update row offset (chart height + padding)
row_offset += (
chart_rows + 5
) # Add 5 rows for padding between charts
except Exception as e:
logger.error(f"输出Excel Sheet {sheet_name} 失败: {e}")
2025-08-07 10:09:51 +00:00
continue
# Save Excel file
wb.save(excel_file_path)
print(f"Chart saved as {excel_file_path}")
def stats_high_volume_hours(self, data: pd.DataFrame, volume_ratio_threshold: int = None):
"""
统计巨量小时分布
小时包括0-23每小时一个数据
首先不区分价格涨跌统计每个小时满足huge_volume == 1的次数
然后区分价格涨跌统计每个小时满足huge_volume == 1的次数
最后统计每个小时满足huge_volume == 1的次数与满足huge_volume == 0的次数的比率
:param data: 市场数据
:return: 巨量小时分布
"""
if data is None:
return None
if volume_ratio_threshold is not None and volume_ratio_threshold > 0:
data = data[data["volume_ratio"] >= volume_ratio_threshold]
# 将date_time转换为datetime类型
data["date_time"] = pd.to_datetime(data["date_time"])
# 通过pandas自带的功能计算pct_chg
data["pct_chg"] = data["close"].pct_change()
# 统计每个小时满足huge_volume == 1的次数
huge_volume_hours = data.groupby(data["date_time"].dt.hour)["huge_volume"].sum()
# 统计每个小时满足huge_volume == 0的次数
# no_huge_volume_hours = (
# data.groupby(data["date_time"].dt.hour)["huge_volume"].count()
# - huge_volume_hours
# )
# 统计每个小时满足huge_volume == 1的次数与满足huge_volume == 0的次数的比率
# huge_volume_ratio_hours = huge_volume_hours / no_huge_volume_hours
# 将huge_volume_ratio_hours转换为百分比
# huge_volume_ratio_hours = huge_volume_ratio_hours * 100
# 统计每个小时满足huge_volume == 1且上涨的次数
huge_volume_rise_hours_df = (
data[(data["huge_volume"] == 1) & (data["pct_chg"] > 0)]
.groupby(data["date_time"].dt.hour)["huge_volume"]
.sum()
)
# 统计每个小时满足huge_volume == 1且下跌的次数
huge_volume_fall_hours_df = (
data[(data["huge_volume"] == 1) & (data["pct_chg"] < 0)]
.groupby(data["date_time"].dt.hour)["huge_volume"]
.sum()
)
# 将huge_volume_hours, no_huge_volume_hours, huge_volume_ratio_hours转换为DataFrame
huge_volume_hours_df = pd.DataFrame(huge_volume_hours)
# no_huge_volume_hours_df = pd.DataFrame(no_huge_volume_hours)
# huge_volume_ratio_hours_df = pd.DataFrame(huge_volume_ratio_hours)
huge_volume_rise_hours_df = pd.DataFrame(huge_volume_rise_hours_df)
huge_volume_fall_hours_df = pd.DataFrame(huge_volume_fall_hours_df)
# 将hour index作为列名: hour,将sum与count后的列名改为huge_volume_count, no_huge_volume_count
huge_volume_hours_df.index.name = "hour"
# no_huge_volume_hours_df.index.name = "hour"
# huge_volume_ratio_hours_df.index.name = "hour"
huge_volume_rise_hours_df.index.name = "hour"
huge_volume_fall_hours_df.index.name = "hour"
huge_volume_hours_df.columns = ["huge_volume_count"]
# no_huge_volume_hours_df.columns = ["no_huge_volume_count"]
# huge_volume_ratio_hours_df.columns = ["huge_volume_ratio"]
huge_volume_rise_hours_df.columns = ["huge_volume_rise_count"]
huge_volume_fall_hours_df.columns = ["huge_volume_fall_count"]
# 将huge_volume_hours_df, no_huge_volume_hours_df, huge_volume_ratio_hours_df, huge_volume_rise_hours_df, huge_volume_fall_hours_df合并为DataFrame
result_df = pd.concat(
[
huge_volume_hours_df,
# no_huge_volume_hours_df,
# huge_volume_ratio_hours_df,
huge_volume_rise_hours_df,
huge_volume_fall_hours_df,
],
axis=1,
)
# 将hour index作为列名: hour
result_df.index.name = "hour"
result_df = result_df.reset_index()
# 将hour index转换为列名: hour, huge_volume_count, no_huge_volume_count, huge_volume_ratio
result_df["symbol"] = data.iloc[0]["symbol"]
result_df["bar"] = data.iloc[0]["bar"]
result_df = result_df[
[
"symbol",
"bar",
"hour",
"huge_volume_count",
# "no_huge_volume_count",
# "huge_volume_ratio",
"huge_volume_rise_count",
"huge_volume_fall_count",
]
]
result_df.reset_index(drop=True, inplace=True)
return result_df
2025-08-07 10:09:51 +00:00
def find_peaks_valleys(self, data: pd.DataFrame, window=10):
"""
识别K线数据的波峰和波谷
参数
df: DataFrame包含'open', 'high', 'low', 'close'索引为时间
window: 窗口大小用于比较前后K线的价格默认10表示左右各10根K线
返回
result_df: DataFrame包含波峰和波谷的时间价格和类型
"""
# 确保输入数据包含必要的列
if not all(col in data.columns for col in ["open", "high", "low", "close"]):
raise ValueError(
"DataFrame must contain 'open', 'high', 'low', 'close' columns"
)
# 初始化结果列表
peaks_valleys = []
# 检测波峰基于high价格
highs = data["high"]
for i in range(window, len(data) - window):
# 当前K线的high价格
current_high = highs.iloc[i]
# 窗口内的前后K线的high价格
window_highs = highs.iloc[i - window : i + window + 1]
# 如果当前high是窗口内的最大值标记为波峰
if (
current_high == window_highs.max()
and current_high > highs.iloc[i - 1]
and current_high > highs.iloc[i + 1]
):
peaks_valleys.append(
{
"symbol": data.iloc[i]["symbol"],
"bar": data.iloc[i]["bar"],
"timestamp": data.iloc[i]["timestamp"],
"date_time": data.iloc[i]["date_time"],
"price": current_high,
"type": "peak",
}
)
# 检测波谷基于low价格
lows = data["low"]
for i in range(window, len(data) - window):
# 当前K线的low价格
current_low = lows.iloc[i]
# 窗口内的前后K线的low价格
window_lows = lows.iloc[i - window : i + window + 1]
# 如果当前low是窗口内的最小值标记为波谷
if (
current_low == window_lows.min()
and current_low < lows.iloc[i - 1]
and current_low < lows.iloc[i + 1]
):
peaks_valleys.append(
{
"symbol": data.iloc[i]["symbol"],
"bar": data.iloc[i]["bar"],
"timestamp": data.iloc[i]["timestamp"],
"date_time": data.iloc[i]["date_time"],
"price": current_low,
"type": "valley",
}
)
# 转换为DataFrame并按时间排序
result_df = pd.DataFrame(peaks_valleys)
if not result_df.empty:
result_df = result_df.sort_values(by="timestamp").reset_index(drop=True)
else:
result_df = pd.DataFrame(
columns=["symbol", "timestamp", "date_time", "bar", "price", "type"]
)
# 检查result_df如果type为peak时下一条数据type依然为peak则删除当前数据
if not result_df.empty:
# 使用布尔索引来标记要删除的行
to_drop_peaks = []
handled_indexes = []
for i in range(len(result_df) - 1):
if i in handled_indexes:
continue
if result_df.iloc[i]["type"] == "peak":
current_peak_value = result_df.iloc[i]["price"]
current_peak_index = i
# 如type连续为peak只应该保留price最大的行删除其他行
# 如type连续为peak且存在price为8 7 10 9 8 11 10的情况只应该保留price为11的行
for j in range(i + 1, len(result_df)):
if result_df.iloc[j]["type"] == "peak":
next_peak_value = result_df.iloc[j]["price"]
if current_peak_value > next_peak_value:
to_drop_peaks.append(j)
else:
to_drop_peaks.append(current_peak_index)
current_peak_value = next_peak_value
current_peak_index = j
handled_indexes.append(j)
else:
break
# 删除标记的行
result_df = result_df.drop(to_drop_peaks).reset_index(drop=True)
# 如type连续为valley只应该保留price最小的行删除其他行
# 如type连续为valley且存在price为8 7 10 9 8的情况只应该保留price为7的行
to_drop_valleys = []
handled_indexes = []
for i in range(len(result_df) - 1):
if i in handled_indexes:
continue
if result_df.iloc[i]["type"] == "valley":
current_valley_value = result_df.iloc[i]["price"]
current_valley_index = i
for j in range(i + 1, len(result_df)):
if result_df.iloc[j]["type"] == "valley":
next_valley_value = result_df.iloc[j]["price"]
if current_valley_value < next_valley_value:
to_drop_valleys.append(j)
else:
to_drop_valleys.append(current_valley_index)
current_valley_value = next_valley_value
current_valley_index = j
handled_indexes.append(j)
else:
break
# 删除标记的行
result_df = result_df.drop(to_drop_valleys).reset_index(drop=True)
# 初始化价格变化列
result_df["price_change"] = 0.0
result_df["price_change_ratio"] = 0.0
# 计算下一条数据与当前数据之间的价格差,并计算价格差与当前数据价格的比率
if len(result_df) > 1:
for i in range(len(result_df) - 1):
result_df.iloc[i + 1, result_df.columns.get_loc("price_change")] = (
result_df.iloc[i + 1]["price"] - result_df.iloc[i]["price"]
)
result_df.iloc[
i + 1, result_df.columns.get_loc("price_change_ratio")
] = (
result_df.iloc[i + 1]["price_change"] / result_df.iloc[i]["price"]
) * 100
return result_df