From 019b894c9a6c967987a5b257033d96297a52d62e Mon Sep 17 00:00:00 2001 From: blade <8019068@qq.com> Date: Sat, 6 Sep 2025 14:10:07 +0800 Subject: [PATCH] 1. support import binance data. 2. optimize update huge volume data algorithm --- config.py | 20 ++--- huge_volume_main.py | 198 +++++++++++++++++++++++++++++--------------- market_data_main.py | 46 +++++++++- 3 files changed, 186 insertions(+), 78 deletions(-) diff --git a/config.py b/config.py index bfa2942..5e253ac 100644 --- a/config.py +++ b/config.py @@ -84,16 +84,16 @@ BINANCE_MONITOR_CONFIG = { US_STOCK_MONITOR_CONFIG = { "volume_monitor": { "symbols": [ - "QQQ", - "TQQQ", - "MSFT", - "AAPL", - "GOOG", - "NVDA", - "META", - "AMZN", - "AVGO", - "TSLA", + # "QQQ", + # "TQQQ", + # "MSFT", + # "AAPL", + # "GOOG", + # "NVDA", + # "META", + # "AMZN", + # "AVGO", + # "TSLA", "PLTR", "COIN", "MSTR", diff --git a/huge_volume_main.py b/huge_volume_main.py index 8ad15b7..790fb0d 100644 --- a/huge_volume_main.py +++ b/huge_volume_main.py @@ -139,8 +139,8 @@ class HugeVolumeMain: only_output_huge_volume=only_output_huge_volume, output_excel=False, ) - if data is not None: - if is_update: + if data is not None and len(data) > 0: + if is_update and len(data) > window_size: min_timestamp = int(data["timestamp"].min()) max_timestamp = int(data["timestamp"].max()) exist_data = self.db_huge_volume_data.query_huge_volume_data_by_symbol_bar_window_size( @@ -197,11 +197,26 @@ class HugeVolumeMain: logger.info(f"共有{len(folders)}个文件夹") symbols = self.market_data_main.symbols bars = self.market_data_main.bars + + output_folder = r"./data/binance/" + os.makedirs(output_folder, exist_ok=True) + success_folder_file = os.path.join(output_folder, "success_folder.txt") + + success_folder_list = [] + with open(success_folder_file, "r", encoding="utf-8") as f: + for line in f: + if line.strip() == "": + continue + success_folder_list.append(line.strip()) for folder in folders: if not os.path.isdir(os.path.join(root_path, folder)): continue logger.info(f"开始处理文件夹: {folder}") + if folder in success_folder_list: + logger.info(f"文件夹: {folder} 已处理") + continue files = os.listdir(os.path.join(root_path, folder)) + all_success = True for file in files: if not os.path.isfile(os.path.join(root_path, folder, file)): continue @@ -215,10 +230,18 @@ class HugeVolumeMain: bar = "1H" if symbol not in symbols or bar not in bars: continue - logger.info(f"开始处理文件: {file} {symbol} {bar}") - self.import_binance_data_by_csv( + + success = self.import_binance_data_by_csv( full_file_path, symbol, bar, window_sizes ) + if not success: + all_success = False + if all_success: + try: + with open(success_folder_file, "a", encoding="utf-8") as f: + f.write(folder + "\n") + except Exception as e: + logger.error(f"写入记录失败: {folder} {e}") def import_binance_data_by_csv( self, full_file_path: str, symbol: str, bar: str, window_sizes: list @@ -229,72 +252,104 @@ class HugeVolumeMain: :param symbol: 虚拟货币名称 :param bar: 时间周期 """ - if full_file_path is None or symbol is None or bar is None: - logger.error("信息不完整") - return - if not os.path.exists(full_file_path): - logger.error(f"文件不存在: {full_file_path}") - return - df = pd.read_csv(full_file_path, encoding="GBK") - if df is None or len(df) == 0: - logger.error(f"文件为空: {full_file_path}") - return - columns = list(df) - if "邢不行" in columns[0] or "Unnamed" in columns[1]: - # 将第一行作为列名 - df.columns = df.iloc[0] - df = df.iloc[1:] - df.reset_index(drop=True, inplace=True) - - df = self.market_data_main.adjust_binance_csv_data(symbol, bar, df) - df = self.market_data_main.post_save_data(df) - min_start_time_ts = int(df["timestamp"].min()) - max_start_time_ts = int(df["timestamp"].max()) - df = self.market_data_main.post_calculate_metrics( - symbol, bar, min_start_time_ts, max_start_time_ts - ) - df = df.sort_values(by="timestamp", ascending=True) - df = df.reset_index(drop=True) - for window_size in window_sizes: - self.update_volume_spike(symbol, bar, window_size) - - def update_volume_spike(self, symbol: str, bar: str, window_size: int = 50): try: - latest_huge_volume_data = self.db_huge_volume_data.query_latest_data( - symbol, bar, window_size - ) - if latest_huge_volume_data is None or len(latest_huge_volume_data) == 0: - self.detect_volume_spike( - symbol=symbol, - bar=bar, - window_size=window_size, - only_output_huge_volume=False, - ) + logger.info(f"开始处理文件: {full_file_path} {symbol} {bar}") + if full_file_path is None or symbol is None or bar is None: + logger.error("信息不完整") return + if not os.path.exists(full_file_path): + logger.error(f"文件不存在: {full_file_path}") + return + df = pd.read_csv(full_file_path, encoding="GBK") + if df is None or len(df) == 0: + raise Exception(f"文件为空: {full_file_path}") + columns = list(df) + if len(columns) == 0: + raise Exception(f"文件为空: {full_file_path}") + elif len(columns) == 1 and "邢不行" in columns[0]: + df.reset_index(inplace=True) + df.columns = df.iloc[0] + df = df.iloc[1:] + elif "邢不行" in columns[0] or "Unnamed" in columns[1]: + # 将第一行作为列名 + df.columns = df.iloc[0] + df = df.iloc[1:] else: - earliest_date_time = latest_huge_volume_data["date_time"] - earliest_timestamp = latest_huge_volume_data["timestamp"] - seconds = self.get_seconds_by_bar(bar) - earliest_timestamp = earliest_timestamp - ( - (window_size - 1) * seconds * 1000 - ) - earliest_date_time = timestamp_to_datetime(earliest_timestamp) + pass + df.reset_index(drop=True, inplace=True) - data = self.detect_volume_spike( - symbol=symbol, - bar=bar, - window_size=window_size, - start=earliest_date_time, - only_output_huge_volume=False, - is_update=True, + df = self.market_data_main.adjust_binance_csv_data(symbol, bar, df) + df = self.market_data_main.post_save_data(df) + min_start_time_ts = int(df["timestamp"].min()) + max_start_time_ts = int(df["timestamp"].max()) + df = self.market_data_main.post_calculate_metrics( + symbol, bar, min_start_time_ts, max_start_time_ts + ) + df = df.sort_values(by="timestamp", ascending=True) + df = df.reset_index(drop=True) + earliest_date_time = str(df.iloc[0]["date_time"]) + earliest_timestamp = int(df.iloc[0]["timestamp"]) + end_date_time = str(df.iloc[-1]["date_time"]) + for window_size in window_sizes: + self.update_volume_spike(symbol, bar, window_size, earliest_date_time, earliest_timestamp, end_date_time) + return True + except Exception as e: + logger.error(f"导入binance数据失败: {e}") + output_folder = r"./data/binance/" + os.makedirs(output_folder, exist_ok=True) + error_record_file = os.path.join(output_folder, "error_record.txt") + with open(error_record_file, "a", encoding="utf-8") as f: + f.write(full_file_path + "\n") + return False + + def update_volume_spike( + self, + symbol: str, + bar: str, + window_size: int = 50, + earliest_date_time: str = None, + earliest_timestamp: int = None, + end_date_time: str = None, + ): + try: + if earliest_date_time is None or earliest_timestamp is None: + latest_huge_volume_data = self.db_huge_volume_data.query_latest_data( + symbol, bar, window_size ) - logger.info( - f"更新巨量交易数据: {symbol} {bar} 窗口大小: {window_size} 从 {earliest_date_time} 到 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - ) - if data is not None and len(data) > 0: - logger.info(f"此次更新巨量交易数据: {len(data)}条") + if latest_huge_volume_data is None or len(latest_huge_volume_data) == 0: + self.detect_volume_spike( + symbol=symbol, + bar=bar, + window_size=window_size, + only_output_huge_volume=False, + ) + return else: - logger.info(f"此次更新巨量交易数据为空") + earliest_date_time = latest_huge_volume_data["date_time"] + earliest_timestamp = latest_huge_volume_data["timestamp"] + + seconds = self.get_seconds_by_bar(bar) + earliest_timestamp = earliest_timestamp - ( + (window_size - 1) * seconds * 1000 + ) + earliest_date_time = timestamp_to_datetime(earliest_timestamp) + + data = self.detect_volume_spike( + symbol=symbol, + bar=bar, + window_size=window_size, + start=earliest_date_time, + end=end_date_time, + only_output_huge_volume=False, + is_update=True, + ) + logger.info( + f"更新巨量交易数据: {symbol} {bar} 窗口大小: {window_size} 从 {earliest_date_time} 到 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + ) + if data is not None and len(data) > 0: + logger.info(f"此次更新巨量交易数据: {len(data)}条") + else: + logger.info(f"此次更新巨量交易数据为空") except Exception as e: logger.error( f"更新巨量交易数据失败: {symbol} {bar} 窗口大小: {window_size} 到 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}" @@ -610,7 +665,7 @@ def batch_update_volume_spike(threshold: float = 2.0, is_us_stock: bool = False) or len(window_sizes) == 0 ): window_sizes = [50, 80, 100, 120] - huge_volume_main = HugeVolumeMain(threshold, is_us_stock) + huge_volume_main = HugeVolumeMain(threshold, is_us_stock, is_binance=False) for window_size in window_sizes: huge_volume_main.batch_update_volume_spike(window_size=window_size) @@ -621,6 +676,14 @@ def batch_import_binance_data_by_csv(): huge_volume_main.batch_import_binance_data_by_csv(root_path) +def test_import_binance_data_by_csv(): + huge_volume_main = HugeVolumeMain(threshold=2.0, is_us_stock=False, is_binance=True) + file_path = "./data/binance/spot/2020-08-11/SOL-USDT_1h.csv" + huge_volume_main.import_binance_data_by_csv( + file_path, "SOL-USDT", "1H", [50, 80, 100, 120] + ) + + def test_send_huge_volume_data_to_wechat(): huge_volume_main = HugeVolumeMain(threshold=2.0) # 获得昨天日期 @@ -633,8 +696,9 @@ def test_send_huge_volume_data_to_wechat(): if __name__ == "__main__": - batch_import_binance_data_by_csv() - # batch_update_volume_spike(threshold=2.0, is_us_stock=False) + test_import_binance_data_by_csv() + # batch_import_binance_data_by_csv() + # batch_update_volume_spike(threshold=2.0, is_us_stock=True) # test_send_huge_volume_data_to_wechat() # batch_initial_detect_volume_spike(threshold=2.0) diff --git a/market_data_main.py b/market_data_main.py index 00c8543..a90888a 100644 --- a/market_data_main.py +++ b/market_data_main.py @@ -1,5 +1,5 @@ import core.logger as logging -from datetime import datetime +from datetime import datetime, timedelta, timezone from time import sleep import pandas as pd from core.biz.market_data import MarketData @@ -212,6 +212,8 @@ class MarketDataMain: data["volCCyQuote"] = None data["create_time"] = None + data = self.check_date_time(data, bar) + for index, row in data.iterrows(): candle_begin_time = row["candle_begin_time"] timestamp = datetime_to_timestamp(candle_begin_time, is_utc=True) @@ -255,6 +257,48 @@ class MarketDataMain: data = data.sort_values(by="timestamp", ascending=True) data = data.reset_index(drop=True) return data + + def check_date_time(self, data: pd.DataFrame, bar: str): + """ + 检查日期时间 + """ + sample_date_time = data["candle_begin_time"].iloc[0] + is_ok = True + try: + timestamp = datetime_to_timestamp(sample_date_time, is_utc=True) + except Exception as e: + is_ok = False + if not is_ok: + date_part = sample_date_time.split(" ")[0] + first_date_time = f"{date_part} 00:00:00" + first_date_time_utc = datetime.strptime(first_date_time, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) + if bar == "1H": + # 将candle_begin_time以first_date_time为起点,每条记录增加一小时 + for index, row in data.iterrows(): + if index == 0: + candle_begin_time = first_date_time_utc + else: + candle_begin_time = first_date_time_utc + timedelta(hours=1 * index) + data.loc[index, "candle_begin_time"] = candle_begin_time.strftime("%Y-%m-%d %H:%M:%S") + elif bar == "5m": + # 将candle_begin_time以first_date_time为起点,每条记录增加五分钟 + for index, row in data.iterrows(): + if index == 0: + candle_begin_time = first_date_time_utc + else: + candle_begin_time = first_date_time_utc + timedelta(minutes=5 * index) + data.loc[index, "candle_begin_time"] = candle_begin_time.strftime("%Y-%m-%d %H:%M:%S") + elif bar == "30m": + # 将candle_begin_time以first_date_time为起点,每条记录增加三十分钟 + for index, row in data.iterrows(): + if index == 0: + candle_begin_time = first_date_time_utc + else: + candle_begin_time = first_date_time_utc + timedelta(minutes=30 * index) + data.loc[index, "candle_begin_time"] = candle_begin_time.strftime("%Y-%m-%d %H:%M:%S") + else: + pass + return data def post_save_data(self, data: pd.DataFrame): if data is not None and len(data) > 0: