153 lines
7.2 KiB
Python
153 lines
7.2 KiB
Python
import time
|
||
from datetime import datetime, timedelta
|
||
import logging
|
||
from typing import Optional
|
||
import pandas as pd
|
||
import okx.MarketData as Market
|
||
from core.utils import datetime_to_timestamp
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
|
||
|
||
class DataMonitor:
|
||
def __init__(self,
|
||
api_key: str,
|
||
secret_key: str,
|
||
passphrase: str,
|
||
sandbox: bool = True):
|
||
flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境
|
||
self.market_api = Market.MarketAPI(
|
||
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
|
||
flag=flag
|
||
)
|
||
|
||
def get_historical_kline_data(self, symbol: str = None, start: str = None, bar: str = '1m', limit: int = 100, end_time: int = None) -> Optional[pd.DataFrame]:
|
||
"""
|
||
获取历史K线数据,支持start为北京时间字符串(%Y-%m-%d %H:%M:%S)或UTC毫秒级时间戳
|
||
:param symbol: 交易对
|
||
:param start: 起始时间(北京时间字符串或UTC毫秒级时间戳)
|
||
:param bar: K线周期
|
||
:param limit: 每次请求数量
|
||
:param end_time: 结束时间(毫秒级timestamp),默认当前时间
|
||
:return: pd.DataFrame
|
||
"""
|
||
if symbol is None:
|
||
symbol = "XCH-USDT"
|
||
if bar is None:
|
||
bar = "5m"
|
||
if end_time is None:
|
||
end_time = int(time.time() * 1000) # 当前时间(毫秒)
|
||
# 处理start参数
|
||
if start is None:
|
||
# 默认两个月前
|
||
two_months_ago = datetime.now() - timedelta(days=60)
|
||
start_time = int(two_months_ago.timestamp() * 1000)
|
||
else:
|
||
try:
|
||
# 判断是否就是timestamp整型数据
|
||
if isinstance(start, int):
|
||
start_time = start
|
||
# 判断是否为纯数字(UTC毫秒级timestamp)
|
||
elif start.isdigit():
|
||
start_time = int(start)
|
||
else:
|
||
# 按北京时间字符串处理,转换为毫秒级timestamp
|
||
start_time = datetime_to_timestamp(start)
|
||
except Exception as e:
|
||
logging.error(f"start参数解析失败: {e}")
|
||
return None
|
||
columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"]
|
||
all_data = []
|
||
latest_timestamp = -1
|
||
while start_time < end_time:
|
||
try:
|
||
# after,真实逻辑是获得指定时间之前的数据 !!!
|
||
response = self.get_data_from_api(symbol, end_time, bar, limit)
|
||
if response is None:
|
||
logging.warning(f"请求失败,请稍后再试")
|
||
break
|
||
if response["code"] != "0" or not response["data"]:
|
||
logging.warning(f"请求失败或无数据: {response.get('msg', 'No message')}")
|
||
break
|
||
candles = response["data"]
|
||
|
||
from_time = int(candles[-1][0])
|
||
to_time = int(candles[0][0])
|
||
if latest_timestamp == -1:
|
||
latest_timestamp = from_time
|
||
else:
|
||
if latest_timestamp > from_time:
|
||
latest_timestamp = from_time
|
||
else:
|
||
logging.warning(f"上一次数据最早时间戳 {latest_timestamp} 小于等于 from_time {from_time}, 停止获取数据")
|
||
break
|
||
from_time_str = pd.to_datetime(from_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
|
||
to_time_str = pd.to_datetime(to_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
|
||
logging.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}")
|
||
if from_time < start_time:
|
||
start_time_str = pd.to_datetime(start_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
|
||
logging.warning(f"本轮获取{symbol}, {bar} 数据最早时间 {from_time_str} 早于 此次数据获取起始时间 {start_time_str}, 停止获取数据")
|
||
# candels中仅保留start_time之后的数据
|
||
candles = [candle for candle in candles if int(candle[0]) >= start_time]
|
||
all_data.extend(candles)
|
||
break
|
||
else:
|
||
# 更新 end_time 为本次请求中最早的时间戳
|
||
end_time = from_time - 1 # 减 1 毫秒以避免重复
|
||
all_data.extend(candles)
|
||
time.sleep(0.5)
|
||
except Exception as e:
|
||
logging.error(f"请求出错: {e}")
|
||
break
|
||
|
||
if all_data:
|
||
df = pd.DataFrame(all_data, columns=columns)
|
||
df = df[df['confirm'] == '1']
|
||
for col in ['open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote']:
|
||
df[col] = pd.to_numeric(df[col], errors='coerce')
|
||
dt_series = pd.to_datetime(df['timestamp'].astype(int), unit='ms', utc=True, errors='coerce').dt.tz_convert('Asia/Shanghai')
|
||
df['date_time'] = dt_series.dt.strftime('%Y-%m-%d %H:%M:%S')
|
||
# 将timestamp转换为整型
|
||
df['timestamp'] = df['timestamp'].astype(int)
|
||
# 添加虚拟货币名称列,内容为symbol
|
||
df['symbol'] = symbol
|
||
# 添加bar列,内容为bar
|
||
df['bar'] = bar
|
||
df = df[['symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote']]
|
||
df.sort_values('timestamp', inplace=True)
|
||
df.reset_index(drop=True, inplace=True)
|
||
logging.info(f"总计获取 {len(df)} 条 K 线数据(仅confirm=1)")
|
||
# 获取df中date_time的最早时间与最新时间
|
||
earliest_time = df['date_time'].min()
|
||
latest_time = df['date_time'].max()
|
||
logging.info(f"本轮更新{symbol}, {bar} 数据最早时间: {earliest_time}, 最新时间: {latest_time}")
|
||
return df
|
||
else:
|
||
logging.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试")
|
||
return None
|
||
|
||
def get_data_from_api(self, symbol, end_time, bar, limit):
|
||
response = None
|
||
count = 0
|
||
while True:
|
||
try:
|
||
response = self.market_api.get_history_candlesticks(
|
||
instId=symbol,
|
||
after=end_time, # 获取指定时间之前的数据,
|
||
bar=bar,
|
||
limit=str(limit)
|
||
)
|
||
if response:
|
||
break
|
||
except Exception as e:
|
||
logging.error(f"请求出错: {e}")
|
||
count += 1
|
||
if count > 3:
|
||
break
|
||
time.sleep(10)
|
||
return response
|
||
|
||
def get_data_from_db(self, symbol, bar, db_url):
|
||
sql = """
|
||
SELECT * FROM crypto_market_data
|
||
WHERE symbol = :symbol AND bar = :bar
|
||
ORDER BY timestamp DESC
|
||
LIMIT 1""" |