diff --git a/config.py b/config.py index 3774036..94c5205 100644 --- a/config.py +++ b/config.py @@ -50,7 +50,8 @@ TIME_CONFIG = { MONITOR_CONFIG = { "volume_monitor":{ - "symbols": ["XCH-USDT"], + "symbols": ["XCH-USDT", "BTC-USDT", "ETH-USDT", "SOL-USDT", "DOGE-USDT", + "XCH-USDT-SWAP", "BTC-USDT-SWAP", "ETH-USDT-SWAP", "SOL-USDT-SWAP", "DOGE-USDT-SWAP"], "intervals": ["5m", "15m", "1H", "4H", "1D"], "initial_date": "2025-07-01 00:00:00" }, @@ -62,4 +63,12 @@ MONITOR_CONFIG = { {"interval": "1H", "threshold": 0.1} ] } +} + +MYSQL_CONFIG = { + "host": "localhost", + "port": 3306, + "user": "xch", + "password": "xch_okx_2025", + "database": "okx" } \ No newline at end of file diff --git a/core/data_monitor.py b/core/data_monitor.py index fba9eea..bcd7786 100644 --- a/core/data_monitor.py +++ b/core/data_monitor.py @@ -41,8 +41,11 @@ class DataMonitor: start_time = int(two_months_ago.timestamp() * 1000) else: try: + # 判断是否就是timestamp整型数据 + if isinstance(start, int): + start_time = start # 判断是否为纯数字(UTC毫秒级timestamp) - if start.isdigit(): + elif start.isdigit(): start_time = int(start) else: # 按北京时间字符串处理,转换为毫秒级timestamp @@ -52,6 +55,7 @@ class DataMonitor: return None columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"] all_data = [] + latest_timestamp = -1 while start_time < end_time: try: # after,真实逻辑是获得指定时间之前的数据 !!! @@ -65,23 +69,50 @@ class DataMonitor: logging.warning(f"请求失败或无数据: {response.get('msg', 'No message')}") break candles = response["data"] - all_data.extend(candles) - # 更新 end_time 为本次请求中最早的时间戳 - end_time = int(candles[-1][0]) - logging.info(f"已获取 {len(candles)} 条数据,最早时间: {pd.to_datetime(end_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')}") - end_time -= 1 # 减 1 毫秒以避免重复 - time.sleep(0.2) + + from_time = int(candles[-1][0]) + to_time = int(candles[0][0]) + if latest_timestamp == -1: + latest_timestamp = from_time + else: + if latest_timestamp > from_time: + latest_timestamp = from_time + else: + logging.warning(f"上一次数据最早时间戳 {latest_timestamp} 小于等于 from_time {from_time}, 停止获取数据") + break + from_time_str = pd.to_datetime(from_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') + to_time_str = pd.to_datetime(to_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') + logging.info(f"已获取 {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}") + if from_time < start_time: + start_time_str = pd.to_datetime(start_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') + logging.warning(f"本轮获取数据最早时间 {from_time_str} 早于 此次数据获取起始时间 {start_time_str}, 停止获取数据") + # candels中仅保留start_time之后的数据 + candles = [candle for candle in candles if int(candle[0]) >= start_time] + all_data.extend(candles) + break + else: + # 更新 end_time 为本次请求中最早的时间戳 + end_time = from_time - 1 # 减 1 毫秒以避免重复 + all_data.extend(candles) + time.sleep(0.2) except Exception as e: logging.error(f"请求出错: {e}") break + if all_data: df = pd.DataFrame(all_data, columns=columns) df = df[df['confirm'] == '1'] - for col in ['open', 'high', 'low', 'close', 'volume']: + for col in ['open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote']: df[col] = pd.to_numeric(df[col], errors='coerce') dt_series = pd.to_datetime(df['timestamp'].astype(int), unit='ms', utc=True, errors='coerce').dt.tz_convert('Asia/Shanghai') df['date_time'] = dt_series.dt.strftime('%Y-%m-%d %H:%M:%S') - df = df[['timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume']] + # 将timestamp转换为整型 + df['timestamp'] = df['timestamp'].astype(int) + # 添加虚拟货币名称列,内容为symbol + df['symbol'] = symbol + # 添加bar列,内容为bar + df['bar'] = bar + df = df[['symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote']] df.sort_values('timestamp', inplace=True) df.reset_index(drop=True, inplace=True) logging.info(f"总计获取 {len(df)} 条 K 线数据(仅confirm=1)") diff --git a/core/db_manager.py b/core/db_manager.py new file mode 100644 index 0000000..9b30a49 --- /dev/null +++ b/core/db_manager.py @@ -0,0 +1,56 @@ +import pandas as pd +from sqlalchemy import create_engine, exc, text +import logging + +def save_market_data_to_mysql(df: pd.DataFrame, db_url: str): + """ + 将K线行情数据保存到MySQL的crypto_market_data表 + :param df: K线数据DataFrame + :param symbol: 交易对 + :param bar: K线周期 + :param db_url: 数据库连接URL + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + # 按表字段顺序排列 + columns = [ + 'symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', + 'volume', 'volCcy', 'volCCyQuote' + ] + df = df[columns] + + # 建立数据库连接 + try: + engine = create_engine(db_url) + try: + df.to_sql( + name='crypto_market_data', + con=engine, + if_exists='append', + index=False, + method='multi' + ) + logging.info("数据已成功写入数据库。") + except Exception as e: + logging.error(f'插入数据出错: {e}') + with engine.begin() as conn: + for _, row in df.iterrows(): + try: + sql = text(""" + INSERT INTO crypto_market_data + (symbol, bar, timestamp, date_time, open, high, low, close, volume, volCcy, volCCyQuote) + VALUES (:symbol, :bar, :timestamp, :date_time, :open, :high, :low, :close, :volume, :volCcy, :volCCyQuote) + ON DUPLICATE KEY UPDATE + open=VALUES(open), high=VALUES(high), low=VALUES(low), close=VALUES(close), + volume=VALUES(volume), volCcy=VALUES(volCcy), volCCyQuote=VALUES(volCCyQuote), date_time=VALUES(date_time) + """) + conn.execute(sql, row.to_dict()) + except exc.IntegrityError as e: + logging.error(f'唯一索引冲突: {e}') + except Exception as e: + logging.error(f'插入数据出错: {e}') + logging.info("数据已成功写入数据库。") + except Exception as e: + logging.error(f'数据库连接或写入失败: {e}') \ No newline at end of file diff --git a/monitor_main.py b/monitor_main.py index b565288..8e91ad6 100644 --- a/monitor_main.py +++ b/monitor_main.py @@ -1,11 +1,12 @@ import logging from time import sleep from core.data_monitor import DataMonitor -from config import API_KEY, SECRET_KEY, PASSPHRASE, SANDBOX, MONITOR_CONFIG +from core.db_manager import save_market_data_to_mysql +from config import API_KEY, SECRET_KEY, PASSPHRASE, SANDBOX, \ + MONITOR_CONFIG, MYSQL_CONFIG logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') - class MonitorMain: def __init__(self): self.data_monitor = DataMonitor( @@ -17,6 +18,15 @@ class MonitorMain: self.symbols = MONITOR_CONFIG.get("volume_monitor", {}).get("symbols", ["XCH-USDT"]) self.intervals = MONITOR_CONFIG.get("volume_monitor", {}).get("intervals", ["5m", "15m", "1H", "4H", "1D"]) self.initial_date = MONITOR_CONFIG.get("volume_monitor", {}).get("initial_date", "2025-07-01 00:00:00") + mysql_user = MYSQL_CONFIG.get("user", "xch") + mysql_password = MYSQL_CONFIG.get("password", "") + if not mysql_password: + raise ValueError("MySQL password is not set") + mysql_host = MYSQL_CONFIG.get("host", "localhost") + mysql_port = MYSQL_CONFIG.get("port", 3306) + mysql_database = MYSQL_CONFIG.get("database", "okx") + + self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" def initial_data(self): for symbol in self.symbols: @@ -24,6 +34,7 @@ class MonitorMain: data = self.data_monitor.get_historical_kline_data(symbol=symbol, start=self.initial_date, bar=interval) + save_market_data_to_mysql(data, self.db_url) if __name__ == "__main__": diff --git a/sql/table/crypto_market_data.sql b/sql/table/crypto_market_data.sql new file mode 100644 index 0000000..9f492da --- /dev/null +++ b/sql/table/crypto_market_data.sql @@ -0,0 +1,15 @@ +CREATE TABLE IF NOT EXISTS crypto_market_data ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + symbol VARCHAR(50) NOT NULL, + bar VARCHAR(20) NOT NULL, + timestamp BIGINT NOT NULL, + date_time VARCHAR(50) NOT NULL, + open DECIMAL(20,5) NOT NULL, + high DECIMAL(20,5) NOT NULL, + low DECIMAL(20,5) NOT NULL, + close DECIMAL(20,5) NOT NULL, + volume DECIMAL(30,8) NOT NULL, + volCcy DECIMAL(30,8) NOT NULL, + volCCyQuote DECIMAL(30,8) NOT NULL, + UNIQUE KEY uniq_symbol_bar_timestamp (symbol, bar, timestamp) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; \ No newline at end of file