1. support update huge volume data

a. update market data at first
b. According to the first updated market data timestamp, get earlier window-size records.
c. Calculate huge volume data by the latest market data
d. Only insert new data to huge volume table
2. rename code file to be more proper.
This commit is contained in:
blade 2025-07-28 16:14:40 +08:00
parent 8a798fee03
commit c6ff3adc16
10 changed files with 679 additions and 123 deletions

View File

@ -52,15 +52,15 @@ MONITOR_CONFIG = {
"volume_monitor":{ "volume_monitor":{
"symbols": ["XCH-USDT", "BTC-USDT", "ETH-USDT", "SOL-USDT", "DOGE-USDT", "symbols": ["XCH-USDT", "BTC-USDT", "ETH-USDT", "SOL-USDT", "DOGE-USDT",
"XCH-USDT-SWAP", "BTC-USDT-SWAP", "ETH-USDT-SWAP", "SOL-USDT-SWAP", "DOGE-USDT-SWAP"], "XCH-USDT-SWAP", "BTC-USDT-SWAP", "ETH-USDT-SWAP", "SOL-USDT-SWAP", "DOGE-USDT-SWAP"],
"intervals": ["5m", "15m", "1H", "4H", "1D"], "bars": ["5m", "15m", "1H", "4H", "1D"],
"initial_date": "2025-05-01 00:00:00" "initial_date": "2025-05-01 00:00:00"
}, },
"price_monitor":{ "price_monitor":{
"symbols": ["XCH-USDT"], "symbols": ["XCH-USDT"],
"intervals": [ "bats": [
{"interval": "5m", "threshold": 0.025}, {"bar": "5m", "threshold": 0.025},
{"interval": "15m", "threshold": 0.5}, {"bar": "15m", "threshold": 0.5},
{"interval": "1H", "threshold": 0.1} {"bar": "1H", "threshold": 0.1}
] ]
} }
} }

378
core/db_huge_volume_data.py Normal file
View File

