import core.logger as logging import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from datetime import datetime import re from openpyxl import Workbook from openpyxl.drawing.image import Image import openpyxl from openpyxl.styles import Font from PIL import Image as PILImage from config import OKX_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE from core.db.db_market_data import DBMarketData from core.db.db_huge_volume_data import DBHugeVolumeData from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp # seaborn支持中文 plt.rcParams["font.family"] = ["SimHei"] logger = logging.logger class PriceVolumeStats: def __init__(self): mysql_user = MYSQL_CONFIG.get("user", "xch") mysql_password = MYSQL_CONFIG.get("password", "") if not mysql_password: raise ValueError("MySQL password is not set") mysql_host = MYSQL_CONFIG.get("host", "localhost") mysql_port = MYSQL_CONFIG.get("port", 3306) mysql_database = MYSQL_CONFIG.get("database", "okx") self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" self.db_market_data = DBMarketData(self.db_url) self.db_huge_volume_data = DBHugeVolumeData(self.db_url) self.symbol_list = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get("symbols", []) self.bars_list = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get("bars", []) self.windows_list = WINDOW_SIZE.get("window_sizes", []) self.initial_date = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-15 00:00:00" ) self.end_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.window_size = 100 self.stats_output_dir = "./output/statistics/excel/" os.makedirs(self.stats_output_dir, exist_ok=True) self.stats_chart_dir = "./output/statistics/chart/" os.makedirs(self.stats_chart_dir, exist_ok=True) def batch_price_volume_statistics(self): high_volume_hours_list = [] huge_high_volume_hours_list = [] price_stats_list = [] pct_change_stats_list = [] peak_valley_data_list = [] peak_valley_stats_list = [] volume_stats_list = [] price_volume_stats_list = [] earliest_market_timestamp = None latest_market_timestamp = None logger.info( f"开始统计{self.initial_date}到{self.end_date}的价格、成交量、成交量与高价低价关系数据" ) for symbol in self.symbol_list: for bar in self.bars_list: data = self.db_huge_volume_data.query_huge_volume_data_by_symbol_bar_window_size( symbol, bar, self.window_size, self.initial_date, self.end_date ) if data is not None: data = pd.DataFrame(data) data.sort_values(by="timestamp", inplace=True) # pct_change计算涨跌百分比 data["pct_change"] = data["close"].pct_change() * 100 if earliest_market_timestamp is None: earliest_market_timestamp = data["timestamp"].iloc[0] else: if data["timestamp"].iloc[0] < earliest_market_timestamp: earliest_market_timestamp = data["timestamp"].iloc[0] if latest_market_timestamp is None: latest_market_timestamp = data["timestamp"].iloc[-1] else: if data["timestamp"].iloc[-1] > latest_market_timestamp: latest_market_timestamp = data["timestamp"].iloc[-1] # 统计高成交量小时分布 logger.info(f"统计{symbol} {bar} 巨量小时分布数据") high_volume_hours_data = self.stats_high_volume_hours(data) high_volume_hours_list.append(high_volume_hours_data) huge_high_volume_hours_data = self.stats_high_volume_hours(data, 4) huge_high_volume_hours_list.append(huge_high_volume_hours_data) logger.info(f"统计{symbol} {bar} 价格数据") price_stats_data = self.calculate_price_statistics(data) logger.info(f"统计{symbol} {bar} 涨跌百分比数据") pct_change_stats_data = self.calculate_pct_change_statistics(data) logger.info(f"统计{symbol} {bar} 价格变化峰值和谷值数据") peak_valley_data, peak_valley_stats_data = ( self.calculate_price_change_peak_valley_statistics(data) ) logger.info(f"统计{symbol} {bar} 成交量数据") volume_stats_data = self.calculate_volume_statistics(data) logger.info(f"统计{symbol} {bar} 成交量与高价低价关系数据") price_volume_stats_data = self.calculate_price_volume_statistics( data ) price_stats_list.append(price_stats_data) pct_change_stats_list.append(pct_change_stats_data) peak_valley_data_list.append(peak_valley_data) peak_valley_stats_list.append(peak_valley_stats_data) volume_stats_list.append(volume_stats_data) price_volume_stats_list.append(price_volume_stats_data) high_volume_hours_df = pd.concat(high_volume_hours_list) high_volume_hours_df.sort_values(by=["symbol", "bar", "hour"], inplace=True) huge_high_volume_hours_df = pd.concat(huge_high_volume_hours_list) huge_high_volume_hours_df.sort_values(by=["symbol", "bar", "hour"], inplace=True) price_stats_df = pd.DataFrame(price_stats_list) price_stats_df.sort_values(by=["symbol", "bar"], inplace=True) pct_change_stats_df = pd.DataFrame(pct_change_stats_list) pct_change_stats_df.sort_values(by=["symbol", "bar"], inplace=True) peak_valley_data_df = pd.concat(peak_valley_data_list) peak_valley_data_df.sort_values(by=["symbol", "bar", "timestamp"], inplace=True) peak_valley_stats_df = pd.DataFrame(peak_valley_stats_list) peak_valley_stats_df.sort_values(by=["symbol", "bar"], inplace=True) volume_stats_df = pd.DataFrame(volume_stats_list) volume_stats_df.sort_values(by=["symbol", "bar"], inplace=True) price_volume_stats_df = pd.DataFrame(price_volume_stats_list) price_volume_stats_df.sort_values(by=["symbol", "bar"], inplace=True) earliest_market_date_time = timestamp_to_datetime(earliest_market_timestamp) earliest_market_date_time = re.sub( r"[\:\-\s]", "", str(earliest_market_date_time) ) latest_market_date_time = timestamp_to_datetime(latest_market_timestamp) latest_market_date_time = re.sub(r"[\:\-\s]", "", str(latest_market_date_time)) output_file_name = f"price_volume_stats_window_size_{self.window_size}_from_{earliest_market_date_time}_to_{latest_market_date_time}.xlsx" output_file_path = os.path.join(self.stats_output_dir, output_file_name) logger.info(f"导出{output_file_path}") with pd.ExcelWriter(output_file_path) as writer: price_stats_df.to_excel(writer, sheet_name="价格统计", index=False) pct_change_stats_df.to_excel( writer, sheet_name="涨跌百分比统计", index=False ) peak_valley_data_df.to_excel( writer, sheet_name="波峰波谷明细", index=False ) peak_valley_stats_df.to_excel( writer, sheet_name="波峰波谷统计", index=False ) volume_stats_df.to_excel(writer, sheet_name="量能统计", index=False) price_volume_stats_df.to_excel( writer, sheet_name="量价统计", index=False ) high_volume_hours_df.to_excel( writer, sheet_name="放量小时分布", index=False ) huge_high_volume_hours_df.to_excel( writer, sheet_name="4倍放量小时分布", index=False ) chart_dict = self.draw_price_change_peak_valley_chart(peak_valley_stats_df) self.output_chart_to_excel(output_file_path, chart_dict) chart_dict = self.draw_high_volume_hours_chart(high_volume_hours_df, normal=True) self.output_chart_to_excel(output_file_path, chart_dict) chart_dict = self.draw_high_volume_hours_chart(huge_high_volume_hours_df, normal=False) self.output_chart_to_excel(output_file_path, chart_dict) return price_stats_df, volume_stats_df, price_volume_stats_df def calculate_price_statistics(self, data: pd.DataFrame): """ 计算价格统计数据 :param data: 市场数据 :return: 价格统计数据 """ if data is None: return None stats_data = self.base_statistics(data, "close") return stats_data def calculate_pct_change_statistics(self, data: pd.DataFrame): """ 计算涨跌百分比统计数据 :param data: 市场数据 :return: 涨跌百分比统计数据 """ if data is None: return None stats_data = self.base_statistics(data, "pct_change") return stats_data def calculate_volume_statistics(self, data): """ 计算成交量统计数据 针对volume计算, 包括:max, min, mean, std, median,percentile, mode, range, kurtosis, skewness, z-score :param data: 市场数据 :return: 成交量统计数据 """ if data is None: return None stats_data = self.base_statistics(data, "volume") return stats_data def base_statistics(self, data: pd.DataFrame, column: str): """ 计算基础统计数据 :param data: 市场数据 :param column: 列名 针对column计算, 包括:max, min, mean, std, median,percentile, mode, range, kurtosis, skewness, z-score Max(最大值) 含义:数据集中的最大值 应用:在K线数据中, max可表示high价格的最高点, 标识波峰 示例:某股票一周最高价为120 Min(最小值) 含义:数据集中的最小值 应用:在K线数据中, min可表示low价格的最低点, 标识波谷 示例:某股票一周最低价为100 Mean(均值/平均值) 含义:所有值的总和除以数据点个数, mean = Σx / n 应用:分析K线close价格平均水平, 或SEC财务数据的平均收入 示例:一周收盘价均值为105 Std(标准差) 含义:衡量数据点偏离均值的分散程度, std = sqrt(Σ(x - mean)² / n) 应用:在K线中, 高std表示价格波动大; 在SEC数据中, 反映收入稳定性 示例:股票价格std为3.2, 波动较大 Median(中位数) 含义:数据集排序后的中间值(偶数个数据取中间两值平均) 应用:对K线close或SEC收入计算中位数, 减少极端值影响 示例:收盘价[100, 102, 105, 110, 120]的中位数为105 Percentile(百分位数) 含义:数据集中低于某值的百分比位置, 如第25百分位数(Q1)表示25%数据低于该值 应用:在K线数据中, 分析价格分布(如75%价格低于某值); 在SEC数据中, 比较公司收入在行业中的排名 示例:股票价格的75th percentile为110, 表示75%价格低于110 Range(极差) 含义:最大值与最小值之差, range = max - min 应用:衡量K线价格波动幅度, 或SEC数据(如利润)的变化范围 示例:股票一周价格极差为20(120 - 100) Kurtosis(峰度) 含义:描述数据分布的尖锐程度和尾部厚度 高峰度(>3):分布尖锐, 尾部厚(多极端值) 低峰度(<3):分布平坦, 尾部薄 应用:在K线回报率中, 高峰度提示极端波动风险; 在SEC数据中, 分析收入异常值 示例:股票回报率峰度为4, 可能有较大波动 Skewness(偏度) 含义:描述数据分布的偏斜方向 正偏(>0):右尾长(如少数高收入) 负偏(<0):左尾长(如多亏损) 零偏(≈0):分布对称 应用:在K线中, 正偏表示上涨概率高; 在SEC数据中, 偏度反映财务指标非对称性 示例:股票收益正偏为0.5, 上涨可能性较大 :return: 基础统计数据 """ if data is None: return None if column not in data.columns: return None stats_data = {"symbol": data["symbol"].iloc[0], "bar": data["bar"].iloc[0]} target_data = data[column] stats_data["count"] = len(target_data) stats_data["max"] = target_data.max() stats_data["min"] = target_data.min() stats_data["mean"] = target_data.mean() stats_data["std"] = target_data.std() stats_data["median"] = target_data.median() if column in ["pct_change", "price_change_ratio"]: # 计算上涨次数,下跌次数,上涨次数/总次数 stats_data["up_count"] = len(target_data[target_data > 0]) stats_data["down_count"] = len(target_data[target_data < 0]) stats_data["up_ratio"] = ( stats_data["up_count"] / stats_data["count"] ) * 100 stats_data["down_ratio"] = ( stats_data["down_count"] / stats_data["count"] ) * 100 # 计算上涨过程中,平均涨幅,涨幅中位数,涨幅标准差 up_data = target_data[target_data > 0] stats_data["up_mean"] = up_data.mean() stats_data["up_median"] = up_data.median() stats_data["up_std"] = up_data.std() # 计算下跌过程中,平均跌幅,跌幅中位数,跌幅标准差 down_data = target_data[target_data < 0] stats_data["down_mean"] = down_data.mean() stats_data["down_median"] = down_data.median() stats_data["down_std"] = down_data.std() stats_data["range"] = target_data.max() - target_data.min() stats_data["kurtosis"] = target_data.kurt() stats_data["skewness"] = target_data.skew() # 计算百分位数,要求10,20,30,40,50,60,70,80,90,100 percentile_list = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] percentile_data = target_data.quantile(percentile_list) for i in range(len(percentile_list)): percentile_position = percentile_list[i] percentile_name = f"percentile_{int(percentile_position * 100)}" stats_data[percentile_name] = percentile_data.loc[percentile_position] return stats_data def calculate_price_volume_statistics(self, data): """ 计算高价,低价出现时,与巨量成交量之间的关系 :param data: 市场数据 :return: 价格成交量统计数据 """ if data is None: return None stats_data = {"symbol": data["symbol"].iloc[0], "bar": data["bar"].iloc[0]} stats_data["count"] = len(data) stats_data["huge_volume_count"] = len(data[data["huge_volume"] == 1]) stats_data["huge_volume_ratio"] = ( stats_data["huge_volume_count"] / stats_data["count"] ) * 100 stats_data["close_80_high_count"] = len(data[data["close_80_high"] == 1]) stats_data["hv_close_80_high_count"] = len( data[(data["huge_volume"] == 1) & (data["close_80_high"] == 1)] ) stats_data["hv_close_80_high_ratio"] = ( stats_data["hv_close_80_high_count"] / stats_data["close_80_high_count"] ) * 100 stats_data["close_90_high_count"] = len(data[data["close_90_high"] == 1]) stats_data["hv_close_90_high_count"] = len( data[(data["huge_volume"] == 1) & (data["close_90_high"] == 1)] ) stats_data["hv_close_90_high_ratio"] = ( stats_data["hv_close_90_high_count"] / stats_data["close_90_high_count"] ) * 100 stats_data["high_80_high_count"] = len(data[data["high_80_high"] == 1]) stats_data["hv_high_80_high_count"] = len( data[(data["huge_volume"] == 1) & (data["high_80_high"] == 1)] ) stats_data["hv_high_80_high_ratio"] = ( stats_data["hv_high_80_high_count"] / stats_data["high_80_high_count"] ) * 100 stats_data["high_90_high_count"] = len(data[data["high_90_high"] == 1]) stats_data["hv_high_90_high_count"] = len( data[(data["huge_volume"] == 1) & (data["high_90_high"] == 1)] ) stats_data["hv_high_90_high_ratio"] = ( stats_data["hv_high_90_high_count"] / stats_data["high_90_high_count"] ) * 100 stats_data["close_20_low_count"] = len(data[data["close_20_low"] == 1]) stats_data["hv_close_20_low_count"] = len( data[(data["huge_volume"] == 1) & (data["close_20_low"] == 1)] ) stats_data["hv_close_20_low_ratio"] = ( stats_data["hv_close_20_low_count"] / stats_data["close_20_low_count"] ) * 100 stats_data["close_10_low_count"] = len(data[data["close_10_low"] == 1]) stats_data["hv_close_10_low_count"] = len( data[(data["huge_volume"] == 1) & (data["close_10_low"] == 1)] ) stats_data["low_20_low_count"] = len(data[data["low_20_low"] == 1]) stats_data["hv_low_20_low_count"] = len( data[(data["huge_volume"] == 1) & (data["low_20_low"] == 1)] ) stats_data["hv_low_20_low_ratio"] = ( stats_data["hv_low_20_low_count"] / stats_data["low_20_low_count"] ) * 100 stats_data["low_10_low_count"] = len(data[data["low_10_low"] == 1]) stats_data["hv_low_10_low_count"] = len( data[(data["huge_volume"] == 1) & (data["low_10_low"] == 1)] ) stats_data["hv_low_10_low_ratio"] = ( stats_data["hv_low_10_low_count"] / stats_data["low_10_low_count"] ) * 100 # huge_volume == 1时,价格处于高位的次数 stats_data["hv_price_high_count"] = len( data[ (data["huge_volume"] == 1) & ( (data["close_80_high"] == 1) | (data["close_90_high"] == 1) | (data["high_80_high"] == 1) | (data["high_90_high"] == 1) ) ] ) stats_data["hv_price_high_ratio"] = ( stats_data["hv_price_high_count"] / stats_data["huge_volume_count"] ) * 100 # huge_volume == 1时,价格处于低位的次数 stats_data["hv_price_low_count"] = len( data[ (data["huge_volume"] == 1) & ( (data["close_20_low"] == 1) | (data["close_10_low"] == 1) | (data["low_20_low"] == 1) | (data["low_10_low"] == 1) ) ] ) stats_data["hv_price_low_ratio"] = ( stats_data["hv_price_low_count"] / stats_data["huge_volume_count"] ) * 100 return stats_data def calculate_price_change_peak_valley_statistics(self, data): """ 计算价格变化峰值和谷值统计数据 :param data: 市场数据 :return: 价格变化峰值和谷值统计数据 """ if data is None: return None peak_valley_data = self.find_peaks_valleys(data) stats_data = self.base_statistics(peak_valley_data, "price_change_ratio") return peak_valley_data, stats_data def draw_high_volume_hours_chart(self, data: pd.DataFrame, normal: bool = True): """ 绘制高成交量小时分布图表(美观,保存到self.stats_chart_dir) :param data: 高成交量小时分布数据(如high_volume_hours_df) :return: None """ if data is None or data.empty: return None # seaborn风格设置 sns.set_theme(style="whitegrid") # plt.rcParams['font.family'] = "SimHei" plt.rcParams["font.sans-serif"] = ["SimHei"] # 也可直接用字体名 plt.rcParams["font.size"] = 11 # 设置字体大小 plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题 chart_dict = {} for symbol in data["symbol"].unique(): symbol_data = data[data["symbol"] == symbol] if normal: sheet_name = f"{symbol}_量时分布图表" else: sheet_name = f"{symbol}_4倍量时分布图表" chart_dict[sheet_name] = {} for bar in symbol_data["bar"].unique(): bar_data = symbol_data[symbol_data["bar"] == bar].copy() # 将hour改名为小时 bar_data.rename(columns={"hour": "小时"}, inplace=True) # huge_volume_count改名为巨量次数 bar_data.rename(columns={"huge_volume_count": "巨量次数"}, inplace=True) # huge_volume_ratio改名为巨量次数占比 bar_data.rename(columns={"huge_volume_ratio": "巨量次数占比"}, inplace=True) # huge_volume_rise_count改名为巨量上涨次数 bar_data.rename(columns={"huge_volume_rise_count": "巨量上涨次数"}, inplace=True) # huge_volume_fall_count改名为巨量下跌次数 bar_data.rename(columns={"huge_volume_fall_count": "巨量下跌次数"}, inplace=True) bar_data.reset_index(drop=True, inplace=True) fig, axes = plt.subplots(1, 2, figsize=(14, 5)) fig.suptitle(f"巨量小时分布 - {symbol} {bar}", fontsize=18) # huge_volume_count # 柱状图使用不同颜色,巨量次数使用渐变蓝色 palette = sns.color_palette("Blues_d", 2) palette[0] = sns.color_palette("Blues_d", 2)[1] palette[1] = sns.color_palette("Reds_d", 2)[1] sns.barplot( ax=axes[0], x="小时", y="巨量次数", data=bar_data, hue="symbol", palette=palette, legend=False, ) axes[0].set_title("巨量小时分布") axes[0].set_ylabel("巨量次数") # huge_volume_rise_count与huge_volume_fall_count # 创建一个图表,都位于axes[1, 0],包含两个柱状图: # huge_volume_rise_count与huge_volume_fall_count并列放置, # 并使用不同的颜色 df_long = pd.melt(bar_data, id_vars=['小时'], value_vars=['巨量上涨次数', '巨量下跌次数'], var_name='类别', value_name='次数') # 柱状图使用不同颜色,巨量上涨次数使用渐变红色,巨量下跌次数使用渐变绿色 palette = sns.color_palette("Blues_d", 2) palette[0] = sns.color_palette("Reds_d", 2)[1] palette[1] = sns.color_palette("Greens_d", 2)[1] sns.barplot( ax=axes[1], x="小时", y="次数", data=df_long, hue="类别", palette=palette, legend=False, ) axes[1].set_title("巨量小时上涨下跌分布") axes[1].set_ylabel("次数") # 旋转x轴标签 for ax in axes.flat: for label in ax.get_xticklabels(): label.set_rotation(45) plt.tight_layout(rect=[0, 0, 1, 0.96]) if normal: save_path = os.path.join(self.stats_chart_dir, f"{symbol}_{bar}_high_volume_hours.png") else: save_path = os.path.join(self.stats_chart_dir, f"{symbol}_{bar}_4_high_volume_hours.png") plt.savefig(save_path, dpi=150) plt.close(fig) chart_dict[sheet_name][ f"巨量小时分布 - {bar}" ] = save_path return chart_dict def draw_price_change_peak_valley_chart(self, data: pd.DataFrame): """ 绘制价格变化峰值和谷值图表(美观,保存到self.stats_chart_dir) :param data: 价格变化峰值和谷值统计数据(如peak_valley_stats_df) :return: None """ if data is None or data.empty: return None # seaborn风格设置 sns.set_theme(style="whitegrid") # plt.rcParams['font.family'] = "SimHei" plt.rcParams["font.sans-serif"] = ["SimHei"] # 也可直接用字体名 plt.rcParams["font.size"] = 11 # 设置字体大小 plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题 chart_dict = {"波峰波谷图表": {}} for bar in data["bar"].unique(): bar_data = data[data["bar"] == bar] fig, axes = plt.subplots(2, 2, figsize=(14, 10)) fig.suptitle(f"波段变化峰值和谷值统计 - {bar}", fontsize=18) # up_mean sns.barplot( ax=axes[0, 0], x="symbol", y="up_mean", data=bar_data, hue="symbol", palette="Blues_d", legend=False, ) axes[0, 0].set_title("波段上涨均值(%)") axes[0, 0].set_ylabel("up_mean") # down_mean sns.barplot( ax=axes[0, 1], x="symbol", y=bar_data["down_mean"].abs(), data=bar_data, hue="symbol", palette="Reds_d", legend=False, ) axes[0, 1].set_title("波段下跌均值(绝对值,%)") axes[0, 1].set_ylabel("down_mean(abs)") # max sns.barplot( ax=axes[1, 0], x="symbol", y="max", data=bar_data, hue="symbol", palette="Greens_d", legend=False, ) axes[1, 0].set_title("波段最大涨幅(%)") axes[1, 0].set_ylabel("max") # min sns.barplot( ax=axes[1, 1], x="symbol", y=bar_data["min"].abs(), data=bar_data, hue="symbol", palette="Purples_d", legend=False, ) axes[1, 1].set_title("波段最大跌幅(绝对值,%)") axes[1, 1].set_ylabel("min(abs)") # 旋转x轴标签 for ax in axes.flat: for label in ax.get_xticklabels(): label.set_rotation(45) plt.tight_layout(rect=[0, 0, 1, 0.96]) save_path = os.path.join(self.stats_chart_dir, f"peak_valley_{bar}.png") plt.savefig(save_path, dpi=150) plt.close(fig) chart_dict["波峰波谷图表"][ f"波段变化峰值和谷值统计 - {bar}" ] = save_path return chart_dict def output_chart_to_excel(self, excel_file_path: str, charts_dict: dict): """ 输出Excel文件,包含所有图表 charts_dict: 图表数据字典,格式为: { "sheet_name": { "chart_name": "chart_path" } } """ logger.info(f"将图表输出到{excel_file_path}") # 打开已经存在的Excel文件 wb = openpyxl.load_workbook(excel_file_path) for sheet_name, chart_data_dict in charts_dict.items(): try: ws = wb.create_sheet(title=sheet_name) row_offset = 1 for chart_name, chart_path in chart_data_dict.items(): # Load image to get dimensions with PILImage.open(chart_path) as img: width_px, height_px = img.size # Convert pixel height to Excel row height (approximate: 1 point = 1.333 pixels, 1 row ≈ 15 points for 20 pixels) pixels_per_point = 1.333 points_per_row = 15 # Default row height in points pixels_per_row = ( points_per_row * pixels_per_point ) # ≈ 20 pixels per row chart_rows = max( 10, int(height_px / pixels_per_row) ) # Minimum 10 rows for small charts # Add chart title # 支持中文标题 ws[f"A{row_offset}"] = chart_name.encode("utf-8").decode("utf-8") ws[f"A{row_offset}"].font = openpyxl.styles.Font(bold=True, size=12) row_offset += 2 # Add 2 rows for title and spacing # Insert chart image img = Image(chart_path) ws.add_image(img, f"A{row_offset}") # Update row offset (chart height + padding) row_offset += ( chart_rows + 5 ) # Add 5 rows for padding between charts except Exception as e: logger.error(f"输出Excel Sheet {sheet_name} 失败: {e}") continue # Save Excel file wb.save(excel_file_path) print(f"Chart saved as {excel_file_path}") def stats_high_volume_hours(self, data: pd.DataFrame, volume_ratio_threshold: int = None): """ 统计巨量小时分布 小时包括0-23点,每小时一个数据 首先不区分价格涨跌,统计每个小时,满足huge_volume == 1的次数 然后区分价格涨跌,统计每个小时,满足huge_volume == 1的次数 最后统计每个小时,满足huge_volume == 1的次数,与满足huge_volume == 0的次数的比率 :param data: 市场数据 :return: 巨量小时分布 """ if data is None: return None if volume_ratio_threshold is not None and volume_ratio_threshold > 0: data = data[data["volume_ratio"] >= volume_ratio_threshold] # 将date_time转换为datetime类型 data["date_time"] = pd.to_datetime(data["date_time"]) # 通过pandas自带的功能,计算pct_chg data["pct_chg"] = data["close"].pct_change() # 统计每个小时,满足huge_volume == 1的次数 huge_volume_hours = data.groupby(data["date_time"].dt.hour)["huge_volume"].sum() # 统计每个小时,满足huge_volume == 0的次数 # no_huge_volume_hours = ( # data.groupby(data["date_time"].dt.hour)["huge_volume"].count() # - huge_volume_hours # ) # 统计每个小时,满足huge_volume == 1的次数,与满足huge_volume == 0的次数的比率 # huge_volume_ratio_hours = huge_volume_hours / no_huge_volume_hours # 将huge_volume_ratio_hours转换为百分比 # huge_volume_ratio_hours = huge_volume_ratio_hours * 100 # 统计每个小时,满足huge_volume == 1且上涨的次数 huge_volume_rise_hours_df = ( data[(data["huge_volume"] == 1) & (data["pct_chg"] > 0)] .groupby(data["date_time"].dt.hour)["huge_volume"] .sum() ) # 统计每个小时,满足huge_volume == 1且下跌的次数 huge_volume_fall_hours_df = ( data[(data["huge_volume"] == 1) & (data["pct_chg"] < 0)] .groupby(data["date_time"].dt.hour)["huge_volume"] .sum() ) # 将huge_volume_hours, no_huge_volume_hours, huge_volume_ratio_hours转换为DataFrame huge_volume_hours_df = pd.DataFrame(huge_volume_hours) # no_huge_volume_hours_df = pd.DataFrame(no_huge_volume_hours) # huge_volume_ratio_hours_df = pd.DataFrame(huge_volume_ratio_hours) huge_volume_rise_hours_df = pd.DataFrame(huge_volume_rise_hours_df) huge_volume_fall_hours_df = pd.DataFrame(huge_volume_fall_hours_df) # 将hour index作为列名: hour,将sum与count后的列名改为huge_volume_count, no_huge_volume_count huge_volume_hours_df.index.name = "hour" # no_huge_volume_hours_df.index.name = "hour" # huge_volume_ratio_hours_df.index.name = "hour" huge_volume_rise_hours_df.index.name = "hour" huge_volume_fall_hours_df.index.name = "hour" huge_volume_hours_df.columns = ["huge_volume_count"] # no_huge_volume_hours_df.columns = ["no_huge_volume_count"] # huge_volume_ratio_hours_df.columns = ["huge_volume_ratio"] huge_volume_rise_hours_df.columns = ["huge_volume_rise_count"] huge_volume_fall_hours_df.columns = ["huge_volume_fall_count"] # 将huge_volume_hours_df, no_huge_volume_hours_df, huge_volume_ratio_hours_df, huge_volume_rise_hours_df, huge_volume_fall_hours_df合并为DataFrame result_df = pd.concat( [ huge_volume_hours_df, # no_huge_volume_hours_df, # huge_volume_ratio_hours_df, huge_volume_rise_hours_df, huge_volume_fall_hours_df, ], axis=1, ) # 将hour index作为列名: hour result_df.index.name = "hour" result_df = result_df.reset_index() # 将hour index转换为列名: hour, huge_volume_count, no_huge_volume_count, huge_volume_ratio result_df["symbol"] = data.iloc[0]["symbol"] result_df["bar"] = data.iloc[0]["bar"] result_df = result_df[ [ "symbol", "bar", "hour", "huge_volume_count", # "no_huge_volume_count", # "huge_volume_ratio", "huge_volume_rise_count", "huge_volume_fall_count", ] ] result_df.reset_index(drop=True, inplace=True) return result_df def find_peaks_valleys(self, data: pd.DataFrame, window=10): """ 识别K线数据的波峰和波谷 参数: df: DataFrame,包含'open', 'high', 'low', 'close'列,索引为时间 window: 窗口大小,用于比较前后K线的价格(默认10,表示左右各10根K线) 返回: result_df: DataFrame,包含波峰和波谷的时间、价格和类型 """ # 确保输入数据包含必要的列 if not all(col in data.columns for col in ["open", "high", "low", "close"]): raise ValueError( "DataFrame must contain 'open', 'high', 'low', 'close' columns" ) # 初始化结果列表 peaks_valleys = [] # 检测波峰(基于high价格) highs = data["high"] for i in range(window, len(data) - window): # 当前K线的high价格 current_high = highs.iloc[i] # 窗口内的前后K线的high价格 window_highs = highs.iloc[i - window : i + window + 1] # 如果当前high是窗口内的最大值,标记为波峰 if ( current_high == window_highs.max() and current_high > highs.iloc[i - 1] and current_high > highs.iloc[i + 1] ): peaks_valleys.append( { "symbol": data.iloc[i]["symbol"], "bar": data.iloc[i]["bar"], "timestamp": data.iloc[i]["timestamp"], "date_time": data.iloc[i]["date_time"], "price": current_high, "type": "peak", } ) # 检测波谷(基于low价格) lows = data["low"] for i in range(window, len(data) - window): # 当前K线的low价格 current_low = lows.iloc[i] # 窗口内的前后K线的low价格 window_lows = lows.iloc[i - window : i + window + 1] # 如果当前low是窗口内的最小值,标记为波谷 if ( current_low == window_lows.min() and current_low < lows.iloc[i - 1] and current_low < lows.iloc[i + 1] ): peaks_valleys.append( { "symbol": data.iloc[i]["symbol"], "bar": data.iloc[i]["bar"], "timestamp": data.iloc[i]["timestamp"], "date_time": data.iloc[i]["date_time"], "price": current_low, "type": "valley", } ) # 转换为DataFrame并按时间排序 result_df = pd.DataFrame(peaks_valleys) if not result_df.empty: result_df = result_df.sort_values(by="timestamp").reset_index(drop=True) else: result_df = pd.DataFrame( columns=["symbol", "timestamp", "date_time", "bar", "price", "type"] ) # 检查result_df,如果type为peak时,下一条数据type依然为peak,则删除当前数据 if not result_df.empty: # 使用布尔索引来标记要删除的行 to_drop_peaks = [] handled_indexes = [] for i in range(len(result_df) - 1): if i in handled_indexes: continue if result_df.iloc[i]["type"] == "peak": current_peak_value = result_df.iloc[i]["price"] current_peak_index = i # 如type连续为peak,只应该保留price最大的行,删除其他行 # 如type连续为peak且存在price为8 7 10 9 8 11 10的情况,只应该保留price为11的行 for j in range(i + 1, len(result_df)): if result_df.iloc[j]["type"] == "peak": next_peak_value = result_df.iloc[j]["price"] if current_peak_value > next_peak_value: to_drop_peaks.append(j) else: to_drop_peaks.append(current_peak_index) current_peak_value = next_peak_value current_peak_index = j handled_indexes.append(j) else: break # 删除标记的行 result_df = result_df.drop(to_drop_peaks).reset_index(drop=True) # 如type连续为valley,只应该保留price最小的行,删除其他行 # 如type连续为valley且存在price为8 7 10 9 8的情况,只应该保留price为7的行 to_drop_valleys = [] handled_indexes = [] for i in range(len(result_df) - 1): if i in handled_indexes: continue if result_df.iloc[i]["type"] == "valley": current_valley_value = result_df.iloc[i]["price"] current_valley_index = i for j in range(i + 1, len(result_df)): if result_df.iloc[j]["type"] == "valley": next_valley_value = result_df.iloc[j]["price"] if current_valley_value < next_valley_value: to_drop_valleys.append(j) else: to_drop_valleys.append(current_valley_index) current_valley_value = next_valley_value current_valley_index = j handled_indexes.append(j) else: break # 删除标记的行 result_df = result_df.drop(to_drop_valleys).reset_index(drop=True) # 初始化价格变化列 result_df["price_change"] = 0.0 result_df["price_change_ratio"] = 0.0 # 计算下一条数据与当前数据之间的价格差,并计算价格差与当前数据价格的比率 if len(result_df) > 1: for i in range(len(result_df) - 1): result_df.iloc[i + 1, result_df.columns.get_loc("price_change")] = ( result_df.iloc[i + 1]["price"] - result_df.iloc[i]["price"] ) result_df.iloc[ i + 1, result_df.columns.get_loc("price_change_ratio") ] = ( result_df.iloc[i + 1]["price_change"] / result_df.iloc[i]["price"] ) * 100 return result_df