crypto_quant/trade_data_main.py

93 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import core.logger as logging
import time
from datetime import datetime, timedelta
import pandas as pd
from core.utils import datetime_to_timestamp, timestamp_to_datetime, transform_date_time_to_timestamp
from core.biz.trade_data import TradeData
from config import (
API_KEY,
SECRET_KEY,
PASSPHRASE,
SANDBOX,
OKX_MONITOR_CONFIG,
COIN_MYSQL_CONFIG,
)
logger = logging.logger
class TradeDataMain:
def __init__(self):
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.trade_data = TradeData(
api_key=API_KEY,
secret_key=SECRET_KEY,
passphrase=PASSPHRASE,
sandbox=SANDBOX,
db_url=self.db_url,
)
def get_trade_data(self, symbol: str = None, start_time: int = None, end_time: int = None, limit: int = 100):
"""
获取交易数据
"""
if symbol is None:
symbol = "XCH-USDT"
if end_time is None:
end_time = int(time.time() * 1000)
else:
end_time = transform_date_time_to_timestamp(end_time)
# 处理start参数
if start_time is None:
# 默认两个月前
start_time_str = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get("initial_date", "2025-05-01 00:00:00")
start_time = transform_date_time_to_timestamp(start_time_str)
else:
start_time = transform_date_time_to_timestamp(start_time)
# 从数据库获取最早数据
earliest_data = self.trade_data.db_trade_data.query_earliest_data(symbol)
db_earliest_time = None
if earliest_data is not None:
db_earliest_time = earliest_data["ts"]
db_latest_time = None
latest_data = self.trade_data.db_trade_data.query_latest_data(symbol)
if latest_data is not None:
db_latest_time = latest_data["ts"]
start_date_time = timestamp_to_datetime(start_time)
end_date_time = timestamp_to_datetime(end_time)
# 如果db_earliest_time和db_latest_time存在则需要调整start_time和end_time
if db_earliest_time is None or db_latest_time is None:
logger.info(f"数据库无数据从API获取交易数据: {symbol}, {start_date_time}, {end_date_time}, {limit}")
self.trade_data.get_history_trades(symbol, start_time, end_time, limit)
else:
if db_earliest_time > start_time:
db_earliest_date_time = timestamp_to_datetime(db_earliest_time)
logger.info(f"从API补充最早数据{symbol}, {start_date_time} {db_earliest_date_time}")
self.trade_data.get_history_trades(symbol, start_time, db_earliest_time + 1, limit)
if db_latest_time < end_time:
db_latest_date_time = timestamp_to_datetime(db_latest_time)
logger.info(f"从API补充最新数据{symbol}, {db_latest_date_time}, {end_date_time}")
self.trade_data.get_history_trades(symbol, db_latest_time + 1, end_time, limit)
final_data = self.trade_data.db_trade_data.query_trade_data_by_symbol(symbol=symbol, start=start_time, end=end_time)
if final_data is not None and len(final_data) > 0:
logger.info(f"获取交易数据: {symbol}, {start_date_time}, {end_date_time}")
final_data = pd.DataFrame(final_data)
final_data.sort_values(by="ts", inplace=True)
final_data.reset_index(drop=True, inplace=True)
return final_data
else:
logger.info(f"获取交易数据失败: {symbol}, {start_date_time}, {end_date_time}")
return None
if __name__ == "__main__":
trade_data_main = TradeDataMain()
trade_data_main.get_trade_data()