@ -0,0 +1,378 @@
import pandas as pd
import logging
from core.db_manager import DBData
from core.utils import check_date_time_format, datetime_to_timestamp
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
class DBHugeVolumeData:
def __init__(
self,
db_url: str
):
self.db_url = db_url
self.table_name = "crypto_huge_volume"
self.columns = [
"symbol",
"bar",
"timestamp",
"date_time",
"open",
"high",
"low",
"close",
"volume",
"volCcy",
"volCCyQuote",
"volume_ma",
"volume_std",
"volume_threshold",
"huge_volume",
"volume_ratio",
"spike_intensity",
"close_80_percentile",
"close_20_percentile",
"price_high",
"price_low",
"volume_price_spike",
"create_time",
]
self.db_manager = DBData(db_url, self.table_name, self.columns)
def insert_data_to_mysql(self, df: pd.DataFrame):
"""
将巨量交易数据保存到MySQL的crypto_huge_volume表
速度 最快
内存 中等
适用场景中小数据量<10万条
:param df: 巨量交易数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql(df)
def insert_data_to_mysql_fast(self, df: pd.DataFrame):
"""
快速插入巨量交易数据方案2使用executemany批量插入
速度 很快
内存
适用场景中等数据量
:param df: 巨量交易数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_fast(df)
def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000):
"""
分块插入巨量交易数据方案3适合大数据量
速度 中等
内存 最低
适用场景大数据量>10万条
:param df: 巨量交易数据DataFrame
:param chunk_size: 分块大小
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_chunk(df, chunk_size)
def insert_data_to_mysql_simple(self, df: pd.DataFrame):
"""
简单插入巨量交易数据方案4直接使用to_sql忽略重复
速度 最快
内存 中等
注意会抛出重复键错误需要额外处理
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_simple(df)
def query_latest_data(self, symbol: str, bar: str):
"""
查询最新巨量交易数据
:param symbol: 交易对
:param bar: K线周期
"""
sql = """
SELECT * FROM crypto_huge_volume
WHERE symbol = :symbol AND bar = :bar
ORDER BY timestamp DESC
LIMIT 1
"""
condition_dict = {"symbol": symbol, "bar": bar}
return self.db_manager.query_data(sql, condition_dict, return_multi=False)
def query_data_by_symbol_bar_timestamp(self, symbol: str, bar: str, timestamp: int):
"""
根据交易对K线周期和时间戳查询巨量交易数据
:param symbol: 交易对
:param bar: K线周期
:param timestamp: 时间戳
"""
sql = """
SELECT * FROM crypto_huge_volume
WHERE symbol = :symbol AND bar = :bar AND timestamp = :timestamp
"""
condition_dict = {"symbol": symbol, "bar": bar, "timestamp": timestamp}
return self.db_manager.query_data(sql, condition_dict, return_multi=False)
def query_huge_volume_data_by_symbol_bar(self, symbol: str, bar: str, start: str = None, end: str = None):
"""
根据交易对和K线周期查询巨量交易数据
:param symbol: 交易对
:param bar: K线周期
:param start: 开始时间
:param end: 结束时间
"""
if start is None or end is None:
sql = """
SELECT * FROM crypto_huge_volume
WHERE symbol = :symbol AND bar = :bar
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar}
else:
if start is not None:
if isinstance(start, str):
if start.isdigit():
start = int(start)
else:
start = check_date_time_format(start)
# 判断是否是日期时间格式
if start is None:
logging.warning(f"日期时间格式错误: {start}")
return None
start = datetime_to_timestamp(start)
if end is not None:
if isinstance(end, str):
if end.isdigit():
end = int(end)
else:
end = check_date_time_format(end)
if end is None:
logging.warning(f"日期时间格式错误: {end}")
return None
end = datetime_to_timestamp(end)
if start is not None and end is not None:
if start > end:
start, end = end, start
sql = """
SELECT * FROM crypto_huge_volume
WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end}
elif start is not None:
sql = """
SELECT * FROM crypto_huge_volume
WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start}
elif end is not None:
sql = """
SELECT * FROM crypto_huge_volume
WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "end": end}
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_huge_volume_records(self, symbol: str = None, bar: str = None, start: str = None, end: str = None):
"""
查询巨量交易记录只返回huge_volume=1的记录
:param symbol: 交易对
:param bar: K线周期
:param start: 开始时间
:param end: 结束时间
"""
conditions = ["huge_volume = 1"]
condition_dict = {}
if symbol:
conditions.append("symbol = :symbol")
condition_dict["symbol"] = symbol
if bar:
conditions.append("bar = :bar")
condition_dict["bar"] = bar
if start:
if isinstance(start, str):
if start.isdigit():
start = int(start)
else:
start = check_date_time_format(start)
if start is None:
logging.warning(f"日期时间格式错误: {start}")
return None
start = datetime_to_timestamp(start)
conditions.append("timestamp >= :start")
condition_dict["start"] = start
if end:
if isinstance(end, str):
if end.isdigit():
end = int(end)
else:
end = check_date_time_format(end)
if end is None:
logging.warning(f"日期时间格式错误: {end}")
return None
end = datetime_to_timestamp(end)
conditions.append("timestamp <= :end")
condition_dict["end"] = end
where_clause = " AND ".join(conditions)
sql = f"""
SELECT * FROM crypto_huge_volume
WHERE {where_clause}
ORDER BY timestamp DESC
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_volume_price_spike_records(self, symbol: str = None, bar: str = None, start: str = None, end: str = None):
"""
查询量价尖峰记录只返回volume_price_spike=1的记录
:param symbol: 交易对
:param bar: K线周期
:param start: 开始时间
:param end: 结束时间
"""
conditions = ["volume_price_spike = 1"]
condition_dict = {}
if symbol:
conditions.append("symbol = :symbol")
condition_dict["symbol"] = symbol
if bar:
conditions.append("bar = :bar")
condition_dict["bar"] = bar
if start:
if isinstance(start, str):
if start.isdigit():
start = int(start)
else:
start = check_date_time_format(start)
if start is None:
logging.warning(f"日期时间格式错误: {start}")
return None
start = datetime_to_timestamp(start)
conditions.append("timestamp >= :start")
condition_dict["start"] = start
if end:
if isinstance(end, str):
if end.isdigit():
end = int(end)
else:
end = check_date_time_format(end)
if end is None:
logging.warning(f"日期时间格式错误: {end}")
return None
end = datetime_to_timestamp(end)
conditions.append("timestamp <= :end")
condition_dict["end"] = end
where_clause = " AND ".join(conditions)
sql = f"""
SELECT * FROM crypto_huge_volume
WHERE {where_clause}
ORDER BY timestamp DESC
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def get_statistics_summary(self, symbol: str = None, bar: str = None, start: str = None, end: str = None):
"""
获取巨量交易统计摘要
:param symbol: 交易对
:param bar: K线周期
:param start: 开始时间
:param end: 结束时间
"""
conditions = []
condition_dict = {}
if symbol:
conditions.append("symbol = :symbol")
condition_dict["symbol"] = symbol
if bar:
conditions.append("bar = :bar")
condition_dict["bar"] = bar
if start:
if isinstance(start, str):
if start.isdigit():
start = int(start)
else:
start = check_date_time_format(start)
if start is None:
logging.warning(f"日期时间格式错误: {start}")
return None
start = datetime_to_timestamp(start)
conditions.append("timestamp >= :start")
condition_dict["start"] = start
if end:
if isinstance(end, str):
if end.isdigit():
end = int(end)
else:
end = check_date_time_format(end)
if end is None:
logging.warning(f"日期时间格式错误: {end}")
return None
end = datetime_to_timestamp(end)
conditions.append("timestamp <= :end")
condition_dict["end"] = end
where_clause = " AND ".join(conditions) if conditions else "1=1"
sql = f"""
SELECT
COUNT(*) as total_records,
SUM(huge_volume) as huge_volume_count,
SUM(volume_price_spike) as volume_price_spike_count,
SUM(price_high) as price_high_count,
SUM(price_low) as price_low_count,
AVG(volume_ratio) as avg_volume_ratio,
MAX(volume_ratio) as max_volume_ratio,
AVG(spike_intensity) as avg_spike_intensity,
MAX(spike_intensity) as max_spike_intensity
FROM crypto_huge_volume
WHERE {where_clause}
"""
return self.db_manager.query_data(sql, condition_dict, return_multi=False)
def get_top_volume_spikes(self, symbol: str = None, bar: str = None, limit: int = 10):
"""
获取成交量尖峰最高的记录
:param symbol: 交易对
:param bar: K线周期
:param limit: 返回记录数量
"""
conditions = ["huge_volume = 1"]
condition_dict = {}
if symbol:
conditions.append("symbol = :symbol")
condition_dict["symbol"] = symbol
if bar:
conditions.append("bar = :bar")
condition_dict["bar"] = bar
where_clause = " AND ".join(conditions)
sql = f"""
SELECT * FROM crypto_huge_volume
WHERE {where_clause}
ORDER BY volume_ratio DESC
LIMIT :limit
"""
condition_dict["limit"] = limit
return self.db_manager.query_data(sql, condition_dict, return_multi=True)

