import time from datetime import datetime, timedelta import logging from typing import Optional import pandas as pd import okx.MarketData as Market logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') class DataMonitor: def __init__(self, api_key: str, secret_key: str, passphrase: str, sandbox: bool = True): flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境 self.market_api = Market.MarketAPI( api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, flag=flag ) def get_historical_kline_data(self, symbol: str = None, start: str = None, bar: str = '1m', limit: int = 100, end_time: int = None) -> Optional[pd.DataFrame]: """ 获取历史K线数据,支持start为北京时间字符串(%Y-%m-%d %H:%M:%S)或UTC毫秒级时间戳 :param symbol: 交易对 :param start: 起始时间(北京时间字符串或UTC毫秒级时间戳) :param bar: K线周期 :param limit: 每次请求数量 :param end_time: 结束时间(毫秒级timestamp),默认当前时间 :return: pd.DataFrame """ from core.utils import datetime_to_timestamp if symbol is None: symbol = "XCH-USDT" if end_time is None: end_time = int(time.time() * 1000) # 当前时间(毫秒) # 处理start参数 if start is None: # 默认两个月前 two_months_ago = datetime.now() - timedelta(days=60) start_time = int(two_months_ago.timestamp() * 1000) else: try: # 判断是否就是timestamp整型数据 if isinstance(start, int): start_time = start # 判断是否为纯数字(UTC毫秒级timestamp) elif start.isdigit(): start_time = int(start) else: # 按北京时间字符串处理,转换为毫秒级timestamp start_time = datetime_to_timestamp(start) except Exception as e: logging.error(f"start参数解析失败: {e}") return None columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"] all_data = [] latest_timestamp = -1 while start_time < end_time: try: # after,真实逻辑是获得指定时间之前的数据 !!! response = self.market_api.get_history_candlesticks( instId=symbol, after=end_time, # 获取指定时间之前的数据, bar=bar, limit=str(limit) ) if response["code"] != "0" or not response["data"]: logging.warning(f"请求失败或无数据: {response.get('msg', 'No message')}") break candles = response["data"] from_time = int(candles[-1][0]) to_time = int(candles[0][0]) if latest_timestamp == -1: latest_timestamp = from_time else: if latest_timestamp > from_time: latest_timestamp = from_time else: logging.warning(f"上一次数据最早时间戳 {latest_timestamp} 小于等于 from_time {from_time}, 停止获取数据") break from_time_str = pd.to_datetime(from_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') to_time_str = pd.to_datetime(to_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') logging.info(f"已获取 {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}") if from_time < start_time: start_time_str = pd.to_datetime(start_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') logging.warning(f"本轮获取数据最早时间 {from_time_str} 早于 此次数据获取起始时间 {start_time_str}, 停止获取数据") # candels中仅保留start_time之后的数据 candles = [candle for candle in candles if int(candle[0]) >= start_time] all_data.extend(candles) break else: # 更新 end_time 为本次请求中最早的时间戳 end_time = from_time - 1 # 减 1 毫秒以避免重复 all_data.extend(candles) time.sleep(0.2) except Exception as e: logging.error(f"请求出错: {e}") break if all_data: df = pd.DataFrame(all_data, columns=columns) df = df[df['confirm'] == '1'] for col in ['open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote']: df[col] = pd.to_numeric(df[col], errors='coerce') dt_series = pd.to_datetime(df['timestamp'].astype(int), unit='ms', utc=True, errors='coerce').dt.tz_convert('Asia/Shanghai') df['date_time'] = dt_series.dt.strftime('%Y-%m-%d %H:%M:%S') # 将timestamp转换为整型 df['timestamp'] = df['timestamp'].astype(int) # 添加虚拟货币名称列,内容为symbol df['symbol'] = symbol # 添加bar列,内容为bar df['bar'] = bar df = df[['symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote']] df.sort_values('timestamp', inplace=True) df.reset_index(drop=True, inplace=True) logging.info(f"总计获取 {len(df)} 条 K 线数据(仅confirm=1)") return df else: logging.warning("未获取到数据") return None