diff --git a/config.py b/config.py index 94c5205..93b267b 100644 --- a/config.py +++ b/config.py @@ -53,7 +53,7 @@ MONITOR_CONFIG = { "symbols": ["XCH-USDT", "BTC-USDT", "ETH-USDT", "SOL-USDT", "DOGE-USDT", "XCH-USDT-SWAP", "BTC-USDT-SWAP", "ETH-USDT-SWAP", "SOL-USDT-SWAP", "DOGE-USDT-SWAP"], "intervals": ["5m", "15m", "1H", "4H", "1D"], - "initial_date": "2025-07-01 00:00:00" + "initial_date": "2025-05-01 00:00:00" }, "price_monitor":{ "symbols": ["XCH-USDT"], diff --git a/core/data_monitor.py b/core/data_monitor.py index bcd7786..f7e9496 100644 --- a/core/data_monitor.py +++ b/core/data_monitor.py @@ -4,7 +4,7 @@ import logging from typing import Optional import pandas as pd import okx.MarketData as Market - +from core.utils import datetime_to_timestamp logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') class DataMonitor: @@ -29,9 +29,10 @@ class DataMonitor: :param end_time: 结束时间(毫秒级timestamp),默认当前时间 :return: pd.DataFrame """ - from core.utils import datetime_to_timestamp if symbol is None: symbol = "XCH-USDT" + if bar is None: + bar = "5m" if end_time is None: end_time = int(time.time() * 1000) # 当前时间(毫秒) # 处理start参数 @@ -59,12 +60,10 @@ class DataMonitor: while start_time < end_time: try: # after,真实逻辑是获得指定时间之前的数据 !!! - response = self.market_api.get_history_candlesticks( - instId=symbol, - after=end_time, # 获取指定时间之前的数据, - bar=bar, - limit=str(limit) - ) + response = self.get_data_from_api(symbol, end_time, bar, limit) + if response is None: + logging.warning(f"请求失败,请稍后再试") + break if response["code"] != "0" or not response["data"]: logging.warning(f"请求失败或无数据: {response.get('msg', 'No message')}") break @@ -82,10 +81,10 @@ class DataMonitor: break from_time_str = pd.to_datetime(from_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') to_time_str = pd.to_datetime(to_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') - logging.info(f"已获取 {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}") + logging.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}") if from_time < start_time: start_time_str = pd.to_datetime(start_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') - logging.warning(f"本轮获取数据最早时间 {from_time_str} 早于 此次数据获取起始时间 {start_time_str}, 停止获取数据") + logging.warning(f"本轮获取{symbol}, {bar} 数据最早时间 {from_time_str} 早于 此次数据获取起始时间 {start_time_str}, 停止获取数据") # candels中仅保留start_time之后的数据 candles = [candle for candle in candles if int(candle[0]) >= start_time] all_data.extend(candles) @@ -94,7 +93,7 @@ class DataMonitor: # 更新 end_time 为本次请求中最早的时间戳 end_time = from_time - 1 # 减 1 毫秒以避免重复 all_data.extend(candles) - time.sleep(0.2) + time.sleep(0.5) except Exception as e: logging.error(f"请求出错: {e}") break @@ -116,7 +115,39 @@ class DataMonitor: df.sort_values('timestamp', inplace=True) df.reset_index(drop=True, inplace=True) logging.info(f"总计获取 {len(df)} 条 K 线数据(仅confirm=1)") + # 获取df中date_time的最早时间与最新时间 + earliest_time = df['date_time'].min() + latest_time = df['date_time'].max() + logging.info(f"本轮更新{symbol}, {bar} 数据最早时间: {earliest_time}, 最新时间: {latest_time}") return df else: - logging.warning("未获取到数据") - return None \ No newline at end of file + logging.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试") + return None + + def get_data_from_api(self, symbol, end_time, bar, limit): + response = None + count = 0 + while True: + try: + response = self.market_api.get_history_candlesticks( + instId=symbol, + after=end_time, # 获取指定时间之前的数据, + bar=bar, + limit=str(limit) + ) + if response: + break + except Exception as e: + logging.error(f"请求出错: {e}") + count += 1 + if count > 3: + break + time.sleep(10) + return response + + def get_data_from_db(self, symbol, bar, db_url): + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar + ORDER BY timestamp DESC + LIMIT 1""" \ No newline at end of file diff --git a/core/db_manager.py b/core/db_manager.py index 9b30a49..d9744e2 100644 --- a/core/db_manager.py +++ b/core/db_manager.py @@ -1,8 +1,9 @@ import pandas as pd from sqlalchemy import create_engine, exc, text import logging +from core.utils import transform_data_type -def save_market_data_to_mysql(df: pd.DataFrame, db_url: str): +def insert_market_data_to_mysql(df: pd.DataFrame, db_url: str): """ 将K线行情数据保存到MySQL的crypto_market_data表 :param df: K线数据DataFrame @@ -53,4 +54,52 @@ def save_market_data_to_mysql(df: pd.DataFrame, db_url: str): logging.error(f'插入数据出错: {e}') logging.info("数据已成功写入数据库。") except Exception as e: - logging.error(f'数据库连接或写入失败: {e}') \ No newline at end of file + logging.error(f'数据库连接或写入失败: {e}') + + +def query_latest_data(symbol: str, bar: str, db_url: str): + """ + 查询最新数据 + :param symbol: 交易对 + :param bar: K线周期 + :param db_url: 数据库连接URL + """ + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar + ORDER BY timestamp DESC + LIMIT 1 + """ + condition_dict = {"symbol": symbol, "bar": bar} + return query(sql, condition_dict, db_url, return_multi=False) + +def query(sql: str, condition_dict: dict, db_url: str, return_multi: bool = True): + """ + 查询数据 + :param sql: 查询SQL + :param db_url: 数据库连接URL + """ + try: + engine = create_engine(db_url) + with engine.connect() as conn: + result = conn.execute(text(sql), condition_dict) + if return_multi: + result = result.fetchall() + if result: + result_list = [transform_data_type(dict(row._mapping)) for row in result] + return result_list + else: + return None + else: + result = result.fetchone() + if result: + result_dict = transform_data_type(dict(result._mapping)) + return result_dict + else: + return None + except Exception as e: + logging.error(f'查询数据出错: {e}') + return None + + + diff --git a/core/utils.py b/core/utils.py index d1c9ecc..5fa9056 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone, timedelta +from decimal import Decimal def datetime_to_timestamp(date_str: str) -> int: """ @@ -17,4 +18,13 @@ def timestamp_to_datetime(timestamp: int) -> str: :return: 形如 '2023-01-01 12:00:00' 的日期时间字符串 """ dt = datetime.fromtimestamp(timestamp / 1000, timezone(timedelta(hours=8))) - return dt.strftime('%Y-%m-%d %H:%M:%S') \ No newline at end of file + return dt.strftime('%Y-%m-%d %H:%M:%S') + +def transform_data_type(data: dict): + """ + 遍历字典,将所有Decimal类型的值转换为float类型 + """ + for key, value in data.items(): + if isinstance(value, Decimal): + data[key] = float(value) + return data \ No newline at end of file diff --git a/monitor_main.py b/monitor_main.py index 8e91ad6..2dd5229 100644 --- a/monitor_main.py +++ b/monitor_main.py @@ -1,7 +1,7 @@ import logging from time import sleep from core.data_monitor import DataMonitor -from core.db_manager import save_market_data_to_mysql +from core.db_manager import insert_market_data_to_mysql, query_latest_data from config import API_KEY, SECRET_KEY, PASSPHRASE, SANDBOX, \ MONITOR_CONFIG, MYSQL_CONFIG @@ -29,16 +29,54 @@ class MonitorMain: self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" def initial_data(self): + """ + 初始化数据 + """ for symbol in self.symbols: for interval in self.intervals: - data = self.data_monitor.get_historical_kline_data(symbol=symbol, - start=self.initial_date, - bar=interval) - save_market_data_to_mysql(data, self.db_url) + latest_data = query_latest_data(symbol, interval, self.db_url) + if latest_data: + logging.info(f"已初始化{symbol}, {interval} 最新数据,请使用update_data()更新数据") + continue + self.fetch_save_data(symbol, interval, self.initial_date) + def fetch_save_data(self, symbol: str, interval: str, start: str): + """ + 获取保存数据 + """ + data = self.data_monitor.get_historical_kline_data(symbol=symbol, + start=start, + bar=interval) + if data is not None and len(data) > 0: + insert_market_data_to_mysql(data, self.db_url) + + def update_data(self): + """ + 更新数据 + 1. 获取最新数据 + 2. 获取最新数据的时间戳 + 3. 根据最新数据的时间戳,获取最新数据 + 4. 将最新数据保存到数据库 + """ + for symbol in self.symbols: + for interval in self.intervals: + latest_data = query_latest_data(symbol, interval, self.db_url) + if not latest_data: + self.fetch_save_data(symbol, interval, self.initial_date) + continue + else: + latest_timestamp = latest_data.get("timestamp") + if latest_timestamp: + latest_timestamp = int(latest_timestamp) + else: + logging.warning(f"获取{symbol}, {interval} 最新数据失败") + continue + self.fetch_save_data(symbol, interval, latest_timestamp + 1) + if __name__ == "__main__": monitor_main = MonitorMain() + # monitor_main.update_data() monitor_main.initial_data()