diff --git a/core/data_monitor.py b/core/data_monitor.py index f7e9496..1e43a68 100644 --- a/core/data_monitor.py +++ b/core/data_monitor.py @@ -111,7 +111,8 @@ class DataMonitor: df['symbol'] = symbol # 添加bar列,内容为bar df['bar'] = bar - df = df[['symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote']] + df['create_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + df = df[['symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote', 'create_time']] df.sort_values('timestamp', inplace=True) df.reset_index(drop=True, inplace=True) logging.info(f"总计获取 {len(df)} 条 K 线数据(仅confirm=1)") diff --git a/core/db_manager.py b/core/db_manager.py index 689bdb0..505ab9e 100644 --- a/core/db_manager.py +++ b/core/db_manager.py @@ -2,167 +2,241 @@ import pandas as pd from sqlalchemy import create_engine, exc, text import re, datetime import logging -from core.utils import transform_data_type, datetime_to_timestamp, check_date_time_format +from core.utils import ( + transform_data_type, + datetime_to_timestamp, + check_date_time_format, +) -def insert_market_data_to_mysql(df: pd.DataFrame, db_url: str): - """ - 将K线行情数据保存到MySQL的crypto_market_data表 - :param df: K线数据DataFrame - :param symbol: 交易对 - :param bar: K线周期 - :param db_url: 数据库连接URL - """ - if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") - return +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") - # 按表字段顺序排列 - columns = [ - 'symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', - 'volume', 'volCcy', 'volCCyQuote' - ] - df = df[columns] - # 建立数据库连接 - try: - engine = create_engine(db_url) +class DBData: + def __init__( + self, + db_url: str, + table_name: str = "crypto_market_data", + columns: list = None + ): + self.db_url = db_url + self.table_name = table_name + self.temp_table_name = f"temp_{table_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" + self.columns = columns + if self.columns is None: + raise ValueError("columns不能为空") + if len(self.columns) != len(set(self.columns)): + raise ValueError("columns不能有重复") + + def create_insert_sql_text_by_temp_table(self, temp_table_name: str): + """ + 创建插入SQL语句(使用临时表) + 示例: + INSERT INTO crypto_market_data + (symbol, bar, timestamp, date_time, open, high, low, close, volume, volCcy, volCCyQuote, create_time) + SELECT symbol, bar, timestamp, date_time, open, high, low, close, volume, volCcy, volCCyQuote, create_time + FROM {temp_table_name} + ON DUPLICATE KEY UPDATE + open=VALUES(open), high=VALUES(high), low=VALUES(low), close=VALUES(close), + volume=VALUES(volume), volCcy=VALUES(volCcy), volCCyQuote=VALUES(volCCyQuote), + date_time=VALUES(date_time), create_time=VALUES(create_time) + """ + sql = f""" + INSERT INTO {self.table_name} + ({",".join(self.columns)}) + SELECT {",".join(self.columns)} + FROM {temp_table_name} + ON DUPLICATE KEY UPDATE + {", ".join([f"{col}=VALUES({col})" for col in self.columns])} + """ + return sql + + def create_insert_sql_text(self): + """ + 创建插入SQL语句(不使用临时表) + 示例: + INSERT INTO crypto_market_data + (symbol, bar, timestamp, date_time, open, high, low, close, volume, volCcy, volCCyQuote, create_time) + VALUES (:symbol, :bar, :timestamp, :date_time, :open, :high, :low, :close, :volume, :volCcy, :volCCyQuote, :create_time) + ON DUPLICATE KEY UPDATE + open=VALUES(open), high=VALUES(high), low=VALUES(low), close=VALUES(close), + volume=VALUES(volume), volCcy=VALUES(volCcy), volCCyQuote=VALUES(volCCyQuote), + date_time=VALUES(date_time), create_time=VALUES(create_time) + """ + sql = f""" + INSERT INTO {self.table_name} + ({",".join(self.columns)}) + VALUES ({",".join([f":{col}" for col in self.columns])}) + ON DUPLICATE KEY UPDATE + {", ".join([f"{col}=VALUES({col})" for col in self.columns])} + """ + return sql + + def insert_data_to_mysql(self, df: pd.DataFrame): + """ + 将K线行情数据保存到MySQL的crypto_market_data表 + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 适用场景:中小数据量(<10万条) + :param df: K线数据DataFrame + :param symbol: 交易对 + :param bar: K线周期 + :param db_url: 数据库连接URL + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + df = df[self.columns] + # 建立数据库连接 try: - df.to_sql( - name='crypto_market_data', - con=engine, - if_exists='append', - index=False, - method='multi' - ) + engine = create_engine(self.db_url) + + # 方案1:使用临时表 + 批量更新(推荐,速度最快) + with engine.begin() as conn: + # 将数据写入临时表 + df.to_sql( + name=self.temp_table_name, + con=engine, + if_exists="replace", + index=False, + method="multi", + ) + + # 使用INSERT ... ON DUPLICATE KEY UPDATE批量处理 + sql = text(self.create_insert_sql_text_by_temp_table(self.temp_table_name)) + conn.execute(sql) + + # 删除临时表 + conn.execute(text(f"DROP TABLE IF EXISTS {self.temp_table_name}")) + logging.info("数据已成功写入数据库。") except Exception as e: - logging.error(f'插入数据出错: {e}') - with engine.begin() as conn: - for _, row in df.iterrows(): - try: - sql = text(""" - INSERT INTO crypto_market_data - (symbol, bar, timestamp, date_time, open, high, low, close, volume, volCcy, volCCyQuote) - VALUES (:symbol, :bar, :timestamp, :date_time, :open, :high, :low, :close, :volume, :volCcy, :volCCyQuote) - ON DUPLICATE KEY UPDATE - open=VALUES(open), high=VALUES(high), low=VALUES(low), close=VALUES(close), - volume=VALUES(volume), volCcy=VALUES(volCcy), volCCyQuote=VALUES(volCCyQuote), date_time=VALUES(date_time) - """) - conn.execute(sql, row.to_dict()) - except exc.IntegrityError as e: - logging.error(f'唯一索引冲突: {e}') - except Exception as e: - logging.error(f'插入数据出错: {e}') - logging.info("数据已成功写入数据库。") - except Exception as e: - logging.error(f'数据库连接或写入失败: {e}') - - -def query_latest_data(symbol: str, bar: str, db_url: str): - """ - 查询最新数据 - :param symbol: 交易对 - :param bar: K线周期 - :param db_url: 数据库连接URL - """ - sql = """ - SELECT * FROM crypto_market_data - WHERE symbol = :symbol AND bar = :bar - ORDER BY timestamp DESC - LIMIT 1 + logging.error(f"数据库连接或写入失败: {e}") + + def insert_data_to_mysql_fast(self, df: pd.DataFrame): """ - condition_dict = {"symbol": symbol, "bar": bar} - return query(sql, condition_dict, db_url, return_multi=False) + 快速插入K线行情数据(方案2:使用executemany批量插入) + 速度:⭐⭐⭐⭐ 很快 + 内存:⭐⭐⭐⭐⭐ 低 + 适用场景:中等数据量 + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + df = df[self.columns] + try: + engine = create_engine(self.db_url) + with engine.begin() as conn: + # 使用executemany批量插入 + sql = text(self.create_insert_sql_text()) + # 将DataFrame转换为字典列表 + data_dicts = [row.to_dict() for _, row in df.iterrows()] + conn.execute(sql, data_dicts) -def query_data_by_symbol_bar(symbol: str, bar: str, start: str, end: str, db_url: str): - """ - 根据交易对和K线周期查询数据 - :param symbol: 交易对 - :param bar: K线周期 - :param db_url: 数据库连接URL - """ - if start is None or end is None: - sql = """ - SELECT * FROM crypto_market_data - WHERE symbol = :symbol AND bar = :bar - ORDER BY timestamp ASC - """ - condition_dict = {"symbol": symbol, "bar": bar} - else: - if start is not None: - if isinstance(start, str): - if start.isdigit(): - start = int(start) - else: - start = check_date_time_format(start) - # 判断是否是日期时间格式 - if start is None: - logging.warning(f"日期时间格式错误: {start}") - return None - start = datetime_to_timestamp(start) - if end is not None: - if isinstance(end, str): - if end.isdigit(): - end = int(end) - else: - end = check_date_time_format(end) - if end is None: - logging.warning(f"日期时间格式错误: {end}") - return None - end = datetime_to_timestamp(end) - if start is not None and end is not None: - if start > end: - start, end = end, start - sql = """ - SELECT * FROM crypto_market_data - WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end - ORDER BY timestamp ASC - """ - condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end} - elif start is not None: - sql = """ - SELECT * FROM crypto_market_data - WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start - ORDER BY timestamp ASC - """ - condition_dict = {"symbol": symbol, "bar": bar, "start": start} - elif end is not None: - sql = """ - SELECT * FROM crypto_market_data - WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end - ORDER BY timestamp ASC - """ - condition_dict = {"symbol": symbol, "bar": bar, "end": end} - return query(sql, condition_dict, db_url, return_multi=True) + logging.info("数据已成功写入数据库。") + except Exception as e: + logging.error(f"数据库连接或写入失败: {e}") -def query(sql: str, condition_dict: dict, db_url: str, return_multi: bool = True): - """ - 查询数据 - :param sql: 查询SQL - :param db_url: 数据库连接URL - """ - try: - engine = create_engine(db_url) - with engine.connect() as conn: - result = conn.execute(text(sql), condition_dict) - if return_multi: - result = result.fetchall() - if result: - result_list = [transform_data_type(dict(row._mapping)) for row in result] - return result_list - else: - return None - else: - result = result.fetchone() - if result: - result_dict = transform_data_type(dict(result._mapping)) - return result_dict - else: - return None - except Exception as e: - logging.error(f'查询数据出错: {e}') - return None + def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000): + """ + 分块插入K线行情数据(方案3:适合大数据量) + 速度:⭐⭐⭐ 中等 + 内存:⭐⭐⭐⭐⭐ 最低 + 适用场景:大数据量(>10万条) + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + df = df[self.columns] + + try: + engine = create_engine(self.db_url) + with engine.begin() as conn: + # 分块处理 + total_rows = len(df) + for i in range(0, total_rows, chunk_size): + chunk_df = df.iloc[i : i + chunk_size] + with engine.begin() as conn: + # 创建临时表 + temp_table_name = f"{self.temp_table_name}_{i}" + + # 将数据写入临时表 + chunk_df.to_sql( + name=temp_table_name, + con=engine, + if_exists="replace", + index=False, + method="multi", + ) + + # 使用INSERT ... ON DUPLICATE KEY UPDATE批量处理 + sql = text(self.create_insert_sql_text_by_temp_table(temp_table_name)) + conn.execute(sql) + + # 删除临时表 + conn.execute(text(f"DROP TABLE IF EXISTS {temp_table_name}")) + + logging.info(f"已处理 {min(i+chunk_size, total_rows)}/{total_rows} 条记录") + logging.info("数据已成功写入数据库。") + except Exception as e: + logging.error(f"数据库连接或写入失败: {e}") + def insert_data_to_mysql_simple(self, df: pd.DataFrame): + """ + 简单插入K线行情数据(方案4:直接使用to_sql,忽略重复) + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 注意:会抛出重复键错误,需要额外处理 + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + df = df[self.columns] + try: + engine = create_engine(self.db_url) + with engine.begin() as conn: + df.to_sql( + name=self.table_name, + con=engine, + if_exists="append", + index=False, + method="multi", + ) + logging.info("数据已成功写入数据库。") + except Exception as e: + logging.error(f"数据库连接或写入失败: {e}") - + def query_data(self, sql: str, condition_dict: dict, return_multi: bool = True): + """ + 查询数据 + :param sql: 查询SQL + :param db_url: 数据库连接URL + """ + try: + engine = create_engine(self.db_url) + with engine.connect() as conn: + result = conn.execute(text(sql), condition_dict) + if return_multi: + result = result.fetchall() + if result: + result_list = [ + transform_data_type(dict(row._mapping)) for row in result + ] + return result_list + else: + return None + else: + result = result.fetchone() + if result: + result_dict = transform_data_type(dict(result._mapping)) + return result_dict + else: + return None + except Exception as e: + logging.error(f"查询数据出错: {e}") + return None + diff --git a/core/db_market_data.py b/core/db_market_data.py new file mode 100644 index 0000000..9ebeb84 --- /dev/null +++ b/core/db_market_data.py @@ -0,0 +1,163 @@ +import pandas as pd +import logging +from core.db_manager import DBData +from core.utils import check_date_time_format, datetime_to_timestamp + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") + + +class DBMarketData: + def __init__( + self, + db_url: str + ): + self.db_url = db_url + self.table_name = "crypto_market_data" + self.columns = [ + "symbol", + "bar", + "timestamp", + "date_time", + "open", + "high", + "low", + "close", + "volume", + "volCcy", + "volCCyQuote", + "create_time", + ] + self.db_manager = DBData(db_url, self.table_name, self.columns) + + def insert_data_to_mysql(self, df: pd.DataFrame): + """ + 将K线行情数据保存到MySQL的crypto_market_data表 + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 适用场景:中小数据量(<10万条) + :param df: K线数据DataFrame + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql(df) + + def insert_data_to_mysql_fast(self, df: pd.DataFrame): + """ + 快速插入K线行情数据(方案2:使用executemany批量插入) + 速度:⭐⭐⭐⭐ 很快 + 内存:⭐⭐⭐⭐⭐ 低 + 适用场景:中等数据量 + :param df: K线数据DataFrame + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_fast(df) + + def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000): + """ + 分块插入K线行情数据(方案3:适合大数据量) + 速度:⭐⭐⭐ 中等 + 内存:⭐⭐⭐⭐⭐ 最低 + 适用场景:大数据量(>10万条) + :param df: K线数据DataFrame + :param chunk_size: 分块大小 + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) + + def insert_data_to_mysql_simple(self, df: pd.DataFrame): + """ + 简单插入K线行情数据(方案4:直接使用to_sql,忽略重复) + 速度:⭐⭐⭐⭐⭐ 最快 + 内存:⭐⭐⭐⭐ 中等 + 注意:会抛出重复键错误,需要额外处理 + """ + if df is None or df.empty: + logging.warning("DataFrame为空,无需写入数据库。") + return + + self.db_manager.insert_data_to_mysql_simple(df) + + def query_latest_data(self, symbol: str, bar: str): + """ + 查询最新数据 + :param symbol: 交易对 + :param bar: K线周期 + """ + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar + ORDER BY timestamp DESC + LIMIT 1 + """ + condition_dict = {"symbol": symbol, "bar": bar} + return self.db_manager.query_data(sql, condition_dict, return_multi=False) + + def query_market_data_by_symbol_bar(self, symbol: str, bar: str, start: str, end: str): + """ + 根据交易对和K线周期查询数据 + :param symbol: 交易对 + :param bar: K线周期 + :param start: 开始时间 + :param end: 结束时间 + """ + if start is None or end is None: + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar} + else: + if start is not None: + if isinstance(start, str): + if start.isdigit(): + start = int(start) + else: + start = check_date_time_format(start) + # 判断是否是日期时间格式 + if start is None: + logging.warning(f"日期时间格式错误: {start}") + return None + start = datetime_to_timestamp(start) + if end is not None: + if isinstance(end, str): + if end.isdigit(): + end = int(end) + else: + end = check_date_time_format(end) + if end is None: + logging.warning(f"日期时间格式错误: {end}") + return None + end = datetime_to_timestamp(end) + if start is not None and end is not None: + if start > end: + start, end = end, start + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end} + elif start is not None: + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "start": start} + elif end is not None: + sql = """ + SELECT * FROM crypto_market_data + WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end + ORDER BY timestamp ASC + """ + condition_dict = {"symbol": symbol, "bar": bar, "end": end} + return self.db_manager.query_data(sql, condition_dict, return_multi=True) \ No newline at end of file diff --git a/core/db_statistics_data.py b/core/db_statistics_data.py new file mode 100644 index 0000000..e69de29 diff --git a/core/statistics.py b/core/statistics.py index 14e18ea..657480c 100644 --- a/core/statistics.py +++ b/core/statistics.py @@ -1,4 +1,4 @@ -from core.db_manager import query_data_by_symbol_bar +from core.db_manager import query_market_data_by_symbol_bar from pandas import DataFrame import logging import os @@ -30,7 +30,7 @@ class Statistics: 2. 每一个window的最新的volume是否高于该window的volume的均值+2倍标准差,如果满足条件,则增加一列:huge_volume,值为1 3. 如果check_price为True,则检查: a. 每一个window的close是否处于该window的80%分位数及以上 - b. 每一个window的close是否处于该window的20%分位数及以上 + b. 每一个window的close是否处于该window的20%分位数及以下 Args: data: 包含成交量数据的DataFrame diff --git a/monitor_main.py b/monitor_main.py index 82f16e0..55fb782 100644 --- a/monitor_main.py +++ b/monitor_main.py @@ -1,11 +1,18 @@ import logging from time import sleep from core.data_monitor import DataMonitor -from core.db_manager import insert_market_data_to_mysql, query_latest_data -from config import API_KEY, SECRET_KEY, PASSPHRASE, SANDBOX, \ - MONITOR_CONFIG, MYSQL_CONFIG +from core.db_market_data import DBMarketData +from config import ( + API_KEY, + SECRET_KEY, + PASSPHRASE, + SANDBOX, + MONITOR_CONFIG, + MYSQL_CONFIG, +) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") -logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') class MonitorMain: def __init__(self): @@ -14,10 +21,16 @@ class MonitorMain: secret_key=SECRET_KEY, passphrase=PASSPHRASE, sandbox=SANDBOX, - ) - self.symbols = MONITOR_CONFIG.get("volume_monitor", {}).get("symbols", ["XCH-USDT"]) - self.intervals = MONITOR_CONFIG.get("volume_monitor", {}).get("intervals", ["5m", "15m", "1H", "4H", "1D"]) - self.initial_date = MONITOR_CONFIG.get("volume_monitor", {}).get("initial_date", "2025-07-01 00:00:00") + ) + self.symbols = MONITOR_CONFIG.get("volume_monitor", {}).get( + "symbols", ["XCH-USDT"] + ) + self.intervals = MONITOR_CONFIG.get("volume_monitor", {}).get( + "intervals", ["5m", "15m", "1H", "4H", "1D"] + ) + self.initial_date = MONITOR_CONFIG.get("volume_monitor", {}).get( + "initial_date", "2025-07-01 00:00:00" + ) mysql_user = MYSQL_CONFIG.get("user", "xch") mysql_password = MYSQL_CONFIG.get("password", "") if not mysql_password: @@ -27,29 +40,32 @@ class MonitorMain: mysql_database = MYSQL_CONFIG.get("database", "okx") self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" - + self.db_market_data = DBMarketData(self.db_url) + def initial_data(self): """ 初始化数据 """ for symbol in self.symbols: for interval in self.intervals: - latest_data = query_latest_data(symbol, interval, self.db_url) + latest_data = self.db_market_data.query_latest_data(symbol, interval) if latest_data: - logging.info(f"已初始化{symbol}, {interval} 最新数据,请使用update_data()更新数据") + logging.info( + f"已初始化{symbol}, {interval} 最新数据,请使用update_data()更新数据" + ) continue self.fetch_save_data(symbol, interval, self.initial_date) - + def fetch_save_data(self, symbol: str, interval: str, start: str): """ 获取保存数据 """ - data = self.data_monitor.get_historical_kline_data(symbol=symbol, - start=start, - bar=interval) + data = self.data_monitor.get_historical_kline_data( + symbol=symbol, start=start, bar=interval + ) if data is not None and len(data) > 0: - insert_market_data_to_mysql(data, self.db_url) - + self.db_market_data.insert_data_to_mysql(data) + def update_data(self): """ 更新数据 @@ -60,7 +76,7 @@ class MonitorMain: """ for symbol in self.symbols: for interval in self.intervals: - latest_data = query_latest_data(symbol, interval, self.db_url) + latest_data = self.db_market_data.query_latest_data(symbol, interval) if not latest_data: self.fetch_save_data(symbol, interval, self.initial_date) continue @@ -72,11 +88,9 @@ class MonitorMain: logging.warning(f"获取{symbol}, {interval} 最新数据失败") continue self.fetch_save_data(symbol, interval, latest_timestamp + 1) - + if __name__ == "__main__": monitor_main = MonitorMain() monitor_main.update_data() # monitor_main.initial_data() - - diff --git a/sql/query/sql_playground.sql b/sql/query/sql_playground.sql new file mode 100644 index 0000000..ba6f3e8 --- /dev/null +++ b/sql/query/sql_playground.sql @@ -0,0 +1,3 @@ +select * from crypto_market_data +WHERE symbol='XCH-USDT-SWAP' and bar='5m' and date_time > '2025-07-26' +order by timestamp desc; diff --git a/sql/table/crypto_market_data.sql b/sql/table/crypto_market_data.sql index 9f492da..cc22dc3 100644 --- a/sql/table/crypto_market_data.sql +++ b/sql/table/crypto_market_data.sql @@ -1,3 +1,6 @@ +-- 临时禁用安全模式 +SET SQL_SAFE_UPDATES = 0; + CREATE TABLE IF NOT EXISTS crypto_market_data ( id BIGINT AUTO_INCREMENT PRIMARY KEY, symbol VARCHAR(50) NOT NULL, @@ -12,4 +15,10 @@ CREATE TABLE IF NOT EXISTS crypto_market_data ( volCcy DECIMAL(30,8) NOT NULL, volCCyQuote DECIMAL(30,8) NOT NULL, UNIQUE KEY uniq_symbol_bar_timestamp (symbol, bar, timestamp) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; \ No newline at end of file +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +--添加一列create_time,格式为字符串 +ALTER TABLE crypto_market_data ADD COLUMN create_time VARCHAR(50); + +--更新create_time列的值为指定时间 +UPDATE crypto_market_data SET create_time = '2025-07-28 11:00:00' WHERE id > 0; diff --git a/statistics_main.py b/statistics_main.py index cc769a3..4f24c3a 100644 --- a/statistics_main.py +++ b/statistics_main.py @@ -1,5 +1,6 @@ from core.statistics import Statistics -from core.db_manager import query_data_by_symbol_bar +from core.db_market_data import DBMarketData +from monitor_main import MonitorMain import logging from config import MONITOR_CONFIG, MYSQL_CONFIG from datetime import datetime @@ -22,6 +23,8 @@ class StatisticsMain: self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}" self.statistics = Statistics() + self.db_market_data = DBMarketData(self.db_url) + self.monitor_main = MonitorMain() def batch_detect_volume_spike(self, start: str, end: str): pass @@ -40,7 +43,7 @@ class StatisticsMain: ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - data = query_data_by_symbol_bar(symbol, bar, start, end, self.db_url) + data = self.db_market_data.query_market_data_by_symbol_bar(symbol, bar, start, end) if data is None: logging.warning(f"获取数据失败: {symbol} {bar} {start} {end}") return None @@ -59,6 +62,9 @@ class StatisticsMain: only_output_huge_volume=only_output_huge_volume, output_excel=True, ) + + def update_volume_spike(self): + self.monitor_main.update_data() if __name__ == "__main__": diff --git a/worklog.md b/worklog.md new file mode 100644 index 0000000..bff43e5 --- /dev/null +++ b/worklog.md @@ -0,0 +1,17 @@ +# 2025-07-26 +Get huge volume based on historical data +Window: 50 +Threshold: Mean + 2*std + +# 2025-07-28 +1. Save huge volume to mysql +2. Support upgrade huge volume based on the latest market data +a. upgrade the latest market data +b. Get the one window period data (e.g. 50) according the earliest updated market data +c. Calculate huge volume according to one window period data + updated data +3. Retrospective trading analysis: +huge volume: +a. high price: The ratio of declines in the next three trading periods +b. low price: The ratio of increases in the next three trading periods + +