From 1dd829e2cfd29942d6e940299cffcb03d83df8b9 Mon Sep 17 00:00:00 2001 From: blade <8019068@qq.com> Date: Sat, 26 Jul 2025 14:41:50 +0800 Subject: [PATCH] support statistics huge volume based on market data --- .gitignore | 1 + core/db_manager.py | 65 ++++++++++++++++++++++++- core/statistics.py | 118 +++++++++++++++++++++++++++++++++++++++++++++ core/utils.py | 17 ++++++- monitor_main.py | 4 +- statistics_main.py | 71 +++++++++++++++++++++++++++ 6 files changed, 272 insertions(+), 4 deletions(-) create mode 100644 statistics_main.py diff --git a/.gitignore b/.gitignore index a943f5a..c407bbe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /core/__pycache__ /__pycache__/*.pyc +/output diff --git a/core/db_manager.py b/core/db_manager.py index d9744e2..689bdb0 100644 --- a/core/db_manager.py +++ b/core/db_manager.py @@ -1,7 +1,8 @@ import pandas as pd from sqlalchemy import create_engine, exc, text +import re, datetime import logging -from core.utils import transform_data_type +from core.utils import transform_data_type, datetime_to_timestamp, check_date_time_format def insert_market_data_to_mysql(df: pd.DataFrame, db_url: str): """ @@ -72,6 +73,68 @@ def query_latest_data(symbol: str, bar: str, db_url: str): """ condition_dict = {"symbol": symbol, "bar": bar} return query(sql, condition_dict, db_url, return_multi=False) + + +def query_data_by_symbol_bar(symbol: str, bar: str, start: str, end: str, db_url: str): + """ + 根据交易对和K线周期查询数据 + :param symbol: 交易对 + :param bar: K线周期 + :param db_url: 数据库连接URL + """ + if start is None or end is None: + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar} + else: + if start is not None: + if isinstance(start, str): + if start.isdigit(): + start = int(start) + else: + start = check_date_time_format(start) + # 判断是否是日期时间格式 + if start is None: + logging.warning(f"日期时间格式错误: {start}") + return None + start = datetime_to_timestamp(start) + if end is not None: + if isinstance(end, str): + if end.isdigit(): + end = int(end) + else: + end = check_date_time_format(end) + if end is None: + logging.warning(f"日期时间格式错误: {end}") + return None + end = datetime_to_timestamp(end) + if start is not None and end is not None: + if start > end: + start, end = end, start + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end} + elif start is not None: + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "start": start} + elif end is not None: + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "end": end} + return query(sql, condition_dict, db_url, return_multi=True) def query(sql: str, condition_dict: dict, db_url: str, return_multi: bool = True): """ diff --git a/core/statistics.py b/core/statistics.py index e69de29..14e18ea 100644 --- a/core/statistics.py +++ b/core/statistics.py @@ -0,0 +1,118 @@ +from core.db_manager import query_data_by_symbol_bar +from pandas import DataFrame +import logging +import os +import re +import pandas as pd + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + + +class Statistics: + def __init__(self, output_folder: str = "./output"): + self.output_folder = output_folder + os.makedirs(self.output_folder, exist_ok=True) + + def detect_volume_spike( + self, + data: DataFrame, + threshold: float = 2.0, + window: int = 50, + check_price: bool = False, + only_output_huge_volume: bool = False, + output_excel: bool = False, + ): + """ + detect_volume_spike的函数逻辑: + 1. 根据window滑动行情数据 + 2. 每一个window的最新的volume是否高于该window的volume的均值+2倍标准差,如果满足条件,则增加一列:huge_volume,值为1 + 3. 如果check_price为True,则检查: + a. 每一个window的close是否处于该window的80%分位数及以上 + b. 每一个window的close是否处于该window的20%分位数及以上 + + Args: + data: 包含成交量数据的DataFrame + threshold: 标准差倍数,默认为2.0(即成交量超过均值+2倍标准差) + window: 计算移动窗口的大小,默认50个周期 + check_price: 是否检查价格处于windows内的80%分位数以上,或20%分位数以下,默认False + Returns: + DataFrame: 包含异常检测结果的DataFrame + """ + if data is None or len(data) == 0: + logging.warning("数据为空,无法进行成交量异常检测") + return None + + if "volume" not in data.columns: + logging.error("数据中缺少volume列") + return None + + # 按时间戳排序 + data = data.sort_values(by="timestamp", ascending=True).copy() + + # 计算移动窗口的成交量均值和标准差 + data["volume_ma"] = data["volume"].rolling(window=window, min_periods=1).mean() + data["volume_std"] = data["volume"].rolling(window=window, min_periods=1).std() + + # 计算成交量阈值(均值 + threshold倍标准差) + data["volume_threshold"] = data["volume_ma"] + threshold * data["volume_std"] + + # 判断当前成交量是否超过阈值 + data["huge_volume"] = (data["volume"] > data["volume_threshold"]).astype(int) + + # 计算成交量比率 + data["volume_ratio"] = data["volume"] / data["volume_ma"] + + # 计算异常强度 + data["spike_intensity"] = data["volume_ratio"] - 1 + + # 如果check_price为True,检查价格分位数 + if check_price: + if "close" not in data.columns: + logging.error("数据中缺少close列,无法进行价格检查") + return data + + # 计算移动窗口的收盘价分位数 + data["close_80_percentile"] = ( + data["close"].rolling(window=window, min_periods=1).quantile(0.8) + ) + data["close_20_percentile"] = ( + data["close"].rolling(window=window, min_periods=1).quantile(0.2) + ) + + # 检查收盘价是否在80%分位数及以上或20%分位数及以下 + data["price_high"] = (data["close"] >= data["close_80_percentile"]).astype( + int + ) + data["price_low"] = (data["close"] <= data["close_20_percentile"]).astype( + int + ) + + # 综合判断:成交量异常且价格处于极端位置 + data["volume_price_spike"] = ( + (data["huge_volume"] == 1) + & ((data["price_high"] == 1) | (data["price_low"] == 1)) + ).astype(int) + + if only_output_huge_volume: + data = data[data["huge_volume"] == 1] + + if output_excel: + # 检查数据是否为空 + if len(data) == 0: + logging.warning("数据为空,无法导出Excel文件") + return data + + start_date = data["date_time"].iloc[0] + end_date = data["date_time"].iloc[-1] + # remove punctuation from start_date and end_date + start_date = re.sub(r"[\:\-\s]", "", str(start_date)) + end_date = re.sub(r"[\:\-\s]", "", str(end_date)) + symbol = data["symbol"].iloc[0] + bar = data["bar"].iloc[0] + file_name = f"volume_spike_{symbol}_{bar}_{start_date}_{end_date}.xlsx" + with pd.ExcelWriter(os.path.join(self.output_folder, file_name)) as writer: + data.to_excel(writer, sheet_name="volume_spike", index=False) + + return data diff --git a/core/utils.py b/core/utils.py index 5fa9056..8b3bd21 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,5 +1,6 @@ from datetime import datetime, timezone, timedelta from decimal import Decimal +import re def datetime_to_timestamp(date_str: str) -> int: """ @@ -27,4 +28,18 @@ def transform_data_type(data: dict): for key, value in data.items(): if isinstance(value, Decimal): data[key] = float(value) - return data \ No newline at end of file + return data + + +def check_date_time_format(date_time: str) -> bool: + """ + 检查日期时间格式是否正确 + """ + if re.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$', date_time): + return date_time + elif re.match(r'^\d{4}-\d{2}-\d{2}$', date_time): + return date_time + " 00:00:00" + elif re.match(r'^\d{4}\d{2}\d{2}$', date_time): + return f"{date_time[0:4]}-{date_time[4:6]}-{date_time[6:8]} 00:00:00" + else: + return None \ No newline at end of file diff --git a/monitor_main.py b/monitor_main.py index 2dd5229..82f16e0 100644 --- a/monitor_main.py +++ b/monitor_main.py @@ -76,7 +76,7 @@ class MonitorMain: if __name__ == "__main__": monitor_main = MonitorMain() - # monitor_main.update_data() - monitor_main.initial_data() + monitor_main.update_data() + # monitor_main.initial_data() diff --git a/statistics_main.py b/statistics_main.py new file mode 100644 index 0000000..cc769a3 --- /dev/null +++ b/statistics_main.py @@ -0,0 +1,71 @@ +from core.statistics import Statistics +from core.db_manager import query_data_by_symbol_bar +import logging +from config import MONITOR_CONFIG, MYSQL_CONFIG +from datetime import datetime +import pandas as pd + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + + +class StatisticsMain: + def __init__(self): + mysql_user = MYSQL_CONFIG.get("user", "xch") + mysql_password = MYSQL_CONFIG.get("password", "") + if not mysql_password: + raise ValueError("MySQL password is not set") + mysql_host = MYSQL_CONFIG.get("host", "localhost") + mysql_port = MYSQL_CONFIG.get("port", 3306) + mysql_database = MYSQL_CONFIG.get("database", "okx") + + self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" + self.statistics = Statistics() + + def batch_detect_volume_spike(self, start: str, end: str): + pass + + def detect_volume_spike( + self, + symbol: str = "XCH-USDT", + bar: str = "5m", + start: str = "2025-05-01 00:00:00", + end: str = None, + only_output_huge_volume: bool = False, + ): + if start is None: + start = MONITOR_CONFIG.get("volume_monitor", {}).get( + "initial_date", "2025-05-01 00:00:00" + ) + if end is None: + end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + data = query_data_by_symbol_bar(symbol, bar, start, end, self.db_url) + if data is None: + logging.warning(f"获取数据失败: {symbol} {bar} {start} {end}") + return None + else: + if len(data) == 0: + logging.warning(f"获取数据为空: {symbol} {bar} {start} {end}") + return None + else: + if isinstance(data, list): + data = pd.DataFrame(data) + elif isinstance(data, dict): + data = pd.DataFrame([data]) + return self.statistics.detect_volume_spike( + data=data, + check_price=True, + only_output_huge_volume=only_output_huge_volume, + output_excel=True, + ) + + +if __name__ == "__main__": + statistics_main = StatisticsMain() + statistics_main.detect_volume_spike( + symbol="XCH-USDT", + bar="5m", + start="2025-05-01 00:00:00", + only_output_huge_volume=True, + )