import logging from datetime import datetime from time import sleep from core.market_data_monitor import MarketDataMonitor from core.db_market_data import DBMarketData from core.utils import datetime_to_timestamp, timestamp_to_datetime from trade_data_main import TradeDataMain from config import ( API_KEY, SECRET_KEY, PASSPHRASE, SANDBOX, MONITOR_CONFIG, MYSQL_CONFIG, BAR_THRESHOLD, ) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") class MarketDataMain: def __init__(self): self.market_data_monitor = MarketDataMonitor( api_key=API_KEY, secret_key=SECRET_KEY, passphrase=PASSPHRASE, sandbox=SANDBOX, ) self.symbols = MONITOR_CONFIG.get("volume_monitor", {}).get( "symbols", ["XCH-USDT"] ) self.bars = MONITOR_CONFIG.get("volume_monitor", {}).get( "bars", ["5m", "15m", "1H", "1D"] ) self.initial_date = MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-07-01 00:00:00" ) mysql_user = MYSQL_CONFIG.get("user", "xch") mysql_password = MYSQL_CONFIG.get("password", "") if not mysql_password: raise ValueError("MySQL password is not set") mysql_host = MYSQL_CONFIG.get("host", "localhost") mysql_port = MYSQL_CONFIG.get("port", 3306) mysql_database = MYSQL_CONFIG.get("database", "okx") self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" self.db_market_data = DBMarketData(self.db_url) self.trade_data_main = TradeDataMain() def initial_data(self): """ 初始化数据 """ for symbol in self.symbols: for bar in self.bars: logging.info(f"开始初始化行情数据: {symbol} {bar}") latest_data = self.db_market_data.query_latest_data(symbol, bar) if latest_data: start = latest_data.get("timestamp") start_date_time = timestamp_to_datetime(start) start = start + 1 else: start = datetime_to_timestamp(self.initial_date) start_date_time = self.initial_date logging.info( f"开始初始化{symbol}, {bar} 行情数据,从 {start_date_time} 开始" ) self.fetch_save_data(symbol, bar, start) def fetch_save_data(self, symbol: str, bar: str, start: str): """ 获取保存数据 """ end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") end_time_ts = datetime_to_timestamp(end_time) if isinstance(start, str): if start.isdigit(): start_time_ts = int(start) else: start_time_ts = datetime_to_timestamp(start) elif isinstance(start, int): start_time_ts = start else: raise ValueError(f"开始时间格式错误: {start}") # 如果bar为5m, 15m: # end_time_ts与start_time_ts相差超过1天,则按照1天为单位 # 如果bar为1H, 4H, # end_time_ts与start_time_ts相差超过5天,则按照5天为单位 # 如果bar为1D, 则end_time_ts与start_time_ts相差超过10天,则按照10天为单位 # 获取数据,直到end_time_ts threshold = None if bar in ["5m", "15m"]: threshold = 86400000 - 1 elif bar in ["1H", "4H"]: threshold = 432000000 - 1 elif bar == "1D": threshold = 864000000 - 1 while start_time_ts < end_time_ts: current_end_time_ts = start_time_ts + threshold if current_end_time_ts >= end_time_ts: current_end_time_ts = end_time_ts start_date_time = timestamp_to_datetime(start_time_ts) end_date_time = timestamp_to_datetime(current_end_time_ts) logging.info( f"获取行情数据: {symbol} {bar} 从 {start_date_time} 到 {end_date_time}" ) # 首先判断是否存在 > current_end_time_ts + 1 的数据 # 如果存在,则跳过此次循环 data = self.db_market_data.query_market_data_by_symbol_bar( symbol=symbol, bar=bar, start=start_time_ts, end=current_end_time_ts, ) if data is not None and len(data) > 0: logging.info(f"已存在{symbol}, {bar} 从 {start_date_time} 到 {end_date_time} 的数据,跳过此次循环") start_time_ts = current_end_time_ts continue # current_end_time_ts + 1, 目的是为了避免缺少最后一条数据 data = self.market_data_monitor.get_historical_kline_data( symbol=symbol, start=start_time_ts, bar=bar, end_time=current_end_time_ts + 1, ) if data is not None and len(data) > 0: data["buy_sz"] = -1 data["sell_sz"] = -1 # 根据交易数据,设置buy_sz和sell_sz # 比特币的数据获取过慢,暂时不获取交易数据 # if not symbol.endswith("-SWAP"): # # trade_data的end_time需要比market_data的end_time大一个周期 # trade_end_time_ts = current_end_time_ts + BAR_THRESHOLD[bar] + 1 # trade_data = self.trade_data_main.get_trade_data( # symbol=symbol, start_time=start_time_ts, end_time=trade_end_time_ts # ) # for index, row in data.iterrows(): # try: # current_from_time = int(row["timestamp"]) # if index == len(data) - 1: # current_to_time = current_from_time + BAR_THRESHOLD[bar] # else: # current_to_time = int(data.iloc[index + 1]["timestamp"]) # current_trade_data = trade_data[ # (trade_data["ts"] >= current_from_time) # & (trade_data["ts"] <= current_to_time) # ] # if current_trade_data is not None and len(current_trade_data) > 0: # current_buy_sz = current_trade_data[ # current_trade_data["side"] == "buy" # ]["sz"].sum() # current_sell_sz = current_trade_data[ # current_trade_data["side"] == "sell" # ]["sz"].sum() # data.loc[index, "buy_sz"] = current_buy_sz # data.loc[index, "sell_sz"] = current_sell_sz # except Exception as e: # logging.error(f"设置buy_sz和sell_sz失败: {e}") # continue if data is not None and len(data) > 0: data = data[ [ "symbol", "bar", "timestamp", "date_time", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "buy_sz", "sell_sz", "create_time", ] ] self.db_market_data.insert_data_to_mysql(data) start_time_ts = current_end_time_ts return data def batch_update_data(self): """ 更新数据 1. 获取最新数据 2. 获取最新数据的时间戳 3. 根据最新数据的时间戳,获取最新数据 4. 将最新数据保存到数据库 """ for symbol in self.symbols: for bar in self.bars: self.update_data(symbol, bar) def update_data(self, symbol: str, bar: str): """ 更新数据 """ logging.info(f"开始更新行情数据: {symbol} {bar}") latest_data = self.db_market_data.query_latest_data(symbol, bar) if not latest_data: data = self.fetch_save_data(symbol, bar, self.initial_date) else: latest_timestamp = latest_data.get("timestamp") if latest_timestamp: latest_timestamp = int(latest_timestamp) else: logging.warning(f"获取{symbol}, {bar} 最新数据失败") return data = self.fetch_save_data(symbol, bar, latest_timestamp + 1) return data if __name__ == "__main__": market_data_main = MarketDataMain() # market_data_main.batch_update_data() market_data_main.initial_data()