complete market data monitor main logic

This commit is contained in:
blade 2025-08-05 15:30:50 +08:00
parent cd0b64c5c9
commit 2073f3966f
14 changed files with 1182 additions and 82 deletions

Binary file not shown.

View File

@ -20,7 +20,8 @@ class HugeVolume:
def _calculate_percentile_indicators(
self,
data: pd.DataFrame,
window_size: int,
window_size: int = 50,
price_column: str = "close",
percentiles: List[Tuple[float, str]] = [(0.8, "80"), (0.2, "20"), (0.9, "90"), (0.1, "10")]
) -> pd.DataFrame:
"""
@ -32,44 +33,24 @@ class HugeVolume:
"""
for percentile, suffix in percentiles:
# 计算分位数
data[f"close_{suffix}_percentile"] = (
data["close"].rolling(window=window_size, min_periods=1).quantile(percentile)
data[f"{price_column}_{suffix}_percentile"] = (
data[price_column].rolling(window=window_size, min_periods=1).quantile(percentile)
)
# 判断价格是否达到分位数
if suffix in ["80", "90"]:
# 高点分位数
data[f"price_{suffix}_high"] = (
data["close"] >= data[f"close_{suffix}_percentile"]
data[f"{price_column}_{suffix}_high"] = (
data[price_column] >= data[f"{price_column}_{suffix}_percentile"]
).astype(int)
else:
# 低点分位数
data[f"price_{suffix}_low"] = (
data["close"] <= data[f"close_{suffix}_percentile"]
data[f"{price_column}_{suffix}_low"] = (
data[price_column] <= data[f"{price_column}_{suffix}_percentile"]
).astype(int)
return data
def _calculate_volume_price_spikes(self, data: pd.DataFrame) -> pd.DataFrame:
"""
计算量价尖峰指标
:param data: 数据DataFrame
:return: 包含量价尖峰指标的DataFrame
"""
# 80/20量价尖峰
data["volume_80_20_price_spike"] = (
(data["huge_volume"] == 1)
& ((data["price_80_high"] == 1) | (data["price_20_low"] == 1))
).astype(int)
# 90/10量价尖峰
data["volume_90_10_price_spike"] = (
(data["huge_volume"] == 1)
& ((data["price_90_high"] == 1) | (data["price_10_low"] == 1))
).astype(int)
return data
def detect_huge_volume(
self,
data: DataFrame,
@ -136,12 +117,17 @@ class HugeVolume:
if "close" not in data.columns:
logging.error("数据中缺少close列无法进行价格检查")
return data
if "high" not in data.columns:
logging.error("数据中缺少high列无法进行价格检查")
return data
if "low" not in data.columns:
logging.error("数据中缺少low列无法进行价格检查")
return data
# 计算分位数指标80/20和90/10
data = self._calculate_percentile_indicators(data, window_size)
for price_column in ["close", "high", "low"]:
# 计算分位数指标80/20和90/10
data = self._calculate_percentile_indicators(data, window_size, price_column)
# 计算量价尖峰指标
data = self._calculate_volume_price_spikes(data)
if only_output_huge_volume:
data = data[(data["huge_volume"] == 1)]

View File

@ -8,7 +8,7 @@ import okx.TradingData as TradingData
from core.utils import transform_date_time_to_timestamp
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
class MarketDataMonitor:
class MarketData:
def __init__(self,
api_key: str,
secret_key: str,
@ -24,7 +24,54 @@ class MarketDataMonitor:
flag=flag
)
def get_historical_kline_data(self, symbol: str = None, start: str = None, bar: str = '1m', limit: int = 100, end_time: int = None) -> Optional[pd.DataFrame]:
def get_realtime_kline_data(self, symbol: str = None, bar: str = '5m', end_time: int = None, limit: int = 50) -> Optional[pd.DataFrame]:
"""
获取实时K线数据
"""
if symbol is None:
symbol = "XCH-USDT"
if bar is None:
bar = "5m"
if end_time is None:
end_time = int(time.time() * 1000) # 当前时间(毫秒)
else:
end_time = transform_date_time_to_timestamp(end_time)
if end_time is None:
logging.error(f"end_time参数解析失败: {end_time}")
return None
response = self.get_realtime_candlesticks_from_api(symbol, bar, end_time, limit)
if response:
candles = response["data"]
from_time = int(candles[-1][0])
to_time = int(candles[0][0])
from_time_str = pd.to_datetime(from_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
to_time_str = pd.to_datetime(to_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
logging.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}")
columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"]
candles_pd = pd.DataFrame(candles, columns=columns)
for col in ['open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote']:
candles_pd[col] = pd.to_numeric(candles_pd[col], errors='coerce')
dt_series = pd.to_datetime(candles_pd['timestamp'].astype(int), unit='ms', utc=True, errors='coerce').dt.tz_convert('Asia/Shanghai')
candles_pd['date_time'] = dt_series.dt.strftime('%Y-%m-%d %H:%M:%S')
# 将timestamp转换为整型
candles_pd['timestamp'] = candles_pd['timestamp'].astype(int)
# 添加虚拟货币名称列内容为symbol
candles_pd['symbol'] = symbol
# 添加bar列内容为bar
candles_pd['bar'] = bar
candles_pd['create_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
candles_pd = candles_pd[['symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote', 'create_time']]
candles_pd.sort_values('timestamp', inplace=True)
candles_pd.reset_index(drop=True, inplace=True)
return candles_pd
else:
logging.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试")
return None
def get_historical_kline_data(self, symbol: str = None, start: str = None, bar: str = '5m', limit: int = 100, end_time: int = None) -> Optional[pd.DataFrame]:
"""
获取历史K线数据支持start为北京时间字符串%Y-%m-%d %H:%M:%S或UTC毫秒级时间戳
:param symbol: 交易对
@ -56,7 +103,7 @@ class MarketDataMonitor:
while start_time < end_time:
try:
# after真实逻辑是获得指定时间之前的数据
response = self.get_candlesticks_from_api(symbol, end_time, bar, limit)
response = self.get_historical_candlesticks_from_api(symbol, bar, end_time, limit)
if response is None:
logging.warning(f"请求失败,请稍后再试")
break
@ -151,15 +198,15 @@ class MarketDataMonitor:
df.loc[index, "sell_sz"] = sell_sz
return df
def get_candlesticks_from_api(self, symbol, end_time, bar, limit):
def get_historical_candlesticks_from_api(self, symbol, bar, end_time, limit):
response = None
count = 0
while True:
try:
response = self.market_api.get_history_candlesticks(
instId=symbol,
after=end_time, # 获取指定时间之前的数据,
bar=bar,
after=end_time, # 获取指定时间之前的数据,
limit=str(limit)
)
if response:
@ -172,6 +219,27 @@ class MarketDataMonitor:
time.sleep(10)
return response
def get_realtime_candlesticks_from_api(self, symbol, bar, end_time, limit):
response = None
count = 0
while True:
try:
response = self.market_api.get_candlesticks(
instId=symbol,
bar=bar,
after=end_time,
limit=str(limit)
)
if response:
break
except Exception as e:
logging.error(f"请求出错: {e}")
count += 1
if count > 3:
break
time.sleep(10)
return response
def get_data_from_db(self, symbol, bar, db_url):
sql = """
SELECT * FROM crypto_market_data

249
core/biz/market_monitor.py Normal file
View File

@ -0,0 +1,249 @@
import pandas as pd
import numpy as np
from metrics_config import METRICS_CONFIG
from time import time
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
def create_metrics_report(row: pd.Series, only_output_rise: bool = False):
"""
创建指标报告
"""
contents = []
huge_volume = row["huge_volume"]
symbol = row["symbol"]
bar = row["bar"]
window_size = row["window_size"]
date_time = row["date_time"]
if huge_volume == 1:
logging.info(
f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 巨量"
)
else:
logging.info(
f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 非巨量,此次不发送相关数据"
)
return
# fill -1 to nan
row = row.fillna(1)
close = row["close"]
open = row["open"]
high = row["high"]
low = row["low"]
pct_chg = row["pct_chg"]
if only_output_rise and pct_chg < 0:
logging.info(
f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 下跌,不发送相关数据"
)
return
contents.append(f"# 交易巨量报告")
contents.append(f"## {symbol} {bar} 滑动窗口: {window_size} 时间: {date_time}")
contents.append(f"### 价格信息")
contents.append(f"当前价格: {close}, 开盘价: {open}, 最高价: {high}, 最低价: {low}")
contents.append(f"涨跌幅: {pct_chg}")
volume = row["volume"]
volCcy = row["volCcy"]
volCCyQuote = row["volCCyQuote"]
volume_ratio = row["volume_ratio"]
spike_intensity = row["spike_intensity"]
close_80_high = int(row["close_80_high"])
close_20_low = int(row["close_20_low"])
close_90_high = int(row["close_90_high"])
close_10_low = int(row["close_10_low"])
high_80_high = int(row["high_80_high"])
high_90_high = int(row["high_90_high"])
low_20_low = int(row["low_20_low"])
low_10_low = int(row["low_10_low"])
contents.append(f"### 交易量信息")
contents.append(
f"交易量(张): {volume}, 交易量(币): {volCcy}, 交易量(货币): {volCCyQuote}"
)
contents.append(f"交易量比率: {volume_ratio}, 尖峰强度: {spike_intensity}")
if close_90_high:
contents.append(f"当前价格处于滑动窗口期90%分位数高点")
elif close_80_high:
contents.append(f"当前价格处于滑动窗口期80%分位数高点")
elif close_20_low:
contents.append(f"当前价格处于滑动窗口期20%分位数低点")
elif close_10_low:
contents.append(f"当前价格处于滑动窗口期10%分位数低点")
long_short_info = {"": [], "": []}
ma_long_short = str(row["ma_long_short"])
ma_long_short_value = METRICS_CONFIG.get("ma_long_short", {}).get(ma_long_short, 1)
if ma_long_short_value > 1:
long_short_info[""].append(f"均线势头: {ma_long_short}")
if ma_long_short_value < 1:
long_short_info[""].append(f"均线势头: {ma_long_short}")
macd_signal = str(row["macd_signal"])
macd_divergence = str(row["macd_divergence"])
kdj_signal = str(row["kdj_signal"])
kdj_pattern = str(row["kdj_pattern"])
rsi_signal = str(row["rsi_signal"])
boll_signal = str(row["boll_signal"])
boll_pattern = str(row["boll_pattern"])
is_long = False
is_short = False
is_over_buy = False
is_over_sell = False
if (
macd_divergence == "顶背离"
or kdj_pattern in ["超超买", "超买"]
or rsi_signal in ["超超买", "超买"]
or boll_pattern in ["超超买", "超买"]
):
is_over_buy = True
if (
macd_divergence == "底背离"
or kdj_pattern in ["超超卖", "超卖"]
or rsi_signal in ["超超卖", "超卖"]
or boll_pattern in ["超超卖", "超卖"]
):
is_over_sell = True
if ma_long_short == "":
is_long = True
if ma_long_short == "":
is_short = True
ma_divergence = str(row["ma_divergence"])
if is_long:
check_long_short = ""
if is_over_buy:
check_over_buy = "超买"
else:
check_over_buy = "非超买"
ma_divergence_value = (
METRICS_CONFIG.get("ma_divergence", {})
.get(check_long_short, {})
.get(check_over_buy, {})
.get(ma_divergence, 1)
)
if ma_divergence_value > 1:
long_short_info[""].append(f"均线形态: {ma_divergence}")
if ma_divergence_value < 1:
long_short_info[""].append(f"均线形态: {ma_divergence}")
if is_short:
if is_over_sell:
check_over_sell = "超卖"
else:
check_over_sell = "非超卖"
ma_divergence_value = (
METRICS_CONFIG.get("ma_divergence", {})
.get(check_long_short, {})
.get(check_over_sell, {})
.get(ma_divergence, 1)
)
if ma_divergence_value > 1:
long_short_info[""].append(f"均线形态: {ma_divergence}")
if ma_divergence_value < 1:
long_short_info[""].append(f"均线形态: {ma_divergence}")
ma_cross = str(row["ma_cross"])
ma_cross_value = METRICS_CONFIG.get("ma_cross", {}).get(ma_cross, 1)
if ma_cross_value > 1:
long_short_info[""].append(f"均线交叉: {ma_cross}")
if ma_cross_value < 1:
long_short_info[""].append(f"均线交叉: {ma_cross}")
macd_signal_value = METRICS_CONFIG.get("macd", {}).get(macd_signal, 1)
if macd_signal_value > 1:
long_short_info[""].append(f"MACD信号: {macd_signal}")
if macd_signal_value < 1:
long_short_info[""].append(f"MACD信号: {macd_signal}")
macd_divergence_value = METRICS_CONFIG.get("macd", {}).get(macd_divergence, 1)
if macd_divergence_value > 1:
long_short_info[""].append(f"MACD背离: {row['macd_divergence']}")
if macd_divergence_value < 1:
long_short_info[""].append(f"MACD背离: {row['macd_divergence']}")
kdj_signal_value = METRICS_CONFIG.get("kdj", {}).get(kdj_signal, 1)
if kdj_signal_value > 1:
long_short_info[""].append(f"KDJ信号: {kdj_signal}")
if kdj_signal_value < 1:
long_short_info[""].append(f"KDJ信号: {kdj_signal}")
kdj_pattern_value = METRICS_CONFIG.get("kdj", {}).get(kdj_pattern, 1)
if kdj_pattern_value > 1:
long_short_info[""].append(f"KDJ形态: {kdj_pattern}")
if kdj_pattern_value < 1:
long_short_info[""].append(f"KDJ形态: {kdj_pattern}")
rsi_signal_value = METRICS_CONFIG.get("rsi", {}).get(rsi_signal, 1)
if rsi_signal_value > 1:
long_short_info[""].append(f"RSI形态: {rsi_signal}")
if rsi_signal_value < 1:
long_short_info[""].append(f"RSI形态: {rsi_signal}")
boll_signal_value = METRICS_CONFIG.get("boll", {}).get(boll_signal, 1)
if boll_signal_value > 1:
long_short_info[""].append(f"BOLL信号: {boll_signal}")
if boll_signal_value < 1:
long_short_info[""].append(f"BOLL信号: {boll_signal}")
boll_pattern_value = METRICS_CONFIG.get("boll", {}).get(boll_pattern, 1)
if boll_pattern_value > 1:
long_short_info[""].append(f"BOLL形态: {boll_pattern}")
if boll_pattern_value < 1:
long_short_info[""].append(f"BOLL形态: {boll_pattern}")
k_up_down = str(row["k_up_down"])
k_shape = str(row["k_shape"])
if is_over_buy:
k_shape_value = (
METRICS_CONFIG.get("k_shape", {})
.get("超买", {})
.get(k_up_down, {})
.get(k_shape, 1)
)
if k_shape_value > 1:
long_short_info[""].append(f"K线形态: {k_shape}")
if k_shape_value < 1:
long_short_info[""].append(f"K线形态: {k_shape}")
if is_over_sell:
k_shape_value = (
METRICS_CONFIG.get("k_shape", {})
.get("超卖", {})
.get(k_up_down, {})
.get(k_shape, 1)
)
if k_shape_value > 1:
long_short_info[""].append(f"K线形态: {k_shape}")
if k_shape_value < 1:
long_short_info[""].append(f"K线形态: {k_shape}")
if k_up_down == "阳线":
if is_long and not is_over_buy:
long_short_info[""].append(f"量价关系: 非超买且放量上涨")
if is_short and is_over_sell:
long_short_info[""].append(f"量价关系: 空头态势且超卖,但出现放量上涨,可能反转")
if k_up_down == "阴线":
if is_long and is_over_buy:
if close_80_high or close_90_high or high_80_high or high_90_high:
long_short_info[""].append(f"量价关系: 多头态势且超买, 目前是价位高点,但出现放量下跌,可能反转")
if is_short and not is_over_sell:
long_short_info[""].append(f"量价关系: 空头态势且非超卖,出现放量下跌")
contents.append(f"### 技术指标信息")
long_info_list = long_short_info[""]
short_info_list = long_short_info[""]
if len(long_info_list) > 0:
contents.append(f"#### 多头指标信号")
contents.append(f"{"\n".join(long_info_list)}")
if len(short_info_list) > 0:
contents.append(f"#### 空头指标信号")
contents.append(f"{"\n".join(short_info_list)}")
mark_down_text = "\n\n".join(contents)
return mark_down_text

View File

@ -0,0 +1,439 @@
import pandas as pd
import logging
from typing import Optional, List, Dict, Any, Union
from core.db.db_manager import DBData
from core.utils import transform_date_time_to_timestamp
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
class DBMarketMonitor:
def __init__(
self,
db_url: str
):
self.db_url = db_url
self.table_name = "crypto_market_monitor"
self.columns = [
"symbol",
"bar",
"window_size",
"timestamp",
"date_time",
"report",
"report_file_path",
"report_file_name",
"report_file_byte_size"
]
self.db_manager = DBData(db_url, self.table_name, self.columns)
def _process_time_parameter(self, time_param: Optional[Union[str, int]]) -> Optional[int]:
"""
处理时间参数统一转换为时间戳
:param time_param: 时间参数字符串或整数
:return: 时间戳或None
"""
if time_param is None:
return None
time_param = transform_date_time_to_timestamp(time_param)
if time_param is None:
return None
return time_param
def _build_query_conditions(
self,
symbol: Optional[str] = None,
bar: Optional[str] = None,
window_size: Optional[int] = None,
start: Optional[Union[str, int]] = None,
end: Optional[Union[str, int]] = None,
additional_conditions: Optional[List[str]] = None
) -> tuple[List[str], Dict[str, Any]]:
"""
构建查询条件
:param symbol: 交易对
:param bar: K线周期
:param window_size: 窗口大小
:param start: 开始时间
:param end: 结束时间
:param additional_conditions: 额外的查询条件
:return: (条件列表, 参数字典)
"""
conditions = additional_conditions or []
condition_dict = {}
if symbol:
conditions.append("symbol = :symbol")
condition_dict["symbol"] = symbol
if bar:
conditions.append("bar = :bar")
condition_dict["bar"] = bar
if window_size:
conditions.append("window_size = :window_size")
condition_dict["window_size"] = window_size
# 处理时间参数
start_timestamp = self._process_time_parameter(start)
end_timestamp = self._process_time_parameter(end)
if start_timestamp is not None:
conditions.append("timestamp >= :start")
condition_dict["start"] = start_timestamp
if end_timestamp is not None:
conditions.append("timestamp <= :end")
condition_dict["end"] = end_timestamp
return conditions, condition_dict
def insert_data_to_mysql(self, df: pd.DataFrame) -> None:
"""
将市场监控数据保存到MySQL的crypto_market_monitor表
速度 最快
内存 中等
适用场景中小数据量<10万条
:param df: 市场监控数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql(df)
def insert_data_to_mysql_fast(self, df: pd.DataFrame) -> None:
"""
快速插入市场监控数据方案2使用executemany批量插入
速度 很快
内存
适用场景中等数据量
:param df: 市场监控数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_fast(df)
def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000) -> None:
"""
分块插入市场监控数据方案3分批处理大数据量
速度 中等
内存 最低
适用场景大数据量>10万条
:param df: 市场监控数据DataFrame
:param chunk_size: 每块大小
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_chunk(df, chunk_size)
def insert_data_to_mysql_simple(self, df: pd.DataFrame) -> None:
"""
简单插入市场监控数据方案4使用pandas to_sql
速度 较慢
内存 较高
适用场景小数据量简单场景
:param df: 市场监控数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_simple(df)
def query_latest_data(self, symbol: str, bar: str, window_size: int) -> Optional[Dict[str, Any]]:
"""
查询最新的市场监控数据
:param symbol: 交易对
:param bar: K线周期
:param window_size: 窗口大小
:return: 最新数据字典或None
"""
conditions = [
"symbol = :symbol",
"bar = :bar",
"window_size = :window_size"
]
condition_dict = {
"symbol": symbol,
"bar": bar,
"window_size": window_size
}
sql = f"""
SELECT * FROM {self.table_name}
WHERE {' AND '.join(conditions)}
ORDER BY timestamp DESC
LIMIT 1
"""
result = self.db_manager.query_data(sql, condition_dict, return_multi=False)
return result
def query_data_before_timestamp(self, symbol: str, bar: str, window_size: int, timestamp: int, limit: int = 100) -> Optional[List[Dict[str, Any]]]:
"""
查询指定时间戳之前的数据
:param symbol: 交易对
:param bar: K线周期
:param window_size: 窗口大小
:param timestamp: 时间戳
:param limit: 限制条数
:return: 数据列表或None
"""
conditions = [
"symbol = :symbol",
"bar = :bar",
"window_size = :window_size",
"timestamp < :timestamp"
]
condition_dict = {
"symbol": symbol,
"bar": bar,
"window_size": window_size,
"timestamp": timestamp
}
sql = f"""
SELECT * FROM {self.table_name}
WHERE {' AND '.join(conditions)}
ORDER BY timestamp DESC
LIMIT {limit}
"""
result = self.db_manager.query_data(sql, condition_dict, return_multi=True)
return result
def query_market_monitor_by_symbol_bar(self, symbol: str, bar: str, window_size: int, start: Optional[Union[str, int]] = None, end: Optional[Union[str, int]] = None) -> Optional[List[Dict[str, Any]]]:
"""
根据交易对和K线周期查询市场监控数据
:param symbol: 交易对
:param bar: K线周期
:param window_size: 窗口大小
:param start: 开始时间
:param end: 结束时间
:return: 数据列表或None
"""
conditions, condition_dict = self._build_query_conditions(
symbol=symbol,
bar=bar,
window_size=window_size,
start=start,
end=end
)
if not conditions:
sql = f"SELECT * FROM {self.table_name} ORDER BY timestamp DESC"
else:
sql = f"""
SELECT * FROM {self.table_name}
WHERE {' AND '.join(conditions)}
ORDER BY timestamp DESC
"""
result = self.db_manager.query_data(sql, condition_dict, return_multi=True)
return result
def query_market_monitor_by_window_size(self, window_size: int, symbol: Optional[str] = None, bar: Optional[str] = None, start: Optional[Union[str, int]] = None, end: Optional[Union[str, int]] = None) -> Optional[List[Dict[str, Any]]]:
"""
根据窗口大小查询市场监控数据
:param window_size: 窗口大小
:param symbol: 交易对可选
:param bar: K线周期可选
:param start: 开始时间可选
:param end: 结束时间可选
:return: 数据列表或None
"""
conditions, condition_dict = self._build_query_conditions(
symbol=symbol,
bar=bar,
window_size=window_size,
start=start,
end=end
)
if not conditions:
sql = f"SELECT * FROM {self.table_name} ORDER BY timestamp DESC"
else:
sql = f"""
SELECT * FROM {self.table_name}
WHERE {' AND '.join(conditions)}
ORDER BY timestamp DESC
"""
result = self.db_manager.query_data(sql, condition_dict, return_multi=True)
return result
def get_market_monitor_statistics(self, symbol: Optional[str] = None, bar: Optional[str] = None, window_size: Optional[int] = None, start: Optional[Union[str, int]] = None, end: Optional[Union[str, int]] = None) -> Optional[Dict[str, Any]]:
"""
获取市场监控数据统计信息
:param symbol: 交易对可选
:param bar: K线周期可选
:param window_size: 窗口大小可选
:param start: 开始时间可选
:param end: 结束时间可选
:return: 统计信息字典或None
"""
conditions, condition_dict = self._build_query_conditions(
symbol=symbol,
bar=bar,
window_size=window_size,
start=start,
end=end
)
where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
sql = f"""
SELECT
COUNT(*) as total_count,
COUNT(DISTINCT symbol) as symbol_count,
COUNT(DISTINCT bar) as bar_count,
COUNT(DISTINCT window_size) as window_size_count,
MIN(timestamp) as earliest_timestamp,
MAX(timestamp) as latest_timestamp,
AVG(report_file_byte_size) as avg_file_size,
SUM(report_file_byte_size) as total_file_size
FROM {self.table_name}
{where_clause}
"""
result = self.db_manager.query_data(sql, condition_dict, return_multi=False)
return result
def get_recent_market_monitor_data(self, symbol: Optional[str] = None, bar: Optional[str] = None, window_size: Optional[int] = None, limit: int = 100) -> Optional[List[Dict[str, Any]]]:
"""
获取最近的市场监控数据
:param symbol: 交易对可选
:param bar: K线周期可选
:param window_size: 窗口大小可选
:param limit: 限制条数
:return: 数据列表或None
"""
conditions, condition_dict = self._build_query_conditions(
symbol=symbol,
bar=bar,
window_size=window_size
)
if not conditions:
sql = f"SELECT * FROM {self.table_name} ORDER BY timestamp DESC LIMIT {limit}"
else:
sql = f"""
SELECT * FROM {self.table_name}
WHERE {' AND '.join(conditions)}
ORDER BY timestamp DESC
LIMIT {limit}
"""
result = self.db_manager.query_data(sql, condition_dict, return_multi=True)
return result
def get_market_monitor_by_file_size_range(self, min_size: int, max_size: int, symbol: Optional[str] = None, bar: Optional[str] = None, window_size: Optional[int] = None, start: Optional[Union[str, int]] = None, end: Optional[Union[str, int]] = None) -> Optional[List[Dict[str, Any]]]:
"""
根据文件大小范围查询市场监控数据
:param min_size: 最小文件大小
:param max_size: 最大文件大小
:param symbol: 交易对可选
:param bar: K线周期可选
:param window_size: 窗口大小可选
:param start: 开始时间可选
:param end: 结束时间可选
:return: 数据列表或None
"""
conditions, condition_dict = self._build_query_conditions(
symbol=symbol,
bar=bar,
window_size=window_size,
start=start,
end=end
)
conditions.append("report_file_byte_size BETWEEN :min_size AND :max_size")
condition_dict["min_size"] = min_size
condition_dict["max_size"] = max_size
sql = f"""
SELECT * FROM {self.table_name}
WHERE {' AND '.join(conditions)}
ORDER BY timestamp DESC
"""
result = self.db_manager.query_data(sql, condition_dict, return_multi=True)
return result
def get_market_monitor_by_symbol_list(self, symbols: List[str], bar: Optional[str] = None, window_size: Optional[int] = None, start: Optional[Union[str, int]] = None, end: Optional[Union[str, int]] = None) -> Optional[List[Dict[str, Any]]]:
"""
根据交易对列表查询市场监控数据
:param symbols: 交易对列表
:param bar: K线周期可选
:param window_size: 窗口大小可选
:param start: 开始时间可选
:param end: 结束时间可选
:return: 数据列表或None
"""
if not symbols:
return None
conditions, condition_dict = self._build_query_conditions(
bar=bar,
window_size=window_size,
start=start,
end=end
)
# 构建IN查询条件
placeholders = [f":symbol_{i}" for i in range(len(symbols))]
conditions.append(f"symbol IN ({', '.join(placeholders)})")
for i, symbol in enumerate(symbols):
condition_dict[f"symbol_{i}"] = symbol
sql = f"""
SELECT * FROM {self.table_name}
WHERE {' AND '.join(conditions)}
ORDER BY timestamp DESC
"""
result = self.db_manager.query_data(sql, condition_dict, return_multi=True)
return result
def delete_old_market_monitor_data(self, days: int = 30, symbol: Optional[str] = None, bar: Optional[str] = None, window_size: Optional[int] = None) -> int:
"""
删除旧的市场监控数据
:param days: 保留天数
:param symbol: 交易对可选
:param bar: K线周期可选
:param window_size: 窗口大小可选
:return: 删除的记录数
"""
import datetime
from sqlalchemy import text
# 计算截止时间戳
cutoff_time = datetime.datetime.now() - datetime.timedelta(days=days)
cutoff_timestamp = int(cutoff_time.timestamp() * 1000) # 转换为毫秒时间戳
conditions, condition_dict = self._build_query_conditions(
symbol=symbol,
bar=bar,
window_size=window_size
)
conditions.append("timestamp < :cutoff_timestamp")
condition_dict["cutoff_timestamp"] = cutoff_timestamp
sql = f"""
DELETE FROM {self.table_name}
WHERE {' AND '.join(conditions)}
"""
try:
with self.db_manager.db_engine.connect() as conn:
result = conn.execute(text(sql), condition_dict)
conn.commit()
deleted_count = result.rowcount
logging.info(f"删除了 {deleted_count} 条旧的市场监控数据")
return deleted_count
except Exception as e:
logging.error(f"删除旧数据时发生错误: {e}")
return 0

View File

@ -44,7 +44,7 @@ class HugeVolumeMain:
for bar in self.market_data_main.bars:
if start is None:
start = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
"initial_date", "2025-05-15 00:00:00"
)
data = self.detect_volume_spike(
symbol,
@ -467,7 +467,7 @@ def batch_initial_detect_volume_spike(threshold: float = 2.0):
window_sizes = [50, 80, 100, 120]
huge_volume_main = HugeVolumeMain(threshold)
start_date = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
"initial_date", "2025-05-15 00:00:00"
)
for window_size in window_sizes:
huge_volume_main.batch_initial_detect_volume_spike(

View File

@ -2,7 +2,7 @@ import logging
from datetime import datetime
from time import sleep
import pandas as pd
from core.biz.market_data_monitor import MarketDataMonitor
from core.biz.market_data import MarketData
from core.db.db_market_data import DBMarketData
from core.biz.metrics_calculation import MetricsCalculation
from core.utils import (
@ -26,7 +26,7 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(mes
class MarketDataMain:
def __init__(self):
self.market_data_monitor = MarketDataMonitor(
self.market_data = MarketData(
api_key=API_KEY,
secret_key=SECRET_KEY,
passphrase=PASSPHRASE,
@ -113,7 +113,7 @@ class MarketDataMain:
logging.info(
f"获取行情数据: {symbol} {bar}{start_date_time}{end_date_time}"
)
data = self.market_data_monitor.get_historical_kline_data(
data = self.market_data.get_historical_kline_data(
symbol=symbol,
start=current_start_time_ts,
bar=bar,
@ -172,43 +172,7 @@ class MarketDataMain:
"create_time",
]
]
data["pre_close"] = None
data["close_change"] = None
data["pct_chg"] = None
data["ma1"] = None
data["ma2"] = None
data["dif"] = None
data["dea"] = None
data["macd"] = None
data["macd_signal"] = None
data["macd_divergence"] = None
data["kdj_k"] = None
data["kdj_d"] = None
data["kdj_j"] = None
data["kdj_signal"] = None
data["kdj_pattern"] = None
data["ma5"] = None
data["ma10"] = None
data["ma20"] = None
data["ma30"] = None
data["ma_cross"] = None
data["ma5_close_diff"] = None
data["ma10_close_diff"] = None
data["ma20_close_diff"] = None
data["ma30_close_diff"] = None
data["ma_close_avg"] = None
data["ma_long_short"] = None
data["ma_divergence"] = None
data["rsi_14"] = None
data["rsi_signal"] = None
data["boll_upper"] = None
data["boll_middle"] = None
data["boll_lower"] = None
data["boll_signal"] = None
data["boll_pattern"] = None
data["k_length"] = None
data["k_shape"] = None
data["k_up_down"] = None
data = self.add_new_columns(data)
self.db_market_data.insert_data_to_mysql(data)
current_min_start_time_ts = data["timestamp"].min()
if current_min_start_time_ts < min_start_time_ts:
@ -233,7 +197,10 @@ class MarketDataMain:
handle_data = self.db_market_data.query_market_data_by_symbol_bar(
symbol=symbol, bar=bar, start=earliest_timestamp, end=None
)
if handle_data is not None and len(handle_data) > len(before_data):
if handle_data is not None:
if before_data is not None and len(handle_data) <= len(before_data):
logging.error(f"handle_data数据条数小于before_data数据条数: {symbol} {bar}")
return None
if isinstance(handle_data, list):
handle_data = pd.DataFrame(handle_data)
elif isinstance(handle_data, dict):
@ -252,6 +219,54 @@ class MarketDataMain:
self.db_market_data.insert_data_to_mysql(handle_data)
return data
def add_new_columns(self, data: pd.DataFrame):
"""
添加新列
"""
columns = data.columns.tolist()
if "buy_sz" not in columns:
data["buy_sz"] = -1
if "sell_sz" not in columns:
data["sell_sz"] = -1
data["pre_close"] = None
data["close_change"] = None
data["pct_chg"] = None
data["ma1"] = None
data["ma2"] = None
data["dif"] = None
data["dea"] = None
data["macd"] = None
data["macd_signal"] = None
data["macd_divergence"] = None
data["kdj_k"] = None
data["kdj_d"] = None
data["kdj_j"] = None
data["kdj_signal"] = None
data["kdj_pattern"] = None
data["ma5"] = None
data["ma10"] = None
data["ma20"] = None
data["ma30"] = None
data["ma_cross"] = None
data["ma5_close_diff"] = None
data["ma10_close_diff"] = None
data["ma20_close_diff"] = None
data["ma30_close_diff"] = None
data["ma_close_avg"] = None
data["ma_long_short"] = None
data["ma_divergence"] = None
data["rsi_14"] = None
data["rsi_signal"] = None
data["boll_upper"] = None
data["boll_middle"] = None
data["boll_lower"] = None
data["boll_signal"] = None
data["boll_pattern"] = None
data["k_length"] = None
data["k_shape"] = None
data["k_up_down"] = None
return data
def calculate_metrics(self, data: pd.DataFrame):
"""
计算技术指标

164
market_monitor_main.py Normal file
View File

@ -0,0 +1,164 @@
from numpy import real
from market_data_main import MarketDataMain
from huge_volume_main import HugeVolumeMain
from core.biz.market_monitor import create_metrics_report
from core.db.db_market_monitor import DBMarketMonitor
from core.wechat import Wechat
from config import MONITOR_CONFIG, MYSQL_CONFIG
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
import logging
import os
import pandas as pd
from datetime import datetime, timedelta
import json
import re
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
class MarketMonitorMain:
def __init__(self):
self.market_data_main = MarketDataMain()
self.huge_volume_main = HugeVolumeMain()
self.wechat = Wechat()
self.monitor_config = MONITOR_CONFIG
self.window_size = 100
self.start_date = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-01 00:00:00"
)
self.latest_record_file_path = "./output/latest_record.json"
self.latest_record = self.get_latest_record()
self.output_folder = "./output/report/market_monitor/"
os.makedirs(self.output_folder, exist_ok=True)
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_market_monitor = DBMarketMonitor(self.db_url)
def get_latest_record(self):
"""
获取最新记录
"""
if os.path.exists(self.latest_record_file_path):
with open(self.latest_record_file_path, "r", encoding="utf-8") as f:
return json.load(f)
else:
with open(self.latest_record_file_path, "w", encoding="utf-8") as f:
json.dump({}, f, ensure_ascii=False, indent=4)
return {}
def monitor_realtime_market(
self,
symbol: str,
bar: str,
only_output_huge_volume: bool = False,
only_output_rise: bool = False,
):
"""
监控最新市场数据
考虑到速度暂不与数据库交互直接从api获取数据
"""
real_time_data = self.market_data_main.market_data.get_realtime_kline_data(
symbol=symbol,
bar=bar,
end_time=None,
limit=50,
)
if real_time_data is None or len(real_time_data) == 0:
logging.error(f"获取最新市场数据失败: {symbol}, {bar}")
return
latest_realtime_timestamp = real_time_data["timestamp"].iloc[-1]
latest_record_timestamp = (
self.latest_record.get(symbol, {}).get(bar, {}).get("timestamp", 0)
)
latest_reatime_datetime = timestamp_to_datetime(latest_realtime_timestamp)
latest_record_datetime = timestamp_to_datetime(latest_record_timestamp)
if (
latest_record_timestamp is not None
and latest_realtime_timestamp <= latest_record_timestamp
):
logging.info(
f"最新市场数据时间戳 {latest_reatime_datetime} 小于等于最新记录时间戳 {latest_record_datetime} 不进行监控"
)
return
else:
self.latest_record[symbol] = {bar: {"timestamp": latest_realtime_timestamp}}
with open(self.latest_record_file_path, "w", encoding="utf-8") as f:
json.dump(self.latest_record, f, ensure_ascii=False, indent=4)
logging.info(
f"最新市场数据时间 {latest_reatime_datetime}, 上一次记录时间 {latest_record_datetime}"
)
real_time_data = self.market_data_main.add_new_columns(real_time_data)
logging.info(f"开始计算技术指标: {symbol} {bar}")
real_time_data = self.market_data_main.calculate_metrics(real_time_data)
logging.info(f"开始计算大成交量: {symbol} {bar} 窗口大小: {self.window_size}")
real_time_data = self.huge_volume_main.huge_volume.detect_huge_volume(
data=real_time_data,
window_size=self.window_size,
threshold=self.huge_volume_main.threshold,
check_price=True,
only_output_huge_volume=only_output_huge_volume,
output_excel=False,
)
if real_time_data is None or len(real_time_data) == 0:
logging.error(
f"计算大成交量失败: {symbol} {bar} 窗口大小: {self.window_size}"
)
return
report = create_metrics_report(real_time_data, only_output_rise)
text_length = len(report.encode("utf-8"))
logging.info(f"发送报告到企业微信,字节数: {text_length}")
self.wechat.send_markdown(report)
self.latest_record[symbol][bar]["timestamp"] = latest_realtime_timestamp
with open(self.latest_record_file_path, "w", encoding="utf-8") as f:
json.dump(self.latest_record, f, ensure_ascii=False, indent=4)
# remove punction in latest_reatime_datetime
latest_reatime_datetime = re.sub(r"[\:\-\s]", "", latest_reatime_datetime)
report_file_name = f"{symbol}_{bar}_{self.window_size}_{latest_reatime_datetime}.md"
report_file_path = os.path.join(self.output_folder, report_file_name)
with open(report_file_path, "w", encoding="utf-8") as f:
f.write(report.replace(":", "_"))
report_file_byte_size = os.path.getsize(report_file_path)
report_data = {
"symbol": symbol,
"bar": bar,
"window_size": self.window_size,
"timestamp": latest_realtime_timestamp,
"date_time": latest_reatime_datetime,
"report": report,
"report_file_path": report_file_path,
"report_file_name": report_file_name,
"report_file_byte_size": report_file_byte_size
}
report_data = pd.DataFrame([report_data])
logging.info(f"插入数据到数据库")
self.db_market_monitor.insert_data_to_mysql(report_data)
def batch_monitor_realtime_market(
self,
only_output_huge_volume: bool = True,
only_output_rise: bool = False,
):
for symbol in self.market_data_main.symbols:
for bar in self.market_data_main.bars:
self.monitor_realtime_market(
symbol,
bar,
only_output_huge_volume,
only_output_rise,
)

166
metrics_config.py Normal file
View File

@ -0,0 +1,166 @@
METRICS_CONFIG = {
"macd": {
"金叉": 1.5,
"死叉": 0.5,
"底背离": 1.5,
"顶背离": 0.5,
},
"kdj": {
"金叉": 1.5,
"死叉": 0.5,
"超超卖": 1.5,
"超卖": 1.2,
"超超买": 0.5,
"超买": 0.8,
},
"rsi": {
"超卖": 1.2,
"超买": 0.8,
},
"boll": {
"突破下轨": 1.2,
"击穿上轨": 0.8,
"超超卖": 1.5,
"超卖": 1.2,
"超超买": 0.5,
"超买": 0.8,
},
"ma_long_short": {
"": 1.2,
"": 0.8,
},
"ma_divergence": {
"": {
"超买": {
"超发散": 0.8,
"粘合": 0.8,
},
"非超买": {
"发散": 1.2,
"适中": 1.2,
"粘合": 1.5,
},
},
"": {
"超卖": {
"超发散": 1.2,
"粘合": 1.2,
},
"非超卖": {
"发散": 0.8,
"适中": 0.8,
"粘合": 0.8,
},
},
},
"ma5102030": {
"5穿10": 1.1,
"5穿20": 1.2,
"5穿30": 1.3,
"10穿30": 1.3,
"10穿5": 0.8,
"20穿5": 0.7,
"30穿5": 0.6,
"30穿10": 0.5,
},
"k_shape": {
"超买": {
"阳线": {
"一字": 0.8,
"长吊锤线": 0.8,
"吊锤线": 0.9,
"长倒T线": 0.8,
"倒T线": 0.9,
"长十字星": 0.8,
"十字星": 0.9,
"长上影线纺锤体": 0.8,
"长下影线纺锤体": 0.9,
"大实体": 1.1,
"超大实体": 1.2,
"超大实体+光头光脚": 1.3,
},
"阴线": {
"一字": 0.7,
"长吊锤线": 0.7,
"吊锤线": 0.8,
"长倒T线": 0.7,
"倒T线": 0.8,
"长十字星": 0.7,
"十字星": 0.8,
"长上影线纺锤体": 0.7,
"长下影线纺锤体": 0.8,
"大实体": 0.7,
"超大实体": 0.6,
"光头光脚": 0.8,
"超大实体+光头光脚": 0.5,
},
},
"超卖": {
"阳线": {
"一字": 1.5,
"长吊锤线": 1.5,
"吊锤线": 1.2,
"长倒T线": 1.5,
"倒T线": 1.2,
"长十字星": 1.6,
"十字星": 1.3,
"长上影线纺锤体": 1.2,
"长下影线纺锤体": 1.5,
"小实体": 1.2,
"大实体": 1.5,
"超大实体": 1.8,
"光头光脚": 1.5,
"超大实体+光头光脚": 2,
},
"阴线": {
"一字": 1.2,
"长吊锤线": 1.2,
"吊锤线": 1.1,
"长倒T线": 1.2,
"倒T线": 1.1,
"长十字星": 1.3,
"十字星": 1.1,
"长上影线纺锤体": 1.1,
"长下影线纺锤体": 1.2,
"大实体": 0.8,
"超大实体": 0.7,
"光头光脚": 0.9,
"超大实体+光头光脚": 0.6,
},
},
},
"huge_volume": {
"阳线": {
"": {
"非超买": {
"any": 1.2,
},
},
"": {
"超卖": {
"close_20_low": 1.2,
"close_10_low": 1.3,
"low_20_low": 1.3,
"low_10_low": 1.5,
"any": 1.1,
},
},
},
"阴线": {
"": {
"超买": {
"close_80_high": 0.8,
"close_90_high": 0.7,
"high_80_high": 0.7,
"high_90_high": 0.6,
"any": 0.9,
},
},
"": {
"非超卖": {
"any": 0.8,
},
},
},
},
}

View File

@ -1,6 +1,6 @@
import logging
from core.quant_trader import QuantTrader
from core.strategy import QuantStrategy
from core.biz.quant_trader import QuantTrader
from core.biz.strategy import QuantStrategy
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')

View File

@ -4,7 +4,7 @@ order by timestamp ;
select * from crypto_market_data
WHERE symbol='XCH-USDT' and bar='5m' and date_time > '2025-08-04 15:00:00'
order by timestamp desc;
order by timestamp asc;
delete FROM crypto_market_data where symbol != 'XCH-USDT';

View File

@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS crypto_market_monitor (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
symbol VARCHAR(50) NOT NULL COMMENT '交易对',
bar VARCHAR(20) NOT NULL COMMENT 'K线周期',
window_size INT NOT NULL COMMENT '窗口大小, 50, 80, 100, 120',
timestamp BIGINT NOT NULL COMMENT '时间戳',
date_time VARCHAR(50) NOT NULL COMMENT '日期时间',
report TEXT NOT NULL COMMENT '报告',
report_file_path VARCHAR(255) NOT NULL COMMENT '报告文件路径',
report_file_name VARCHAR(255) NOT NULL COMMENT '报告文件名',
report_file_byte_size INT NOT NULL COMMENT '报告文件大小',
UNIQUE KEY idx_symbol_bar_window_size_timestamp (symbol, bar, window_size, timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='市场行情监控';