crypto_quant/core/data_monitor.py

91 lines
4.2 KiB
Python
Raw Normal View History

2025-07-24 10:23:00 +00:00
import time
from datetime import datetime, timedelta
import logging
from typing import Optional
import pandas as pd
import okx.MarketData as Market
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
class DataMonitor:
def __init__(self,
api_key: str,
secret_key: str,
passphrase: str,
sandbox: bool = True):
flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境
self.market_api = Market.MarketAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
def get_historical_kline_data(self, symbol: str = None, start: str = None, bar: str = '1m', limit: int = 100, end_time: int = None) -> Optional[pd.DataFrame]:
"""
获取历史K线数据支持start为北京时间字符串%Y-%m-%d %H:%M:%S或UTC毫秒级时间戳
:param symbol: 交易对
:param start: 起始时间北京时间字符串或UTC毫秒级时间戳
:param bar: K线周期
:param limit: 每次请求数量
:param end_time: 结束时间毫秒级timestamp默认当前时间
:return: pd.DataFrame
"""
from core.utils import datetime_to_timestamp
if symbol is None:
symbol = "XCH-USDT"
if end_time is None:
end_time = int(time.time() * 1000) # 当前时间(毫秒)
# 处理start参数
if start is None:
# 默认两个月前
two_months_ago = datetime.now() - timedelta(days=60)
start_time = int(two_months_ago.timestamp() * 1000)
else:
try:
# 判断是否为纯数字UTC毫秒级timestamp
if start.isdigit():
start_time = int(start)
else:
# 按北京时间字符串处理转换为毫秒级timestamp
start_time = datetime_to_timestamp(start)
except Exception as e:
logging.error(f"start参数解析失败: {e}")
return None
columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"]
all_data = []
while start_time < end_time:
try:
# after真实逻辑是获得指定时间之前的数据
response = self.market_api.get_history_candlesticks(
instId=symbol,
after=end_time, # 获取指定时间之前的数据,
bar=bar,
limit=str(limit)
)
if response["code"] != "0" or not response["data"]:
logging.warning(f"请求失败或无数据: {response.get('msg', 'No message')}")
break
candles = response["data"]
all_data.extend(candles)
# 更新 end_time 为本次请求中最早的时间戳
end_time = int(candles[-1][0])
logging.info(f"已获取 {len(candles)} 条数据,最早时间: {pd.to_datetime(end_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')}")
end_time -= 1 # 减 1 毫秒以避免重复
time.sleep(0.2)
except Exception as e:
logging.error(f"请求出错: {e}")
break
if all_data:
df = pd.DataFrame(all_data, columns=columns)
df = df[df['confirm'] == '1']
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
dt_series = pd.to_datetime(df['timestamp'].astype(int), unit='ms', utc=True, errors='coerce').dt.tz_convert('Asia/Shanghai')
df['date_time'] = dt_series.dt.strftime('%Y-%m-%d %H:%M:%S')
df = df[['timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume']]
df.sort_values('timestamp', inplace=True)
df.reset_index(drop=True, inplace=True)
logging.info(f"总计获取 {len(df)} 条 K 线数据仅confirm=1")
return df
else:
logging.warning("未获取到数据")
return None