import pandas as pd from core.db.db_market_data import DBMarketData from core.biz.metrics_calculation import MetricsCalculation from config import MYSQL_CONFIG, US_STOCK_MONITOR_CONFIG, OKX_MONITOR_CONFIG import core.logger as logging logger = logging.logger class UpdateDataMain: def __init__(self): mysql_user = MYSQL_CONFIG.get("user", "xch") mysql_password = MYSQL_CONFIG.get("password", "") if not mysql_password: raise ValueError("MySQL password is not set") mysql_host = MYSQL_CONFIG.get("host", "localhost") mysql_port = MYSQL_CONFIG.get("port", 3306) mysql_database = MYSQL_CONFIG.get("database", "okx") self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" self.db_market_data = DBMarketData(self.db_url) self.metrics_calculation = MetricsCalculation() def batch_update_data(self, is_us_stock: bool = False): """ 批量更新数据 """ if is_us_stock: symbols = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get("symbols", []) bars = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get("bars", ["5m", "15m", "1H", "1D"]) else: symbols = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get("symbols", []) bars = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get("bars", ["5m", "15m", "1H", "1D"]) for symbol in symbols: for bar in bars: self.update_data(symbol, bar) def update_data(self, symbol: str, bar: str): """ 更新数据 """ logger.info(f"开始更新数据: {symbol} {bar}") data = self.db_market_data.query_market_data_by_symbol_bar(symbol, bar) logger.info(f"查询数据完成: {symbol} {bar},共有{len(data)}条数据") data = pd.DataFrame(data) data.sort_values(by="timestamp", inplace=True) data = self.update_date_time_us(data) logger.info("更新SAR指标") data = self.metrics_calculation.sar(data) logger.info("更新SAR指标完成") logger.info(f"开始保存数据: {symbol} {bar}") self.db_market_data.insert_data_to_mysql(data) logger.info(f"保存数据完成: {symbol} {bar}") def update_date_time_us(self, data: pd.DataFrame): """ 更新日期时间 """ logger.info(f"开始更新美东日期时间: {data.shape[0]}条数据") dt_us_series = pd.to_datetime(data['timestamp'].astype(int), unit='ms', utc=True, errors='coerce').dt.tz_convert('America/New_York') data['date_time_us'] = dt_us_series.dt.strftime('%Y-%m-%d %H:%M:%S') logger.info(f"更新美东日期时间完成: {data.shape[0]}条数据") return data if __name__ == "__main__": update_data_main = UpdateDataMain() update_data_main.batch_update_data(is_us_stock=True) update_data_main.batch_update_data(is_us_stock=False)