From c6ff3adc167452825345e4a5d713a6f2f12850e6 Mon Sep 17 00:00:00 2001 From: blade <8019068@qq.com> Date: Mon, 28 Jul 2025 16:14:40 +0800 Subject: [PATCH] 1. support update huge volume data a. update market data at first b. According to the first updated market data timestamp, get earlier window-size records. c. Calculate huge volume data by the latest market data d. Only insert new data to huge volume table 2. rename code file to be more proper. --- config.py | 10 +- core/db_huge_volume_data.py | 378 ++++++++++++++++++ core/db_statistics_data.py | 0 core/{statistics.py => huge_volume.py} | 24 +- ...data_monitor.py => market_data_monitor.py} | 2 +- huge_volume_main.py | 199 +++++++++ monitor_main.py => market_data_main.py | 67 ++-- sql/query/sql_playground.sql | 5 +- sql/table/crypto_huge_volume.sql | 40 ++ statistics_main.py | 77 ---- 10 files changed, 679 insertions(+), 123 deletions(-) create mode 100644 core/db_huge_volume_data.py delete mode 100644 core/db_statistics_data.py rename core/{statistics.py => huge_volume.py} (85%) rename core/{data_monitor.py => market_data_monitor.py} (99%) create mode 100644 huge_volume_main.py rename monitor_main.py => market_data_main.py (54%) create mode 100644 sql/table/crypto_huge_volume.sql delete mode 100644 statistics_main.py diff --git a/config.py b/config.py index 93b267b..8e252ca 100644 --- a/config.py +++ b/config.py @@ -52,15 +52,15 @@ MONITOR_CONFIG = { "volume_monitor":{ "symbols": ["XCH-USDT", "BTC-USDT", "ETH-USDT", "SOL-USDT", "DOGE-USDT", "XCH-USDT-SWAP", "BTC-USDT-SWAP", "ETH-USDT-SWAP", "SOL-USDT-SWAP", "DOGE-USDT-SWAP"], - "intervals": ["5m", "15m", "1H", "4H", "1D"], + "bars": ["5m", "15m", "1H", "4H", "1D"], "initial_date": "2025-05-01 00:00:00" }, "price_monitor":{ "symbols": ["XCH-USDT"], - "intervals": [ - {"interval": "5m", "threshold": 0.025}, - {"interval": "15m", "threshold": 0.5}, - {"interval": "1H", "threshold": 0.1} + "bats": [ + {"bar": "5m", "threshold": 0.025}, + {"bar": "15m", "threshold": 0.5}, + {"bar": "1H", "threshold": 0.1} ] } } diff --git a/core/db_huge_volume_data.py b/core/db_huge_volume_data.py new file mode 100644 index 0000000..d433615 --- /dev/null +++ b/core/db_huge_volume_data.py @@ -0,0 +1,378 @@ +import pandas as pd +import logging +from core.db_manager import DBData +from core.utils import check_date_time_format, datetime_to_timestamp + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") + + +class DBHugeVolumeData: + def __init__( + self, + db_url: str + ): + self.db_url = db_url + self.table_name = "crypto_huge_volume" + self.columns = [ + "symbol", + "bar", + "timestamp", + "date_time", + "open", + "high", + "low", + "close", + "volume", + "volCcy", + "volCCyQuote", + "volume_ma", + "volume_std", + "volume_threshold", + "huge_volume", + "volume_ratio", + "spike_intensity", + "close_80_percentile", + "close_20_percentile", + "price_high", + "price_low", + "volume_price_spike", + "create_time", + ] + self.db_manager = DBData(db_url, self.table_name, self.columns) + + def insert_data_to_mysql(self, df: pd.DataFrame): + """ + 将巨量交易数据保存到MySQL的crypto_huge_volume表 + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 适用场景:中小数据量(<10万条) + :param df: 巨量交易数据DataFrame + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql(df) + + def insert_data_to_mysql_fast(self, df: pd.DataFrame): + """ + 快速插入巨量交易数据(方案2:使用executemany批量插入) + 速度:⭐⭐⭐⭐ 很快 + 内存:⭐⭐⭐⭐⭐ 低 + 适用场景:中等数据量 + :param df: 巨量交易数据DataFrame + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_fast(df) + + def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000): + """ + 分块插入巨量交易数据(方案3:适合大数据量) + 速度:⭐⭐⭐ 中等 + 内存:⭐⭐⭐⭐⭐ 最低 + 适用场景:大数据量(>10万条) + :param df: 巨量交易数据DataFrame + :param chunk_size: 分块大小 + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) + + def insert_data_to_mysql_simple(self, df: pd.DataFrame): + """ + 简单插入巨量交易数据(方案4:直接使用to_sql,忽略重复) + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 注意:会抛出重复键错误,需要额外处理 + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_simple(df) + + def query_latest_data(self, symbol: str, bar: str): + """ + 查询最新巨量交易数据 + :param symbol: 交易对 + :param bar: K线周期 + """ + sql = """ + SELECT * FROM crypto_huge_volume + WHERE symbol = :symbol AND bar = :bar + ORDER BY timestamp DESC + LIMIT 1 + """ + condition_dict = {"symbol": symbol, "bar": bar} + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def query_data_by_symbol_bar_timestamp(self, symbol: str, bar: str, timestamp: int): + """ + 根据交易对、K线周期和时间戳查询巨量交易数据 + :param symbol: 交易对 + :param bar: K线周期 + :param timestamp: 时间戳 + """ + sql = """ + SELECT * FROM crypto_huge_volume + WHERE symbol = :symbol AND bar = :bar AND timestamp = :timestamp + """ + condition_dict = {"symbol": symbol, "bar": bar, "timestamp": timestamp} + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def query_huge_volume_data_by_symbol_bar(self, symbol: str, bar: str, start: str = None, end: str = None): + """ + 根据交易对和K线周期查询巨量交易数据 + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + """ + if start is None or end is None: + sql = """ + SELECT * FROM crypto_huge_volume + WHERE symbol = :symbol AND bar = :bar + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar} + else: + if start is not None: + if isinstance(start, str): + if start.isdigit(): + start = int(start) + else: + start = check_date_time_format(start) + # 判断是否是日期时间格式 + if start is None: + logging.warning(f"日期时间格式错误: {start}") + return None + start = datetime_to_timestamp(start) + if end is not None: + if isinstance(end, str): + if end.isdigit(): + end = int(end) + else: + end = check_date_time_format(end) + if end is None: + logging.warning(f"日期时间格式错误: {end}") + return None + end = datetime_to_timestamp(end) + if start is not None and end is not None: + if start > end: + start, end = end, start + sql = """ + SELECT * FROM crypto_huge_volume + WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end} + elif start is not None: + sql = """ + SELECT * FROM crypto_huge_volume + WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "start": start} + elif end is not None: + sql = """ + SELECT * FROM crypto_huge_volume + WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "end": end} + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_huge_volume_records(self, symbol: str = None, bar: str = None, start: str = None, end: str = None): + """ + 查询巨量交易记录(只返回huge_volume=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + """ + conditions = ["huge_volume = 1"] + condition_dict = {} + + if symbol: + conditions.append("symbol = :symbol") + condition_dict["symbol"] = symbol + if bar: + conditions.append("bar = :bar") + condition_dict["bar"] = bar + if start: + if isinstance(start, str): + if start.isdigit(): + start = int(start) + else: + start = check_date_time_format(start) + if start is None: + logging.warning(f"日期时间格式错误: {start}") + return None + start = datetime_to_timestamp(start) + conditions.append("timestamp >= :start") + condition_dict["start"] = start + if end: + if isinstance(end, str): + if end.isdigit(): + end = int(end) + else: + end = check_date_time_format(end) + if end is None: + logging.warning(f"日期时间格式错误: {end}") + return None + end = datetime_to_timestamp(end) + conditions.append("timestamp <= :end") + condition_dict["end"] = end + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def query_volume_price_spike_records(self, symbol: str = None, bar: str = None, start: str = None, end: str = None): + """ + 查询量价尖峰记录(只返回volume_price_spike=1的记录) + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + """ + conditions = ["volume_price_spike = 1"] + condition_dict = {} + + if symbol: + conditions.append("symbol = :symbol") + condition_dict["symbol"] = symbol + if bar: + conditions.append("bar = :bar") + condition_dict["bar"] = bar + if start: + if isinstance(start, str): + if start.isdigit(): + start = int(start) + else: + start = check_date_time_format(start) + if start is None: + logging.warning(f"日期时间格式错误: {start}") + return None + start = datetime_to_timestamp(start) + conditions.append("timestamp >= :start") + condition_dict["start"] = start + if end: + if isinstance(end, str): + if end.isdigit(): + end = int(end) + else: + end = check_date_time_format(end) + if end is None: + logging.warning(f"日期时间格式错误: {end}") + return None + end = datetime_to_timestamp(end) + conditions.append("timestamp <= :end") + condition_dict["end"] = end + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_huge_volume + WHERE {where_clause} + ORDER BY timestamp DESC + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) + + def get_statistics_summary(self, symbol: str = None, bar: str = None, start: str = None, end: str = None): + """ + 获取巨量交易统计摘要 + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + """ + conditions = [] + condition_dict = {} + + if symbol: + conditions.append("symbol = :symbol") + condition_dict["symbol"] = symbol + if bar: + conditions.append("bar = :bar") + condition_dict["bar"] = bar + if start: + if isinstance(start, str): + if start.isdigit(): + start = int(start) + else: + start = check_date_time_format(start) + if start is None: + logging.warning(f"日期时间格式错误: {start}") + return None + start = datetime_to_timestamp(start) + conditions.append("timestamp >= :start") + condition_dict["start"] = start + if end: + if isinstance(end, str): + if end.isdigit(): + end = int(end) + else: + end = check_date_time_format(end) + if end is None: + logging.warning(f"日期时间格式错误: {end}") + return None + end = datetime_to_timestamp(end) + conditions.append("timestamp <= :end") + condition_dict["end"] = end + + where_clause = " AND ".join(conditions) if conditions else "1=1" + sql = f""" + SELECT + COUNT(*) as total_records, + SUM(huge_volume) as huge_volume_count, + SUM(volume_price_spike) as volume_price_spike_count, + SUM(price_high) as price_high_count, + SUM(price_low) as price_low_count, + AVG(volume_ratio) as avg_volume_ratio, + MAX(volume_ratio) as max_volume_ratio, + AVG(spike_intensity) as avg_spike_intensity, + MAX(spike_intensity) as max_spike_intensity + FROM crypto_huge_volume + WHERE {where_clause} + """ + + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def get_top_volume_spikes(self, symbol: str = None, bar: str = None, limit: int = 10): + """ + 获取成交量尖峰最高的记录 + :param symbol: 交易对 + :param bar: K线周期 + :param limit: 返回记录数量 + """ + conditions = ["huge_volume = 1"] + condition_dict = {} + + if symbol: + conditions.append("symbol = :symbol") + condition_dict["symbol"] = symbol + if bar: + conditions.append("bar = :bar") + condition_dict["bar"] = bar + + where_clause = " AND ".join(conditions) + sql = f""" + SELECT * FROM crypto_huge_volume + WHERE {where_clause} + ORDER BY volume_ratio DESC + LIMIT :limit + """ + condition_dict["limit"] = limit + + return self.db_manager.query_data(sql, condition_dict, return_multi=True) diff --git a/core/db_statistics_data.py b/core/db_statistics_data.py deleted file mode 100644 index e69de29..0000000 diff --git a/core/statistics.py b/core/huge_volume.py similarity index 85% rename from core/statistics.py rename to core/huge_volume.py index 657480c..976beec 100644 --- a/core/statistics.py +++ b/core/huge_volume.py @@ -1,25 +1,25 @@ -from core.db_manager import query_market_data_by_symbol_bar from pandas import DataFrame import logging import os import re import pandas as pd +from datetime import datetime logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -class Statistics: +class HugeVolume: def __init__(self, output_folder: str = "./output"): self.output_folder = output_folder os.makedirs(self.output_folder, exist_ok=True) - def detect_volume_spike( + def detect_huge_volume( self, data: DataFrame, + window_size: int = 50, threshold: float = 2.0, - window: int = 50, check_price: bool = False, only_output_huge_volume: bool = False, output_excel: bool = False, @@ -52,8 +52,8 @@ class Statistics: data = data.sort_values(by="timestamp", ascending=True).copy() # 计算移动窗口的成交量均值和标准差 - data["volume_ma"] = data["volume"].rolling(window=window, min_periods=1).mean() - data["volume_std"] = data["volume"].rolling(window=window, min_periods=1).std() + data["volume_ma"] = data["volume"].rolling(window=window_size, min_periods=1).mean() + data["volume_std"] = data["volume"].rolling(window=window_size, min_periods=1).std() # 计算成交量阈值(均值 + threshold倍标准差) data["volume_threshold"] = data["volume_ma"] + threshold * data["volume_std"] @@ -75,10 +75,10 @@ class Statistics: # 计算移动窗口的收盘价分位数 data["close_80_percentile"] = ( - data["close"].rolling(window=window, min_periods=1).quantile(0.8) + data["close"].rolling(window=window_size, min_periods=1).quantile(0.8) ) data["close_20_percentile"] = ( - data["close"].rolling(window=window, min_periods=1).quantile(0.2) + data["close"].rolling(window=window_size, min_periods=1).quantile(0.2) ) # 检查收盘价是否在80%分位数及以上或20%分位数及以下 @@ -97,6 +97,7 @@ class Statistics: if only_output_huge_volume: data = data[data["huge_volume"] == 1] + data["create_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if output_excel: # 检查数据是否为空 @@ -112,7 +113,10 @@ class Statistics: symbol = data["symbol"].iloc[0] bar = data["bar"].iloc[0] file_name = f"volume_spike_{symbol}_{bar}_{start_date}_{end_date}.xlsx" - with pd.ExcelWriter(os.path.join(self.output_folder, file_name)) as writer: - data.to_excel(writer, sheet_name="volume_spike", index=False) + try: + with pd.ExcelWriter(os.path.join(self.output_folder, file_name)) as writer: + data.to_excel(writer, sheet_name="volume_spike", index=False) + except Exception as e: + logging.error(f"导出Excel文件失败: {e}") return data diff --git a/core/data_monitor.py b/core/market_data_monitor.py similarity index 99% rename from core/data_monitor.py rename to core/market_data_monitor.py index 1e43a68..4e79584 100644 --- a/core/data_monitor.py +++ b/core/market_data_monitor.py @@ -7,7 +7,7 @@ import okx.MarketData as Market from core.utils import datetime_to_timestamp logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') -class DataMonitor: +class MarketDataMonitor: def __init__(self, api_key: str, secret_key: str, diff --git a/huge_volume_main.py b/huge_volume_main.py new file mode 100644 index 0000000..a1d3dc9 --- /dev/null +++ b/huge_volume_main.py @@ -0,0 +1,199 @@ +from core.huge_volume import HugeVolume +from core.db_market_data import DBMarketData +from core.db_huge_volume_data import DBHugeVolumeData +from core.utils import timestamp_to_datetime +from market_data_main import MarketDataMain +import logging +from config import MONITOR_CONFIG, MYSQL_CONFIG +from datetime import datetime +import pandas as pd + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + + +class HugeVolumeMain: + def __init__(self, window_size: int = 50, threshold: float = 2.0): + mysql_user = MYSQL_CONFIG.get("user", "xch") + mysql_password = MYSQL_CONFIG.get("password", "") + if not mysql_password: + raise ValueError("MySQL password is not set") + mysql_host = MYSQL_CONFIG.get("host", "localhost") + mysql_port = MYSQL_CONFIG.get("port", 3306) + mysql_database = MYSQL_CONFIG.get("database", "okx") + + self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" + self.huge_volume = HugeVolume() + self.db_market_data = DBMarketData(self.db_url) + self.db_huge_volume_data = DBHugeVolumeData(self.db_url) + self.monitor_main = MarketDataMain() + self.window_size = window_size + self.threshold = threshold + + def batch_initial_detect_volume_spike(self, start: str = None): + for symbol in self.monitor_main.symbols: + for bar in self.monitor_main.bars: + if start is None: + start = MONITOR_CONFIG.get("volume_monitor", {}).get( + "initial_date", "2025-05-01 00:00:00" + ) + data = self.detect_volume_spike( + symbol, bar, start, only_output_huge_volume=True, is_update=False + ) + if data is not None and len(data) > 0: + logging.info(f"此次初始化巨量交易数据: {len(data)}条") + else: + logging.info(f"此次初始化巨量交易数据为空") + + def detect_volume_spike( + self, + symbol: str = "XCH-USDT", + bar: str = "5m", + start: str = "2025-05-01 00:00:00", + end: str = None, + only_output_huge_volume: bool = False, + is_update: bool = False, + ): + if start is None: + start = MONITOR_CONFIG.get("volume_monitor", {}).get( + "initial_date", "2025-05-01 00:00:00" + ) + if end is None: + end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logging.info(f"开始处理巨量交易数据: {symbol} {bar} {start} {end}") + data = self.db_market_data.query_market_data_by_symbol_bar( + symbol, bar, start, end + ) + if data is None: + logging.warning(f"获取行情数据失败: {symbol} {bar} {start} {end}") + return None + else: + if len(data) == 0: + logging.warning(f"获取行情数据为空: {symbol} {bar} {start} {end}") + return None + else: + if isinstance(data, list): + data = pd.DataFrame(data) + elif isinstance(data, dict): + data = pd.DataFrame([data]) + data = self.huge_volume.detect_huge_volume( + data=data, + window_size=self.window_size, + threshold=self.threshold, + check_price=True, + only_output_huge_volume=only_output_huge_volume, + output_excel=True, + ) + if data is not None: + if is_update: + for index, row in data.iterrows(): + exist_huge_volume_data = ( + self.db_huge_volume_data.query_data_by_symbol_bar_timestamp( + symbol, bar, row["timestamp"] + ) + ) + if exist_huge_volume_data is not None: + # remove the exist_huge_volume_data from data + data = data[ + data["timestamp"] != exist_huge_volume_data["timestamp"] + ] + if data is not None and len(data) > 0: + self.db_huge_volume_data.insert_data_to_mysql(data) + else: + logging.warning(f"此次处理巨量交易数据为空: {symbol} {bar} {start} {end}") + return data + else: + return None + + def batch_update_volume_spike(self): + for symbol in self.monitor_main.symbols: + for bar in self.monitor_main.bars: + self.update_volume_spike(symbol, bar) + + def update_volume_spike(self, symbol: str, bar: str): + try: + self.monitor_main.update_data(symbol, bar) + latest_huge_volume_data = self.db_huge_volume_data.query_latest_data( + symbol, bar + ) + if latest_huge_volume_data is None or len(latest_huge_volume_data) == 0: + self.detect_volume_spike(symbol, bar, only_output_huge_volume=True) + return + else: + earliest_date_time = latest_huge_volume_data["date_time"] + earliest_timestamp = latest_huge_volume_data["timestamp"] + seconds = self.get_seconds_by_bar(bar) + earliest_timestamp = earliest_timestamp - ( + (self.window_size - 1) * seconds * 1000 + ) + earliest_date_time = timestamp_to_datetime(earliest_timestamp) + + data = self.detect_volume_spike( + symbol=symbol, + bar=bar, + start=earliest_date_time, + only_output_huge_volume=True, + is_update=True, + ) + logging.info( + f"更新巨量交易数据: {symbol} {bar} from {earliest_date_time}" + ) + if data is not None and len(data) > 0: + logging.info(f"此次更新巨量交易数据: {len(data)}条") + else: + logging.info(f"此次更新巨量交易数据为空") + except Exception as e: + logging.error(f"更新巨量交易数据失败: {symbol} {bar} {e}") + + def get_seconds_by_bar(self, bar: str): + """ + 根据bar获取秒数 + bar: 1s/1m/3m/5m/15m/30m/1H/2H/4H/6H/12H/1D/2D/3D/1W/1M/3M + :param bar: 时间周期 + :return: 秒数 + """ + if bar == "1s": + return 1 + elif bar == "1m": + return 60 + elif bar == "3m": + return 180 + elif bar == "5m": + return 300 + elif bar == "15m": + return 900 + elif bar == "30m": + return 1800 + elif bar == "1H": + return 3600 + elif bar == "2H": + return 7200 + elif bar == "4H": + return 14400 + elif bar == "6H": + return 21600 + elif bar == "12H": + return 43200 + elif bar == "1D": + return 86400 + elif bar == "2D": + return 172800 + elif bar == "3D": + return 259200 + elif bar == "1W": + return 604800 + elif bar == "1M": + return 2592000 + elif bar == "3M": + return 7776000 + else: + raise ValueError(f"不支持的bar: {bar}") + + +if __name__ == "__main__": + huge_volume_main = HugeVolumeMain() + # statistics_main.batch_initial_detect_volume_spike( + # start="2025-05-01 00:00:00", + # ) + huge_volume_main.batch_update_volume_spike() diff --git a/monitor_main.py b/market_data_main.py similarity index 54% rename from monitor_main.py rename to market_data_main.py index 55fb782..a35c275 100644 --- a/monitor_main.py +++ b/market_data_main.py @@ -1,6 +1,6 @@ import logging from time import sleep -from core.data_monitor import DataMonitor +from core.market_data_monitor import MarketDataMonitor from core.db_market_data import DBMarketData from config import ( API_KEY, @@ -14,9 +14,9 @@ from config import ( logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") -class MonitorMain: +class MarketDataMain: def __init__(self): - self.data_monitor = DataMonitor( + self.market_data_monitor = MarketDataMonitor( api_key=API_KEY, secret_key=SECRET_KEY, passphrase=PASSPHRASE, @@ -25,8 +25,8 @@ class MonitorMain: self.symbols = MONITOR_CONFIG.get("volume_monitor", {}).get( "symbols", ["XCH-USDT"] ) - self.intervals = MONITOR_CONFIG.get("volume_monitor", {}).get( - "intervals", ["5m", "15m", "1H", "4H", "1D"] + self.bars = MONITOR_CONFIG.get("volume_monitor", {}).get( + "bars", ["5m", "15m", "1H", "4H", "1D"] ) self.initial_date = MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-07-01 00:00:00" @@ -47,26 +47,28 @@ class MonitorMain: 初始化数据 """ for symbol in self.symbols: - for interval in self.intervals: - latest_data = self.db_market_data.query_latest_data(symbol, interval) + for bar in self.bars: + logging.info(f"开始初始化行情数据: {symbol} {bar}") + latest_data = self.db_market_data.query_latest_data(symbol, bar) if latest_data: logging.info( - f"已初始化{symbol}, {interval} 最新数据,请使用update_data()更新数据" + f"已初始化{symbol}, {bar} 最新行情数据,请使用update_data()更新行情数据" ) continue - self.fetch_save_data(symbol, interval, self.initial_date) + self.fetch_save_data(symbol, bar, self.initial_date) - def fetch_save_data(self, symbol: str, interval: str, start: str): + def fetch_save_data(self, symbol: str, bar: str, start: str): """ 获取保存数据 """ - data = self.data_monitor.get_historical_kline_data( - symbol=symbol, start=start, bar=interval + data = self.market_data_monitor.get_historical_kline_data( + symbol=symbol, start=start, bar=bar ) if data is not None and len(data) > 0: self.db_market_data.insert_data_to_mysql(data) + return data - def update_data(self): + def batch_update_data(self): """ 更新数据 1. 获取最新数据 @@ -75,22 +77,29 @@ class MonitorMain: 4. 将最新数据保存到数据库 """ for symbol in self.symbols: - for interval in self.intervals: - latest_data = self.db_market_data.query_latest_data(symbol, interval) - if not latest_data: - self.fetch_save_data(symbol, interval, self.initial_date) - continue - else: - latest_timestamp = latest_data.get("timestamp") - if latest_timestamp: - latest_timestamp = int(latest_timestamp) - else: - logging.warning(f"获取{symbol}, {interval} 最新数据失败") - continue - self.fetch_save_data(symbol, interval, latest_timestamp + 1) + for bar in self.bars: + self.update_data(symbol, bar) + + def update_data(self, symbol: str, bar: str): + """ + 更新数据 + """ + logging.info(f"开始更新行情数据: {symbol} {bar}") + latest_data = self.db_market_data.query_latest_data(symbol, bar) + if not latest_data: + data = self.fetch_save_data(symbol, bar, self.initial_date) + else: + latest_timestamp = latest_data.get("timestamp") + if latest_timestamp: + latest_timestamp = int(latest_timestamp) + else: + logging.warning(f"获取{symbol}, {bar} 最新数据失败") + return + data = self.fetch_save_data(symbol, bar, latest_timestamp + 1) + return data if __name__ == "__main__": - monitor_main = MonitorMain() - monitor_main.update_data() - # monitor_main.initial_data() + market_data_main = MarketDataMain() + market_data_main.batch_update_data() + # market_data_main.initial_data() diff --git a/sql/query/sql_playground.sql b/sql/query/sql_playground.sql index ba6f3e8..61bffa1 100644 --- a/sql/query/sql_playground.sql +++ b/sql/query/sql_playground.sql @@ -1,3 +1,6 @@ select * from crypto_market_data -WHERE symbol='XCH-USDT-SWAP' and bar='5m' and date_time > '2025-07-26' +WHERE symbol='XCH-USDT' and bar='5m' and date_time > '2025-07-26' order by timestamp desc; + +SET SQL_SAFE_UPDATES = 0; +delete from crypto_market_data where create_time is NULL; \ No newline at end of file diff --git a/sql/table/crypto_huge_volume.sql b/sql/table/crypto_huge_volume.sql new file mode 100644 index 0000000..ecc89bc --- /dev/null +++ b/sql/table/crypto_huge_volume.sql @@ -0,0 +1,40 @@ +CREATE TABLE IF NOT EXISTS crypto_huge_volume ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + symbol VARCHAR(50) NOT NULL COMMENT '交易对', + bar VARCHAR(20) NOT NULL COMMENT 'K线周期', + timestamp BIGINT NOT NULL COMMENT '时间戳', + date_time VARCHAR(50) NOT NULL COMMENT '日期时间', + open DECIMAL(20,5) NOT NULL COMMENT '开盘价', + high DECIMAL(20,5) NOT NULL COMMENT '最高价', + low DECIMAL(20,5) NOT NULL COMMENT '最低价', + close DECIMAL(20,5) NOT NULL COMMENT '收盘价', + volume DECIMAL(30,8) NOT NULL COMMENT '交易量', + volCcy DECIMAL(30,8) NOT NULL COMMENT '交易量(基础货币)', + volCCyQuote DECIMAL(30,8) NOT NULL COMMENT '交易量(计价货币)', + volume_ma DECIMAL(30,8) NOT NULL COMMENT '交易量移动平均', + volume_std DECIMAL(30,8) NOT NULL COMMENT '交易量标准差', + volume_threshold DECIMAL(30,8) NOT NULL COMMENT '交易量阈值', + huge_volume TINYINT NOT NULL DEFAULT 0 COMMENT '是否为巨量(0:否,1:是)', + volume_ratio DECIMAL(20,8) NOT NULL COMMENT '交易量比率', + spike_intensity DECIMAL(20,8) NOT NULL COMMENT '尖峰强度', + close_80_percentile DECIMAL(20,5) NOT NULL COMMENT '收盘价80%分位数', + close_20_percentile DECIMAL(20,5) NOT NULL COMMENT '收盘价20%分位数', + price_high TINYINT NOT NULL DEFAULT 0 COMMENT '价格是否达到高点(0:否,1:是)', + price_low TINYINT NOT NULL DEFAULT 0 COMMENT '价格是否达到低点(0:否,1:是)', + volume_price_spike TINYINT NOT NULL DEFAULT 0 COMMENT '是否出现量价尖峰(0:否,1:是)', + create_time VARCHAR(50) NOT NULL COMMENT '创建时间', + UNIQUE KEY uniq_symbol_bar_timestamp (symbol, bar, timestamp), + INDEX idx_symbol_bar (symbol, bar), + INDEX idx_timestamp (timestamp), + INDEX idx_huge_volume (huge_volume), + INDEX idx_volume_price_spike (volume_price_spike), + INDEX idx_date_time (date_time) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='加密货币巨量交易数据表'; + +-- 添加注释说明 +-- 该表用于存储加密货币市场K线数据以及相关的巨量交易分析指标 +-- 主要功能: +-- 1. 存储基础K线数据(价格、成交量等) +-- 2. 计算并存储巨量交易相关指标 +-- 3. 识别价格和成交量的异常波动 +-- 4. 为交易策略提供数据支持 diff --git a/statistics_main.py b/statistics_main.py deleted file mode 100644 index 4f24c3a..0000000 --- a/statistics_main.py +++ /dev/null @@ -1,77 +0,0 @@ -from core.statistics import Statistics -from core.db_market_data import DBMarketData -from monitor_main import MonitorMain -import logging -from config import MONITOR_CONFIG, MYSQL_CONFIG -from datetime import datetime -import pandas as pd - -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) - - -class StatisticsMain: - def __init__(self): - mysql_user = MYSQL_CONFIG.get("user", "xch") - mysql_password = MYSQL_CONFIG.get("password", "") - if not mysql_password: - raise ValueError("MySQL password is not set") - mysql_host = MYSQL_CONFIG.get("host", "localhost") - mysql_port = MYSQL_CONFIG.get("port", 3306) - mysql_database = MYSQL_CONFIG.get("database", "okx") - - self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" - self.statistics = Statistics() - self.db_market_data = DBMarketData(self.db_url) - self.monitor_main = MonitorMain() - - def batch_detect_volume_spike(self, start: str, end: str): - pass - - def detect_volume_spike( - self, - symbol: str = "XCH-USDT", - bar: str = "5m", - start: str = "2025-05-01 00:00:00", - end: str = None, - only_output_huge_volume: bool = False, - ): - if start is None: - start = MONITOR_CONFIG.get("volume_monitor", {}).get( - "initial_date", "2025-05-01 00:00:00" - ) - if end is None: - end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - data = self.db_market_data.query_market_data_by_symbol_bar(symbol, bar, start, end) - if data is None: - logging.warning(f"获取数据失败: {symbol} {bar} {start} {end}") - return None - else: - if len(data) == 0: - logging.warning(f"获取数据为空: {symbol} {bar} {start} {end}") - return None - else: - if isinstance(data, list): - data = pd.DataFrame(data) - elif isinstance(data, dict): - data = pd.DataFrame([data]) - return self.statistics.detect_volume_spike( - data=data, - check_price=True, - only_output_huge_volume=only_output_huge_volume, - output_excel=True, - ) - - def update_volume_spike(self): - self.monitor_main.update_data() - - -if __name__ == "__main__": - statistics_main = StatisticsMain() - statistics_main.detect_volume_spike( - symbol="XCH-USDT", - bar="5m", - start="2025-05-01 00:00:00", - only_output_huge_volume=True, - )