diff --git a/.gitignore b/.gitignore index 15cbb26..e6f8a5f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ /core/__pycache__ /__pycache__/*.pyc /output -core/db/__pycache__/db_market_data.cpython-312.pyc -core/biz/__pycache__/metrics_calculation.cpython-312.pyc -core/biz/__pycache__/market_data_monitor.cpython-312.pyc -core/db/__pycache__/db_market_data.cpython-312.pyc +core/db/__pycache__/*.pyc +core/biz/__pycache__/*.pyc +core/statistics/__pycache__/*.pyc diff --git a/core/biz/__pycache__/market_monitor.cpython-312.pyc b/core/biz/__pycache__/market_monitor.cpython-312.pyc index a4508f3..ec5390a 100644 Binary files a/core/biz/__pycache__/market_monitor.cpython-312.pyc and b/core/biz/__pycache__/market_monitor.cpython-312.pyc differ diff --git a/core/biz/market_monitor.py b/core/biz/market_monitor.py index 4cd5288..84fdeb5 100644 --- a/core/biz/market_monitor.py +++ b/core/biz/market_monitor.py @@ -61,9 +61,11 @@ def create_metrics_report( contents.append(f"## {symbol} {bar} 滑动窗口: {window_size} 最新数据时间: {now_datetime_str}") else: contents.append(f"## {symbol} {bar} 滑动窗口: {window_size} 交易周期时间: {date_time}") + k_shape = str(row["k_shape"]) contents.append(f"### 价格信息") contents.append(f"当前价格: {close}, 开盘价: {open}, 最高价: {high}, 最低价: {low}") contents.append(f"涨跌幅: {pct_chg}") + contents.append(f"当前K线形态: {k_shape}") volume = round(float(row["volume"]), 4) volCcy = round(float(row["volCcy"]), 4) @@ -215,7 +217,7 @@ def create_metrics_report( long_short_info["空"].append(f"BOLL形态: {boll_pattern}") k_up_down = str(row["k_up_down"]) - k_shape = str(row["k_shape"]) + if is_over_buy: k_shape_value = ( METRICS_CONFIG.get("k_shape", {}) diff --git a/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc b/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc index 831f6ab..a76e2d6 100644 Binary files a/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc and b/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc differ diff --git a/core/statistics/price_volume_stats.py b/core/statistics/price_volume_stats.py new file mode 100644 index 0000000..490799d --- /dev/null +++ b/core/statistics/price_volume_stats.py @@ -0,0 +1,678 @@ +import logging +import os +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +import seaborn as sns +from datetime import datetime +import re +from openpyxl import Workbook +from openpyxl.drawing.image import Image +import openpyxl +from openpyxl.styles import Font +from PIL import Image as PILImage +from config import MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE +from core.db.db_market_data import DBMarketData +from core.db.db_huge_volume_data import DBHugeVolumeData +from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp + +# seaborn支持中文 +plt.rcParams["font.family"] = ["SimHei"] + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + + +class PriceVolumeStats: + def __init__(self): + mysql_user = MYSQL_CONFIG.get("user", "xch") + mysql_password = MYSQL_CONFIG.get("password", "") + if not mysql_password: + raise ValueError("MySQL password is not set") + mysql_host = MYSQL_CONFIG.get("host", "localhost") + mysql_port = MYSQL_CONFIG.get("port", 3306) + mysql_database = MYSQL_CONFIG.get("database", "okx") + + self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" + self.db_market_data = DBMarketData(self.db_url) + self.db_huge_volume_data = DBHugeVolumeData(self.db_url) + self.symbol_list = MONITOR_CONFIG.get("volume_monitor", {}).get("symbols", []) + self.bars_list = MONITOR_CONFIG.get("volume_monitor", {}).get("bars", []) + self.windows_list = WINDOW_SIZE.get("window_sizes", []) + self.initial_date = MONITOR_CONFIG.get("volume_monitor", {}).get( + "initial_date", "2025-05-15 00:00:00" + ) + self.end_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self.window_size = 100 + self.stats_output_dir = "./output/statistics/excel/" + os.makedirs(self.stats_output_dir, exist_ok=True) + self.stats_chart_dir = "./output/statistics/chart/" + os.makedirs(self.stats_chart_dir, exist_ok=True) + + def batch_price_volume_statistics(self): + price_stats_list = [] + pct_change_stats_list = [] + peak_valley_data_list = [] + peak_valley_stats_list = [] + volume_stats_list = [] + price_volume_stats_list = [] + earliest_market_timestamp = None + latest_market_timestamp = None + logging.info( + f"开始统计{self.initial_date}到{self.end_date}的价格、成交量、成交量与高价低价关系数据" + ) + for symbol in self.symbol_list: + for bar in self.bars_list: + data = self.db_huge_volume_data.query_huge_volume_data_by_symbol_bar_window_size( + symbol, bar, self.window_size, self.initial_date, self.end_date + ) + + if data is not None: + data = pd.DataFrame(data) + data.sort_values(by="timestamp", inplace=True) + # pct_change计算涨跌百分比 + data["pct_change"] = data["close"].pct_change() * 100 + if earliest_market_timestamp is None: + earliest_market_timestamp = data["timestamp"].iloc[0] + else: + if data["timestamp"].iloc[0] < earliest_market_timestamp: + earliest_market_timestamp = data["timestamp"].iloc[0] + if latest_market_timestamp is None: + latest_market_timestamp = data["timestamp"].iloc[-1] + else: + if data["timestamp"].iloc[-1] > latest_market_timestamp: + latest_market_timestamp = data["timestamp"].iloc[-1] + logging.info(f"统计{symbol} {bar} 价格数据") + price_stats_data = self.calculate_price_statistics(data) + logging.info(f"统计{symbol} {bar} 涨跌百分比数据") + pct_change_stats_data = self.calculate_pct_change_statistics(data) + logging.info(f"统计{symbol} {bar} 价格变化峰值和谷值数据") + peak_valley_data, peak_valley_stats_data = ( + self.calculate_price_change_peak_valley_statistics(data) + ) + logging.info(f"统计{symbol} {bar} 成交量数据") + volume_stats_data = self.calculate_volume_statistics(data) + logging.info(f"统计{symbol} {bar} 成交量与高价低价关系数据") + price_volume_stats_data = self.calculate_price_volume_statistics( + data + ) + price_stats_list.append(price_stats_data) + pct_change_stats_list.append(pct_change_stats_data) + peak_valley_data_list.append(peak_valley_data) + peak_valley_stats_list.append(peak_valley_stats_data) + volume_stats_list.append(volume_stats_data) + price_volume_stats_list.append(price_volume_stats_data) + price_stats_df = pd.DataFrame(price_stats_list) + price_stats_df.sort_values(by=["symbol", "bar"], inplace=True) + pct_change_stats_df = pd.DataFrame(pct_change_stats_list) + pct_change_stats_df.sort_values(by=["symbol", "bar"], inplace=True) + peak_valley_data_df = pd.concat(peak_valley_data_list) + peak_valley_data_df.sort_values(by=["symbol", "bar", "timestamp"], inplace=True) + peak_valley_stats_df = pd.DataFrame(peak_valley_stats_list) + peak_valley_stats_df.sort_values(by=["symbol", "bar"], inplace=True) + volume_stats_df = pd.DataFrame(volume_stats_list) + volume_stats_df.sort_values(by=["symbol", "bar"], inplace=True) + price_volume_stats_df = pd.DataFrame(price_volume_stats_list) + price_volume_stats_df.sort_values(by=["symbol", "bar"], inplace=True) + + earliest_market_date_time = timestamp_to_datetime(earliest_market_timestamp) + earliest_market_date_time = re.sub( + r"[\:\-\s]", "", str(earliest_market_date_time) + ) + latest_market_date_time = timestamp_to_datetime(latest_market_timestamp) + latest_market_date_time = re.sub(r"[\:\-\s]", "", str(latest_market_date_time)) + output_file_name = f"price_volume_stats_window_size_{self.window_size}_from_{earliest_market_date_time}_to_{latest_market_date_time}.xlsx" + output_file_path = os.path.join(self.stats_output_dir, output_file_name) + logging.info(f"导出{output_file_path}") + with pd.ExcelWriter(output_file_path) as writer: + price_stats_df.to_excel(writer, sheet_name="price_stats", index=False) + pct_change_stats_df.to_excel( + writer, sheet_name="pct_change_stats", index=False + ) + peak_valley_data_df.to_excel( + writer, sheet_name="peak_valley_data", index=False + ) + peak_valley_stats_df.to_excel( + writer, sheet_name="peak_valley_stats", index=False + ) + volume_stats_df.to_excel(writer, sheet_name="volume_stats", index=False) + price_volume_stats_df.to_excel( + writer, sheet_name="price_volume_stats", index=False + ) + chart_dict = self.draw_price_change_peak_valley_chart(peak_valley_stats_df) + self.output_chart_to_excel(output_file_path, chart_dict) + return price_stats_df, volume_stats_df, price_volume_stats_df + + def calculate_price_statistics(self, data: pd.DataFrame): + """ + 计算价格统计数据 + :param data: 市场数据 + :return: 价格统计数据 + """ + if data is None: + return None + stats_data = self.base_statistics(data, "close") + return stats_data + + def calculate_pct_change_statistics(self, data: pd.DataFrame): + """ + 计算涨跌百分比统计数据 + :param data: 市场数据 + :return: 涨跌百分比统计数据 + """ + if data is None: + return None + stats_data = self.base_statistics(data, "pct_change") + return stats_data + + def calculate_volume_statistics(self, data): + """ + 计算成交量统计数据 + 针对volume计算, 包括:max, min, mean, std, median,percentile, + mode, range, kurtosis, skewness, z-score + :param data: 市场数据 + :return: 成交量统计数据 + """ + if data is None: + return None + stats_data = self.base_statistics(data, "volume") + return stats_data + + def base_statistics(self, data: pd.DataFrame, column: str): + """ + 计算基础统计数据 + :param data: 市场数据 + :param column: 列名 + + 针对column计算, 包括:max, min, mean, std, median,percentile, + mode, range, kurtosis, skewness, z-score + Max(最大值) 含义:数据集中的最大值 + 应用:在K线数据中, max可表示high价格的最高点, 标识波峰 + 示例:某股票一周最高价为120 + + Min(最小值) 含义:数据集中的最小值 + 应用:在K线数据中, min可表示low价格的最低点, 标识波谷 + 示例:某股票一周最低价为100 + + Mean(均值/平均值) 含义:所有值的总和除以数据点个数, mean = Σx / n + 应用:分析K线close价格平均水平, 或SEC财务数据的平均收入 + 示例:一周收盘价均值为105 + + Std(标准差) 含义:衡量数据点偏离均值的分散程度, std = sqrt(Σ(x - mean)² / n) + 应用:在K线中, 高std表示价格波动大; 在SEC数据中, 反映收入稳定性 + 示例:股票价格std为3.2, 波动较大 + + Median(中位数) 含义:数据集排序后的中间值(偶数个数据取中间两值平均) + 应用:对K线close或SEC收入计算中位数, 减少极端值影响 + 示例:收盘价[100, 102, 105, 110, 120]的中位数为105 + + Percentile(百分位数) 含义:数据集中低于某值的百分比位置, 如第25百分位数(Q1)表示25%数据低于该值 + 应用:在K线数据中, 分析价格分布(如75%价格低于某值); + 在SEC数据中, 比较公司收入在行业中的排名 + 示例:股票价格的75th percentile为110, 表示75%价格低于110 + + Range(极差) 含义:最大值与最小值之差, range = max - min + 应用:衡量K线价格波动幅度, 或SEC数据(如利润)的变化范围 + 示例:股票一周价格极差为20(120 - 100) + + Kurtosis(峰度) 含义:描述数据分布的尖锐程度和尾部厚度 + 高峰度(>3):分布尖锐, 尾部厚(多极端值) + 低峰度(<3):分布平坦, 尾部薄 + 应用:在K线回报率中, 高峰度提示极端波动风险; + 在SEC数据中, 分析收入异常值 + 示例:股票回报率峰度为4, 可能有较大波动 + + Skewness(偏度) 含义:描述数据分布的偏斜方向 + 正偏(>0):右尾长(如少数高收入) + 负偏(<0):左尾长(如多亏损) + 零偏(≈0):分布对称 + 应用:在K线中, 正偏表示上涨概率高; + 在SEC数据中, 偏度反映财务指标非对称性 + 示例:股票收益正偏为0.5, 上涨可能性较大 + :return: 基础统计数据 + """ + if data is None: + return None + if column not in data.columns: + return None + stats_data = {"symbol": data["symbol"].iloc[0], "bar": data["bar"].iloc[0]} + target_data = data[column] + stats_data["count"] = len(target_data) + stats_data["max"] = target_data.max() + stats_data["min"] = target_data.min() + stats_data["mean"] = target_data.mean() + stats_data["std"] = target_data.std() + stats_data["median"] = target_data.median() + if column in ["pct_change", "price_change_ratio"]: + # 计算上涨次数,下跌次数,上涨次数/总次数 + stats_data["up_count"] = len(target_data[target_data > 0]) + stats_data["down_count"] = len(target_data[target_data < 0]) + stats_data["up_ratio"] = ( + stats_data["up_count"] / stats_data["count"] + ) * 100 + stats_data["down_ratio"] = ( + stats_data["down_count"] / stats_data["count"] + ) * 100 + # 计算上涨过程中,平均涨幅,涨幅中位数,涨幅标准差 + up_data = target_data[target_data > 0] + stats_data["up_mean"] = up_data.mean() + stats_data["up_median"] = up_data.median() + stats_data["up_std"] = up_data.std() + # 计算下跌过程中,平均跌幅,跌幅中位数,跌幅标准差 + down_data = target_data[target_data < 0] + stats_data["down_mean"] = down_data.mean() + stats_data["down_median"] = down_data.median() + stats_data["down_std"] = down_data.std() + + stats_data["range"] = target_data.max() - target_data.min() + stats_data["kurtosis"] = target_data.kurt() + stats_data["skewness"] = target_data.skew() + # 计算百分位数,要求10,20,30,40,50,60,70,80,90,100 + percentile_list = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] + percentile_data = target_data.quantile(percentile_list) + for i in range(len(percentile_list)): + percentile_position = percentile_list[i] + percentile_name = f"percentile_{int(percentile_position * 100)}" + stats_data[percentile_name] = percentile_data.loc[percentile_position] + return stats_data + + def calculate_price_volume_statistics(self, data): + """ + 计算高价,低价出现时,与巨量成交量之间的关系 + :param data: 市场数据 + :return: 价格成交量统计数据 + """ + if data is None: + return None + stats_data = {"symbol": data["symbol"].iloc[0], "bar": data["bar"].iloc[0]} + stats_data["count"] = len(data) + stats_data["huge_volume_count"] = len(data[data["huge_volume"] == 1]) + stats_data["huge_volume_ratio"] = ( + stats_data["huge_volume_count"] / stats_data["count"] + ) * 100 + stats_data["close_80_high_count"] = len(data[data["close_80_high"] == 1]) + stats_data["hv_close_80_high_count"] = len( + data[(data["huge_volume"] == 1) & (data["close_80_high"] == 1)] + ) + stats_data["hv_close_80_high_ratio"] = ( + stats_data["hv_close_80_high_count"] / stats_data["close_80_high_count"] + ) * 100 + + stats_data["close_90_high_count"] = len(data[data["close_90_high"] == 1]) + stats_data["hv_close_90_high_count"] = len( + data[(data["huge_volume"] == 1) & (data["close_90_high"] == 1)] + ) + stats_data["hv_close_90_high_ratio"] = ( + stats_data["hv_close_90_high_count"] / stats_data["close_90_high_count"] + ) * 100 + + stats_data["high_80_high_count"] = len(data[data["high_80_high"] == 1]) + stats_data["hv_high_80_high_count"] = len( + data[(data["huge_volume"] == 1) & (data["high_80_high"] == 1)] + ) + stats_data["hv_high_80_high_ratio"] = ( + stats_data["hv_high_80_high_count"] / stats_data["high_80_high_count"] + ) * 100 + + stats_data["high_90_high_count"] = len(data[data["high_90_high"] == 1]) + stats_data["hv_high_90_high_count"] = len( + data[(data["huge_volume"] == 1) & (data["high_90_high"] == 1)] + ) + stats_data["hv_high_90_high_ratio"] = ( + stats_data["hv_high_90_high_count"] / stats_data["high_90_high_count"] + ) * 100 + + stats_data["close_20_low_count"] = len(data[data["close_20_low"] == 1]) + stats_data["hv_close_20_low_count"] = len( + data[(data["huge_volume"] == 1) & (data["close_20_low"] == 1)] + ) + stats_data["hv_close_20_low_ratio"] = ( + stats_data["hv_close_20_low_count"] / stats_data["close_20_low_count"] + ) * 100 + + stats_data["close_10_low_count"] = len(data[data["close_10_low"] == 1]) + stats_data["hv_close_10_low_count"] = len( + data[(data["huge_volume"] == 1) & (data["close_10_low"] == 1)] + ) + stats_data["low_20_low_count"] = len(data[data["low_20_low"] == 1]) + stats_data["hv_low_20_low_count"] = len( + data[(data["huge_volume"] == 1) & (data["low_20_low"] == 1)] + ) + stats_data["hv_low_20_low_ratio"] = ( + stats_data["hv_low_20_low_count"] / stats_data["low_20_low_count"] + ) * 100 + + stats_data["low_10_low_count"] = len(data[data["low_10_low"] == 1]) + stats_data["hv_low_10_low_count"] = len( + data[(data["huge_volume"] == 1) & (data["low_10_low"] == 1)] + ) + stats_data["hv_low_10_low_ratio"] = ( + stats_data["hv_low_10_low_count"] / stats_data["low_10_low_count"] + ) * 100 + # huge_volume == 1时,价格处于高位的次数 + stats_data["hv_price_high_count"] = len( + data[ + (data["huge_volume"] == 1) + & ( + (data["close_80_high"] == 1) + | (data["close_90_high"] == 1) + | (data["high_80_high"] == 1) + | (data["high_90_high"] == 1) + ) + ] + ) + stats_data["hv_price_high_ratio"] = ( + stats_data["hv_price_high_count"] / stats_data["huge_volume_count"] + ) * 100 + # huge_volume == 1时,价格处于低位的次数 + stats_data["hv_price_low_count"] = len( + data[ + (data["huge_volume"] == 1) + & ( + (data["close_20_low"] == 1) + | (data["close_10_low"] == 1) + | (data["low_20_low"] == 1) + | (data["low_10_low"] == 1) + ) + ] + ) + stats_data["hv_price_low_ratio"] = ( + stats_data["hv_price_low_count"] / stats_data["huge_volume_count"] + ) * 100 + + return stats_data + + def calculate_price_change_peak_valley_statistics(self, data): + """ + 计算价格变化峰值和谷值统计数据 + :param data: 市场数据 + :return: 价格变化峰值和谷值统计数据 + """ + if data is None: + return None + peak_valley_data = self.find_peaks_valleys(data) + stats_data = self.base_statistics(peak_valley_data, "price_change_ratio") + return peak_valley_data, stats_data + + def draw_price_change_peak_valley_chart(self, data: pd.DataFrame): + """ + 绘制价格变化峰值和谷值图表(美观,保存到self.stats_chart_dir) + :param data: 价格变化峰值和谷值统计数据(如peak_valley_stats_df) + :return: None + """ + if data is None or data.empty: + return None + # seaborn风格设置 + sns.set_theme(style="whitegrid") + # plt.rcParams['font.family'] = "SimHei" + plt.rcParams["font.sans-serif"] = ["SimHei"] # 也可直接用字体名 + plt.rcParams["font.size"] = 11 # 设置字体大小 + plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题 + chart_dict = {"bar_peak_valley_chart": {}} + for bar in data["bar"].unique(): + bar_data = data[data["bar"] == bar] + fig, axes = plt.subplots(2, 2, figsize=(14, 10)) + fig.suptitle(f"波段变化峰值和谷值统计 - {bar}", fontsize=18) + # up_mean + sns.barplot( + ax=axes[0, 0], + x="symbol", + y="up_mean", + data=bar_data, + hue="symbol", + palette="Blues_d", + legend=False, + ) + axes[0, 0].set_title("波段上涨均值(%)") + axes[0, 0].set_ylabel("up_mean") + # down_mean + sns.barplot( + ax=axes[0, 1], + x="symbol", + y=bar_data["down_mean"].abs(), + data=bar_data, + hue="symbol", + palette="Reds_d", + legend=False, + ) + axes[0, 1].set_title("波段下跌均值(绝对值,%)") + axes[0, 1].set_ylabel("down_mean(abs)") + # max + sns.barplot( + ax=axes[1, 0], + x="symbol", + y="max", + data=bar_data, + hue="symbol", + palette="Greens_d", + legend=False, + ) + axes[1, 0].set_title("波段最大涨幅(%)") + axes[1, 0].set_ylabel("max") + # min + sns.barplot( + ax=axes[1, 1], + x="symbol", + y=bar_data["min"].abs(), + data=bar_data, + hue="symbol", + palette="Purples_d", + legend=False, + ) + axes[1, 1].set_title("波段最大跌幅(绝对值,%)") + axes[1, 1].set_ylabel("min(abs)") + # 旋转x轴标签 + for ax in axes.flat: + for label in ax.get_xticklabels(): + label.set_rotation(45) + plt.tight_layout(rect=[0, 0, 1, 0.96]) + save_path = os.path.join(self.stats_chart_dir, f"peak_valley_{bar}.png") + plt.savefig(save_path, dpi=150) + plt.close(fig) + chart_dict["bar_peak_valley_chart"][ + f"波段变化峰值和谷值统计 - {bar}" + ] = save_path + return chart_dict + + def output_chart_to_excel(self, excel_file_path: str, charts_dict: dict): + """ + 输出Excel文件,包含所有图表 + charts_dict: 图表数据字典,格式为: + { + "sheet_name": { + "chart_name": "chart_path" + } + } + """ + logging.info(f"将图表输出到{excel_file_path}") + + # 打开已经存在的Excel文件 + wb = openpyxl.load_workbook(excel_file_path) + + for sheet_name, chart_data_dict in charts_dict.items(): + try: + ws = wb.create_sheet(title=sheet_name) + row_offset = 1 + for chart_name, chart_path in chart_data_dict.items(): + # Load image to get dimensions + with PILImage.open(chart_path) as img: + width_px, height_px = img.size + + # Convert pixel height to Excel row height (approximate: 1 point = 1.333 pixels, 1 row ≈ 15 points for 20 pixels) + pixels_per_point = 1.333 + points_per_row = 15 # Default row height in points + pixels_per_row = ( + points_per_row * pixels_per_point + ) # ≈ 20 pixels per row + chart_rows = max( + 10, int(height_px / pixels_per_row) + ) # Minimum 10 rows for small charts + + # Add chart title + # 支持中文标题 + ws[f"A{row_offset}"] = chart_name.encode("utf-8").decode("utf-8") + ws[f"A{row_offset}"].font = openpyxl.styles.Font(bold=True, size=12) + row_offset += 2 # Add 2 rows for title and spacing + + # Insert chart image + img = Image(chart_path) + ws.add_image(img, f"A{row_offset}") + + # Update row offset (chart height + padding) + row_offset += ( + chart_rows + 5 + ) # Add 5 rows for padding between charts + except Exception as e: + logging.error(f"输出Excel Sheet {sheet_name} 失败: {e}") + continue + # Save Excel file + wb.save(excel_file_path) + print(f"Chart saved as {excel_file_path}") + + def find_peaks_valleys(self, data: pd.DataFrame, window=10): + """ + 识别K线数据的波峰和波谷 + 参数: + df: DataFrame,包含'open', 'high', 'low', 'close'列,索引为时间 + window: 窗口大小,用于比较前后K线的价格(默认10,表示左右各10根K线) + 返回: + result_df: DataFrame,包含波峰和波谷的时间、价格和类型 + """ + # 确保输入数据包含必要的列 + if not all(col in data.columns for col in ["open", "high", "low", "close"]): + raise ValueError( + "DataFrame must contain 'open', 'high', 'low', 'close' columns" + ) + + # 初始化结果列表 + peaks_valleys = [] + + # 检测波峰(基于high价格) + highs = data["high"] + for i in range(window, len(data) - window): + # 当前K线的high价格 + current_high = highs.iloc[i] + # 窗口内的前后K线的high价格 + window_highs = highs.iloc[i - window : i + window + 1] + # 如果当前high是窗口内的最大值,标记为波峰 + if ( + current_high == window_highs.max() + and current_high > highs.iloc[i - 1] + and current_high > highs.iloc[i + 1] + ): + peaks_valleys.append( + { + "symbol": data.iloc[i]["symbol"], + "bar": data.iloc[i]["bar"], + "timestamp": data.iloc[i]["timestamp"], + "date_time": data.iloc[i]["date_time"], + "price": current_high, + "type": "peak", + } + ) + + # 检测波谷(基于low价格) + lows = data["low"] + for i in range(window, len(data) - window): + # 当前K线的low价格 + current_low = lows.iloc[i] + # 窗口内的前后K线的low价格 + window_lows = lows.iloc[i - window : i + window + 1] + # 如果当前low是窗口内的最小值,标记为波谷 + if ( + current_low == window_lows.min() + and current_low < lows.iloc[i - 1] + and current_low < lows.iloc[i + 1] + ): + peaks_valleys.append( + { + "symbol": data.iloc[i]["symbol"], + "bar": data.iloc[i]["bar"], + "timestamp": data.iloc[i]["timestamp"], + "date_time": data.iloc[i]["date_time"], + "price": current_low, + "type": "valley", + } + ) + + # 转换为DataFrame并按时间排序 + result_df = pd.DataFrame(peaks_valleys) + if not result_df.empty: + result_df = result_df.sort_values(by="timestamp").reset_index(drop=True) + else: + result_df = pd.DataFrame( + columns=["symbol", "timestamp", "date_time", "bar", "price", "type"] + ) + + # 检查result_df,如果type为peak时,下一条数据type依然为peak,则删除当前数据 + if not result_df.empty: + # 使用布尔索引来标记要删除的行 + to_drop_peaks = [] + handled_indexes = [] + for i in range(len(result_df) - 1): + if i in handled_indexes: + continue + if result_df.iloc[i]["type"] == "peak": + current_peak_value = result_df.iloc[i]["price"] + current_peak_index = i + # 如type连续为peak,只应该保留price最大的行,删除其他行 + # 如type连续为peak且存在price为8 7 10 9 8 11 10的情况,只应该保留price为11的行 + for j in range(i + 1, len(result_df)): + if result_df.iloc[j]["type"] == "peak": + next_peak_value = result_df.iloc[j]["price"] + if current_peak_value > next_peak_value: + to_drop_peaks.append(j) + else: + to_drop_peaks.append(current_peak_index) + current_peak_value = next_peak_value + current_peak_index = j + handled_indexes.append(j) + else: + break + + # 删除标记的行 + result_df = result_df.drop(to_drop_peaks).reset_index(drop=True) + + # 如type连续为valley,只应该保留price最小的行,删除其他行 + # 如type连续为valley且存在price为8 7 10 9 8的情况,只应该保留price为7的行 + to_drop_valleys = [] + handled_indexes = [] + for i in range(len(result_df) - 1): + if i in handled_indexes: + continue + if result_df.iloc[i]["type"] == "valley": + current_valley_value = result_df.iloc[i]["price"] + current_valley_index = i + for j in range(i + 1, len(result_df)): + if result_df.iloc[j]["type"] == "valley": + next_valley_value = result_df.iloc[j]["price"] + if current_valley_value < next_valley_value: + to_drop_valleys.append(j) + else: + to_drop_valleys.append(current_valley_index) + current_valley_value = next_valley_value + current_valley_index = j + handled_indexes.append(j) + else: + break + + # 删除标记的行 + result_df = result_df.drop(to_drop_valleys).reset_index(drop=True) + # 初始化价格变化列 + result_df["price_change"] = 0.0 + result_df["price_change_ratio"] = 0.0 + + # 计算下一条数据与当前数据之间的价格差,并计算价格差与当前数据价格的比率 + if len(result_df) > 1: + for i in range(len(result_df) - 1): + result_df.iloc[i + 1, result_df.columns.get_loc("price_change")] = ( + result_df.iloc[i + 1]["price"] - result_df.iloc[i]["price"] + ) + result_df.iloc[ + i + 1, result_df.columns.get_loc("price_change_ratio") + ] = ( + result_df.iloc[i + 1]["price_change"] / result_df.iloc[i]["price"] + ) * 100 + + return result_df diff --git a/sql/query/sql_playground.sql b/sql/query/sql_playground.sql index e9fb023..bfa99f0 100644 --- a/sql/query/sql_playground.sql +++ b/sql/query/sql_playground.sql @@ -4,9 +4,13 @@ select date_time, open, high, low, close, k_shape from crypto_market_data WHERE symbol='DOGE-USDT' and bar='5m' and date_time > '2025-08-04 15:00:00' order by timestamp ; -select * from crypto_market_data -WHERE symbol='XCH-USDT' and bar='5m' and date_time > '2025-08-04 15:00:00' -order by timestamp asc; +select symbol, bar, window_size, date_time, close, +volume, volume_ratio, huge_volume, +close_20_low, low_20_low, close_10_low, low_10_low, +close_80_high, close_90_high, high_80_high, high_90_high +from crypto_huge_volume +WHERE symbol='BTC-USDT' and bar='5m' and window_size=100# and low_10_low=1 +order by timestamp; select * from crypto_huge_volume WHERE symbol='XCH-USDT' and bar='5m' #and date_time > '2025-08-04 15:00:00' diff --git a/statistics_main.py b/statistics_main.py new file mode 100644 index 0000000..ff91349 --- /dev/null +++ b/statistics_main.py @@ -0,0 +1,15 @@ +from core.statistics.price_volume_stats import PriceVolumeStats +import logging + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") + + +def main(): + price_volume_stats = PriceVolumeStats() + price_stats_df, volume_stats_df, price_volume_stats_df = ( + price_volume_stats.batch_price_volume_statistics() + ) + + +if __name__ == "__main__": + main() \ No newline at end of file