import pandas as pd from sqlalchemy import create_engine, exc, text import logging from core.utils import transform_data_type def insert_market_data_to_mysql(df: pd.DataFrame, db_url: str): """ 将K线行情数据保存到MySQL的crypto_market_data表 :param df: K线数据DataFrame :param symbol: 交易对 :param bar: K线周期 :param db_url: 数据库连接URL """ if df is None or df.empty: logging.warning("DataFrame为空,无需写入数据库。") return # 按表字段顺序排列 columns = [ 'symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote' ] df = df[columns] # 建立数据库连接 try: engine = create_engine(db_url) try: df.to_sql( name='crypto_market_data', con=engine, if_exists='append', index=False, method='multi' ) logging.info("数据已成功写入数据库。") except Exception as e: logging.error(f'插入数据出错: {e}') with engine.begin() as conn: for _, row in df.iterrows(): try: sql = text(""" INSERT INTO crypto_market_data (symbol, bar, timestamp, date_time, open, high, low, close, volume, volCcy, volCCyQuote) VALUES (:symbol, :bar, :timestamp, :date_time, :open, :high, :low, :close, :volume, :volCcy, :volCCyQuote) ON DUPLICATE KEY UPDATE open=VALUES(open), high=VALUES(high), low=VALUES(low), close=VALUES(close), volume=VALUES(volume), volCcy=VALUES(volCcy), volCCyQuote=VALUES(volCCyQuote), date_time=VALUES(date_time) """) conn.execute(sql, row.to_dict()) except exc.IntegrityError as e: logging.error(f'唯一索引冲突: {e}') except Exception as e: logging.error(f'插入数据出错: {e}') logging.info("数据已成功写入数据库。") except Exception as e: logging.error(f'数据库连接或写入失败: {e}') def query_latest_data(symbol: str, bar: str, db_url: str): """ 查询最新数据 :param symbol: 交易对 :param bar: K线周期 :param db_url: 数据库连接URL """ sql = """ SELECT * FROM crypto_market_data WHERE symbol = :symbol AND bar = :bar ORDER BY timestamp DESC LIMIT 1 """ condition_dict = {"symbol": symbol, "bar": bar} return query(sql, condition_dict, db_url, return_multi=False) def query(sql: str, condition_dict: dict, db_url: str, return_multi: bool = True): """ 查询数据 :param sql: 查询SQL :param db_url: 数据库连接URL """ try: engine = create_engine(db_url) with engine.connect() as conn: result = conn.execute(text(sql), condition_dict) if return_multi: result = result.fetchall() if result: result_list = [transform_data_type(dict(row._mapping)) for row in result] return result_list else: return None else: result = result.fetchone() if result: result_dict = transform_data_type(dict(result._mapping)) return result_dict else: return None except Exception as e: logging.error(f'查询数据出错: {e}') return None