View File

@ -1,25 +1,25 @@
from core.db_manager import query_market_data_by_symbol_bar
from pandas import DataFrame from pandas import DataFrame
import logging import logging
import os import os
import re import re
import pandas as pd import pandas as pd
from datetime import datetime
logging.basicConfig( logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
) )
class Statistics: class HugeVolume:
def __init__(self, output_folder: str = "./output"): def __init__(self, output_folder: str = "./output"):
self.output_folder = output_folder self.output_folder = output_folder
os.makedirs(self.output_folder, exist_ok=True) os.makedirs(self.output_folder, exist_ok=True)
def detect_volume_spike( def detect_huge_volume(
self, self,
data: DataFrame, data: DataFrame,
window_size: int = 50,
threshold: float = 2.0, threshold: float = 2.0,
window: int = 50,
check_price: bool = False, check_price: bool = False,
only_output_huge_volume: bool = False, only_output_huge_volume: bool = False,
output_excel: bool = False, output_excel: bool = False,
@ -52,8 +52,8 @@ class Statistics:
data = data.sort_values(by="timestamp", ascending=True).copy() data = data.sort_values(by="timestamp", ascending=True).copy()
# 计算移动窗口的成交量均值和标准差 # 计算移动窗口的成交量均值和标准差
data["volume_ma"] = data["volume"].rolling(window=window, min_periods=1).mean() data["volume_ma"] = data["volume"].rolling(window=window_size, min_periods=1).mean()
data["volume_std"] = data["volume"].rolling(window=window, min_periods=1).std() data["volume_std"] = data["volume"].rolling(window=window_size, min_periods=1).std()
# 计算成交量阈值(均值 + threshold倍标准差 # 计算成交量阈值(均值 + threshold倍标准差
data["volume_threshold"] = data["volume_ma"] + threshold * data["volume_std"] data["volume_threshold"] = data["volume_ma"] + threshold * data["volume_std"]
@ -75,10 +75,10 @@ class Statistics:
# 计算移动窗口的收盘价分位数 # 计算移动窗口的收盘价分位数
data["close_80_percentile"] = ( data["close_80_percentile"] = (
data["close"].rolling(window=window, min_periods=1).quantile(0.8) data["close"].rolling(window=window_size, min_periods=1).quantile(0.8)
) )
data["close_20_percentile"] = ( data["close_20_percentile"] = (
data["close"].rolling(window=window, min_periods=1).quantile(0.2) data["close"].rolling(window=window_size, min_periods=1).quantile(0.2)
) )
# 检查收盘价是否在80%分位数及以上或20%分位数及以下 # 检查收盘价是否在80%分位数及以上或20%分位数及以下
@ -97,6 +97,7 @@ class Statistics:
if only_output_huge_volume: if only_output_huge_volume:
data = data[data["huge_volume"] == 1] data = data[data["huge_volume"] == 1]
data["create_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if output_excel: if output_excel:
# 检查数据是否为空 # 检查数据是否为空
@ -112,7 +113,10 @@ class Statistics:
symbol = data["symbol"].iloc[0] symbol = data["symbol"].iloc[0]
bar = data["bar"].iloc[0] bar = data["bar"].iloc[0]
file_name = f"volume_spike_{symbol}_{bar}_{start_date}_{end_date}.xlsx" file_name = f"volume_spike_{symbol}_{bar}_{start_date}_{end_date}.xlsx"
try:
with pd.ExcelWriter(os.path.join(self.output_folder, file_name)) as writer: with pd.ExcelWriter(os.path.join(self.output_folder, file_name)) as writer:
data.to_excel(writer, sheet_name="volume_spike", index=False) data.to_excel(writer, sheet_name="volume_spike", index=False)
except Exception as e:
logging.error(f"导出Excel文件失败: {e}")
return data return data

View File

@ -7,7 +7,7 @@ import okx.MarketData as Market
from core.utils import datetime_to_timestamp from core.utils import datetime_to_timestamp
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
class DataMonitor: class MarketDataMonitor:
def __init__(self, def __init__(self,
api_key: str, api_key: str,
secret_key: str, secret_key: str,

199
huge_volume_main.py Normal file
View File

@ -0,0 +1,199 @@
from core.huge_volume import HugeVolume
from core.db_market_data import DBMarketData
from core.db_huge_volume_data import DBHugeVolumeData
from core.utils import timestamp_to_datetime
from market_data_main import MarketDataMain
import logging
from config import MONITOR_CONFIG, MYSQL_CONFIG
from datetime import datetime
import pandas as pd
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class HugeVolumeMain:
def __init__(self, window_size: int = 50, threshold: float = 2.0):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.huge_volume = HugeVolume()
self.db_market_data = DBMarketData(self.db_url)
self.db_huge_volume_data = DBHugeVolumeData(self.db_url)
self.monitor_main = MarketDataMain()
self.window_size = window_size
self.threshold = threshold
def batch_initial_detect_volume_spike(self, start: str = None):
for symbol in self.monitor_main.symbols:
for bar in self.monitor_main.bars:
if start is None:
start = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
)
data = self.detect_volume_spike(
symbol, bar, start, only_output_huge_volume=True, is_update=False
)
if data is not None and len(data) > 0:
logging.info(f"此次初始化巨量交易数据: {len(data)}")
else:
logging.info(f"此次初始化巨量交易数据为空")
def detect_volume_spike(
self,
symbol: str = "XCH-USDT",
bar: str = "5m",
start: str = "2025-05-01 00:00:00",
end: str = None,
only_output_huge_volume: bool = False,
is_update: bool = False,
):
if start is None:
start = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logging.info(f"开始处理巨量交易数据: {symbol} {bar} {start} {end}")
data = self.db_market_data.query_market_data_by_symbol_bar(
symbol, bar, start, end
)
if data is None:
logging.warning(f"获取行情数据失败: {symbol} {bar} {start} {end}")
return None
else:
if len(data) == 0:
logging.warning(f"获取行情数据为空: {symbol} {bar} {start} {end}")
return None
else:
if isinstance(data, list):
data = pd.DataFrame(data)
elif isinstance(data, dict):
data = pd.DataFrame([data])
data = self.huge_volume.detect_huge_volume(
data=data,
window_size=self.window_size,
threshold=self.threshold,
check_price=True,
only_output_huge_volume=only_output_huge_volume,
output_excel=True,
)
if data is not None:
if is_update:
for index, row in data.iterrows():
exist_huge_volume_data = (
self.db_huge_volume_data.query_data_by_symbol_bar_timestamp(
symbol, bar, row["timestamp"]
)
)
if exist_huge_volume_data is not None:
# remove the exist_huge_volume_data from data
data = data[
data["timestamp"] != exist_huge_volume_data["timestamp"]
]
if data is not None and len(data) > 0:
self.db_huge_volume_data.insert_data_to_mysql(data)
else:
logging.warning(f"此次处理巨量交易数据为空: {symbol} {bar} {start} {end}")
return data
else:
return None
def batch_update_volume_spike(self):
for symbol in self.monitor_main.symbols:
for bar in self.monitor_main.bars:
self.update_volume_spike(symbol, bar)
def update_volume_spike(self, symbol: str, bar: str):
try:
self.monitor_main.update_data(symbol, bar)
latest_huge_volume_data = self.db_huge_volume_data.query_latest_data(
symbol, bar
)
if latest_huge_volume_data is None or len(latest_huge_volume_data) == 0:
self.detect_volume_spike(symbol, bar, only_output_huge_volume=True)
return
else:
earliest_date_time = latest_huge_volume_data["date_time"]
earliest_timestamp = latest_huge_volume_data["timestamp"]
seconds = self.get_seconds_by_bar(bar)
earliest_timestamp = earliest_timestamp - (
(self.window_size - 1) * seconds * 1000
)
earliest_date_time = timestamp_to_datetime(earliest_timestamp)
data = self.detect_volume_spike(
symbol=symbol,
bar=bar,
start=earliest_date_time,
only_output_huge_volume=True,
is_update=True,
)
logging.info(
f"更新巨量交易数据: {symbol} {bar} from {earliest_date_time}"
)
if data is not None and len(data) > 0:
logging.info(f"此次更新巨量交易数据: {len(data)}")
else:
logging.info(f"此次更新巨量交易数据为空")
except Exception as e:
logging.error(f"更新巨量交易数据失败: {symbol} {bar} {e}")
def get_seconds_by_bar(self, bar: str):
"""
根据bar获取秒数
bar: 1s/1m/3m/5m/15m/30m/1H/2H/4H/6H/12H/1D/2D/3D/1W/1M/3M
:param bar: 时间周期
:return: 秒数
"""
if bar == "1s":
return 1
elif bar == "1m":
return 60
elif bar == "3m":
return 180
elif bar == "5m":
return 300
elif bar == "15m":
return 900
elif bar == "30m":
return 1800
elif bar == "1H":
return 3600
elif bar == "2H":
return 7200
elif bar == "4H":
return 14400
elif bar == "6H":
return 21600
elif bar == "12H":
return 43200
elif bar == "1D":
return 86400
elif bar == "2D":
return 172800
elif bar == "3D":
return 259200
elif bar == "1W":
return 604800
elif bar == "1M":
return 2592000
elif bar == "3M":
return 7776000
else:
raise ValueError(f"不支持的bar: {bar}")
if __name__ == "__main__":
huge_volume_main = HugeVolumeMain()
# statistics_main.batch_initial_detect_volume_spike(
# start="2025-05-01 00:00:00",
# )
huge_volume_main.batch_update_volume_spike()

View File

@ -1,6 +1,6 @@
import logging import logging
from time import sleep from time import sleep
from core.data_monitor import DataMonitor from core.market_data_monitor import MarketDataMonitor
from core.db_market_data import DBMarketData from core.db_market_data import DBMarketData
from config import ( from config import (
API_KEY, API_KEY,
@ -14,9 +14,9 @@ from config import (
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
class MonitorMain: class MarketDataMain:
def __init__(self): def __init__(self):
self.data_monitor = DataMonitor( self.market_data_monitor = MarketDataMonitor(
api_key=API_KEY, api_key=API_KEY,
secret_key=SECRET_KEY, secret_key=SECRET_KEY,
passphrase=PASSPHRASE, passphrase=PASSPHRASE,
@ -25,8 +25,8 @@ class MonitorMain:
self.symbols = MONITOR_CONFIG.get("volume_monitor", {}).get( self.symbols = MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["XCH-USDT"] "symbols", ["XCH-USDT"]
) )
self.intervals = MONITOR_CONFIG.get("volume_monitor", {}).get( self.bars = MONITOR_CONFIG.get("volume_monitor", {}).get(
"intervals", ["5m", "15m", "1H", "4H", "1D"] "bars", ["5m", "15m", "1H", "4H", "1D"]
) )
self.initial_date = MONITOR_CONFIG.get("volume_monitor", {}).get( self.initial_date = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-07-01 00:00:00" "initial_date", "2025-07-01 00:00:00"
@ -47,26 +47,28 @@ class MonitorMain:
初始化数据 初始化数据
""" """
for symbol in self.symbols: for symbol in self.symbols:
for interval in self.intervals: for bar in self.bars:
latest_data = self.db_market_data.query_latest_data(symbol, interval) logging.info(f"开始初始化行情数据: {symbol} {bar}")
latest_data = self.db_market_data.query_latest_data(symbol, bar)
if latest_data: if latest_data:
logging.info( logging.info(
f"已初始化{symbol}, {interval} 最新数据请使用update_data()更新数据" f"已初始化{symbol}, {bar} 最新行情数据请使用update_data()更新行情数据"
) )
continue continue
self.fetch_save_data(symbol, interval, self.initial_date) self.fetch_save_data(symbol, bar, self.initial_date)
def fetch_save_data(self, symbol: str, interval: str, start: str): def fetch_save_data(self, symbol: str, bar: str, start: str):
""" """
获取保存数据 获取保存数据
""" """
data = self.data_monitor.get_historical_kline_data( data = self.market_data_monitor.get_historical_kline_data(
symbol=symbol, start=start, bar=interval symbol=symbol, start=start, bar=bar
) )
if data is not None and len(data) > 0: if data is not None and len(data) > 0:
self.db_market_data.insert_data_to_mysql(data) self.db_market_data.insert_data_to_mysql(data)
return data
def update_data(self): def batch_update_data(self):
""" """
更新数据 更新数据
1. 获取最新数据 1. 获取最新数据
@ -75,22 +77,29 @@ class MonitorMain:
4. 将最新数据保存到数据库 4. 将最新数据保存到数据库
""" """
for symbol in self.symbols: for symbol in self.symbols:
for interval in self.intervals: for bar in self.bars:
latest_data = self.db_market_data.query_latest_data(symbol, interval) self.update_data(symbol, bar)
def update_data(self, symbol: str, bar: str):
"""
更新数据
"""
logging.info(f"开始更新行情数据: {symbol} {bar}")
latest_data = self.db_market_data.query_latest_data(symbol, bar)
if not latest_data: if not latest_data:
self.fetch_save_data(symbol, interval, self.initial_date) data = self.fetch_save_data(symbol, bar, self.initial_date)
continue
else: else:
latest_timestamp = latest_data.get("timestamp") latest_timestamp = latest_data.get("timestamp")
if latest_timestamp: if latest_timestamp:
latest_timestamp = int(latest_timestamp) latest_timestamp = int(latest_timestamp)
else: else:
logging.warning(f"获取{symbol}, {interval} 最新数据失败") logging.warning(f"获取{symbol}, {bar} 最新数据失败")
continue return
self.fetch_save_data(symbol, interval, latest_timestamp + 1) data = self.fetch_save_data(symbol, bar, latest_timestamp + 1)
return data
if __name__ == "__main__": if __name__ == "__main__":
monitor_main = MonitorMain() market_data_main = MarketDataMain()
monitor_main.update_data() market_data_main.batch_update_data()
# monitor_main.initial_data() # market_data_main.initial_data()

View File

@ -1,3 +1,6 @@
select * from crypto_market_data select * from crypto_market_data
WHERE symbol='XCH-USDT-SWAP' and bar='5m' and date_time > '2025-07-26' WHERE symbol='XCH-USDT' and bar='5m' and date_time > '2025-07-26'
order by timestamp desc; order by timestamp desc;
SET SQL_SAFE_UPDATES = 0;
delete from crypto_market_data where create_time is NULL;

View File

@ -0,0 +1,40 @@
CREATE TABLE IF NOT EXISTS crypto_huge_volume (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
symbol VARCHAR(50) NOT NULL COMMENT '交易对',
bar VARCHAR(20) NOT NULL COMMENT 'K线周期',
timestamp BIGINT NOT NULL COMMENT '时间戳',
date_time VARCHAR(50) NOT NULL COMMENT '日期时间',
open DECIMAL(20,5) NOT NULL COMMENT '开盘价',
high DECIMAL(20,5) NOT NULL COMMENT '最高价',
low DECIMAL(20,5) NOT NULL COMMENT '最低价',
close DECIMAL(20,5) NOT NULL COMMENT '收盘价',
volume DECIMAL(30,8) NOT NULL COMMENT '交易量',
volCcy DECIMAL(30,8) NOT NULL COMMENT '交易量(基础货币)',
volCCyQuote DECIMAL(30,8) NOT NULL COMMENT '交易量(计价货币)',
volume_ma DECIMAL(30,8) NOT NULL COMMENT '交易量移动平均',
volume_std DECIMAL(30,8) NOT NULL COMMENT '交易量标准差',
volume_threshold DECIMAL(30,8) NOT NULL COMMENT '交易量阈值',
huge_volume TINYINT NOT NULL DEFAULT 0 COMMENT '是否为巨量(0:否,1:是)',
volume_ratio DECIMAL(20,8) NOT NULL COMMENT '交易量比率',
spike_intensity DECIMAL(20,8) NOT NULL COMMENT '尖峰强度',
close_80_percentile DECIMAL(20,5) NOT NULL COMMENT '收盘价80%分位数',
close_20_percentile DECIMAL(20,5) NOT NULL COMMENT '收盘价20%分位数',
price_high TINYINT NOT NULL DEFAULT 0 COMMENT '价格是否达到高点(0:否,1:是)',
price_low TINYINT NOT NULL DEFAULT 0 COMMENT '价格是否达到低点(0:否,1:是)',
volume_price_spike TINYINT NOT NULL DEFAULT 0 COMMENT '是否出现量价尖峰(0:否,1:是)',
create_time VARCHAR(50) NOT NULL COMMENT '创建时间',
UNIQUE KEY uniq_symbol_bar_timestamp (symbol, bar, timestamp),
INDEX idx_symbol_bar (symbol, bar),
INDEX idx_timestamp (timestamp),
INDEX idx_huge_volume (huge_volume),
INDEX idx_volume_price_spike (volume_price_spike),
INDEX idx_date_time (date_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='加密货币巨量交易数据表';
-- 添加注释说明
-- 该表用于存储加密货币市场K线数据以及相关的巨量交易分析指标
-- 主要功能:
-- 1. 存储基础K线数据价格、成交量等
-- 2. 计算并存储巨量交易相关指标
-- 3. 识别价格和成交量的异常波动
-- 4. 为交易策略提供数据支持

View File

@ -1,77 +0,0 @@
from core.statistics import Statistics
from core.db_market_data import DBMarketData
from monitor_main import MonitorMain
import logging
from config import MONITOR_CONFIG, MYSQL_CONFIG
from datetime import datetime
import pandas as pd
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class StatisticsMain:
def __init__(self):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.statistics = Statistics()
self.db_market_data = DBMarketData(self.db_url)
self.monitor_main = MonitorMain()
def batch_detect_volume_spike(self, start: str, end: str):
pass
def detect_volume_spike(
self,
symbol: str = "XCH-USDT",
bar: str = "5m",
start: str = "2025-05-01 00:00:00",
end: str = None,
only_output_huge_volume: bool = False,
):
if start is None:
start = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
data = self.db_market_data.query_market_data_by_symbol_bar(symbol, bar, start, end)
if data is None:
logging.warning(f"获取数据失败: {symbol} {bar} {start} {end}")
return None
else:
if len(data) == 0:
logging.warning(f"获取数据为空: {symbol} {bar} {start} {end}")
return None
else:
if isinstance(data, list):
data = pd.DataFrame(data)
elif isinstance(data, dict):
data = pd.DataFrame([data])
return self.statistics.detect_volume_spike(
data=data,
check_price=True,
only_output_huge_volume=only_output_huge_volume,
output_excel=True,
)
def update_volume_spike(self):
self.monitor_main.update_data()
if __name__ == "__main__":
statistics_main = StatisticsMain()
statistics_main.detect_volume_spike(
symbol="XCH-USDT",
bar="5m",
start="2025-05-01 00:00:00",
only_output_huge_volume=True,
)