crypto_quant/market_data_main.py

211 lines
8.6 KiB
Python
Raw Normal View History

2025-07-24 10:23:00 +00:00
import logging
from datetime import datetime
2025-07-24 10:23:00 +00:00
from time import sleep
from core.biz.market_data_monitor import MarketDataMonitor
from core.db.db_market_data import DBMarketData
from core.utils import datetime_to_timestamp, timestamp_to_datetime, transform_date_time_to_timestamp
from trade_data_main import TradeDataMain
2025-07-28 04:29:31 +00:00
from config import (
API_KEY,
SECRET_KEY,
PASSPHRASE,
SANDBOX,
MONITOR_CONFIG,
MYSQL_CONFIG,
BAR_THRESHOLD,
2025-07-28 04:29:31 +00:00
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
2025-07-24 10:23:00 +00:00
class MarketDataMain:
2025-07-24 10:23:00 +00:00
def __init__(self):
self.market_data_monitor = MarketDataMonitor(
2025-07-24 10:23:00 +00:00
api_key=API_KEY,
secret_key=SECRET_KEY,
passphrase=PASSPHRASE,
sandbox=SANDBOX,
2025-07-28 04:29:31 +00:00
)
self.symbols = MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["XCH-USDT"]
)
self.bars = MONITOR_CONFIG.get("volume_monitor", {}).get(
"bars", ["5m", "15m", "1H", "1D"]
2025-07-28 04:29:31 +00:00
)
self.initial_date = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-07-01 00:00:00"
)
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
2025-07-28 04:29:31 +00:00
self.db_market_data = DBMarketData(self.db_url)
self.trade_data_main = TradeDataMain()
2025-07-28 04:29:31 +00:00
2025-07-24 10:23:00 +00:00
def initial_data(self):
2025-07-25 08:12:52 +00:00
"""
初始化数据
"""
2025-07-24 10:23:00 +00:00
for symbol in self.symbols:
for bar in self.bars:
logging.info(f"开始初始化行情数据: {symbol} {bar}")
latest_data = self.db_market_data.query_latest_data(symbol, bar)
2025-07-25 08:12:52 +00:00
if latest_data:
start = latest_data.get("timestamp")
start_date_time = timestamp_to_datetime(start)
start = start + 1
else:
start = datetime_to_timestamp(self.initial_date)
start_date_time = self.initial_date
logging.info(
f"开始初始化{symbol}, {bar} 行情数据,从 {start_date_time} 开始"
)
self.fetch_save_data(symbol, bar, start)
2025-07-28 04:29:31 +00:00
def fetch_save_data(self, symbol: str, bar: str, start: str):
2025-07-25 08:12:52 +00:00
"""
获取保存数据
"""
end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
end_time_ts = transform_date_time_to_timestamp(end_time)
if end_time_ts is None:
logging.error(f"结束时间格式错误: {end_time}")
return None
start_time_ts = transform_date_time_to_timestamp(start)
if start_time_ts is None:
logging.error(f"开始时间格式错误: {start}")
return None
# 如果bar为5m, 15m:
# end_time_ts与start_time_ts相差超过1天则按照1天为单位
# 如果bar为1H, 4H,
# end_time_ts与start_time_ts相差超过5天则按照5天为单位
# 如果bar为1D, 则end_time_ts与start_time_ts相差超过10天则按照10天为单位
# 获取数据直到end_time_ts
threshold = None
if bar in ["5m", "15m"]:
threshold = 86400000
elif bar in ["1H", "4H"]:
threshold = 432000000
elif bar == "1D":
threshold = 864000000
while start_time_ts < end_time_ts:
current_start_time_ts = end_time_ts - threshold
if current_start_time_ts < start_time_ts:
current_start_time_ts = start_time_ts
start_date_time = timestamp_to_datetime(current_start_time_ts)
end_date_time = timestamp_to_datetime(end_time_ts)
logging.info(
f"获取行情数据: {symbol} {bar}{start_date_time}{end_date_time}"
)
data = self.market_data_monitor.get_historical_kline_data(
symbol=symbol,
start=current_start_time_ts,
bar=bar,
end_time=end_time_ts,
)
if data is not None and len(data) > 0:
data["buy_sz"] = -1
data["sell_sz"] = -1
# 根据交易数据设置buy_sz和sell_sz
# 比特币的数据获取过慢,暂时不获取交易数据
# if not symbol.endswith("-SWAP"):
# # trade_data的end_time需要比market_data的end_time大一个周期
# trade_data = self.trade_data_main.get_trade_data(
# symbol=symbol, start_time=current_start_time_ts, end_time=end_time_ts
# )
# for index, row in data.iterrows():
# try:
# current_from_time = int(row["timestamp"])
# if index == len(data) - 1:
# current_to_time = current_from_time + BAR_THRESHOLD[bar]
# else:
# current_to_time = int(data.iloc[index + 1]["timestamp"])
# current_trade_data = trade_data[
# (trade_data["ts"] >= current_from_time)
# & (trade_data["ts"] <= current_to_time)
# ]
# if current_trade_data is not None and len(current_trade_data) > 0:
# current_buy_sz = current_trade_data[
# current_trade_data["side"] == "buy"
# ]["sz"].sum()
# current_sell_sz = current_trade_data[
# current_trade_data["side"] == "sell"
# ]["sz"].sum()
# data.loc[index, "buy_sz"] = current_buy_sz
# data.loc[index, "sell_sz"] = current_sell_sz
# except Exception as e:
# logging.error(f"设置buy_sz和sell_sz失败: {e}")
# continue
if data is not None and len(data) > 0:
data = data[
[
"symbol",
"bar",
"timestamp",
"date_time",
"open",
"high",
"low",
"close",
"volume",
"volCcy",
"volCCyQuote",
"buy_sz",
"sell_sz",
"create_time",
]
]
self.db_market_data.insert_data_to_mysql(data)
if current_start_time_ts == start_time_ts:
break
end_time_ts = current_start_time_ts
return data
2025-07-28 04:29:31 +00:00
def batch_update_data(self):
2025-07-25 08:12:52 +00:00
"""
更新数据
1. 获取最新数据
2. 获取最新数据的时间戳
3. 根据最新数据的时间戳获取最新数据
4. 将最新数据保存到数据库
"""
for symbol in self.symbols:
for bar in self.bars:
self.update_data(symbol, bar)
def update_data(self, symbol: str, bar: str):
"""
更新数据
"""
logging.info(f"开始更新行情数据: {symbol} {bar}")
latest_data = self.db_market_data.query_latest_data(symbol, bar)
if not latest_data:
logging.info(f"{symbol}, {bar} 无数据,开始从{self.initial_date}初始化数据")
data = self.fetch_save_data(symbol, bar, self.initial_date)
else:
latest_timestamp = latest_data.get("timestamp")
if latest_timestamp:
latest_timestamp = int(latest_timestamp)
latest_date_time = timestamp_to_datetime(latest_timestamp)
logging.info(f"{symbol}, {bar} 上次获取的最新数据时间: {latest_date_time}")
else:
logging.warning(f"获取{symbol}, {bar} 最新数据失败")
return
data = self.fetch_save_data(symbol, bar, latest_timestamp + 1)
return data
2025-07-28 04:29:31 +00:00
2025-07-24 10:23:00 +00:00
if __name__ == "__main__":
market_data_main = MarketDataMain()
market_data_main.batch_update_data()
# market_data_main.initial_data()