support statistic volume-hour distribution

This commit is contained in:
blade 2025-08-08 18:02:37 +08:00
parent 5969f427a5
commit 38a4465e29
1 changed files with 214 additions and 8 deletions

View File

@ -51,6 +51,8 @@ class PriceVolumeStats:
os.makedirs(self.stats_chart_dir, exist_ok=True) os.makedirs(self.stats_chart_dir, exist_ok=True)
def batch_price_volume_statistics(self): def batch_price_volume_statistics(self):
high_volume_hours_list = []
huge_high_volume_hours_list = []
price_stats_list = [] price_stats_list = []
pct_change_stats_list = [] pct_change_stats_list = []
peak_valley_data_list = [] peak_valley_data_list = []
@ -83,6 +85,12 @@ class PriceVolumeStats:
else: else:
if data["timestamp"].iloc[-1] > latest_market_timestamp: if data["timestamp"].iloc[-1] > latest_market_timestamp:
latest_market_timestamp = data["timestamp"].iloc[-1] latest_market_timestamp = data["timestamp"].iloc[-1]
# 统计高成交量小时分布
logging.info(f"统计{symbol} {bar} 巨量小时分布数据")
high_volume_hours_data = self.stats_high_volume_hours(data)
high_volume_hours_list.append(high_volume_hours_data)
huge_high_volume_hours_data = self.stats_high_volume_hours(data, 4)
huge_high_volume_hours_list.append(huge_high_volume_hours_data)
logging.info(f"统计{symbol} {bar} 价格数据") logging.info(f"统计{symbol} {bar} 价格数据")
price_stats_data = self.calculate_price_statistics(data) price_stats_data = self.calculate_price_statistics(data)
logging.info(f"统计{symbol} {bar} 涨跌百分比数据") logging.info(f"统计{symbol} {bar} 涨跌百分比数据")
@ -103,6 +111,10 @@ class PriceVolumeStats:
peak_valley_stats_list.append(peak_valley_stats_data) peak_valley_stats_list.append(peak_valley_stats_data)
volume_stats_list.append(volume_stats_data) volume_stats_list.append(volume_stats_data)
price_volume_stats_list.append(price_volume_stats_data) price_volume_stats_list.append(price_volume_stats_data)
high_volume_hours_df = pd.concat(high_volume_hours_list)
high_volume_hours_df.sort_values(by=["symbol", "bar", "hour"], inplace=True)
huge_high_volume_hours_df = pd.concat(huge_high_volume_hours_list)
huge_high_volume_hours_df.sort_values(by=["symbol", "bar", "hour"], inplace=True)
price_stats_df = pd.DataFrame(price_stats_list) price_stats_df = pd.DataFrame(price_stats_list)
price_stats_df.sort_values(by=["symbol", "bar"], inplace=True) price_stats_df.sort_values(by=["symbol", "bar"], inplace=True)
pct_change_stats_df = pd.DataFrame(pct_change_stats_list) pct_change_stats_df = pd.DataFrame(pct_change_stats_list)
@ -126,22 +138,32 @@ class PriceVolumeStats:
output_file_path = os.path.join(self.stats_output_dir, output_file_name) output_file_path = os.path.join(self.stats_output_dir, output_file_name)
logging.info(f"导出{output_file_path}") logging.info(f"导出{output_file_path}")
with pd.ExcelWriter(output_file_path) as writer: with pd.ExcelWriter(output_file_path) as writer:
price_stats_df.to_excel(writer, sheet_name="price_stats", index=False) price_stats_df.to_excel(writer, sheet_name="价格统计", index=False)
pct_change_stats_df.to_excel( pct_change_stats_df.to_excel(
writer, sheet_name="pct_change_stats", index=False writer, sheet_name="涨跌百分比统计", index=False
) )
peak_valley_data_df.to_excel( peak_valley_data_df.to_excel(
writer, sheet_name="peak_valley_data", index=False writer, sheet_name="波峰波谷明细", index=False
) )
peak_valley_stats_df.to_excel( peak_valley_stats_df.to_excel(
writer, sheet_name="peak_valley_stats", index=False writer, sheet_name="波峰波谷统计", index=False
) )
volume_stats_df.to_excel(writer, sheet_name="volume_stats", index=False) volume_stats_df.to_excel(writer, sheet_name="量能统计", index=False)
price_volume_stats_df.to_excel( price_volume_stats_df.to_excel(
writer, sheet_name="price_volume_stats", index=False writer, sheet_name="量价统计", index=False
)
high_volume_hours_df.to_excel(
writer, sheet_name="放量小时分布", index=False
)
huge_high_volume_hours_df.to_excel(
writer, sheet_name="4倍放量小时分布", index=False
) )
chart_dict = self.draw_price_change_peak_valley_chart(peak_valley_stats_df) chart_dict = self.draw_price_change_peak_valley_chart(peak_valley_stats_df)
self.output_chart_to_excel(output_file_path, chart_dict) self.output_chart_to_excel(output_file_path, chart_dict)
chart_dict = self.draw_high_volume_hours_chart(high_volume_hours_df, normal=True)
self.output_chart_to_excel(output_file_path, chart_dict)
chart_dict = self.draw_high_volume_hours_chart(huge_high_volume_hours_df, normal=False)
self.output_chart_to_excel(output_file_path, chart_dict)
return price_stats_df, volume_stats_df, price_volume_stats_df return price_stats_df, volume_stats_df, price_volume_stats_df
def calculate_price_statistics(self, data: pd.DataFrame): def calculate_price_statistics(self, data: pd.DataFrame):
@ -395,6 +417,99 @@ class PriceVolumeStats:
stats_data = self.base_statistics(peak_valley_data, "price_change_ratio") stats_data = self.base_statistics(peak_valley_data, "price_change_ratio")
return peak_valley_data, stats_data return peak_valley_data, stats_data
def draw_high_volume_hours_chart(self, data: pd.DataFrame, normal: bool = True):
"""
绘制高成交量小时分布图表美观保存到self.stats_chart_dir
:param data: 高成交量小时分布数据如high_volume_hours_df
:return: None
"""
if data is None or data.empty:
return None
# seaborn风格设置
sns.set_theme(style="whitegrid")
# plt.rcParams['font.family'] = "SimHei"
plt.rcParams["font.sans-serif"] = ["SimHei"] # 也可直接用字体名
plt.rcParams["font.size"] = 11 # 设置字体大小
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
chart_dict = {}
for symbol in data["symbol"].unique():
symbol_data = data[data["symbol"] == symbol]
if normal:
sheet_name = f"{symbol}_量时分布图表"
else:
sheet_name = f"{symbol}_4倍量时分布图表"
chart_dict[sheet_name] = {}
for bar in symbol_data["bar"].unique():
bar_data = symbol_data[symbol_data["bar"] == bar].copy()
# 将hour改名为小时
bar_data.rename(columns={"hour": "小时"}, inplace=True)
# huge_volume_count改名为巨量次数
bar_data.rename(columns={"huge_volume_count": "巨量次数"}, inplace=True)
# huge_volume_ratio改名为巨量次数占比
bar_data.rename(columns={"huge_volume_ratio": "巨量次数占比"}, inplace=True)
# huge_volume_rise_count改名为巨量上涨次数
bar_data.rename(columns={"huge_volume_rise_count": "巨量上涨次数"}, inplace=True)
# huge_volume_fall_count改名为巨量下跌次数
bar_data.rename(columns={"huge_volume_fall_count": "巨量下跌次数"}, inplace=True)
bar_data.reset_index(drop=True, inplace=True)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
fig.suptitle(f"巨量小时分布 - {symbol} {bar}", fontsize=18)
# huge_volume_count
# 柱状图使用不同颜色,巨量次数使用渐变蓝色
palette = sns.color_palette("Blues_d", 2)
palette[0] = sns.color_palette("Blues_d", 2)[1]
palette[1] = sns.color_palette("Reds_d", 2)[1]
sns.barplot(
ax=axes[0],
x="小时",
y="巨量次数",
data=bar_data,
hue="symbol",
palette=palette,
legend=False,
)
axes[0].set_title("巨量小时分布")
axes[0].set_ylabel("巨量次数")
# huge_volume_rise_count与huge_volume_fall_count
# 创建一个图表都位于axes[1, 0],包含两个柱状图:
# huge_volume_rise_count与huge_volume_fall_count并列放置
# 并使用不同的颜色
df_long = pd.melt(bar_data, id_vars=['小时'], value_vars=['巨量上涨次数', '巨量下跌次数'],
var_name='类别', value_name='次数')
# 柱状图使用不同颜色,巨量上涨次数使用渐变红色,巨量下跌次数使用渐变绿色
palette = sns.color_palette("Blues_d", 2)
palette[0] = sns.color_palette("Reds_d", 2)[1]
palette[1] = sns.color_palette("Greens_d", 2)[1]
sns.barplot(
ax=axes[1],
x="小时",
y="次数",
data=df_long,
hue="类别",
palette=palette,
legend=False,
)
axes[1].set_title("巨量小时上涨下跌分布")
axes[1].set_ylabel("次数")
# 旋转x轴标签
for ax in axes.flat:
for label in ax.get_xticklabels():
label.set_rotation(45)
plt.tight_layout(rect=[0, 0, 1, 0.96])
if normal:
save_path = os.path.join(self.stats_chart_dir, f"{symbol}_{bar}_high_volume_hours.png")
else:
save_path = os.path.join(self.stats_chart_dir, f"{symbol}_{bar}_4_high_volume_hours.png")
plt.savefig(save_path, dpi=150)
plt.close(fig)
chart_dict[sheet_name][
f"巨量小时分布 - {bar}"
] = save_path
return chart_dict
def draw_price_change_peak_valley_chart(self, data: pd.DataFrame): def draw_price_change_peak_valley_chart(self, data: pd.DataFrame):
""" """
绘制价格变化峰值和谷值图表美观保存到self.stats_chart_dir 绘制价格变化峰值和谷值图表美观保存到self.stats_chart_dir
@ -409,7 +524,7 @@ class PriceVolumeStats:
plt.rcParams["font.sans-serif"] = ["SimHei"] # 也可直接用字体名 plt.rcParams["font.sans-serif"] = ["SimHei"] # 也可直接用字体名
plt.rcParams["font.size"] = 11 # 设置字体大小 plt.rcParams["font.size"] = 11 # 设置字体大小
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题 plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
chart_dict = {"bar_peak_valley_chart": {}} chart_dict = {"波峰波谷图表": {}}
for bar in data["bar"].unique(): for bar in data["bar"].unique():
bar_data = data[data["bar"] == bar] bar_data = data[data["bar"] == bar]
fig, axes = plt.subplots(2, 2, figsize=(14, 10)) fig, axes = plt.subplots(2, 2, figsize=(14, 10))
@ -470,7 +585,7 @@ class PriceVolumeStats:
save_path = os.path.join(self.stats_chart_dir, f"peak_valley_{bar}.png") save_path = os.path.join(self.stats_chart_dir, f"peak_valley_{bar}.png")
plt.savefig(save_path, dpi=150) plt.savefig(save_path, dpi=150)
plt.close(fig) plt.close(fig)
chart_dict["bar_peak_valley_chart"][ chart_dict["波峰波谷图表"][
f"波段变化峰值和谷值统计 - {bar}" f"波段变化峰值和谷值统计 - {bar}"
] = save_path ] = save_path
return chart_dict return chart_dict
@ -530,6 +645,97 @@ class PriceVolumeStats:
wb.save(excel_file_path) wb.save(excel_file_path)
print(f"Chart saved as {excel_file_path}") print(f"Chart saved as {excel_file_path}")
def stats_high_volume_hours(self, data: pd.DataFrame, volume_ratio_threshold: int = None):
"""
统计巨量小时分布
小时包括0-23每小时一个数据
首先不区分价格涨跌统计每个小时满足huge_volume == 1的次数
然后区分价格涨跌统计每个小时满足huge_volume == 1的次数
最后统计每个小时满足huge_volume == 1的次数与满足huge_volume == 0的次数的比率
:param data: 市场数据
:return: 巨量小时分布
"""
if data is None:
return None
if volume_ratio_threshold is not None and volume_ratio_threshold > 0:
data = data[data["volume_ratio"] >= volume_ratio_threshold]
# 将date_time转换为datetime类型
data["date_time"] = pd.to_datetime(data["date_time"])
# 通过pandas自带的功能计算pct_chg
data["pct_chg"] = data["close"].pct_change()
# 统计每个小时满足huge_volume == 1的次数
huge_volume_hours = data.groupby(data["date_time"].dt.hour)["huge_volume"].sum()
# 统计每个小时满足huge_volume == 0的次数
# no_huge_volume_hours = (
# data.groupby(data["date_time"].dt.hour)["huge_volume"].count()
# - huge_volume_hours
# )
# 统计每个小时满足huge_volume == 1的次数与满足huge_volume == 0的次数的比率
# huge_volume_ratio_hours = huge_volume_hours / no_huge_volume_hours
# 将huge_volume_ratio_hours转换为百分比
# huge_volume_ratio_hours = huge_volume_ratio_hours * 100
# 统计每个小时满足huge_volume == 1且上涨的次数
huge_volume_rise_hours_df = (
data[(data["huge_volume"] == 1) & (data["pct_chg"] > 0)]
.groupby(data["date_time"].dt.hour)["huge_volume"]
.sum()
)
# 统计每个小时满足huge_volume == 1且下跌的次数
huge_volume_fall_hours_df = (
data[(data["huge_volume"] == 1) & (data["pct_chg"] < 0)]
.groupby(data["date_time"].dt.hour)["huge_volume"]
.sum()
)
# 将huge_volume_hours, no_huge_volume_hours, huge_volume_ratio_hours转换为DataFrame
huge_volume_hours_df = pd.DataFrame(huge_volume_hours)
# no_huge_volume_hours_df = pd.DataFrame(no_huge_volume_hours)
# huge_volume_ratio_hours_df = pd.DataFrame(huge_volume_ratio_hours)
huge_volume_rise_hours_df = pd.DataFrame(huge_volume_rise_hours_df)
huge_volume_fall_hours_df = pd.DataFrame(huge_volume_fall_hours_df)
# 将hour index作为列名: hour,将sum与count后的列名改为huge_volume_count, no_huge_volume_count
huge_volume_hours_df.index.name = "hour"
# no_huge_volume_hours_df.index.name = "hour"
# huge_volume_ratio_hours_df.index.name = "hour"
huge_volume_rise_hours_df.index.name = "hour"
huge_volume_fall_hours_df.index.name = "hour"
huge_volume_hours_df.columns = ["huge_volume_count"]
# no_huge_volume_hours_df.columns = ["no_huge_volume_count"]
# huge_volume_ratio_hours_df.columns = ["huge_volume_ratio"]
huge_volume_rise_hours_df.columns = ["huge_volume_rise_count"]
huge_volume_fall_hours_df.columns = ["huge_volume_fall_count"]
# 将huge_volume_hours_df, no_huge_volume_hours_df, huge_volume_ratio_hours_df, huge_volume_rise_hours_df, huge_volume_fall_hours_df合并为DataFrame
result_df = pd.concat(
[
huge_volume_hours_df,
# no_huge_volume_hours_df,
# huge_volume_ratio_hours_df,
huge_volume_rise_hours_df,
huge_volume_fall_hours_df,
],
axis=1,
)
# 将hour index作为列名: hour
result_df.index.name = "hour"
result_df = result_df.reset_index()
# 将hour index转换为列名: hour, huge_volume_count, no_huge_volume_count, huge_volume_ratio
result_df["symbol"] = data.iloc[0]["symbol"]
result_df["bar"] = data.iloc[0]["bar"]
result_df = result_df[
[
"symbol",
"bar",
"hour",
"huge_volume_count",
# "no_huge_volume_count",
# "huge_volume_ratio",
"huge_volume_rise_count",
"huge_volume_fall_count",
]
]
result_df.reset_index(drop=True, inplace=True)
return result_df
def find_peaks_valleys(self, data: pd.DataFrame, window=10): def find_peaks_valleys(self, data: pd.DataFrame, window=10):
""" """
识别K线数据的波峰和波谷 识别K线数据的波峰和波谷