crypto_quant/huge_volume_main.py

495 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from core.biz.huge_volume import HugeVolume
from core.biz.huge_volume_chart import HugeVolumeChart
from core.db.db_market_data import DBMarketData
from core.db.db_huge_volume_data import DBHugeVolumeData
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
from market_data_main import MarketDataMain
from core.wechat import Wechat
import logging
from config import MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE
from datetime import datetime, timedelta
import pandas as pd
import os
import re
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class HugeVolumeMain:
def __init__(self, threshold: float = 2.0):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.huge_volume = HugeVolume()
self.db_market_data = DBMarketData(self.db_url)
self.db_huge_volume_data = DBHugeVolumeData(self.db_url)
self.market_data_main = MarketDataMain()
self.threshold = threshold
self.output_folder = "./output/huge_volume_statistics/"
os.makedirs(self.output_folder, exist_ok=True)
def batch_initial_detect_volume_spike(
self, window_size: int = 50, start: str = None
):
for symbol in self.market_data_main.symbols:
for bar in self.market_data_main.bars:
if start is None:
start = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
)
data = self.detect_volume_spike(
symbol,
bar,
window_size,
start,
only_output_huge_volume=False,
is_update=False,
)
if data is not None and len(data) > 0:
logging.info(f"此次初始化巨量交易数据: {len(data)}")
else:
logging.info(f"此次初始化巨量交易数据为空")
def detect_volume_spike(
self,
symbol: str = "XCH-USDT",
bar: str = "5m",
window_size: int = 50,
start: str = "2025-05-01 00:00:00",
end: str = None,
only_output_huge_volume: bool = False,
is_update: bool = False,
):
if start is None:
start = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logging.info(
f"开始处理巨量交易数据: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
data = self.db_market_data.query_market_data_by_symbol_bar(
symbol, bar, start, end
)
if data is None:
logging.warning(
f"获取行情数据失败: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
return None
else:
if len(data) == 0:
logging.warning(
f"获取行情数据为空: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
return None
else:
if isinstance(data, list):
data = pd.DataFrame(data)
elif isinstance(data, dict):
data = pd.DataFrame([data])
data = self.huge_volume.detect_huge_volume(
data=data,
window_size=window_size,
threshold=self.threshold,
check_price=True,
only_output_huge_volume=only_output_huge_volume,
output_excel=False,
)
if data is not None:
if is_update:
for index, row in data.iterrows():
exist_huge_volume_data = self.db_huge_volume_data.query_data_by_symbol_bar_window_size_timestamp(
symbol, bar, window_size, row["timestamp"]
)
if exist_huge_volume_data is not None:
# remove the exist_huge_volume_data from data
data = data[
data["timestamp"] != exist_huge_volume_data["timestamp"]
]
if data is not None and len(data) > 0:
self.db_huge_volume_data.insert_data_to_mysql(data)
else:
logging.warning(
f"此次处理巨量交易数据为空: {symbol} {bar} {start} {end}"
)
return data
else:
return None
def batch_update_volume_spike(self, window_size: int = 50):
for symbol in self.market_data_main.symbols:
for bar in self.market_data_main.bars:
self.update_volume_spike(symbol, bar, window_size)
def update_volume_spike(self, symbol: str, bar: str, window_size: int = 50):
try:
self.market_data_main.update_data(symbol, bar)
latest_huge_volume_data = self.db_huge_volume_data.query_latest_data(
symbol, bar, window_size
)
if latest_huge_volume_data is None or len(latest_huge_volume_data) == 0:
self.detect_volume_spike(symbol, bar, only_output_huge_volume=False)
return
else:
earliest_date_time = latest_huge_volume_data["date_time"]
earliest_timestamp = latest_huge_volume_data["timestamp"]
seconds = self.get_seconds_by_bar(bar)
earliest_timestamp = earliest_timestamp - (
(window_size - 1) * seconds * 1000
)
earliest_date_time = timestamp_to_datetime(earliest_timestamp)
data = self.detect_volume_spike(
symbol=symbol,
bar=bar,
window_size=window_size,
start=earliest_date_time,
only_output_huge_volume=False,
is_update=True,
)
logging.info(
f"更新巨量交易数据: {symbol} {bar} 窗口大小: {window_size}{earliest_date_time}{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
if data is not None and len(data) > 0:
logging.info(f"此次更新巨量交易数据: {len(data)}")
else:
logging.info(f"此次更新巨量交易数据为空")
except Exception as e:
logging.error(
f"更新巨量交易数据失败: {symbol} {bar} 窗口大小: {window_size}{earliest_date_time}{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}"
)
def get_seconds_by_bar(self, bar: str):
"""
根据bar获取秒数
bar: 1s/1m/3m/5m/15m/30m/1H/2H/4H/6H/12H/1D/2D/3D/1W/1M/3M
:param bar: 时间周期
:return: 秒数
"""
if bar == "1s":
return 1
elif bar == "1m":
return 60
elif bar == "3m":
return 180
elif bar == "5m":
return 300
elif bar == "15m":
return 900
elif bar == "30m":
return 1800
elif bar == "1H":
return 3600
elif bar == "2H":
return 7200
elif bar == "4H":
return 14400
elif bar == "6H":
return 21600
elif bar == "12H":
return 43200
elif bar == "1D":
return 86400
elif bar == "2D":
return 172800
elif bar == "3D":
return 259200
elif bar == "1W":
return 604800
elif bar == "1M":
return 2592000
elif bar == "3M":
return 7776000
else:
raise ValueError(f"不支持的bar: {bar}")
def next_periods_rise_or_fall(
self,
symbol: str,
bar: str,
window_size: int = 50,
start: str = None,
end: str = None,
periods: list = [3, 5],
):
if start is None:
start = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
periods_text = ", ".join([str(period) for period in periods])
logging.info(
f"开始计算巨量出现后,之后{periods_text}个周期,上涨或下跌的比例: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
volume_statistics_data = (
self.db_huge_volume_data.query_huge_volume_data_by_symbol_bar_window_size(
symbol, bar, window_size, start, end
)
)
if volume_statistics_data is None or len(volume_statistics_data) == 0:
logging.warning(
f"获取巨量交易数据为空: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
return None
else:
if isinstance(volume_statistics_data, list):
volume_statistics_data = pd.DataFrame(volume_statistics_data)
elif isinstance(volume_statistics_data, dict):
volume_statistics_data = pd.DataFrame([volume_statistics_data])
if volume_statistics_data is not None and len(volume_statistics_data) > 0:
# 根据timestamp排序
volume_statistics_data = volume_statistics_data.sort_values(
by="timestamp", ascending=True
)
volume_statistics_data["window_size"] = window_size
volume_statistics_data = volume_statistics_data[
[
"symbol",
"bar",
"window_size",
"timestamp",
"date_time",
"open",
"high",
"low",
"close",
"volume",
"huge_volume",
"volume_ratio",
"volume_80_20_price_spike",
"price_80_high",
"price_20_low",
"volume_90_10_price_spike",
"price_90_high",
"price_10_low",
]
]
volume_statistics_data = volume_statistics_data.reset_index(drop=True)
huge_volume_data, result_data = self.huge_volume.next_periods_rise_or_fall(
data=volume_statistics_data, window_size=window_size, periods=periods
)
return huge_volume_data, result_data
def send_huge_volume_data_to_wechat(self, start: str = None, end: str = None):
if start is None:
start = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
start_timestamp = transform_date_time_to_timestamp(start)
end_timestamp = transform_date_time_to_timestamp(end)
start_date_time = timestamp_to_datetime(start_timestamp)
end_date_time = timestamp_to_datetime(end_timestamp)
logging.info(f"开始获取巨量交易数据: {start}{end}")
huge_volume_data = self.db_huge_volume_data.query_huge_volume_records(
start=start_timestamp,
end=end_timestamp
)
if huge_volume_data is None or len(huge_volume_data) == 0:
logging.warning(f"获取巨量交易数据为空: {start}{end}")
return
else:
if isinstance(huge_volume_data, list):
huge_volume_data = pd.DataFrame(huge_volume_data)
else:
huge_volume_data = pd.DataFrame([huge_volume_data])
# 过滤huge_volume_data要求huge_volume为1且(price_80_high == 1 or price_90_high == 1 or price_20_low == 1 or price_10_low == 1)
huge_volume_data = huge_volume_data[huge_volume_data["huge_volume"] == 1]
# 过滤huge_volume_data要求(price_80_high == 1 or price_90_high == 1 or price_20_low == 1 or price_10_low == 1)
huge_volume_data = huge_volume_data[
(huge_volume_data["price_90_high"] == 1) |
(huge_volume_data["price_10_low"] == 1)
]
# 过滤huge_volume_data要求volume_ratio > 10
huge_volume_data = huge_volume_data[huge_volume_data["volume_ratio"] > 10]
# 根据symbol, bar, window_size, timestamp排序
huge_volume_data = huge_volume_data.sort_values(by=["symbol", "bar", "window_size", "timestamp"], ascending=True)
huge_volume_data = huge_volume_data.reset_index(drop=True)
logging.info(f"获取巨量交易数据: {len(huge_volume_data)}")
contents = []
contents.append(f"# 放量交易数据: {start_date_time}{end_date_time}")
symbol_list = huge_volume_data["symbol"].unique()
# 根据symbol_list排序
symbol_list.sort()
for symbol in symbol_list:
contents = []
contents.append(f"# 放量交易数据: {start_date_time}{end_date_time}")
contents.append(f"## 币种: {symbol}")
symbol_data = huge_volume_data[huge_volume_data["symbol"] == symbol]
symbol_data = symbol_data.sort_values(by=["bar", "window_size", "timestamp"], ascending=True)
symbol_data = symbol_data.reset_index(drop=True)
for index, row in symbol_data.iterrows():
if row['huge_volume'] == 1 and (row['price_80_high'] == 1 or row['price_90_high'] == 1 or row['price_20_low'] == 1 or row['price_10_low'] == 1):
if row['price_90_high'] == 1:
price_position_text = "90%分位数高点"
elif row['price_80_high'] == 1:
price_position_text = "80%分位数高点"
else:
price_position_text = ""
if price_position_text == "":
if row['price_10_low'] == 1:
price_position_text = "10%分位数低点"
elif row['price_20_low'] == 1:
price_position_text = "20%分位数低点"
else:
price_position_text = ""
open_price = str(round(row['open'], 6))
high = str(round(row['high'], 6))
low = str(round(row['low'], 6))
close = str(round(row['close'], 6))
volume = str(round(row['volume'], 6))
volCCyQuote = str(round(row['volCCyQuote'], 6))
volume_ratio = str(round(row['volume_ratio'], 6))
contents.append(f"交易周期: {row['bar']}, 滑动窗口: {row['window_size']} , 发生时间: {row['date_time']}")
contents.append(f"开盘价: {open_price}, 最高价: {high}, 最低价: {low}, 收盘价: {close}")
contents.append(f"成交量: {volume}, 成交量USDT: {volCCyQuote}, 交易量比率: {volume_ratio}")
contents.append(f"价格分位: {price_position_text}")
contents.append(f"--------------------------------")
text = "\n\n".join(contents)
# 获得text的字节数
text_length = len(text.encode('utf-8'))
logging.info(f"发送巨量交易数据到微信,字节数: {text_length}")
# with open(os.path.join(self.output_folder, "huge_volume_data.md"), "w", encoding="utf-8") as f:
# f.write(text)
wechat = Wechat()
wechat.send_markdown(text)
def batch_next_periods_rise_or_fall(
self,
start: str = None,
end: str = None,
next_periods: list = [1, 2, 3, 5, 10],
output_excel: bool = False,
):
if start is None:
start = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
huge_volume_data_list = []
result_data_list = []
window_size_list = WINDOW_SIZE.get("window_sizes", None)
if (
window_size_list is None
or not isinstance(window_size_list, list)
or len(window_size_list) == 0
):
window_size_list = [50, 80, 100, 120]
for symbol in self.market_data_main.symbols:
for bar in self.market_data_main.bars:
for window_size in window_size_list:
huge_volume_data, result_data = self.next_periods_rise_or_fall(
symbol, bar, window_size, start, end, next_periods
)
huge_volume_data_list.append(huge_volume_data)
result_data_list.append(result_data)
total_huge_volume_data = pd.concat(huge_volume_data_list)
total_result_data = pd.concat(result_data_list)
if output_excel:
total_huge_volume_data = total_huge_volume_data.reset_index(drop=True)
total_result_data = total_result_data.reset_index(drop=True)
current_date = datetime.now().strftime("%Y%m%d%H%M%S")
file_name = f"next_periods_rise_or_fall_{current_date}.xlsx"
try:
with pd.ExcelWriter(
os.path.join(self.output_folder, file_name)
) as writer:
total_huge_volume_data.to_excel(
writer, sheet_name="details", index=False
)
total_result_data.to_excel(
writer, sheet_name="next_periods_statistics", index=False
)
except Exception as e:
logging.error(f"导出Excel文件失败: {e}")
return total_huge_volume_data, total_result_data
def plot_huge_volume_data(
self,
data_file_path: str,
sheet_name: str = "next_periods_statistics",
output_folder: str = "./output/huge_volume_statistics/",
):
os.makedirs(output_folder, exist_ok=True)
huge_volume_data = pd.read_excel(data_file_path, sheet_name=sheet_name)
huge_volume_chart = HugeVolumeChart(huge_volume_data)
include_heatmap = True
include_line = False
huge_volume_chart.plot_entrance(
include_heatmap=include_heatmap, include_line=include_line
)
def batch_initial_detect_volume_spike(threshold: float = 2.0):
window_sizes = WINDOW_SIZE.get("window_sizes", None)
if (
window_sizes is None
or not isinstance(window_sizes, list)
or len(window_sizes) == 0
):
window_sizes = [50, 80, 100, 120]
huge_volume_main = HugeVolumeMain(threshold)
for window_size in window_sizes:
huge_volume_main.batch_initial_detect_volume_spike(
window_size=window_size,
start="2025-05-01 00:00:00",
)
def batch_update_volume_spike(threshold: float = 2.0):
window_sizes = WINDOW_SIZE.get("window_sizes", None)
if (
window_sizes is None
or not isinstance(window_sizes, list)
or len(window_sizes) == 0
):
window_sizes = [50, 80, 100, 120]
huge_volume_main = HugeVolumeMain(threshold)
for window_size in window_sizes:
huge_volume_main.batch_update_volume_spike(window_size=window_size)
def test_send_huge_volume_data_to_wechat():
huge_volume_main = HugeVolumeMain(threshold=2.0)
# 获得昨天日期
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
logging.info(f"昨天日期: {yesterday}")
# 获得今天日期
today = datetime.now().strftime("%Y-%m-%d")
logging.info(f"今天日期: {today}")
huge_volume_main.send_huge_volume_data_to_wechat(start=yesterday, end=today)
if __name__ == "__main__":
test_send_huge_volume_data_to_wechat()
# batch_initial_detect_volume_spike(threshold=2.0)
# batch_update_volume_spike(threshold=2.0)
# huge_volume_main = HugeVolumeMain(threshold=2.0)
# huge_volume_main.batch_next_periods_rise_or_fall(output_excel=True)
# data_file_path = "./output/huge_volume_statistics/next_periods_rise_or_fall_stat_20250731200304.xlsx"
# sheet_name = "next_periods_statistics"
# output_folder = "./output/huge_volume_statistics/"
# huge_volume_main.plot_huge_volume_data(
# data_file_path=data_file_path,
# sheet_name=sheet_name,
# output_folder=output_folder,
# )