crypto_quant/trade_data_main.py

93 lines
4.1 KiB
Python
Raw Normal View History

import core.logger as logging
import time
from datetime import datetime, timedelta
import pandas as pd
from core.utils import datetime_to_timestamp, timestamp_to_datetime, transform_date_time_to_timestamp
from core.biz.trade_data import TradeData
from config import (
API_KEY,
SECRET_KEY,
PASSPHRASE,
SANDBOX,
2025-08-31 03:20:59 +00:00
OKX_MONITOR_CONFIG,
MYSQL_CONFIG,
)
logger = logging.logger
class TradeDataMain:
def __init__(self):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.trade_data = TradeData(
api_key=API_KEY,
secret_key=SECRET_KEY,
passphrase=PASSPHRASE,
sandbox=SANDBOX,
db_url=self.db_url,
)
def get_trade_data(self, symbol: str = None, start_time: int = None, end_time: int = None, limit: int = 100):
"""
获取交易数据
"""
if symbol is None:
symbol = "XCH-USDT"
if end_time is None:
end_time = int(time.time() * 1000)
else:
end_time = transform_date_time_to_timestamp(end_time)
# 处理start参数
if start_time is None:
# 默认两个月前
2025-08-31 03:20:59 +00:00
start_time_str = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get("initial_date", "2025-05-01 00:00:00")
start_time = transform_date_time_to_timestamp(start_time_str)
else:
start_time = transform_date_time_to_timestamp(start_time)
# 从数据库获取最早数据
earliest_data = self.trade_data.db_trade_data.query_earliest_data(symbol)
db_earliest_time = None
if earliest_data is not None:
db_earliest_time = earliest_data["ts"]
db_latest_time = None
latest_data = self.trade_data.db_trade_data.query_latest_data(symbol)
if latest_data is not None:
db_latest_time = latest_data["ts"]
start_date_time = timestamp_to_datetime(start_time)
end_date_time = timestamp_to_datetime(end_time)
# 如果db_earliest_time和db_latest_time存在则需要调整start_time和end_time
if db_earliest_time is None or db_latest_time is None:
logger.info(f"数据库无数据从API获取交易数据: {symbol}, {start_date_time}, {end_date_time}, {limit}")
self.trade_data.get_history_trades(symbol, start_time, end_time, limit)
else:
if db_earliest_time > start_time:
db_earliest_date_time = timestamp_to_datetime(db_earliest_time)
logger.info(f"从API补充最早数据{symbol}, {start_date_time} {db_earliest_date_time}")
self.trade_data.get_history_trades(symbol, start_time, db_earliest_time + 1, limit)
if db_latest_time < end_time:
db_latest_date_time = timestamp_to_datetime(db_latest_time)
logger.info(f"从API补充最新数据{symbol}, {db_latest_date_time}, {end_date_time}")
self.trade_data.get_history_trades(symbol, db_latest_time + 1, end_time, limit)
final_data = self.trade_data.db_trade_data.query_trade_data_by_symbol(symbol=symbol, start=start_time, end=end_time)
if final_data is not None and len(final_data) > 0:
logger.info(f"获取交易数据: {symbol}, {start_date_time}, {end_date_time}")
final_data = pd.DataFrame(final_data)
final_data.sort_values(by="ts", inplace=True)
final_data.reset_index(drop=True, inplace=True)
return final_data
else:
logger.info(f"获取交易数据失败: {symbol}, {start_date_time}, {end_date_time}")
return None
if __name__ == "__main__":
trade_data_main = TradeDataMain()
trade_data_main.get_trade_data()