crypto_quant/core/db_manager.py

169 lines
6.3 KiB
Python
Raw Normal View History

import pandas as pd
from sqlalchemy import create_engine, exc, text
import re, datetime
import logging
from core.utils import transform_data_type, datetime_to_timestamp, check_date_time_format
2025-07-25 08:12:52 +00:00
def insert_market_data_to_mysql(df: pd.DataFrame, db_url: str):
"""
将K线行情数据保存到MySQL的crypto_market_data表
:param df: K线数据DataFrame
:param symbol: 交易对
:param bar: K线周期
:param db_url: 数据库连接URL
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
return
# 按表字段顺序排列
columns = [
'symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close',
'volume', 'volCcy', 'volCCyQuote'
]
df = df[columns]
# 建立数据库连接
try:
engine = create_engine(db_url)
try:
df.to_sql(
name='crypto_market_data',
con=engine,
if_exists='append',
index=False,
method='multi'
)
logging.info("数据已成功写入数据库。")
except Exception as e:
logging.error(f'插入数据出错: {e}')
with engine.begin() as conn:
for _, row in df.iterrows():
try:
sql = text("""
INSERT INTO crypto_market_data
(symbol, bar, timestamp, date_time, open, high, low, close, volume, volCcy, volCCyQuote)
VALUES (:symbol, :bar, :timestamp, :date_time, :open, :high, :low, :close, :volume, :volCcy, :volCCyQuote)
ON DUPLICATE KEY UPDATE
open=VALUES(open), high=VALUES(high), low=VALUES(low), close=VALUES(close),
volume=VALUES(volume), volCcy=VALUES(volCcy), volCCyQuote=VALUES(volCCyQuote), date_time=VALUES(date_time)
""")
conn.execute(sql, row.to_dict())
except exc.IntegrityError as e:
logging.error(f'唯一索引冲突: {e}')
except Exception as e:
logging.error(f'插入数据出错: {e}')
logging.info("数据已成功写入数据库。")
except Exception as e:
2025-07-25 08:12:52 +00:00
logging.error(f'数据库连接或写入失败: {e}')
def query_latest_data(symbol: str, bar: str, db_url: str):
"""
查询最新数据
:param symbol: 交易对
:param bar: K线周期
:param db_url: 数据库连接URL
"""
sql = """
SELECT * FROM crypto_market_data
WHERE symbol = :symbol AND bar = :bar
ORDER BY timestamp DESC
LIMIT 1
"""
condition_dict = {"symbol": symbol, "bar": bar}
return query(sql, condition_dict, db_url, return_multi=False)
def query_data_by_symbol_bar(symbol: str, bar: str, start: str, end: str, db_url: str):
"""
根据交易对和K线周期查询数据
:param symbol: 交易对
:param bar: K线周期
:param db_url: 数据库连接URL
"""
if start is None or end is None:
sql = """
SELECT * FROM crypto_market_data
WHERE symbol = :symbol AND bar = :bar
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar}
else:
if start is not None:
if isinstance(start, str):
if start.isdigit():
start = int(start)
else:
start = check_date_time_format(start)
# 判断是否是日期时间格式
if start is None:
logging.warning(f"日期时间格式错误: {start}")
return None
start = datetime_to_timestamp(start)
if end is not None:
if isinstance(end, str):
if end.isdigit():
end = int(end)
else:
end = check_date_time_format(end)
if end is None:
logging.warning(f"日期时间格式错误: {end}")
return None
end = datetime_to_timestamp(end)
if start is not None and end is not None:
if start > end:
start, end = end, start
sql = """
SELECT * FROM crypto_market_data
WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end}
elif start is not None:
sql = """
SELECT * FROM crypto_market_data
WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start}
elif end is not None:
sql = """
SELECT * FROM crypto_market_data
WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "end": end}
return query(sql, condition_dict, db_url, return_multi=True)
2025-07-25 08:12:52 +00:00
def query(sql: str, condition_dict: dict, db_url: str, return_multi: bool = True):
"""
查询数据
:param sql: 查询SQL
:param db_url: 数据库连接URL
"""
try:
engine = create_engine(db_url)
with engine.connect() as conn:
result = conn.execute(text(sql), condition_dict)
if return_multi:
result = result.fetchall()
if result:
result_list = [transform_data_type(dict(row._mapping)) for row in result]
return result_list
else:
return None
else:
result = result.fetchone()
if result:
result_dict = transform_data_type(dict(result._mapping))
return result_dict
else:
return None
except Exception as e:
logging.error(f'查询数据出错: {e}')
return None