from pandas import DataFrame import logging import os import re import pandas as pd from datetime import datetime from copy import deepcopy from typing import Optional, List, Dict, Any, Tuple logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) class HugeVolume: def __init__(self, output_folder: str = "./output"): self.output_folder = output_folder os.makedirs(self.output_folder, exist_ok=True) def _calculate_percentile_indicators( self, data: pd.DataFrame, window_size: int = 50, price_column: str = "close", percentiles: List[Tuple[float, str]] = [(0.8, "80"), (0.2, "20"), (0.9, "90"), (0.1, "10")] ) -> pd.DataFrame: """ 计算分位数指标 :param data: 数据DataFrame :param window_size: 窗口大小 :param percentiles: 分位数配置列表,格式为[(分位数, 名称后缀)] :return: 包含分位数指标的DataFrame """ for percentile, suffix in percentiles: # 计算分位数 data[f"{price_column}_{suffix}_percentile"] = ( data[price_column].rolling(window=window_size, min_periods=1).quantile(percentile) ) # 判断价格是否达到分位数 if suffix in ["80", "90"]: # 高点分位数 data[f"{price_column}_{suffix}_high"] = ( data[price_column] >= data[f"{price_column}_{suffix}_percentile"] ).astype(int) else: # 低点分位数 data[f"{price_column}_{suffix}_low"] = ( data[price_column] <= data[f"{price_column}_{suffix}_percentile"] ).astype(int) return data def detect_huge_volume( self, data: DataFrame, window_size: int = 50, threshold: float = 2.0, check_price: bool = False, only_output_huge_volume: bool = False, output_excel: bool = False, ) -> Optional[DataFrame]: """ detect_volume_spike的函数逻辑: 1. 根据window滑动行情数据 2. 每一个window的最新的volume是否高于该window的volume的均值+2倍标准差,如果满足条件,则增加一列:huge_volume,值为1 3. 如果check_price为True,则检查: a. 每一个window的close是否处于该window的80%分位数及以上 b. 每一个window的close是否处于该window的20%分位数及以下 c. 每一个window的close是否处于该window的90%分位数及以上 d. 每一个window的close是否处于该window的10%分位数及以下 Args: data: 包含成交量数据的DataFrame threshold: 标准差倍数,默认为2.0(即成交量超过均值+2倍标准差) window_size: 计算移动窗口的大小,默认50个周期 check_price: 是否检查价格处于windows内的分位数位置,默认False only_output_huge_volume: 是否只输出巨量交易记录,默认False output_excel: 是否输出到Excel文件,默认False Returns: DataFrame: 包含异常检测结果的DataFrame """ if data is None or len(data) == 0: logging.warning("数据为空,无法进行成交量异常检测") return None if "volume" not in data.columns: logging.error("数据中缺少volume列") return None # 按时间戳排序 data = data.sort_values(by="timestamp", ascending=True).copy() data["window_size"] = window_size # 计算移动窗口的成交量均值和标准差 data["volume_ma"] = ( data["volume"].rolling(window=window_size, min_periods=1).mean() ) data["volume_std"] = ( data["volume"].rolling(window=window_size, min_periods=1).std() ) # 计算成交量阈值(均值 + threshold倍标准差) data["volume_threshold"] = data["volume_ma"] + threshold * data["volume_std"] # 判断当前成交量是否超过阈值 data["huge_volume"] = (data["volume"] > data["volume_threshold"]).astype(int) # 计算成交量比率 data["volume_ratio"] = data["volume"] / data["volume_ma"] # 计算异常强度 data["spike_intensity"] = data["volume_ratio"] - 1 # 如果check_price为True,检查价格分位数 if check_price: if "close" not in data.columns: logging.error("数据中缺少close列,无法进行价格检查") return data if "high" not in data.columns: logging.error("数据中缺少high列,无法进行价格检查") return data if "low" not in data.columns: logging.error("数据中缺少low列,无法进行价格检查") return data for price_column in ["close", "high", "low"]: # 计算分位数指标(80/20和90/10) data = self._calculate_percentile_indicators(data, window_size, price_column) if only_output_huge_volume: data = data[(data["huge_volume"] == 1)] data["create_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if output_excel: # 检查数据是否为空 if len(data) == 0: logging.warning("数据为空,无法导出Excel文件") return data start_date = data["date_time"].iloc[0] end_date = data["date_time"].iloc[-1] # remove punctuation from start_date and end_date start_date = re.sub(r"[\:\-\s]", "", str(start_date)) end_date = re.sub(r"[\:\-\s]", "", str(end_date)) symbol = data["symbol"].iloc[0] bar = data["bar"].iloc[0] file_name = f"volume_spike_{symbol}_{bar}_{window_size}_{start_date}_{end_date}.xlsx" try: with pd.ExcelWriter( os.path.join(self.output_folder, file_name) ) as writer: data.to_excel(writer, sheet_name="volume_spike", index=False) except Exception as e: logging.error(f"导出Excel文件失败: {e}") return data def next_periods_rise_or_fall( self, data: pd.DataFrame, window_size: int = 50, periods: List[int] = [1, 2, 3, 5, 10], ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ 1. 根据period_count,计算每个timestamp的下一个periods的rise_or_fall 示例,period_count=3,则计算每个timestamp的下一个3个periods的rise_or_fall 示例,data如下: timestamp close huge_volume 1000000000 100 1 1000000001 101 0 1000000002 102 1 1000000003 103 0 1000000004 100 1 1000000005 98 0 则对于timestamp,1000000000,计算结果为: timestamp close huge_volume next_3_close next_3_result next_5_close next_5_result 1000000000 100 1 103 rise 98 fall 因为之后第3个periods的close是103,所以next_3_result为rise 因为之后第5个periods的close是98,所以next_3_result为fall 2. 根据volume_ratio_percentile_10,将data分成10份,然后计算每一份的上涨与下跌次数,以及平均收益率 3. 新建一个列表: result,计算之后n个周期,close上涨或下跌的比例 a. 计算huge_volume为1时,且price_80_high为1时的数量,如100, 并且计算next_3_result为fall的次数,如50, 然后计算fall_ratio, 如50/100=0.5 b. 计算huge_volume为1时,且price_80_high为1时的数量,如100, 并且计算next_5_result为fall的次数,如30, 然后计算fall_ratio, 如30/100=0.3 c. 计算huge_volume为1时,且price_20_low为1时的数量,如100, 并且计算next_3_result为rise的次数,如50, 然后计算rise_ratio, 如50/100=0.5 d. 计算huge_volume为1时,且price_20_low为1时的数量,如100, 并且计算next_5_result为rise的次数,如30, 然后计算rise_ratio, 如30/100=0.3 e. 同样计算90/10分位数的统计 Args: data: 包含巨量交易数据的DataFrame periods: 计算周期列表,默认[1, 2, 3, 5, 10] output_excel: 是否输出到Excel文件,默认False Returns: Tuple[pd.DataFrame, pd.DataFrame]: (处理后的数据, 统计结果) """ # 将huge_volume, volume_80_20_price_spike, volume_90_10_price_spike, # price_80_high, price_20_low, price_90_high, price_10_low设置为整型 data["huge_volume"] = data["huge_volume"].astype(int) data["volume_80_20_price_spike"] = data["volume_80_20_price_spike"].astype(int) data["volume_90_10_price_spike"] = data["volume_90_10_price_spike"].astype(int) data["price_80_high"] = data["price_80_high"].astype(int) data["price_20_low"] = data["price_20_low"].astype(int) data["price_90_high"] = data["price_90_high"].astype(int) data["price_10_low"] = data["price_10_low"].astype(int) data = data.sort_values(by="timestamp", ascending=True) data = data.reset_index(drop=True) # 计算未来价格变化 for period in periods: data[f"next_{period}_close"] = data["close"].shift(-period) data[f"next_{period}_change"] = ( (data[f"next_{period}_close"] / data["close"] - 1) * 100 ) # 添加一列next_{period}_result,如果next_{period}_change大于0,则值为rise,如果next_{period}_change小于0,则值为fall,如果next_{period}_change等于0,则值为draw data[f"next_{period}_result"] = data[f"next_{period}_change"].apply( lambda x: ( "rise" if pd.notna(x) and x > 0 else ( "fall" if pd.notna(x) and x < 0 else "draw" if pd.notna(x) and x == 0 else x ) ) ) # 将volume_ratio按照百分位分成十份 huge_volume_data = deepcopy(data[data["huge_volume"] == 1]) huge_volume_data = huge_volume_data.sort_values(by="timestamp", ascending=True) huge_volume_data = huge_volume_data.reset_index(drop=True) huge_volume_data["volume_ratio_percentile"] = huge_volume_data["volume_ratio"].rank(pct=True) huge_volume_data["volume_ratio_percentile_10"] = huge_volume_data["volume_ratio_percentile"].apply( lambda x: 10 if x <= 0.1 else 20 if x <= 0.2 else 30 if x <= 0.3 else 40 if x <= 0.4 else 50 if x <= 0.5 else 60 if x <= 0.6 else 70 if x <= 0.7 else 80 if x <= 0.8 else 90 if x <= 0.9 else 100 ) huge_volume_ratio_percentile_10_mean = huge_volume_data.groupby("volume_ratio_percentile_10")["volume_ratio"].mean() percentile_10_mean = round(float(huge_volume_data["volume_ratio"].mean()), 4) # insert one row to huge_volume_ratio_percentile_10_mean, key is -1, value is percentile_10_mean huge_volume_ratio_percentile_10_mean.loc["-1"] = percentile_10_mean # transform huge_volume_ratio_percentile_10_mean index to int huge_volume_ratio_percentile_10_mean.index = huge_volume_ratio_percentile_10_mean.index.astype(int) # sort huge_volume_ratio_percentile_10_mean by index huge_volume_ratio_percentile_10_mean = huge_volume_ratio_percentile_10_mean.sort_index() results = [] # iterate huge_volume_ratio_percentile_10_mean for index, value in huge_volume_ratio_percentile_10_mean.items(): if index == -1: data_temp = deepcopy(huge_volume_data) volume_ratio_percentile_10 = "all" current_percentile_10_mean = percentile_10_mean else: data_temp = deepcopy(huge_volume_data[huge_volume_data["volume_ratio_percentile_10"] == index]) volume_ratio_percentile_10 = str(index) current_percentile_10_mean = round(value, 4) data_temp = data_temp.reset_index(drop=True) data_temp = data_temp.sort_values(by="timestamp", ascending=True) data_temp = data_temp.reset_index(drop=True) # 过滤data, 只获取huge_volume为1,且价格处于分位数位置的行 price_conditions = [] if "price_80_high" in data_temp.columns: price_conditions.append(data_temp["price_80_high"] == 1) if "price_20_low" in data_temp.columns: price_conditions.append(data_temp["price_20_low"] == 1) if "price_90_high" in data_temp.columns: price_conditions.append(data_temp["price_90_high"] == 1) if "price_10_low" in data_temp.columns: price_conditions.append(data_temp["price_10_low"] == 1) if price_conditions: combined_condition = data_temp["huge_volume"] == 1 for condition in price_conditions: combined_condition = combined_condition | condition data_temp = data_temp[combined_condition] data_temp = data_temp.reset_index(drop=True) # 统计各种分位数情况的数量 price_stats = {} for price_type in ["price_80_high", "price_20_low", "price_90_high", "price_10_low"]: if price_type in data.columns: price_stats[price_type] = len(data_temp[(data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1)]) for period in periods: for price_type, count in price_stats.items(): if count > 0: # 计算下跌次数 fall_count = len( data_temp[ (data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1) & (data_temp[f"next_{period}_result"] == "fall") ] ) # 计算上涨次数 rise_count = len( data_temp[ (data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1) & (data_temp[f"next_{period}_result"] == "rise") ] ) draw_count = len( data_temp[ (data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1) & (data_temp[f"next_{period}_result"] == "draw") ] ) # 根据data[f"next_{period}_result"]获得平均收益率 average_return = float(data_temp[(data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1)] [f"next_{period}_change"].mean()) max_return = float(data_temp[(data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1)] [f"next_{period}_change"].max()) min_return = float(data_temp[(data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1)] [f"next_{period}_change"].min()) results.append( { "symbol": data_temp["symbol"].iloc[0] if len(data_temp) > 0 else "", "bar": data_temp["bar"].iloc[0] if len(data_temp) > 0 else "", "window_size": window_size, "huge_volume": 1, "volume_ratio_percentile_10": volume_ratio_percentile_10, "volume_ratio_percentile_10_mean": current_percentile_10_mean, "price_type": price_type, "next_period": period, "average_return": average_return, "max_return": max_return, "min_return": min_return, "rise_count": rise_count, "rise_ratio": round((rise_count / count) * 100, 4), "fall_count": fall_count, "fall_ratio": round((fall_count / count) * 100, 4), "draw_count": draw_count, "draw_ratio": round((draw_count / count) * 100, 4), "total_count": count, } ) result_data = pd.DataFrame(results) return huge_volume_data, result_data