diff --git a/core/biz/__pycache__/huge_volume.cpython-312.pyc b/core/biz/__pycache__/huge_volume.cpython-312.pyc new file mode 100644 index 0000000..8f72f6f Binary files /dev/null and b/core/biz/__pycache__/huge_volume.cpython-312.pyc differ diff --git a/core/biz/__pycache__/market_data_monitor.cpython-312.pyc b/core/biz/__pycache__/market_data_monitor.cpython-312.pyc new file mode 100644 index 0000000..98427ee Binary files /dev/null and b/core/biz/__pycache__/market_data_monitor.cpython-312.pyc differ diff --git a/core/biz/__pycache__/trade_data.cpython-312.pyc b/core/biz/__pycache__/trade_data.cpython-312.pyc new file mode 100644 index 0000000..4d0e21c Binary files /dev/null and b/core/biz/__pycache__/trade_data.cpython-312.pyc differ diff --git a/core/huge_volume.py b/core/biz/huge_volume.py similarity index 53% rename from core/huge_volume.py rename to core/biz/huge_volume.py index fd5f6a2..e7ee399 100644 --- a/core/huge_volume.py +++ b/core/biz/huge_volume.py @@ -4,6 +4,7 @@ import os import re import pandas as pd from datetime import datetime +from copy import deepcopy from typing import Optional, List, Dict, Any, Tuple logging.basicConfig( @@ -175,8 +176,7 @@ class HugeVolume: self, data: pd.DataFrame, window_size: int = 50, - periods: List[int] = [3, 5], - output_excel: bool = False + periods: List[int] = [1, 2, 3, 5, 10], ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ 1. 根据period_count,计算每个timestamp的下一个periods的rise_or_fall @@ -195,9 +195,8 @@ class HugeVolume: 1000000000 100 1 103 rise 98 fall 因为之后第3个periods的close是103,所以next_3_result为rise 因为之后第5个periods的close是98,所以next_3_result为fall - - 2. 如果output_excel为True,则输出到excel - 3. 新建一个列表: result,计算huge_volume为1时,之后3或5个周期,close上涨或下跌的比例 + 2. 根据volume_ratio_percentile_10,将data分成10份,然后计算每一份的上涨与下跌次数,以及平均收益率 + 3. 新建一个列表: result,计算之后n个周期,close上涨或下跌的比例 a. 计算huge_volume为1时,且price_80_high为1时的数量,如100, 并且计算next_3_result为fall的次数,如50, 然后计算fall_ratio, 如50/100=0.5 b. 计算huge_volume为1时,且price_80_high为1时的数量,如100, 并且计算next_5_result为fall的次数,如30, 然后计算fall_ratio, 如30/100=0.3 c. 计算huge_volume为1时,且price_20_low为1时的数量,如100, 并且计算next_3_result为rise的次数,如50, 然后计算rise_ratio, 如50/100=0.5 @@ -206,21 +205,32 @@ class HugeVolume: Args: data: 包含巨量交易数据的DataFrame - periods: 计算周期列表,默认[3, 5] + periods: 计算周期列表,默认[1, 2, 3, 5, 10] output_excel: 是否输出到Excel文件,默认False Returns: Tuple[pd.DataFrame, pd.DataFrame]: (处理后的数据, 统计结果) """ + # 将huge_volume, volume_80_20_price_spike, volume_90_10_price_spike, + # price_80_high, price_20_low, price_90_high, price_10_low设置为整型 + data["huge_volume"] = data["huge_volume"].astype(int) + data["volume_80_20_price_spike"] = data["volume_80_20_price_spike"].astype(int) + data["volume_90_10_price_spike"] = data["volume_90_10_price_spike"].astype(int) + data["price_80_high"] = data["price_80_high"].astype(int) + data["price_20_low"] = data["price_20_low"].astype(int) + data["price_90_high"] = data["price_90_high"].astype(int) + data["price_10_low"] = data["price_10_low"].astype(int) + data = data.sort_values(by="timestamp", ascending=True) data = data.reset_index(drop=True) - + # 计算未来价格变化 for period in periods: data[f"next_{period}_close"] = data["close"].shift(-period) - data[f"next_{period}_result"] = ( - data[f"next_{period}_close"] / data["close"] - 1 + data[f"next_{period}_change"] = ( + (data[f"next_{period}_close"] / data["close"] - 1) * 100 ) - data[f"next_{period}_result"] = data[f"next_{period}_result"].apply( + # 添加一列next_{period}_result,如果next_{period}_change大于0,则值为rise,如果next_{period}_change小于0,则值为fall,如果next_{period}_change等于0,则值为draw + data[f"next_{period}_result"] = data[f"next_{period}_change"].apply( lambda x: ( "rise" if pd.notna(x) and x > 0 @@ -231,68 +241,126 @@ class HugeVolume: ) ) ) - - # 过滤data, 只获取huge_volume为1,且价格处于分位数位置的行 - price_conditions = [] - if "price_80_high" in data.columns: - price_conditions.append(data["price_80_high"] == 1) - if "price_20_low" in data.columns: - price_conditions.append(data["price_20_low"] == 1) - if "price_90_high" in data.columns: - price_conditions.append(data["price_90_high"] == 1) - if "price_10_low" in data.columns: - price_conditions.append(data["price_10_low"] == 1) - - if price_conditions: - combined_condition = data["huge_volume"] == 1 - for condition in price_conditions: - combined_condition = combined_condition | condition - data = data[combined_condition] - - data = data.reset_index(drop=True) - - # 统计各种分位数情况的数量 - price_stats = {} - for price_type in ["price_80_high", "price_20_low", "price_90_high", "price_10_low"]: - if price_type in data.columns: - price_stats[price_type] = len(data[(data["huge_volume"] == 1) & (data[price_type] == 1)]) - + + # 将volume_ratio按照百分位分成十份 + huge_volume_data = deepcopy(data[data["huge_volume"] == 1]) + huge_volume_data = huge_volume_data.sort_values(by="timestamp", ascending=True) + huge_volume_data = huge_volume_data.reset_index(drop=True) + huge_volume_data["volume_ratio_percentile"] = huge_volume_data["volume_ratio"].rank(pct=True) + huge_volume_data["volume_ratio_percentile_10"] = huge_volume_data["volume_ratio_percentile"].apply( + lambda x: 10 if x <= 0.1 else 20 + if x <= 0.2 else 30 if x <= 0.3 else 40 + if x <= 0.4 else 50 if x <= 0.5 else 60 + if x <= 0.6 else 70 if x <= 0.7 else 80 + if x <= 0.8 else 90 if x <= 0.9 else 100 + ) + huge_volume_ratio_percentile_10_mean = huge_volume_data.groupby("volume_ratio_percentile_10")["volume_ratio"].mean() + percentile_10_mean = round(float(huge_volume_data["volume_ratio"].mean()), 4) + # insert one row to huge_volume_ratio_percentile_10_mean, key is -1, value is percentile_10_mean + huge_volume_ratio_percentile_10_mean.loc["-1"] = percentile_10_mean + # transform huge_volume_ratio_percentile_10_mean index to int + huge_volume_ratio_percentile_10_mean.index = huge_volume_ratio_percentile_10_mean.index.astype(int) + # sort huge_volume_ratio_percentile_10_mean by index + huge_volume_ratio_percentile_10_mean = huge_volume_ratio_percentile_10_mean.sort_index() + results = [] - for period in periods: - for price_type, count in price_stats.items(): - if count > 0: - # 计算下跌次数 - fall_count = len( - data[ - (data["huge_volume"] == 1) & - (data[price_type] == 1) & - (data[f"next_{period}_result"] == "fall") - ] - ) - # 计算上涨次数 - rise_count = len( - data[ - (data["huge_volume"] == 1) & - (data[price_type] == 1) & - (data[f"next_{period}_result"] == "rise") - ] - ) - - results.append( - { - "symbol": data["symbol"].iloc[0] if len(data) > 0 else "", - "bar": data["bar"].iloc[0] if len(data) > 0 else "", - "window_size": window_size, - "huge_volume": 1, - "price_type": price_type, - "next_period": period, - "fall_count": fall_count, - "rise_count": rise_count, - "fall_ratio": fall_count / count, - "rise_ratio": rise_count / count, - "total_count": count, - } - ) + # iterate huge_volume_ratio_percentile_10_mean + for index, value in huge_volume_ratio_percentile_10_mean.items(): + if index == -1: + data_temp = deepcopy(huge_volume_data) + volume_ratio_percentile_10 = "all" + current_percentile_10_mean = percentile_10_mean + else: + data_temp = deepcopy(huge_volume_data[huge_volume_data["volume_ratio_percentile_10"] == index]) + volume_ratio_percentile_10 = str(index) + current_percentile_10_mean = round(value, 4) + + data_temp = data_temp.reset_index(drop=True) + data_temp = data_temp.sort_values(by="timestamp", ascending=True) + data_temp = data_temp.reset_index(drop=True) + + # 过滤data, 只获取huge_volume为1,且价格处于分位数位置的行 + price_conditions = [] + if "price_80_high" in data_temp.columns: + price_conditions.append(data_temp["price_80_high"] == 1) + if "price_20_low" in data_temp.columns: + price_conditions.append(data_temp["price_20_low"] == 1) + if "price_90_high" in data_temp.columns: + price_conditions.append(data_temp["price_90_high"] == 1) + if "price_10_low" in data_temp.columns: + price_conditions.append(data_temp["price_10_low"] == 1) + + if price_conditions: + combined_condition = data_temp["huge_volume"] == 1 + for condition in price_conditions: + combined_condition = combined_condition | condition + data_temp = data_temp[combined_condition] + + data_temp = data_temp.reset_index(drop=True) + + # 统计各种分位数情况的数量 + price_stats = {} + for price_type in ["price_80_high", "price_20_low", "price_90_high", "price_10_low"]: + if price_type in data.columns: + price_stats[price_type] = len(data_temp[(data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1)]) + + for period in periods: + for price_type, count in price_stats.items(): + if count > 0: + # 计算下跌次数 + fall_count = len( + data_temp[ + (data_temp["huge_volume"] == 1) & + (data_temp[price_type] == 1) & + (data_temp[f"next_{period}_result"] == "fall") + ] + ) + # 计算上涨次数 + rise_count = len( + data_temp[ + (data_temp["huge_volume"] == 1) & + (data_temp[price_type] == 1) & + (data_temp[f"next_{period}_result"] == "rise") + ] + ) + + draw_count = len( + data_temp[ + (data_temp["huge_volume"] == 1) & + (data_temp[price_type] == 1) & + (data_temp[f"next_{period}_result"] == "draw") + ] + ) + # 根据data[f"next_{period}_result"]获得平均收益率 + average_return = float(data_temp[(data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1)] + [f"next_{period}_change"].mean()) + max_return = float(data_temp[(data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1)] + [f"next_{period}_change"].max()) + min_return = float(data_temp[(data_temp["huge_volume"] == 1) & (data_temp[price_type] == 1)] + [f"next_{period}_change"].min()) + + results.append( + { + "symbol": data_temp["symbol"].iloc[0] if len(data_temp) > 0 else "", + "bar": data_temp["bar"].iloc[0] if len(data_temp) > 0 else "", + "window_size": window_size, + "huge_volume": 1, + "volume_ratio_percentile_10": volume_ratio_percentile_10, + "volume_ratio_percentile_10_mean": current_percentile_10_mean, + "price_type": price_type, + "next_period": period, + "average_return": average_return, + "max_return": max_return, + "min_return": min_return, + "rise_count": rise_count, + "rise_ratio": round((rise_count / count) * 100, 4), + "fall_count": fall_count, + "fall_ratio": round((fall_count / count) * 100, 4), + "draw_count": draw_count, + "draw_ratio": round((draw_count / count) * 100, 4), + "total_count": count, + } + ) result_data = pd.DataFrame(results) - return data, result_data + return huge_volume_data, result_data diff --git a/core/market_data_monitor.py b/core/biz/market_data_monitor.py similarity index 92% rename from core/market_data_monitor.py rename to core/biz/market_data_monitor.py index 73c0773..069ef29 100644 --- a/core/market_data_monitor.py +++ b/core/biz/market_data_monitor.py @@ -5,7 +5,7 @@ from typing import Optional import pandas as pd import okx.MarketData as Market import okx.TradingData as TradingData -from core.utils import datetime_to_timestamp +from core.utils import transform_date_time_to_timestamp logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') class MarketDataMonitor: @@ -46,18 +46,9 @@ class MarketDataMonitor: two_months_ago = datetime.now() - timedelta(days=60) start_time = int(two_months_ago.timestamp() * 1000) else: - try: - # 判断是否就是timestamp整型数据 - if isinstance(start, int): - start_time = start - # 判断是否为纯数字(UTC毫秒级timestamp) - elif start.isdigit(): - start_time = int(start) - else: - # 按北京时间字符串处理,转换为毫秒级timestamp - start_time = datetime_to_timestamp(start) - except Exception as e: - logging.error(f"start参数解析失败: {e}") + start_time = transform_date_time_to_timestamp(start) + if start_time is None: + logging.error(f"start参数解析失败: {start}") return None columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"] all_data = [] diff --git a/core/quant_trader.py b/core/biz/quant_trader.py similarity index 100% rename from core/quant_trader.py rename to core/biz/quant_trader.py diff --git a/core/strategy.py b/core/biz/strategy.py similarity index 99% rename from core/strategy.py rename to core/biz/strategy.py index 547cd71..fc4305f 100644 --- a/core/strategy.py +++ b/core/biz/strategy.py @@ -3,7 +3,7 @@ from datetime import datetime import logging from typing import Optional import pandas as pd -from core.base import QuantTrader +from core.biz.quant_trader import QuantTrader logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') diff --git a/core/trade_data.py b/core/biz/trade_data.py similarity index 97% rename from core/trade_data.py rename to core/biz/trade_data.py index 5d24a92..7a4fcad 100644 --- a/core/trade_data.py +++ b/core/biz/trade_data.py @@ -4,8 +4,8 @@ import logging from typing import Optional import pandas as pd import okx.MarketData as Market -from core.utils import datetime_to_timestamp, timestamp_to_datetime -from core.db_trade_data import DBTradeData +from core.utils import timestamp_to_datetime +from core.db.db_trade_data import DBTradeData logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') diff --git a/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc b/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc new file mode 100644 index 0000000..8669d47 Binary files /dev/null and b/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc differ diff --git a/core/db/__pycache__/db_manager.cpython-312.pyc b/core/db/__pycache__/db_manager.cpython-312.pyc new file mode 100644 index 0000000..8339bb8 Binary files /dev/null and b/core/db/__pycache__/db_manager.cpython-312.pyc differ diff --git a/core/db/__pycache__/db_market_data.cpython-312.pyc b/core/db/__pycache__/db_market_data.cpython-312.pyc new file mode 100644 index 0000000..0fb8c83 Binary files /dev/null and b/core/db/__pycache__/db_market_data.cpython-312.pyc differ diff --git a/core/db/__pycache__/db_trade_data.cpython-312.pyc b/core/db/__pycache__/db_trade_data.cpython-312.pyc new file mode 100644 index 0000000..956f79e Binary files /dev/null and b/core/db/__pycache__/db_trade_data.cpython-312.pyc differ diff --git a/core/db_huge_volume_data.py b/core/db/db_huge_volume_data.py similarity index 96% rename from core/db_huge_volume_data.py rename to core/db/db_huge_volume_data.py index 1e6e56b..abaa2da 100644 --- a/core/db_huge_volume_data.py +++ b/core/db/db_huge_volume_data.py @@ -1,8 +1,8 @@ import pandas as pd import logging from typing import Optional, List, Dict, Any, Union -from core.db_manager import DBData -from core.utils import check_date_time_format, datetime_to_timestamp +from core.db.db_manager import DBData +from core.utils import transform_date_time_to_timestamp logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") @@ -55,21 +55,10 @@ class DBHugeVolumeData: """ if time_param is None: return None - - if isinstance(time_param, int): - return time_param - - if isinstance(time_param, str): - if time_param.isdigit(): - return int(time_param) - else: - parsed_time = check_date_time_format(time_param) - if parsed_time is None: - logging.warning(f"日期时间格式错误: {time_param}") - return None - return datetime_to_timestamp(parsed_time) - - return None + time_param = transform_date_time_to_timestamp(time_param) + if time_param is None: + return None + return time_param def _build_query_conditions( self, diff --git a/core/db_manager.py b/core/db/db_manager.py similarity index 98% rename from core/db_manager.py rename to core/db/db_manager.py index c1af412..7447b5f 100644 --- a/core/db_manager.py +++ b/core/db/db_manager.py @@ -2,11 +2,7 @@ import pandas as pd from sqlalchemy import create_engine, exc, text import re, datetime import logging -from core.utils import ( - transform_data_type, - datetime_to_timestamp, - check_date_time_format, -) +from core.utils import transform_data_type logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") diff --git a/core/db_market_data.py b/core/db/db_market_data.py similarity index 81% rename from core/db_market_data.py rename to core/db/db_market_data.py index 7f7742f..ac0b364 100644 --- a/core/db_market_data.py +++ b/core/db/db_market_data.py @@ -1,7 +1,7 @@ import pandas as pd import logging -from core.db_manager import DBData -from core.utils import check_date_time_format, datetime_to_timestamp +from core.db.db_manager import DBData +from core.utils import transform_date_time_to_timestamp logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") @@ -119,34 +119,13 @@ class DBMarketData: condition_dict = {"symbol": symbol, "bar": bar} else: if start is not None: - if isinstance(start, str): - if start.isdigit(): - start = int(start) - else: - start = check_date_time_format(start) - # 判断是否是日期时间格式 - if start is None: - logging.warning(f"日期时间格式错误: {start}") - return None - start = datetime_to_timestamp(start) - elif isinstance(start, int): - start = int(start) - else: + start = transform_date_time_to_timestamp(start) + if start is None: logging.warning(f"开始时间格式错误: {start}") return None if end is not None: - if isinstance(end, str): - if end.isdigit(): - end = int(end) - else: - end = check_date_time_format(end) - if end is None: - logging.warning(f"日期时间格式错误: {end}") - return None - end = datetime_to_timestamp(end) - elif isinstance(end, int): - end = int(end) - else: + end = transform_date_time_to_timestamp(end) + if end is None: logging.warning(f"结束时间格式错误: {end}") return None if start is not None and end is not None: diff --git a/core/db_trade_data.py b/core/db/db_trade_data.py similarity index 95% rename from core/db_trade_data.py rename to core/db/db_trade_data.py index 9d278e8..ead917f 100644 --- a/core/db_trade_data.py +++ b/core/db/db_trade_data.py @@ -1,8 +1,8 @@ import pandas as pd import logging from typing import Optional, List, Dict, Any, Union -from core.db_manager import DBData -from core.utils import check_date_time_format, datetime_to_timestamp +from core.db.db_manager import DBData +from core.utils import transform_date_time_to_timestamp logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") @@ -34,21 +34,10 @@ class DBTradeData: """ if time_param is None: return None - - if isinstance(time_param, int): - return time_param - - if isinstance(time_param, str): - if time_param.isdigit(): - return int(time_param) - else: - parsed_time = check_date_time_format(time_param) - if parsed_time is None: - logging.warning(f"日期时间格式错误: {time_param}") - return None - return datetime_to_timestamp(parsed_time) - - return None + time_param = transform_date_time_to_timestamp(time_param) + if time_param is None: + return None + return time_param def _build_query_conditions( self, diff --git a/core/utils.py b/core/utils.py index 8b3bd21..919a1c5 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,6 +1,9 @@ from datetime import datetime, timezone, timedelta from decimal import Decimal import re +import logging + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") def datetime_to_timestamp(date_str: str) -> int: """ @@ -31,7 +34,7 @@ def transform_data_type(data: dict): return data -def check_date_time_format(date_time: str) -> bool: +def check_date_time_format(date_time: str) -> str | None: """ 检查日期时间格式是否正确 """ @@ -42,4 +45,27 @@ def check_date_time_format(date_time: str) -> bool: elif re.match(r'^\d{4}\d{2}\d{2}$', date_time): return f"{date_time[0:4]}-{date_time[4:6]}-{date_time[6:8]} 00:00:00" else: + return None + +def transform_date_time_to_timestamp(date_time: int | str): + """ + 将日期时间转换为毫秒级timestamp + """ + try: + # 判断是否就是timestamp整型数据 + if isinstance(date_time, int): + date_time = date_time + # 判断是否为纯数字(UTC毫秒级timestamp) + elif date_time.isdigit(): + date_time = int(date_time) + else: + date_time = check_date_time_format(date_time) + if date_time is None: + logging.error(f"日期时间格式错误: {date_time}") + return None + # 按北京时间字符串处理,转换为毫秒级timestamp + date_time = datetime_to_timestamp(date_time) + return date_time + except Exception as e: + logging.error(f"start参数解析失败: {e}") return None \ No newline at end of file diff --git a/DB_HUGE_VOLUME_UPDATE_SUMMARY.md b/doc/DB_HUGE_VOLUME_UPDATE_SUMMARY.md similarity index 100% rename from DB_HUGE_VOLUME_UPDATE_SUMMARY.md rename to doc/DB_HUGE_VOLUME_UPDATE_SUMMARY.md diff --git a/DB_TRADE_DATA_SUMMARY.md b/doc/DB_TRADE_DATA_SUMMARY.md similarity index 100% rename from DB_TRADE_DATA_SUMMARY.md rename to doc/DB_TRADE_DATA_SUMMARY.md diff --git a/HUGE_VOLUME_UPDATE_SUMMARY.md b/doc/HUGE_VOLUME_UPDATE_SUMMARY.md similarity index 100% rename from HUGE_VOLUME_UPDATE_SUMMARY.md rename to doc/HUGE_VOLUME_UPDATE_SUMMARY.md diff --git a/huge_volume_main.py b/huge_volume_main.py index c744bf4..6479ee5 100644 --- a/huge_volume_main.py +++ b/huge_volume_main.py @@ -1,7 +1,7 @@ -from core.huge_volume import HugeVolume -from core.db_market_data import DBMarketData -from core.db_huge_volume_data import DBHugeVolumeData -from core.utils import timestamp_to_datetime +from core.biz.huge_volume import HugeVolume +from core.db.db_market_data import DBMarketData +from core.db.db_huge_volume_data import DBHugeVolumeData +from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp from market_data_main import MarketDataMain import logging from config import MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE @@ -43,7 +43,7 @@ class HugeVolumeMain: "initial_date", "2025-05-01 00:00:00" ) data = self.detect_volume_spike( - symbol, bar, window_size, start, only_output_huge_volume=True, is_update=False + symbol, bar, window_size, start, only_output_huge_volume=False, is_update=False ) if data is not None and len(data) > 0: logging.info(f"此次初始化巨量交易数据: {len(data)}条") @@ -88,7 +88,7 @@ class HugeVolumeMain: threshold=self.threshold, check_price=True, only_output_huge_volume=only_output_huge_volume, - output_excel=True, + output_excel=False, ) if data is not None: if is_update: @@ -125,7 +125,7 @@ class HugeVolumeMain: symbol, bar, window_size ) if latest_huge_volume_data is None or len(latest_huge_volume_data) == 0: - self.detect_volume_spike(symbol, bar, only_output_huge_volume=True) + self.detect_volume_spike(symbol, bar, only_output_huge_volume=False) return else: earliest_date_time = latest_huge_volume_data["date_time"] @@ -141,7 +141,7 @@ class HugeVolumeMain: bar=bar, window_size=window_size, start=earliest_date_time, - only_output_huge_volume=True, + only_output_huge_volume=False, is_update=True, ) logging.info( @@ -206,7 +206,6 @@ class HugeVolumeMain: start: str = None, end: str = None, periods: list = [3, 5], - output_excel: bool = False, ): if start is None: start = MONITOR_CONFIG.get("volume_monitor", {}).get( @@ -214,78 +213,29 @@ class HugeVolumeMain: ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - logging.info(f"开始计算巨量出现后,之后3或5个周期,上涨或下跌的比例: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}") - huge_volume_data = ( + periods_text = ", ".join([str(period) for period in periods]) + logging.info(f"开始计算巨量出现后,之后{periods_text}个周期,上涨或下跌的比例: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}") + volume_statistics_data = ( self.db_huge_volume_data.query_huge_volume_data_by_symbol_bar_window_size( symbol, bar, window_size, start, end ) ) - if huge_volume_data is None or len(huge_volume_data) == 0: + if volume_statistics_data is None or len(volume_statistics_data) == 0: logging.warning(f"获取巨量交易数据为空: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}") return None else: - if isinstance(huge_volume_data, list): - huge_volume_data = pd.DataFrame(huge_volume_data) - elif isinstance(huge_volume_data, dict): - huge_volume_data = pd.DataFrame([huge_volume_data]) - market_data = self.db_market_data.query_market_data_by_symbol_bar( - symbol, bar, start, end - ) - if market_data is None or len(market_data) == 0: - logging.warning(f"获取行情数据为空: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}") - return None - else: - if isinstance(market_data, list): - market_data = pd.DataFrame(market_data) - elif isinstance(market_data, dict): - market_data = pd.DataFrame([market_data]) + if isinstance(volume_statistics_data, list): + volume_statistics_data = pd.DataFrame(volume_statistics_data) + elif isinstance(volume_statistics_data, dict): + volume_statistics_data = pd.DataFrame([volume_statistics_data]) if ( - huge_volume_data is not None - and len(huge_volume_data) > 0 - and market_data is not None - and len(market_data) > 0 + volume_statistics_data is not None + and len(volume_statistics_data) > 0 ): - # 将huge_volume_data和market_data合并 - # market_data移除id列 - market_data = market_data.drop(columns=["id"]) - # huge_volume_data移除id列 - huge_volume_data = huge_volume_data.drop(columns=["id"]) - data = pd.merge(market_data, huge_volume_data, on="timestamp", how="left") - # 同名的列,只是后缀为_x和_y,需要合并 - data = data.rename( - columns={ - "symbol_x": "symbol", - "bar_x": "bar", - "date_time_x": "date_time", - "open_x": "open", - "high_x": "high", - "low_x": "low", - "close_x": "close", - "volume_x": "volume", - "volCcy_x": "volCcy", - "volCCyQuote_x": "volCCyQuote", - "create_time_x": "create_time", - } - ) - data = data.drop( - columns=[ - "symbol_y", - "bar_y", - "date_time_y", - "open_y", - "high_y", - "low_y", - "close_y", - "volume_y", - "volCcy_y", - "volCCyQuote_y", - "create_time_y", - ] - ) # 根据timestamp排序 - data = data.sort_values(by="timestamp", ascending=True) - data["window_size"] = window_size - data = data[ + volume_statistics_data = volume_statistics_data.sort_values(by="timestamp", ascending=True) + volume_statistics_data["window_size"] = window_size + volume_statistics_data = volume_statistics_data[ [ "symbol", "bar", @@ -299,24 +249,25 @@ class HugeVolumeMain: "volume", "huge_volume", "volume_ratio", - "volume_price_spike", - "price_high", - "price_low", + "volume_80_20_price_spike", + "price_80_high", + "price_20_low", + "volume_90_10_price_spike", + "price_90_high", + "price_10_low", ] ] - data = data.dropna() - data = data.reset_index(drop=True) - data, result_data = self.huge_volume.next_periods_rise_or_fall( - data=data, window_size=window_size, periods=periods, output_excel=output_excel + volume_statistics_data = volume_statistics_data.reset_index(drop=True) + huge_volume_data, result_data = self.huge_volume.next_periods_rise_or_fall( + data=volume_statistics_data, window_size=window_size, periods=periods ) - return data, result_data + return huge_volume_data, result_data def batch_next_periods_rise_or_fall( self, - window_size: int = 50, start: str = None, end: str = None, - periods: list = [3, 5], + next_periods: list = [1, 2, 3, 5, 10], output_excel: bool = False, ): if start is None: @@ -325,20 +276,25 @@ class HugeVolumeMain: ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - data_list = [] + huge_volume_data_list = [] result_data_list = [] + window_size_list = WINDOW_SIZE.get("window_sizes", None) + if window_size_list is None or not isinstance(window_size_list, list) or len(window_size_list) == 0: + window_size_list = [50, 80, 100, 120] + for symbol in self.market_data_main.symbols: for bar in self.market_data_main.bars: - data, result_data = self.next_periods_rise_or_fall( - symbol, bar, window_size, start, end, periods, output_excel - ) - data_list.append(data) - result_data_list.append(result_data) - data = pd.concat(data_list) - result_data = pd.concat(result_data_list) + for window_size in window_size_list: + huge_volume_data, result_data = self.next_periods_rise_or_fall( + symbol, bar, window_size, start, end, next_periods + ) + huge_volume_data_list.append(huge_volume_data) + result_data_list.append(result_data) + total_huge_volume_data = pd.concat(huge_volume_data_list) + total_result_data = pd.concat(result_data_list) if output_excel: - data = data.reset_index(drop=True) - result_data = result_data.reset_index(drop=True) + total_huge_volume_data = total_huge_volume_data.reset_index(drop=True) + total_result_data = total_result_data.reset_index(drop=True) current_date = datetime.now().strftime("%Y%m%d%H%M%S") file_name = ( f"next_periods_rise_or_fall_{current_date}.xlsx" @@ -347,13 +303,13 @@ class HugeVolumeMain: with pd.ExcelWriter( os.path.join(self.output_folder, file_name) ) as writer: - data.to_excel(writer, sheet_name="details", index=False) - result_data.to_excel( + total_huge_volume_data.to_excel(writer, sheet_name="details", index=False) + total_result_data.to_excel( writer, sheet_name="next_periods_statistics", index=False ) except Exception as e: logging.error(f"导出Excel文件失败: {e}") - return data, result_data + return total_huge_volume_data, total_result_data def batch_initial_detect_volume_spike(threshold: float = 2.0): @@ -379,4 +335,6 @@ def batch_update_volume_spike(threshold: float = 2.0): if __name__ == "__main__": # batch_initial_detect_volume_spike(threshold=2.0) - batch_update_volume_spike(threshold=2.0) + # batch_update_volume_spike(threshold=2.0) + huge_volume_main = HugeVolumeMain(threshold=2.0) + huge_volume_main.batch_next_periods_rise_or_fall(output_excel=True) diff --git a/market_data_main.py b/market_data_main.py index f63d93b..904acf2 100644 --- a/market_data_main.py +++ b/market_data_main.py @@ -1,9 +1,9 @@ import logging from datetime import datetime from time import sleep -from core.market_data_monitor import MarketDataMonitor -from core.db_market_data import DBMarketData -from core.utils import datetime_to_timestamp, timestamp_to_datetime +from core.biz.market_data_monitor import MarketDataMonitor +from core.db.db_market_data import DBMarketData +from core.utils import datetime_to_timestamp, timestamp_to_datetime, transform_date_time_to_timestamp from trade_data_main import TradeDataMain from config import ( API_KEY, @@ -72,16 +72,15 @@ class MarketDataMain: 获取保存数据 """ end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - end_time_ts = datetime_to_timestamp(end_time) - if isinstance(start, str): - if start.isdigit(): - start_time_ts = int(start) - else: - start_time_ts = datetime_to_timestamp(start) - elif isinstance(start, int): - start_time_ts = start - else: - raise ValueError(f"开始时间格式错误: {start}") + end_time_ts = transform_date_time_to_timestamp(end_time) + if end_time_ts is None: + logging.error(f"结束时间格式错误: {end_time}") + return None + + start_time_ts = transform_date_time_to_timestamp(start) + if start_time_ts is None: + logging.error(f"开始时间格式错误: {start}") + return None # 如果bar为5m, 15m: # end_time_ts与start_time_ts相差超过1天,则按照1天为单位 @@ -91,39 +90,26 @@ class MarketDataMain: # 获取数据,直到end_time_ts threshold = None if bar in ["5m", "15m"]: - threshold = 86400000 - 1 + threshold = 86400000 elif bar in ["1H", "4H"]: - threshold = 432000000 - 1 + threshold = 432000000 elif bar == "1D": - threshold = 864000000 - 1 + threshold = 864000000 while start_time_ts < end_time_ts: - current_end_time_ts = start_time_ts + threshold - if current_end_time_ts >= end_time_ts: - current_end_time_ts = end_time_ts - start_date_time = timestamp_to_datetime(start_time_ts) - end_date_time = timestamp_to_datetime(current_end_time_ts) + current_start_time_ts = end_time_ts - threshold + if current_start_time_ts < start_time_ts: + current_start_time_ts = start_time_ts + start_date_time = timestamp_to_datetime(current_start_time_ts) + end_date_time = timestamp_to_datetime(end_time_ts) logging.info( f"获取行情数据: {symbol} {bar} 从 {start_date_time} 到 {end_date_time}" ) - # 首先判断是否存在 > current_end_time_ts + 1 的数据 - # 如果存在,则跳过此次循环 - data = self.db_market_data.query_market_data_by_symbol_bar( - symbol=symbol, - bar=bar, - start=start_time_ts, - end=current_end_time_ts, - ) - if data is not None and len(data) > 0: - logging.info(f"已存在{symbol}, {bar} 从 {start_date_time} 到 {end_date_time} 的数据,跳过此次循环") - start_time_ts = current_end_time_ts - continue - # current_end_time_ts + 1, 目的是为了避免缺少最后一条数据 data = self.market_data_monitor.get_historical_kline_data( symbol=symbol, - start=start_time_ts, + start=current_start_time_ts, bar=bar, - end_time=current_end_time_ts + 1, + end_time=end_time_ts, ) if data is not None and len(data) > 0: data["buy_sz"] = -1 @@ -133,9 +119,8 @@ class MarketDataMain: # 比特币的数据获取过慢,暂时不获取交易数据 # if not symbol.endswith("-SWAP"): # # trade_data的end_time需要比market_data的end_time大一个周期 - # trade_end_time_ts = current_end_time_ts + BAR_THRESHOLD[bar] + 1 # trade_data = self.trade_data_main.get_trade_data( - # symbol=symbol, start_time=start_time_ts, end_time=trade_end_time_ts + # symbol=symbol, start_time=current_start_time_ts, end_time=end_time_ts # ) # for index, row in data.iterrows(): # try: @@ -180,8 +165,9 @@ class MarketDataMain: ] ] self.db_market_data.insert_data_to_mysql(data) - - start_time_ts = current_end_time_ts + if current_start_time_ts == start_time_ts: + break + end_time_ts = current_start_time_ts return data def batch_update_data(self): @@ -203,11 +189,14 @@ class MarketDataMain: logging.info(f"开始更新行情数据: {symbol} {bar}") latest_data = self.db_market_data.query_latest_data(symbol, bar) if not latest_data: + logging.info(f"{symbol}, {bar} 无数据,开始从{self.initial_date}初始化数据") data = self.fetch_save_data(symbol, bar, self.initial_date) else: latest_timestamp = latest_data.get("timestamp") if latest_timestamp: latest_timestamp = int(latest_timestamp) + latest_date_time = timestamp_to_datetime(latest_timestamp) + logging.info(f"{symbol}, {bar} 上次获取的最新数据时间: {latest_date_time}") else: logging.warning(f"获取{symbol}, {bar} 最新数据失败") return @@ -217,5 +206,5 @@ class MarketDataMain: if __name__ == "__main__": market_data_main = MarketDataMain() - # market_data_main.batch_update_data() - market_data_main.initial_data() + market_data_main.batch_update_data() + # market_data_main.initial_data() diff --git a/sql/query/sql_playground.sql b/sql/query/sql_playground.sql index 60c5878..558dfac 100644 --- a/sql/query/sql_playground.sql +++ b/sql/query/sql_playground.sql @@ -1,6 +1,6 @@ select * from crypto_market_data -WHERE symbol='DOGE-USDT-SWAP' and bar='1D' #and date_time > '2025-07-01' -order by timestamp; +WHERE symbol='XCH-USDT-SWAP' and bar='5m' #and date_time > '2025-07-01' +order by timestamp desc; delete FROM crypto_market_data where symbol != 'XCH-USDT'; diff --git a/sql/table/crypto_huge_volume.sql b/sql/table/crypto_huge_volume.sql index b3192ae..63db711 100644 --- a/sql/table/crypto_huge_volume.sql +++ b/sql/table/crypto_huge_volume.sql @@ -12,12 +12,12 @@ CREATE TABLE IF NOT EXISTS crypto_huge_volume ( volume DECIMAL(30,8) NOT NULL COMMENT '交易量', volCcy DECIMAL(30,8) NOT NULL COMMENT '交易量(基础货币)', volCCyQuote DECIMAL(30,8) NOT NULL COMMENT '交易量(计价货币)', - volume_ma DECIMAL(30,8) NOT NULL COMMENT '交易量移动平均', - volume_std DECIMAL(30,8) NOT NULL COMMENT '交易量标准差', - volume_threshold DECIMAL(30,8) NOT NULL COMMENT '交易量阈值', + volume_ma DECIMAL(30,8) NULL COMMENT '交易量移动平均', + volume_std DECIMAL(30,8) NULL COMMENT '交易量标准差', + volume_threshold DECIMAL(30,8) NULL COMMENT '交易量阈值', huge_volume TINYINT NOT NULL DEFAULT 0 COMMENT '是否为巨量(0:否,1:是)', - volume_ratio DECIMAL(20,8) NOT NULL COMMENT '交易量比率', - spike_intensity DECIMAL(20,8) NOT NULL COMMENT '尖峰强度', + volume_ratio DECIMAL(20,8) NULL COMMENT '交易量比率', + spike_intensity DECIMAL(20,8) NULL COMMENT '尖峰强度', close_80_percentile DECIMAL(20,5) NOT NULL COMMENT '收盘价80%分位数', close_20_percentile DECIMAL(20,5) NOT NULL COMMENT '收盘价20%分位数', price_80_high TINYINT NOT NULL DEFAULT 0 COMMENT '价格是否达到80%分位数高点(0:否,1:是)', diff --git a/test_db_huge_volume.py b/test_db_huge_volume.py index 24ed835..1c73555 100644 --- a/test_db_huge_volume.py +++ b/test_db_huge_volume.py @@ -9,7 +9,7 @@ import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) -from core.db_huge_volume_data import DBHugeVolumeData +from core.db.db_huge_volume_data import DBHugeVolumeData import logging # 配置日志 diff --git a/test_db_trade_data.py b/test_db_trade_data.py index 2f74a9b..14d0136 100644 --- a/test_db_trade_data.py +++ b/test_db_trade_data.py @@ -9,7 +9,7 @@ import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) -from core.db_trade_data import DBTradeData +from core.db.db_trade_data import DBTradeData import logging # 配置日志 diff --git a/test_huge_volume.py b/test_huge_volume.py index bbe7f77..e2d8179 100644 --- a/test_huge_volume.py +++ b/test_huge_volume.py @@ -12,7 +12,7 @@ sys.path.append(os.path.dirname(os.path.abspath(__file__))) import pandas as pd import numpy as np from datetime import datetime, timedelta -from core.huge_volume import HugeVolume +from core.biz.huge_volume import HugeVolume import logging # 配置日志 diff --git a/trade_data_main.py b/trade_data_main.py index 06d6e81..c602570 100644 --- a/trade_data_main.py +++ b/trade_data_main.py @@ -2,8 +2,8 @@ import logging import time from datetime import datetime, timedelta import pandas as pd -from core.utils import datetime_to_timestamp, timestamp_to_datetime -from core.trade_data import TradeData +from core.utils import datetime_to_timestamp, timestamp_to_datetime, transform_date_time_to_timestamp +from core.biz.trade_data import TradeData from config import ( API_KEY, SECRET_KEY, @@ -44,14 +44,14 @@ class TradeDataMain: if end_time is None: end_time = int(time.time() * 1000) else: - end_time = self.transform_date_time(end_time) + end_time = transform_date_time_to_timestamp(end_time) # 处理start参数 if start_time is None: # 默认两个月前 start_time_str = MONITOR_CONFIG.get("volume_monitor", {}).get("initial_date", "2025-05-01 00:00:00") - start_time = self.transform_date_time(start_time_str) + start_time = transform_date_time_to_timestamp(start_time_str) else: - start_time = self.transform_date_time(start_time) + start_time = transform_date_time_to_timestamp(start_time) # 从数据库获取最早数据 earliest_data = self.trade_data.db_trade_data.query_earliest_data(symbol) db_earliest_time = None @@ -87,25 +87,6 @@ class TradeDataMain: logging.info(f"获取交易数据失败: {symbol}, {start_date_time}, {end_date_time}") return None - def transform_date_time(self, date_time: str): - """ - 将日期时间转换为毫秒级timestamp - """ - try: - # 判断是否就是timestamp整型数据 - if isinstance(date_time, int): - date_time = date_time - # 判断是否为纯数字(UTC毫秒级timestamp) - elif date_time.isdigit(): - date_time = int(date_time) - else: - # 按北京时间字符串处理,转换为毫秒级timestamp - date_time = datetime_to_timestamp(date_time) - return date_time - except Exception as e: - logging.error(f"start参数解析失败: {e}") - return None - if __name__ == "__main__": trade_data_main = TradeDataMain() diff --git a/trade_main.py b/trade_main.py index b1b654e..5fdcb81 100644 --- a/trade_main.py +++ b/trade_main.py @@ -1,7 +1,7 @@ import logging from time import sleep -from core.base import QuantTrader -from core.strategy import QuantStrategy +from core.biz.quant_trader import QuantTrader +from core.biz.strategy import QuantStrategy from config import API_KEY, SECRET_KEY, PASSPHRASE, SANDBOX, TRADING_CONFIG, TIME_CONFIG logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')