diff --git a/auto_schedule.py b/auto_schedule.py new file mode 100644 index 0000000..225f26c --- /dev/null +++ b/auto_schedule.py @@ -0,0 +1,28 @@ +import schedule +import time +import datetime +import core.logger as logging +import subprocess +import os + +logger = logging.logger +# 定义要执行的任务 +def run_script(): + start_time = time.time() + logger.info(f"Executing script at: {datetime.datetime.now()}") + output_file = r'./output/auto_schedule.txt' + with open(output_file, 'a') as f: + f.write(f"Task ran at {datetime.datetime.now()}\n") + python_path = r"D:\miniconda3\envs\okx\python.exe" + script_path = r"D:\python_projects\crypto_quant\monitor_schedule.py" + subprocess.run([python_path, script_path]) + end_time = time.time() + logger.info(f"Script execution time: {end_time - start_time} seconds") +# 设置每20秒运行一次 +schedule.every(20).seconds.do(run_script) + +# 保持程序运行并检查调度 +logger.info("Scheduler started. Press Ctrl+C to stop.") +while True: + schedule.run_pending() + time.sleep(1) \ No newline at end of file diff --git a/core/biz/__pycache__/huge_volume.cpython-312.pyc b/core/biz/__pycache__/huge_volume.cpython-312.pyc index 95af9d0..1d43573 100644 Binary files a/core/biz/__pycache__/huge_volume.cpython-312.pyc and b/core/biz/__pycache__/huge_volume.cpython-312.pyc differ diff --git a/core/biz/__pycache__/huge_volume_chart.cpython-312.pyc b/core/biz/__pycache__/huge_volume_chart.cpython-312.pyc index 038ea1f..567511e 100644 Binary files a/core/biz/__pycache__/huge_volume_chart.cpython-312.pyc and b/core/biz/__pycache__/huge_volume_chart.cpython-312.pyc differ diff --git a/core/biz/__pycache__/market_data.cpython-312.pyc b/core/biz/__pycache__/market_data.cpython-312.pyc index b6b8bfc..ebfe61c 100644 Binary files a/core/biz/__pycache__/market_data.cpython-312.pyc and b/core/biz/__pycache__/market_data.cpython-312.pyc differ diff --git a/core/biz/__pycache__/market_monitor.cpython-312.pyc b/core/biz/__pycache__/market_monitor.cpython-312.pyc index 00e9a93..82415c4 100644 Binary files a/core/biz/__pycache__/market_monitor.cpython-312.pyc and b/core/biz/__pycache__/market_monitor.cpython-312.pyc differ diff --git a/core/biz/__pycache__/trade_data.cpython-312.pyc b/core/biz/__pycache__/trade_data.cpython-312.pyc index 4d0e21c..d62b15f 100644 Binary files a/core/biz/__pycache__/trade_data.cpython-312.pyc and b/core/biz/__pycache__/trade_data.cpython-312.pyc differ diff --git a/core/biz/huge_volume.py b/core/biz/huge_volume.py index 5a27ef3..14763a9 100644 --- a/core/biz/huge_volume.py +++ b/core/biz/huge_volume.py @@ -1,5 +1,5 @@ from pandas import DataFrame -import logging +import core.logger as logging import os import re import pandas as pd @@ -7,9 +7,7 @@ from datetime import datetime from copy import deepcopy from typing import Optional, List, Dict, Any, Tuple -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) +logger = logging.logger class HugeVolume: @@ -81,11 +79,11 @@ class HugeVolume: DataFrame: 包含异常检测结果的DataFrame """ if data is None or len(data) == 0: - logging.warning("数据为空,无法进行成交量异常检测") + logger.warning("数据为空,无法进行成交量异常检测") return None if "volume" not in data.columns: - logging.error("数据中缺少volume列") + logger.error("数据中缺少volume列") return None # 按时间戳排序 @@ -115,13 +113,13 @@ class HugeVolume: # 如果check_price为True,检查价格分位数 if check_price: if "close" not in data.columns: - logging.error("数据中缺少close列,无法进行价格检查") + logger.error("数据中缺少close列,无法进行价格检查") return data if "high" not in data.columns: - logging.error("数据中缺少high列,无法进行价格检查") + logger.error("数据中缺少high列,无法进行价格检查") return data if "low" not in data.columns: - logging.error("数据中缺少low列,无法进行价格检查") + logger.error("数据中缺少low列,无法进行价格检查") return data for price_column in ["close", "high", "low"]: @@ -137,7 +135,7 @@ class HugeVolume: if output_excel: # 检查数据是否为空 if len(data) == 0: - logging.warning("数据为空,无法导出Excel文件") + logger.warning("数据为空,无法导出Excel文件") return data start_date = data["date_time"].iloc[0] @@ -154,7 +152,7 @@ class HugeVolume: ) as writer: data.to_excel(writer, sheet_name="volume_spike", index=False) except Exception as e: - logging.error(f"导出Excel文件失败: {e}") + logger.error(f"导出Excel文件失败: {e}") return data diff --git a/core/biz/huge_volume_chart.py b/core/biz/huge_volume_chart.py index fcab219..8c80e2b 100644 --- a/core/biz/huge_volume_chart.py +++ b/core/biz/huge_volume_chart.py @@ -6,7 +6,7 @@ import seaborn as sns from openpyxl import Workbook from openpyxl.drawing.image import Image from PIL import Image as PILImage -import logging +import core.logger as logging from datetime import datetime import pandas as pd import os @@ -14,9 +14,7 @@ import re import openpyxl from openpyxl.styles import Font -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) +logger = logging.logger sns.set_theme(style="whitegrid") # 设置中文 @@ -140,7 +138,7 @@ class HugeVolumeChart: for symbol in self.symbol_list: charts_dict[f"{symbol}_{ratio_column}_heatmap"] = {} for price_type in self.price_type_list: - logging.info(f"绘制{symbol} {price_type} {ratio_column}热力图") + logger.info(f"绘制{symbol} {price_type} {ratio_column}热力图") df = self.data[(self.data["symbol"] == symbol) & (self.data["price_type"] == price_type)] pivot_table = df.pivot_table(values=ratio_column, index='window_size', columns='bar', aggfunc='mean') plt.figure(figsize=(10, 6)) @@ -323,7 +321,7 @@ class HugeVolumeChart: """ 绘制价格上涨下跌图 """ - logging.info(f"绘制价格上涨下跌图: {prefix}") + logger.info(f"绘制价格上涨下跌图: {prefix}") # 根据price_type_list,得到各个price_type的平均rise_ratio,平均fall_ratio,平均draw_ratio, 平均average_return price_type_data_dict = {} for price_type in self.price_type_list: @@ -404,7 +402,7 @@ class HugeVolumeChart: } } """ - logging.info(f"输出Excel文件,包含所有{chart_type}图表") + logger.info(f"输出Excel文件,包含所有{chart_type}图表") file_name = f"huge_volume_{chart_type}_{datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx" file_path = os.path.join(self.output_folder, file_name) @@ -444,7 +442,7 @@ class HugeVolumeChart: # Update row offset (chart height + padding) row_offset += chart_rows + 5 # Add 5 rows for padding between charts except Exception as e: - logging.error(f"输出Excel Sheet {sheet_name} 失败: {e}") + logger.error(f"输出Excel Sheet {sheet_name} 失败: {e}") continue # Save Excel file @@ -456,4 +454,4 @@ class HugeVolumeChart: try: os.remove(chart_path) except Exception as e: - logging.error(f"删除临时文件失败: {e}") + logger.error(f"删除临时文件失败: {e}") diff --git a/core/biz/market_data.py b/core/biz/market_data.py index dd1c6de..4607c5f 100644 --- a/core/biz/market_data.py +++ b/core/biz/market_data.py @@ -1,12 +1,13 @@ import time from datetime import datetime, timedelta -import logging from typing import Optional import pandas as pd import okx.MarketData as Market import okx.TradingData as TradingData from core.utils import transform_date_time_to_timestamp -logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') +import core.logger as logging + +logger = logging.logger class MarketData: def __init__(self, @@ -14,15 +15,18 @@ class MarketData: secret_key: str, passphrase: str, sandbox: bool = True): - flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境 + self.flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境 + self.api_key = api_key + self.secret_key = secret_key + self.passphrase = passphrase self.market_api = Market.MarketAPI( api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, - flag=flag - ) - self.trade_api = TradingData.TradingDataAPI( - api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, - flag=flag + flag=self.flag ) + # self.trade_api = TradingData.TradingDataAPI( + # api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, + # flag=flag + # ) def get_realtime_kline_data(self, symbol: str = None, bar: str = '5m', end_time: int = None, limit: int = 50) -> Optional[pd.DataFrame]: """ @@ -38,7 +42,7 @@ class MarketData: else: end_time = transform_date_time_to_timestamp(end_time) if end_time is None: - logging.error(f"end_time参数解析失败: {end_time}") + logger.error(f"end_time参数解析失败: {end_time}") return None response = self.get_realtime_candlesticks_from_api(symbol, bar, end_time, limit) if response: @@ -47,7 +51,7 @@ class MarketData: to_time = int(candles[0][0]) from_time_str = pd.to_datetime(from_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') to_time_str = pd.to_datetime(to_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') - logging.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}") + logger.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}") columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"] candles_pd = pd.DataFrame(candles, columns=columns) for col in ['open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote']: @@ -67,7 +71,7 @@ class MarketData: return candles_pd else: - logging.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试") + logger.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试") return None @@ -95,7 +99,7 @@ class MarketData: else: start_time = transform_date_time_to_timestamp(start) if start_time is None: - logging.error(f"start参数解析失败: {start}") + logger.error(f"start参数解析失败: {start}") return None columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"] all_data = [] @@ -105,10 +109,10 @@ class MarketData: # after,真实逻辑是获得指定时间之前的数据 !!! response = self.get_historical_candlesticks_from_api(symbol, bar, end_time, limit) if response is None: - logging.warning(f"请求失败,请稍后再试") + logger.warning(f"请求失败,请稍后再试") break if response["code"] != "0" or not response["data"]: - logging.warning(f"请求失败或无数据: {response.get('msg', 'No message')}") + logger.warning(f"请求失败或无数据: {response.get('msg', 'No message')}") break candles = response["data"] @@ -120,14 +124,14 @@ class MarketData: if latest_timestamp > from_time: latest_timestamp = from_time else: - logging.warning(f"上一次数据最早时间戳 {latest_timestamp} 小于等于 from_time {from_time}, 停止获取数据") + logger.warning(f"上一次数据最早时间戳 {latest_timestamp} 小于等于 from_time {from_time}, 停止获取数据") break from_time_str = pd.to_datetime(from_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') to_time_str = pd.to_datetime(to_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') - logging.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}") + logger.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}") if from_time < start_time: start_time_str = pd.to_datetime(start_time, unit='ms', utc=True).tz_convert('Asia/Shanghai') - logging.warning(f"本轮获取{symbol}, {bar} 数据最早时间 {from_time_str} 早于 此次数据获取起始时间 {start_time_str}, 停止获取数据") + logger.warning(f"本轮获取{symbol}, {bar} 数据最早时间 {from_time_str} 早于 此次数据获取起始时间 {start_time_str}, 停止获取数据") # candels中仅保留start_time之后的数据 candles = [candle for candle in candles if int(candle[0]) >= start_time] if len(candles) > 0: @@ -147,7 +151,7 @@ class MarketData: end_time = from_time - 1 time.sleep(0.5) except Exception as e: - logging.error(f"请求出错: {e}") + logger.error(f"请求出错: {e}") break if all_data: @@ -169,14 +173,14 @@ class MarketData: df = df[['symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote', 'create_time']] df.sort_values('timestamp', inplace=True) df.reset_index(drop=True, inplace=True) - logging.info(f"总计获取 {len(df)} 条 K 线数据(仅confirm=1)") + logger.info(f"总计获取 {len(df)} 条 K 线数据(仅confirm=1)") # 获取df中date_time的最早时间与最新时间 earliest_time = df['date_time'].min() latest_time = df['date_time'].max() - logging.info(f"本轮更新{symbol}, {bar} 数据最早时间: {earliest_time}, 最新时间: {latest_time}") + logger.info(f"本轮更新{symbol}, {bar} 数据最早时间: {earliest_time}, 最新时间: {latest_time}") return df else: - logging.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试") + logger.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试") return None def set_buy_and_sell_sz(self, symbol: str, candles: list, columns: list): @@ -201,6 +205,7 @@ class MarketData: def get_historical_candlesticks_from_api(self, symbol, bar, end_time, limit): response = None count = 0 + while True: try: response = self.market_api.get_history_candlesticks( @@ -212,8 +217,13 @@ class MarketData: if response: break except Exception as e: - logging.error(f"请求出错: {e}") + logger.error(f"请求出错: {e}") count += 1 + # 重新初始化,目的是为了避免长时间运行,导致被平台段封禁 + self.market_api = Market.MarketAPI( + api_key=self.api_key, api_secret_key=self.secret_key, passphrase=self.passphrase, + flag=self.flag + ) if count > 3: break time.sleep(10) @@ -233,8 +243,13 @@ class MarketData: if response: break except Exception as e: - logging.error(f"请求出错: {e}") + logger.error(f"请求出错: {e}") count += 1 + # 重新初始化,目的是为了避免长时间运行,导致被平台段封禁 + self.market_api = Market.MarketAPI( + api_key=self.api_key, api_secret_key=self.secret_key, passphrase=self.passphrase, + flag=self.flag + ) if count > 3: break time.sleep(5) diff --git a/core/biz/market_monitor.py b/core/biz/market_monitor.py index 5ad69e3..bde56d3 100644 --- a/core/biz/market_monitor.py +++ b/core/biz/market_monitor.py @@ -3,10 +3,9 @@ import numpy as np from metrics_config import METRICS_CONFIG from config import BAR_THRESHOLD from time import time +import core.logger as logging -import logging - -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") +logger = logging.logger def create_metrics_report( @@ -29,16 +28,16 @@ def create_metrics_report( date_time = row["date_time"] if only_output_huge_volume: if huge_volume == 1: - logging.info( + logger.info( f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 巨量" ) else: - logging.info( + logger.info( f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 非巨量,此次不发送相关数据" ) return else: - logging.info( + logger.info( f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time}" ) @@ -51,7 +50,7 @@ def create_metrics_report( low = round(float(row["low"]), 10) pct_chg = round(float(row["pct_chg"]), 4) if only_output_rise and pct_chg < 0: - logging.info( + logger.info( f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 下跌,不发送相关数据" ) return @@ -63,9 +62,9 @@ def create_metrics_report( else: contents.append(f"## {brief} 交易量报告") if now_datetime_str is not None: - contents.append(f"## {symbol} {bar} 滑动窗口: {window_size} 最新数据时间: {now_datetime_str}") + contents.append(f"## 滑动窗口: {window_size} 最新数据时间: {now_datetime_str}") else: - contents.append(f"## {symbol} {bar} 滑动窗口: {window_size} 交易周期时间: {date_time}") + contents.append(f"## 滑动窗口: {window_size} 交易周期时间: {date_time}") k_shape = str(row["k_shape"]) contents.append(f"### 价格信息") contents.append(f"当前价格: {close}, 开盘价: {open}, 最高价: {high}, 最低价: {low}") @@ -160,6 +159,7 @@ def create_metrics_report( if ma_divergence_value < 1: long_short_info["空"].append(f"均线形态: {ma_divergence}") if is_short: + check_long_short = "空" if is_over_sell: check_over_sell = "超卖" else: @@ -354,7 +354,7 @@ def get_last_huge_volume_record(all_data: pd.DataFrame, bar: str, timestamp: int results.append(f"最近十个周期内,出现巨量的次数: {huge_volume_in_ten_period_count}") return results except Exception as e: - logging.error(f"获取最近一次巨量记录信息失败: {e}") + logger.error(f"获取最近一次巨量记录信息失败: {e}") results.append(f"获取最近一次巨量记录信息失败: {e}") return results diff --git a/core/biz/metrics_calculation.py b/core/biz/metrics_calculation.py index 2710927..225c10d 100644 --- a/core/biz/metrics_calculation.py +++ b/core/biz/metrics_calculation.py @@ -45,15 +45,13 @@ data = metrics.set_ma_long_short_advanced(data, method="hybrid") - "震荡":震荡市场,建议观望或区间交易 """ -import logging +import core.logger as logging import pandas as pd import numpy as np import talib as tb from talib import MA_Type -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) +logger = logging.logger class MetricsCalculation: @@ -72,7 +70,7 @@ class MetricsCalculation: return df def macd(self, df: pd.DataFrame): - logging.info("计算MACD指标") + logger.info("计算MACD指标") data = np.array(df.close) ndata = len(data) m, n, T = 12, 26, 9 @@ -112,7 +110,7 @@ class MetricsCalculation: return df def kdj(self, df: pd.DataFrame): - logging.info("计算KDJ指标") + logger.info("计算KDJ指标") low_list = df["low"].rolling(window=9).min() low_list.fillna(value=df["low"].expanding().min(), inplace=True) high_list = df["high"].rolling(window=9).max() @@ -149,7 +147,7 @@ class MetricsCalculation: KDJ_K < 30, KDJ_D < 30, KDJ_J < 20: 超卖 否则为"徘徊" """ - logging.info("设置KDJ形态") + logger.info("设置KDJ形态") # 初始化kdj_pattern列 df["kdj_pattern"] = "徘徊" @@ -204,7 +202,7 @@ class MetricsCalculation: 使用20个周期的滚动窗口计算相对统计特征,避免绝对阈值过于严格的问题 """ - logging.info("设置均线多空和发散") + logger.info("设置均线多空和发散") # 通过趋势强度计算多空 # 震荡:不满足多空条件的其他情况 @@ -265,7 +263,7 @@ class MetricsCalculation: required_columns = ["timestamp", "close", "dif", "macd", "kdj_j"] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: - print(f"缺少必要的列: {missing_columns}") + logger.info(f"缺少必要的列: {missing_columns}") return df # 按时间戳排序(升序) @@ -362,7 +360,7 @@ class MetricsCalculation: required_columns = ["timestamp", "close", "dif", "macd", "kdj_j"] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: - print(f"缺少必要的列: {missing_columns}") + logger.info(f"缺少必要的列: {missing_columns}") return df # 按时间戳排序(升序) @@ -416,7 +414,7 @@ class MetricsCalculation: 支持所有均线交叉类型:5上穿10/20/30,10上穿20/30,20上穿30 以及对应的下穿信号:30下穿20/10/5, 20下穿10/5,10下穿5 """ - logging.info("计算均线指标") + logger.info("计算均线指标") df["ma5"] = df["close"].rolling(window=5).mean().dropna() df["ma10"] = df["close"].rolling(window=10).mean().dropna() df["ma20"] = df["close"].rolling(window=20).mean().dropna() @@ -492,7 +490,7 @@ class MetricsCalculation: return df def rsi(self, df: pd.DataFrame): - logging.info("计算RSI指标") + logger.info("计算RSI指标") df["rsi_14"] = tb.RSI(df["close"].values, timeperiod=14) df["rsi_signal"] = "" rsi_high = df["rsi_14"] > 70 @@ -507,7 +505,7 @@ class MetricsCalculation: return df def boll(self, df: pd.DataFrame): - logging.info("计算BOLL指标") + logger.info("计算BOLL指标") df["boll_upper"], df["boll_middle"], df["boll_lower"] = tb.BBANDS( df["close"].values, timeperiod=20, matype=MA_Type.SMA ) @@ -524,7 +522,7 @@ class MetricsCalculation: 超卖:价格接近下轨,且KDJ超卖 震荡:其他情况 """ - logging.info("设置BOLL形态") + logger.info("设置BOLL形态") # 初始化boll_pattern列 df["boll_pattern"] = "震荡" @@ -532,7 +530,7 @@ class MetricsCalculation: required_columns = ["close", "boll_upper", "boll_lower", "kdj_j"] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: - print(f"缺少必要的列: {missing_columns}") + logger.info(f"缺少必要的列: {missing_columns}") return df # 计算价格与布林带的距离百分比 @@ -600,7 +598,7 @@ class MetricsCalculation: - 长:K线实体或影线较长 - 超长:K线实体和影线都很长 """ - logging.info("设置K线长度") + logger.info("设置K线长度") # 检查必要的列是否存在 required_columns = ["close", "open", "high", "low"] missing_columns = [col for col in required_columns if col not in df.columns] @@ -707,12 +705,12 @@ class MetricsCalculation: - 超大实体:实体占比70%-90% - 光头光脚:实体占比>90%(非一字情况) """ - logging.info("设置K线形状") + logger.info("设置K线形状") # 检查必要的列是否存在 required_columns = ["close", "open", "high", "low"] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: - print(f"缺少必要的列: {missing_columns}") + logger.info(f"缺少必要的列: {missing_columns}") return df # 计算K线的基本特征 @@ -923,7 +921,7 @@ class MetricsCalculation: - "statistical": 统计分布方法 - "hybrid": 混合方法 """ - logging.info(f"使用{method}方法设置均线多空") + logger.info(f"使用{method}方法设置均线多空") if method == "weighted_voting": return self._weighted_voting_method(data) @@ -936,7 +934,7 @@ class MetricsCalculation: elif method == "hybrid": return self._hybrid_method(data) else: - logging.warning(f"未知的方法: {method},使用默认加权投票方法") + logger.warning(f"未知的方法: {method},使用默认加权投票方法") return self._weighted_voting_method(data) def _weighted_voting_method(self, data: pd.DataFrame): diff --git a/core/biz/quant_trader.py b/core/biz/quant_trader.py index 933de49..7b96e84 100644 --- a/core/biz/quant_trader.py +++ b/core/biz/quant_trader.py @@ -3,10 +3,10 @@ import okx.Trade as Trade import okx.MarketData as Market import okx.PublicData as Public import pandas as pd -import logging +import core.logger as logging from typing import Optional -logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') +logger = logging.logger class QuantTrader: def __init__(self, @@ -60,20 +60,20 @@ class QuantTrader: details = balance.get('details', []) for detail in details: if detail.get('ccy') == 'USDT': - logging.info(f"USDT余额: {detail.get('availBal')}") + logger.info(f"USDT余额: {detail.get('availBal')}") result['USDT'] = float(detail.get('availBal', 0)) if detail.get('ccy') == self.symbol_prefix: - logging.info(f"{self.symbol_prefix}余额: {detail.get('availBal')}") + logger.info(f"{self.symbol_prefix}余额: {detail.get('availBal')}") result[self.symbol_prefix] = float(detail.get('availBal', 0)) if detail.get('ccy') == self.symbol_swap: - logging.info(f"{self.symbol_swap}余额: {detail.get('availBal')}") + logger.info(f"{self.symbol_swap}余额: {detail.get('availBal')}") result[self.symbol_swap] = float(detail.get('availBal', 0)) return result else: - logging.error(f"获取余额失败: {search_result}") + logger.error(f"获取余额失败: {search_result}") return {} except Exception as e: - logging.error(f"获取余额异常: {e}") + logger.error(f"获取余额异常: {e}") return {} def get_current_price(self, symbol: str = None) -> Optional[float]: @@ -89,16 +89,16 @@ class QuantTrader: data = result.get('data', []) if data and 'last' in data[0]: price = float(data[0]['last']) - logging.info(f"当前{symbol_prefix}价格: ${price:,.2f}") + logger.info(f"当前{symbol_prefix}价格: ${price:,.2f}") return price else: - logging.error(f"ticker数据格式异常: {data}") + logger.error(f"ticker数据格式异常: {data}") return None else: - logging.error(f"获取价格失败: {result}") + logger.error(f"获取价格失败: {result}") return None except Exception as e: - logging.error(f"获取价格异常: {e}") + logger.error(f"获取价格异常: {e}") return None def get_kline_data(self, symbol: str = None, after: str = None, before: str = None, bar: str = '1m', limit: int = 100) -> Optional[pd.DataFrame]: @@ -129,7 +129,7 @@ class QuantTrader: if result.get('code') == '0': data = result.get('data', []) if not data: - logging.warning("K线数据为空") + logger.warning("K线数据为空") return None df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', @@ -140,10 +140,10 @@ class QuantTrader: df['timestamp'] = pd.to_datetime(df['timestamp'].astype(int), unit='ms', errors='coerce') return df else: - logging.error(f"获取K线数据失败: {result}") + logger.error(f"获取K线数据失败: {result}") return None except Exception as e: - logging.error(f"获取K线数据异常: {e}") + logger.error(f"获取K线数据异常: {e}") return None def place_market_order(self, side: str, size: float) -> Optional[str]: @@ -154,7 +154,7 @@ class QuantTrader: if side == 'sell': try: if symbol_balance < size: - logging.error(f"{self.symbol_prefix}余额不足,目前余额: {symbol_balance}") + logger.error(f"{self.symbol_prefix}余额不足,目前余额: {symbol_balance}") return None result = self.trade_api.place_order( instId=self.symbol, @@ -164,20 +164,20 @@ class QuantTrader: sz=str(size) ) if result.get('code') == '0': - logging.info(f"下单成功: {side} {size} {self.symbol_prefix}") + logger.info(f"下单成功: {side} {size} {self.symbol_prefix}") return result['data'][0]['ordId'] else: - logging.error(f"下单失败: {result}") + logger.error(f"下单失败: {result}") return None except Exception as e: - logging.error(f"下单异常: {e}") + logger.error(f"下单异常: {e}") return None elif side == 'buy': try: instrument_result = self.public_api.get_instruments(instType="SPOT", instId=self.symbol) instrument_data = instrument_result.get("data", []) if not instrument_data: - logging.error(f"未获取到合约信息: {instrument_result}") + logger.error(f"未获取到合约信息: {instrument_result}") return None min_sz = float(instrument_data[0].get("minSz", 0)) if size < min_sz: @@ -186,7 +186,7 @@ class QuantTrader: last_price = float(ticker["data"][0]["last"]) usdt_amount = float(last_price * size) if usdt_balance < usdt_amount: - logging.error(f"USDT余额不足,目前余额: {usdt_balance}") + logger.error(f"USDT余额不足,目前余额: {usdt_balance}") return None result = self.trade_api.place_order( instId=self.symbol, @@ -196,16 +196,16 @@ class QuantTrader: sz=str(usdt_amount) ) if result.get('code') == '0': - logging.info(f"下单成功: {side} {usdt_amount} USDT") + logger.info(f"下单成功: {side} {usdt_amount} USDT") return result['data'][0]['ordId'] else: - logging.error(f"下单失败: {result}") + logger.error(f"下单失败: {result}") return None except Exception as e: - logging.error(f"下单异常: {e}") + logger.error(f"下单异常: {e}") return None else: - logging.error(f"不支持的下单方向: {side}") + logger.error(f"不支持的下单方向: {side}") return None # 设置杠杆倍数 @@ -218,9 +218,9 @@ class QuantTrader: posSide=posSide ) if result["code"] == "0": - logging.info(f"设置杠杆倍数 {leverage}x 成功") + logger.info(f"设置杠杆倍数 {leverage}x 成功") else: - logging.error(f"设置杠杆失败: {result['msg']}") + logger.error(f"设置杠杆失败: {result['msg']}") return result["code"] == "0" # 计算保证金需求 @@ -231,10 +231,10 @@ class QuantTrader: contract_value = quantity * slot * price # 每张 0.01 BTC initial_margin = contract_value / leverage recommended_margin = initial_margin * (1 + buffer_ratio) - logging.info(f"开仓{self.symbol_swap}价格: {price:.2f} USDT") - logging.info(f"合约总价值: {contract_value:.2f} USDT") - logging.info(f"初始保证金: {initial_margin:.2f} USDT") - logging.info(f"推荐保证金 (含 {buffer_ratio*100}% 缓冲): {recommended_margin:.2f} USDT") + logger.info(f"开仓{self.symbol_swap}价格: {price:.2f} USDT") + logger.info(f"合约总价值: {contract_value:.2f} USDT") + logger.info(f"初始保证金: {initial_margin:.2f} USDT") + logger.info(f"推荐保证金 (含 {buffer_ratio*100}% 缓冲): {recommended_margin:.2f} USDT") return recommended_margin, price # 开空头仓位(卖出空单) @@ -243,7 +243,7 @@ class QuantTrader: # 计算所需保证金和开仓价格 margin_data = self.calculate_margin(quantity, leverage, slot, buffer_ratio) if not margin_data: - logging.error("无法计算保证金,终止下单") + logger.error("无法计算保证金,终止下单") return None, None required_margin, entry_price = margin_data @@ -251,7 +251,7 @@ class QuantTrader: balance = self.get_account_balance() avail_bal = balance.get('USDT') if avail_bal is None or avail_bal < required_margin: - logging.error(f"保证金不足,需至少 {required_margin:.2f} USDT,当前余额: {avail_bal}") + logger.error(f"保证金不足,需至少 {required_margin:.2f} USDT,当前余额: {avail_bal}") return None, None # 设置杠杆 @@ -270,10 +270,10 @@ class QuantTrader: } result = self.trade_api.place_order(**order_data) if result.get("code") == "0": - logging.info(f"开空单成功,订单ID: {result['data'][0]['ordId']}") + logger.info(f"开空单成功,订单ID: {result['data'][0]['ordId']}") return result["data"][0]["ordId"], entry_price else: - logging.error(f"开空单失败: {result.get('msg', result)}") + logger.error(f"开空单失败: {result.get('msg', result)}") return None, None # 平空单(买入平仓) @@ -290,10 +290,10 @@ class QuantTrader: } result = self.trade_api.place_order(**order_data) if result.get("code") == "0": - logging.info(f"平空单成功,订单ID: {result['data'][0]['ordId']}") + logger.info(f"平空单成功,订单ID: {result['data'][0]['ordId']}") return True else: - logging.error(f"平空单失败: {result.get('msg', result)}") + logger.error(f"平空单失败: {result.get('msg', result)}") return False def get_minimun_order_size(self) -> None: @@ -304,10 +304,10 @@ class QuantTrader: instrument = result.get("data", [{}])[0] min_sz = float(instrument.get("minSz", 0)) lot_sz = float(instrument.get("lotSz", 0)) - logging.info(f"最小交易量 (minSz): {min_sz} {self.symbol_prefix}") - logging.info(f"交易量精度 (lotSz): {lot_sz} {self.symbol_prefix}") + logger.info(f"最小交易量 (minSz): {min_sz} {self.symbol_prefix}") + logger.info(f"交易量精度 (lotSz): {lot_sz} {self.symbol_prefix}") else: - logging.error(f"错误: {result.get('msg', result)}") + logger.error(f"错误: {result.get('msg', result)}") except Exception as e: - logging.error(f"异常: {str(e)}") + logger.error(f"异常: {str(e)}") diff --git a/core/biz/strategy.py b/core/biz/strategy.py index fc4305f..cd17f7a 100644 --- a/core/biz/strategy.py +++ b/core/biz/strategy.py @@ -1,11 +1,11 @@ import time from datetime import datetime -import logging +import core.logger as logging from typing import Optional import pandas as pd from core.biz.quant_trader import QuantTrader -logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') +logger = logging.logger class QuantStrategy: def __init__(self, @@ -31,10 +31,10 @@ class QuantStrategy: 计算简单移动平均线 """ if 'close' not in df: - logging.error("DataFrame缺少'close'列,无法计算SMA") + logger.error("DataFrame缺少'close'列,无法计算SMA") return pd.Series([float('nan')] * len(df)) if len(df) < period: - logging.warning(f"数据长度不足{period},SMA结果将包含NaN") + logger.warning(f"数据长度不足{period},SMA结果将包含NaN") return df['close'].rolling(window=period).mean() def calculate_rsi(self, df: pd.DataFrame, period: int = 14) -> pd.Series: @@ -42,10 +42,10 @@ class QuantStrategy: 计算RSI指标 """ if 'close' not in df: - logging.error("DataFrame缺少'close'列,无法计算RSI") + logger.error("DataFrame缺少'close'列,无法计算RSI") return pd.Series([float('nan')] * len(df)) if len(df) < period: - logging.warning(f"数据长度不足{period},RSI结果将包含NaN") + logger.warning(f"数据长度不足{period},RSI结果将包含NaN") delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() @@ -58,111 +58,111 @@ class QuantStrategy: """ 简单移动平均线策略 """ - logging.info("=== 执行移动平均线策略 ===") + logger.info("=== 执行移动平均线策略 ===") try: df = self.quant_trader.get_kline_data(bar='5m', limit=max(50, sma_long_period+2)) except Exception as e: - logging.error(f"获取K线数据失败: {e}") + logger.error(f"获取K线数据失败: {e}") return if df is None or len(df) < max(sma_short_period, sma_long_period, 2): - logging.warning("数据不足,无法执行策略") + logger.warning("数据不足,无法执行策略") return df['sma_short'] = self.calculate_sma(df, sma_short_period) df['sma_long'] = self.calculate_sma(df, sma_long_period) if len(df) < 2: - logging.warning("数据不足2条,无法判断金叉死叉") + logger.warning("数据不足2条,无法判断金叉死叉") return latest = df.iloc[-1] prev = df.iloc[-2] if pd.isna(latest['sma_short']) or pd.isna(latest['sma_long']) or pd.isna(prev['sma_short']) or pd.isna(prev['sma_long']): - logging.warning("均线数据存在NaN,跳过本次信号判断") + logger.warning("均线数据存在NaN,跳过本次信号判断") return - logging.info(f"短期均线: {latest['sma_short']:.2f}") - logging.info(f"长期均线: {latest['sma_long']:.2f}") - logging.info(f"当前价格: {latest['close']:.2f}") + logger.info(f"短期均线: {latest['sma_short']:.2f}") + logger.info(f"长期均线: {latest['sma_long']:.2f}") + logger.info(f"当前价格: {latest['close']:.2f}") if (latest['sma_short'] > latest['sma_long'] and prev['sma_short'] <= prev['sma_long']): - logging.info("信号: 买入") + logger.info("信号: 买入") self.quant_trader.place_market_order('buy', self.quant_trader.position_size) elif (latest['sma_short'] < latest['sma_long'] and prev['sma_short'] >= prev['sma_long']): - logging.info("信号: 卖出") + logger.info("信号: 卖出") self.quant_trader.place_market_order('sell', self.quant_trader.position_size) else: - logging.info("信号: 持仓观望") + logger.info("信号: 持仓观望") def rsi_strategy(self, period: int = 14, oversold: int = 30, overbought: int = 70) -> None: """ RSI策略 """ - logging.info("=== 执行RSI策略 ===") + logger.info("=== 执行RSI策略 ===") try: df = self.quant_trader.get_kline_data(bar='5m', limit=max(50, period+2)) except Exception as e: - logging.error(f"获取K线数据失败: {e}") + logger.error(f"获取K线数据失败: {e}") return if df is None or len(df) < period: - logging.warning("数据不足,无法执行策略") + logger.warning("数据不足,无法执行策略") return df['rsi'] = self.calculate_rsi(df, period) latest_rsi = df['rsi'].iloc[-1] if pd.isna(latest_rsi): - logging.warning("最新RSI为NaN,跳过本次信号判断") + logger.warning("最新RSI为NaN,跳过本次信号判断") return - logging.info(f"当前RSI: {latest_rsi:.2f}") + logger.info(f"当前RSI: {latest_rsi:.2f}") if latest_rsi < oversold: - logging.info("信号: RSI超卖,买入") + logger.info("信号: RSI超卖,买入") self.quant_trader.place_market_order('buy', self.quant_trader.position_size) elif latest_rsi > overbought: - logging.info("信号: RSI超买,卖出") + logger.info("信号: RSI超买,卖出") self.quant_trader.place_market_order('sell', self.quant_trader.position_size) else: - logging.info("信号: RSI正常区间,持仓观望") + logger.info("信号: RSI正常区间,持仓观望") def grid_trading_strategy(self, grid_levels: int = 5, grid_range: float = 0.02) -> None: """ 网格交易策略 """ if grid_levels <= 0: - logging.error("网格数必须大于0") + logger.error("网格数必须大于0") return if grid_range <= 0: - logging.error("网格范围必须大于0") + logger.error("网格范围必须大于0") return - logging.info(f"=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===") + logger.info(f"=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===") try: current_price = self.quant_trader.get_current_price() except Exception as e: - logging.error(f"获取当前价格失败: {e}") + logger.error(f"获取当前价格失败: {e}") return if current_price is None: - logging.warning("当前价格获取失败") + logger.warning("当前价格获取失败") return grid_prices = [] for i in range(grid_levels): price = current_price * (1 + grid_range * (i - grid_levels//2) / grid_levels) grid_prices.append(price) - logging.info(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}") + logger.info(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}") try: df = self.quant_trader.get_kline_data(bar='1m', limit=10) except Exception as e: - logging.error(f"获取K线数据失败: {e}") + logger.error(f"获取K线数据失败: {e}") return if df is None or len(df) == 0 or 'close' not in df: - logging.warning("K线数据无效,无法执行网格策略") + logger.warning("K线数据无效,无法执行网格策略") return latest_price = df['close'].iloc[-1] if pd.isna(latest_price): - logging.warning("最新价格为NaN,跳过本次信号判断") + logger.warning("最新价格为NaN,跳过本次信号判断") return closest_grid = min(grid_prices, key=lambda x: abs(x - latest_price)) - logging.info(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}") + logger.info(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}") if latest_price < closest_grid * 0.995: - logging.info("信号: 价格下跌,网格买入") + logger.info("信号: 价格下跌,网格买入") self.quant_trader.place_market_order('buy', self.quant_trader.position_size) elif latest_price > closest_grid * 1.005: - logging.info("信号: 价格上涨,网格卖出") + logger.info("信号: 价格上涨,网格卖出") self.quant_trader.place_market_order('sell', self.quant_trader.position_size) else: - logging.info("信号: 价格在网格内,持仓观望") + logger.info("信号: 价格在网格内,持仓观望") def run_strategy_loop(self, strategy: str = 'sma', @@ -172,16 +172,16 @@ class QuantStrategy: 运行策略循环 """ if interval <= 0: - logging.error("循环间隔必须大于0秒") + logger.error("循环间隔必须大于0秒") return - logging.info(f"开始运行{strategy}策略,间隔{interval}秒") + logger.info(f"开始运行{strategy}策略,间隔{interval}秒") while True: try: - logging.info(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + logger.info(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) try: self.quant_trader.get_account_balance() except Exception as e: - logging.error(f"获取账户余额失败: {e}") + logger.error(f"获取账户余额失败: {e}") if strategy == 'sma': sma_short_period = trading_config.get("sma_short_period", 5) sma_long_period = trading_config.get("sma_long_period", 20) @@ -196,14 +196,14 @@ class QuantStrategy: grid_range = trading_config.get("grid_range", 0.02) self.grid_trading_strategy(grid_levels, grid_range) else: - logging.error("未知策略") + logger.error("未知策略") break - logging.info(f"等待{interval}秒后继续...") + logger.info(f"等待{interval}秒后继续...") time.sleep(interval) except KeyboardInterrupt: - logging.info("策略运行被用户中断") + logger.info("策略运行被用户中断") break except Exception as e: - logging.error(f"策略运行异常: {e}") + logger.error(f"策略运行异常: {e}") time.sleep(interval) \ No newline at end of file diff --git a/core/biz/trade_data.py b/core/biz/trade_data.py index 7a4fcad..97cc230 100644 --- a/core/biz/trade_data.py +++ b/core/biz/trade_data.py @@ -1,14 +1,13 @@ import time from datetime import datetime, timedelta -import logging from typing import Optional import pandas as pd import okx.MarketData as Market from core.utils import timestamp_to_datetime from core.db.db_trade_data import DBTradeData +import core.logger as logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') - +logger = logging.logger class TradeData: def __init__(self, @@ -57,7 +56,7 @@ class TradeData: if result: break except Exception as e: - logging.error(f"请求出错: {e}") + logger.error(f"请求出错: {e}") count += 1 if count > 3: break @@ -75,7 +74,7 @@ class TradeData: from_date_time = timestamp_to_datetime(from_time) to_date_time = timestamp_to_datetime(to_time) - logging.info(f"获得交易数据,最早时间: {from_date_time}, 最近时间: {to_date_time}") + logger.info(f"获得交易数据,最早时间: {from_date_time}, 最近时间: {to_date_time}") df = pd.DataFrame(trades) # 过滤时间范围 @@ -112,7 +111,7 @@ class TradeData: else: return None except Exception as e: - logging.error(f"获取历史交易数据失败: {e}") + logger.error(f"获取历史交易数据失败: {e}") return None diff --git a/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc b/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc index a76e2d6..b90e83a 100644 Binary files a/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc and b/core/db/__pycache__/db_huge_volume_data.cpython-312.pyc differ diff --git a/core/db/__pycache__/db_manager.cpython-312.pyc b/core/db/__pycache__/db_manager.cpython-312.pyc index 8339bb8..1dabfb7 100644 Binary files a/core/db/__pycache__/db_manager.cpython-312.pyc and b/core/db/__pycache__/db_manager.cpython-312.pyc differ diff --git a/core/db/__pycache__/db_market_data.cpython-312.pyc b/core/db/__pycache__/db_market_data.cpython-312.pyc index f85cbaa..cf78140 100644 Binary files a/core/db/__pycache__/db_market_data.cpython-312.pyc and b/core/db/__pycache__/db_market_data.cpython-312.pyc differ diff --git a/core/db/__pycache__/db_market_monitor.cpython-312.pyc b/core/db/__pycache__/db_market_monitor.cpython-312.pyc index 9642ab7..96fdf78 100644 Binary files a/core/db/__pycache__/db_market_monitor.cpython-312.pyc and b/core/db/__pycache__/db_market_monitor.cpython-312.pyc differ diff --git a/core/db/__pycache__/db_trade_data.cpython-312.pyc b/core/db/__pycache__/db_trade_data.cpython-312.pyc index 956f79e..f2dd1d1 100644 Binary files a/core/db/__pycache__/db_trade_data.cpython-312.pyc and b/core/db/__pycache__/db_trade_data.cpython-312.pyc differ diff --git a/core/db/db_huge_volume_data.py b/core/db/db_huge_volume_data.py index 2a4407e..e0180b9 100644 --- a/core/db/db_huge_volume_data.py +++ b/core/db/db_huge_volume_data.py @@ -1,11 +1,10 @@ import pandas as pd -import logging +import core.logger as logging from typing import Optional, List, Dict, Any, Union from core.db.db_manager import DBData from core.utils import transform_date_time_to_timestamp -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") - +logger = logging.logger class DBHugeVolumeData: def __init__( @@ -127,7 +126,7 @@ class DBHugeVolumeData: :param df: 巨量交易数据DataFrame """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql(df) @@ -141,7 +140,7 @@ class DBHugeVolumeData: :param df: 巨量交易数据DataFrame """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_fast(df) @@ -156,7 +155,7 @@ class DBHugeVolumeData: :param chunk_size: 分块大小 """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) @@ -169,7 +168,7 @@ class DBHugeVolumeData: 注意:会抛出重复键错误,需要额外处理 """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_simple(df) diff --git a/core/db/db_manager.py b/core/db/db_manager.py index 7447b5f..dc05b03 100644 --- a/core/db/db_manager.py +++ b/core/db/db_manager.py @@ -1,11 +1,10 @@ import pandas as pd from sqlalchemy import create_engine, exc, text import re, datetime -import logging +import core.logger as logging from core.utils import transform_data_type -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") - +logger = logging.logger class DBData: def __init__( @@ -86,7 +85,7 @@ class DBData: :param db_url: 数据库连接URL """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return df = df[self.columns] @@ -112,9 +111,9 @@ class DBData: # 删除临时表 conn.execute(text(f"DROP TABLE IF EXISTS {self.temp_table_name}")) - logging.info("数据已成功写入数据库。") + logger.info("数据已成功写入数据库。") except Exception as e: - logging.error(f"数据库连接或写入失败: {e}") + logger.error(f"数据库连接或写入失败: {e}") def insert_data_to_mysql_fast(self, df: pd.DataFrame): """ @@ -124,7 +123,7 @@ class DBData: 适用场景:中等数据量 """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return df = df[self.columns] @@ -137,9 +136,9 @@ class DBData: data_dicts = [row.to_dict() for _, row in df.iterrows()] conn.execute(sql, data_dicts) - logging.info("数据已成功写入数据库。") + logger.info("数据已成功写入数据库。") except Exception as e: - logging.error(f"数据库连接或写入失败: {e}") + logger.error(f"数据库连接或写入失败: {e}") def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000): """ @@ -149,7 +148,7 @@ class DBData: 适用场景:大数据量(>10万条) """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return df = df[self.columns] @@ -180,12 +179,12 @@ class DBData: # 删除临时表 conn.execute(text(f"DROP TABLE IF EXISTS {temp_table_name}")) - logging.info( + logger.info( f"已处理 {min(i+chunk_size, total_rows)}/{total_rows} 条记录" ) - logging.info("数据已成功写入数据库。") + logger.info("数据已成功写入数据库。") except Exception as e: - logging.error(f"数据库连接或写入失败: {e}") + logger.error(f"数据库连接或写入失败: {e}") def insert_data_to_mysql_simple(self, df: pd.DataFrame): """ @@ -195,7 +194,7 @@ class DBData: 注意:会抛出重复键错误,需要额外处理 """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return df = df[self.columns] @@ -208,9 +207,9 @@ class DBData: index=False, method="multi", ) - logging.info("数据已成功写入数据库。") + logger.info("数据已成功写入数据库。") except Exception as e: - logging.error(f"数据库连接或写入失败: {e}") + logger.error(f"数据库连接或写入失败: {e}") def query_data(self, sql: str, condition_dict: dict, return_multi: bool = True): """ @@ -245,5 +244,5 @@ class DBData: else: return None except Exception as e: - logging.error(f"查询数据出错: {e}") + logger.error(f"查询数据出错: {e}") return None diff --git a/core/db/db_market_data.py b/core/db/db_market_data.py index b35b41f..d922c95 100644 --- a/core/db/db_market_data.py +++ b/core/db/db_market_data.py @@ -1,9 +1,9 @@ import pandas as pd -import logging +import core.logger as logging from core.db.db_manager import DBData from core.utils import transform_date_time_to_timestamp -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") +logger = logging.logger class DBMarketData: @@ -78,7 +78,7 @@ class DBMarketData: :param df: K线数据DataFrame """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql(df) @@ -92,7 +92,7 @@ class DBMarketData: :param df: K线数据DataFrame """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_fast(df) @@ -107,7 +107,7 @@ class DBMarketData: :param chunk_size: 分块大小 """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) @@ -120,7 +120,7 @@ class DBMarketData: 注意:会抛出重复键错误,需要额外处理 """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_simple(df) @@ -489,12 +489,12 @@ class DBMarketData: if start is not None: start = transform_date_time_to_timestamp(start) if start is None: - logging.warning(f"开始时间格式错误: {start}") + logger.warning(f"开始时间格式错误: {start}") return None if end is not None: end = transform_date_time_to_timestamp(end) if end is None: - logging.warning(f"结束时间格式错误: {end}") + logger.warning(f"结束时间格式错误: {end}") return None if start is not None and end is not None: if start > end: diff --git a/core/db/db_market_monitor.py b/core/db/db_market_monitor.py index e578459..5e31045 100644 --- a/core/db/db_market_monitor.py +++ b/core/db/db_market_monitor.py @@ -1,10 +1,10 @@ import pandas as pd -import logging +import core.logger as logging from typing import Optional, List, Dict, Any, Union from core.db.db_manager import DBData from core.utils import transform_date_time_to_timestamp -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") +logger = logging.logger class DBMarketMonitor: @@ -94,7 +94,7 @@ class DBMarketMonitor: :param df: 市场监控数据DataFrame """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql(df) @@ -108,7 +108,7 @@ class DBMarketMonitor: :param df: 市场监控数据DataFrame """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_fast(df) @@ -123,7 +123,7 @@ class DBMarketMonitor: :param chunk_size: 每块大小 """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) @@ -137,7 +137,7 @@ class DBMarketMonitor: :param df: 市场监控数据DataFrame """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_simple(df) @@ -432,8 +432,8 @@ class DBMarketMonitor: result = conn.execute(text(sql), condition_dict) conn.commit() deleted_count = result.rowcount - logging.info(f"删除了 {deleted_count} 条旧的市场监控数据") + logger.info(f"删除了 {deleted_count} 条旧的市场监控数据") return deleted_count except Exception as e: - logging.error(f"删除旧数据时发生错误: {e}") + logger.error(f"删除旧数据时发生错误: {e}") return 0 \ No newline at end of file diff --git a/core/db/db_trade_data.py b/core/db/db_trade_data.py index ead917f..775d655 100644 --- a/core/db/db_trade_data.py +++ b/core/db/db_trade_data.py @@ -1,10 +1,10 @@ import pandas as pd -import logging +import core.logger as logging from typing import Optional, List, Dict, Any, Union from core.db.db_manager import DBData from core.utils import transform_date_time_to_timestamp -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") +logger = logging.logger class DBTradeData: @@ -88,7 +88,7 @@ class DBTradeData: :param df: 交易数据DataFrame """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql(df) @@ -102,7 +102,7 @@ class DBTradeData: :param df: 交易数据DataFrame """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_fast(df) @@ -117,7 +117,7 @@ class DBTradeData: :param chunk_size: 分块大小 """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_chunk(df, chunk_size) @@ -130,7 +130,7 @@ class DBTradeData: 注意:会抛出重复键错误,需要额外处理 """ if df is None or df.empty: - logging.warning("DataFrame为空,无需写入数据库。") + logger.warning("DataFrame为空,无需写入数据库。") return self.db_manager.insert_data_to_mysql_simple(df) diff --git a/core/logger.py b/core/logger.py new file mode 100644 index 0000000..62df942 --- /dev/null +++ b/core/logger.py @@ -0,0 +1,72 @@ +import logging +import time +from logging.handlers import TimedRotatingFileHandler +import os +''' +日志工具类 +''' + + +class Logger: + def __init__(self): + # log文件存储路径 + current_dir = os.getcwd() + if current_dir.endswith('crypto_quant'): + output_folder = r'./output/log/' + elif current_dir.startswith(r'/root/'): + output_folder = r'/root/crypto_quant/output/log/' + else: + output_folder = r'./output/log/' + os.makedirs(output_folder, exist_ok=True) + # add self._log_filename to be adata_yyyyMMddHHmm.log + self._log_filename = os.path.join(output_folder, 'crypto_monitor_{}.log'.format(time.strftime("%Y%m%d%H%M%S", time.localtime()))) + + # self._log_filename = os.path.join(output_folder, 'adata.log') + + ''' + %(levelno)s: 打印日志级别的数值 + %(levelname)s: 打印日志级别名称 + %(pathname)s: 打印当前执行程序的路径,其实就是sys.argv[0] + %(filename)s: 打印当前执行程序名 + %(funcName)s: 打印日志的当前函数 + %(lineno)d: 打印日志的当前行号 + %(asctime)s: 打印日志的时间 + %(thread)d: 打印线程ID + %(threadName)s: 打印线程名称 + %(process)d: 打印进程ID + %(message)s: 打印日志信息 + ''' + logging.basicConfig() + # 日志信息输出格式 + self._formatter = logging.Formatter('%(asctime)s - %(process)d - %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + # 创建一个日志对象 + self._logger = logging.getLogger() + # self.set_console_logger() + self.set_file_logger() + self._logger.setLevel(logging.INFO) + + def set_console_logger(self): + '''设置控制台日志输出''' + console_handler = logging.StreamHandler() + console_handler.setFormatter(self._formatter) + console_handler.setLevel(logging.INFO) + self._logger.addHandler(console_handler) + + def set_file_logger(self): + '''设置日志文件输出''' + log_file_handler = TimedRotatingFileHandler(filename=self._log_filename, + when="D", + interval=1, + backupCount=3, + encoding='utf-8') + log_file_handler.setFormatter(self._formatter) + log_file_handler.setLevel(logging.INFO) + # log_file_handler.suffix = "%Y%m%d_%H%M%S.log" + self._logger.addHandler(log_file_handler) + + def get_logger(self): + return self._logger + + +logger = Logger().get_logger() diff --git a/core/statistics/price_volume_stats.py b/core/statistics/price_volume_stats.py index 7f8489e..33e34fd 100644 --- a/core/statistics/price_volume_stats.py +++ b/core/statistics/price_volume_stats.py @@ -1,4 +1,4 @@ -import logging +import core.logger as logging import os import pandas as pd import numpy as np @@ -19,9 +19,7 @@ from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp # seaborn支持中文 plt.rcParams["font.family"] = ["SimHei"] -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) +logger = logging.logger class PriceVolumeStats: @@ -61,7 +59,7 @@ class PriceVolumeStats: price_volume_stats_list = [] earliest_market_timestamp = None latest_market_timestamp = None - logging.info( + logger.info( f"开始统计{self.initial_date}到{self.end_date}的价格、成交量、成交量与高价低价关系数据" ) for symbol in self.symbol_list: @@ -86,22 +84,22 @@ class PriceVolumeStats: if data["timestamp"].iloc[-1] > latest_market_timestamp: latest_market_timestamp = data["timestamp"].iloc[-1] # 统计高成交量小时分布 - logging.info(f"统计{symbol} {bar} 巨量小时分布数据") + logger.info(f"统计{symbol} {bar} 巨量小时分布数据") high_volume_hours_data = self.stats_high_volume_hours(data) high_volume_hours_list.append(high_volume_hours_data) huge_high_volume_hours_data = self.stats_high_volume_hours(data, 4) huge_high_volume_hours_list.append(huge_high_volume_hours_data) - logging.info(f"统计{symbol} {bar} 价格数据") + logger.info(f"统计{symbol} {bar} 价格数据") price_stats_data = self.calculate_price_statistics(data) - logging.info(f"统计{symbol} {bar} 涨跌百分比数据") + logger.info(f"统计{symbol} {bar} 涨跌百分比数据") pct_change_stats_data = self.calculate_pct_change_statistics(data) - logging.info(f"统计{symbol} {bar} 价格变化峰值和谷值数据") + logger.info(f"统计{symbol} {bar} 价格变化峰值和谷值数据") peak_valley_data, peak_valley_stats_data = ( self.calculate_price_change_peak_valley_statistics(data) ) - logging.info(f"统计{symbol} {bar} 成交量数据") + logger.info(f"统计{symbol} {bar} 成交量数据") volume_stats_data = self.calculate_volume_statistics(data) - logging.info(f"统计{symbol} {bar} 成交量与高价低价关系数据") + logger.info(f"统计{symbol} {bar} 成交量与高价低价关系数据") price_volume_stats_data = self.calculate_price_volume_statistics( data ) @@ -136,7 +134,7 @@ class PriceVolumeStats: latest_market_date_time = re.sub(r"[\:\-\s]", "", str(latest_market_date_time)) output_file_name = f"price_volume_stats_window_size_{self.window_size}_from_{earliest_market_date_time}_to_{latest_market_date_time}.xlsx" output_file_path = os.path.join(self.stats_output_dir, output_file_name) - logging.info(f"导出{output_file_path}") + logger.info(f"导出{output_file_path}") with pd.ExcelWriter(output_file_path) as writer: price_stats_df.to_excel(writer, sheet_name="价格统计", index=False) pct_change_stats_df.to_excel( @@ -600,7 +598,7 @@ class PriceVolumeStats: } } """ - logging.info(f"将图表输出到{excel_file_path}") + logger.info(f"将图表输出到{excel_file_path}") # 打开已经存在的Excel文件 wb = openpyxl.load_workbook(excel_file_path) @@ -639,7 +637,7 @@ class PriceVolumeStats: chart_rows + 5 ) # Add 5 rows for padding between charts except Exception as e: - logging.error(f"输出Excel Sheet {sheet_name} 失败: {e}") + logger.error(f"输出Excel Sheet {sheet_name} 失败: {e}") continue # Save Excel file wb.save(excel_file_path) diff --git a/core/utils.py b/core/utils.py index 919a1c5..8aeecf4 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,9 +1,9 @@ from datetime import datetime, timezone, timedelta from decimal import Decimal import re -import logging +import core.logger as logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") +logger = logging.logger def datetime_to_timestamp(date_str: str) -> int: """ @@ -61,11 +61,11 @@ def transform_date_time_to_timestamp(date_time: int | str): else: date_time = check_date_time_format(date_time) if date_time is None: - logging.error(f"日期时间格式错误: {date_time}") + logger.error(f"日期时间格式错误: {date_time}") return None # 按北京时间字符串处理,转换为毫秒级timestamp date_time = datetime_to_timestamp(date_time) return date_time except Exception as e: - logging.error(f"start参数解析失败: {e}") + logger.error(f"start参数解析失败: {e}") return None \ No newline at end of file diff --git a/core/wechat.py b/core/wechat.py index a47c0ae..ad4ea3d 100644 --- a/core/wechat.py +++ b/core/wechat.py @@ -4,11 +4,11 @@ 但需要管理员提供企业id以及secret信息 通过wechatpy库实现 """ -import logging +import core.logger as logging import requests from config import WECHAT_CONFIG -logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') +logger = logging.logger class Wechat: def __init__(self): diff --git a/huge_volume_main.py b/huge_volume_main.py index 67b0b32..7e5b8aa 100644 --- a/huge_volume_main.py +++ b/huge_volume_main.py @@ -5,16 +5,14 @@ from core.db.db_huge_volume_data import DBHugeVolumeData from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp from market_data_main import MarketDataMain from core.wechat import Wechat -import logging +import core.logger as logging from config import MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE from datetime import datetime, timedelta import pandas as pd import os import re -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) +logger = logging.logger class HugeVolumeMain: @@ -55,9 +53,9 @@ class HugeVolumeMain: is_update=False, ) if data is not None and len(data) > 0: - logging.info(f"此次初始化巨量交易数据: {len(data)}条") + logger.info(f"此次初始化巨量交易数据: {len(data)}条") else: - logging.info(f"此次初始化巨量交易数据为空") + logger.info(f"此次初始化巨量交易数据为空") def detect_volume_spike( self, @@ -75,20 +73,20 @@ class HugeVolumeMain: ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - logging.info( + logger.info( f"开始处理巨量交易数据: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) data = self.db_market_data.query_market_data_by_symbol_bar( symbol, bar, start, end ) if data is None: - logging.warning( + logger.warning( f"获取行情数据失败: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) return None else: if len(data) == 0: - logging.warning( + logger.warning( f"获取行情数据为空: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) return None @@ -119,7 +117,7 @@ class HugeVolumeMain: if data is not None and len(data) > 0: self.db_huge_volume_data.insert_data_to_mysql(data) else: - logging.warning( + logger.warning( f"此次处理巨量交易数据为空: {symbol} {bar} {start} {end}" ) return data @@ -162,15 +160,15 @@ class HugeVolumeMain: only_output_huge_volume=False, is_update=True, ) - logging.info( + logger.info( f"更新巨量交易数据: {symbol} {bar} 窗口大小: {window_size} 从 {earliest_date_time} 到 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) if data is not None and len(data) > 0: - logging.info(f"此次更新巨量交易数据: {len(data)}条") + logger.info(f"此次更新巨量交易数据: {len(data)}条") else: - logging.info(f"此次更新巨量交易数据为空") + logger.info(f"此次更新巨量交易数据为空") except Exception as e: - logging.error( + logger.error( f"更新巨量交易数据失败: {symbol} {bar} 窗口大小: {window_size} 从 {earliest_date_time} 到 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}" ) @@ -234,7 +232,7 @@ class HugeVolumeMain: if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") periods_text = ", ".join([str(period) for period in periods]) - logging.info( + logger.info( f"开始计算巨量出现后,之后{periods_text}个周期,上涨或下跌的比例: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) volume_statistics_data = ( @@ -243,7 +241,7 @@ class HugeVolumeMain: ) ) if volume_statistics_data is None or len(volume_statistics_data) == 0: - logging.warning( + logger.warning( f"获取巨量交易数据为空: {symbol} {bar} 窗口大小: {window_size} 从 {start} 到 {end}" ) return None @@ -299,12 +297,12 @@ class HugeVolumeMain: start_date_time = timestamp_to_datetime(start_timestamp) end_date_time = timestamp_to_datetime(end_timestamp) - logging.info(f"开始获取巨量交易数据: {start} 到 {end}") + logger.info(f"开始获取巨量交易数据: {start} 到 {end}") huge_volume_data = self.db_huge_volume_data.query_huge_volume_records( start=start_timestamp, end=end_timestamp ) if huge_volume_data is None or len(huge_volume_data) == 0: - logging.warning(f"获取巨量交易数据为空: {start} 到 {end}") + logger.warning(f"获取巨量交易数据为空: {start} 到 {end}") return else: if isinstance(huge_volume_data, list): @@ -325,7 +323,7 @@ class HugeVolumeMain: by=["symbol", "bar", "window_size", "timestamp"], ascending=True ) huge_volume_data = huge_volume_data.reset_index(drop=True) - logging.info(f"获取巨量交易数据: {len(huge_volume_data)}条") + logger.info(f"获取巨量交易数据: {len(huge_volume_data)}条") contents = [] contents.append(f"# 放量交易数据: {start_date_time} 到 {end_date_time}") symbol_list = huge_volume_data["symbol"].unique() @@ -383,7 +381,7 @@ class HugeVolumeMain: # 获得text的字节数 text_length = len(text.encode("utf-8")) - logging.info(f"发送巨量交易数据到微信,字节数: {text_length}") + logger.info(f"发送巨量交易数据到微信,字节数: {text_length}") # with open(os.path.join(self.output_folder, "huge_volume_data.md"), "w", encoding="utf-8") as f: # f.write(text) wechat = Wechat() @@ -438,7 +436,7 @@ class HugeVolumeMain: writer, sheet_name="next_periods_statistics", index=False ) except Exception as e: - logging.error(f"导出Excel文件失败: {e}") + logger.error(f"导出Excel文件失败: {e}") return total_huge_volume_data, total_result_data def plot_huge_volume_data( @@ -493,10 +491,10 @@ def test_send_huge_volume_data_to_wechat(): huge_volume_main = HugeVolumeMain(threshold=2.0) # 获得昨天日期 yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") - logging.info(f"昨天日期: {yesterday}") + logger.info(f"昨天日期: {yesterday}") # 获得今天日期 today = datetime.now().strftime("%Y-%m-%d") - logging.info(f"今天日期: {today}") + logger.info(f"今天日期: {today}") huge_volume_main.send_huge_volume_data_to_wechat(start=yesterday, end=today) diff --git a/market_data_main.py b/market_data_main.py index 0de9d62..2aafc4c 100644 --- a/market_data_main.py +++ b/market_data_main.py @@ -1,4 +1,4 @@ -import logging +import core.logger as logging from datetime import datetime from time import sleep import pandas as pd @@ -21,7 +21,7 @@ from config import ( BAR_THRESHOLD, ) -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") +logger = logging.logger class MarketDataMain: @@ -59,7 +59,7 @@ class MarketDataMain: """ for symbol in self.symbols: for bar in self.bars: - logging.info(f"开始初始化行情数据: {symbol} {bar}") + logger.info(f"开始初始化行情数据: {symbol} {bar}") latest_data = self.db_market_data.query_latest_data(symbol, bar) if latest_data: start = latest_data.get("timestamp") @@ -68,7 +68,7 @@ class MarketDataMain: else: start = datetime_to_timestamp(self.initial_date) start_date_time = self.initial_date - logging.info( + logger.info( f"开始初始化{symbol}, {bar} 行情数据,从 {start_date_time} 开始" ) self.fetch_save_data(symbol, bar, start) @@ -80,12 +80,12 @@ class MarketDataMain: end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") end_time_ts = transform_date_time_to_timestamp(end_time) if end_time_ts is None: - logging.error(f"结束时间格式错误: {end_time}") + logger.error(f"结束时间格式错误: {end_time}") return None start_time_ts = transform_date_time_to_timestamp(start) if start_time_ts is None: - logging.error(f"开始时间格式错误: {start}") + logger.error(f"开始时间格式错误: {start}") return None # 如果bar为5m, 15m, 30m: @@ -110,7 +110,7 @@ class MarketDataMain: current_start_time_ts = start_time_ts start_date_time = timestamp_to_datetime(current_start_time_ts) end_date_time = timestamp_to_datetime(end_time_ts) - logging.info( + logger.info( f"获取行情数据: {symbol} {bar} 从 {start_date_time} 到 {end_date_time}" ) data = self.market_data.get_historical_kline_data( @@ -151,7 +151,7 @@ class MarketDataMain: # data.loc[index, "buy_sz"] = current_buy_sz # data.loc[index, "sell_sz"] = current_sell_sz # except Exception as e: - # logging.error(f"设置buy_sz和sell_sz失败: {e}") + # logger.error(f"设置buy_sz和sell_sz失败: {e}") # continue if data is not None and len(data) > 0: data = data[ @@ -184,7 +184,7 @@ class MarketDataMain: if min_start_time_ts is not None and get_data: # 补充技术指标数据 # 获得min_start_time_ts之前30条数据 - logging.info(f"开始补充技术指标数据: {symbol} {bar}") + logger.info(f"开始补充技术指标数据: {symbol} {bar}") before_data = self.db_market_data.query_data_before_timestamp( symbol, bar, min_start_time_ts, 30 ) @@ -199,7 +199,7 @@ class MarketDataMain: ) if handle_data is not None: if before_data is not None and len(handle_data) <= len(before_data): - logging.error(f"handle_data数据条数小于before_data数据条数: {symbol} {bar}") + logger.error(f"handle_data数据条数小于before_data数据条数: {symbol} {bar}") return None if isinstance(handle_data, list): handle_data = pd.DataFrame(handle_data) @@ -208,14 +208,14 @@ class MarketDataMain: elif isinstance(handle_data, pd.DataFrame): pass else: - logging.error(f"handle_data类型错误: {type(handle_data)}") + logger.error(f"handle_data类型错误: {type(handle_data)}") return None handle_data = self.calculate_metrics(handle_data) if latest_before_timestamp is not None: handle_data = handle_data[handle_data["timestamp"] > latest_before_timestamp] handle_data.reset_index(drop=True, inplace=True) - logging.info(f"开始保存技术指标数据: {symbol} {bar}") + logger.info(f"开始保存技术指标数据: {symbol} {bar}") self.db_market_data.insert_data_to_mysql(handle_data) return data @@ -350,21 +350,21 @@ class MarketDataMain: """ 更新数据 """ - logging.info(f"开始更新行情数据: {symbol} {bar}") + logger.info(f"开始更新行情数据: {symbol} {bar}") latest_data = self.db_market_data.query_latest_data(symbol, bar) if not latest_data: - logging.info(f"{symbol}, {bar} 无数据,开始从{self.initial_date}初始化数据") + logger.info(f"{symbol}, {bar} 无数据,开始从{self.initial_date}初始化数据") data = self.fetch_save_data(symbol, bar, self.initial_date) else: latest_timestamp = latest_data.get("timestamp") if latest_timestamp: latest_timestamp = int(latest_timestamp) latest_date_time = timestamp_to_datetime(latest_timestamp) - logging.info( + logger.info( f"{symbol}, {bar} 上次获取的最新数据时间: {latest_date_time}" ) else: - logging.warning(f"获取{symbol}, {bar} 最新数据失败") + logger.warning(f"获取{symbol}, {bar} 最新数据失败") return data = self.fetch_save_data(symbol, bar, latest_timestamp + 1) return data @@ -373,7 +373,7 @@ class MarketDataMain: """ 批量计算技术指标 """ - logging.info("开始批量计算技术指标") + logger.info("开始批量计算技术指标") start_date_time = MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-15 00:00:00" ) @@ -382,14 +382,14 @@ class MarketDataMain: current_timestamp = transform_date_time_to_timestamp(current_date_time) for symbol in self.symbols: for bar in self.bars: - logging.info(f"开始计算技术指标: {symbol} {bar}") + logger.info(f"开始计算技术指标: {symbol} {bar}") data = self.db_market_data.query_market_data_by_symbol_bar( symbol=symbol, bar=bar, start=start_timestamp - 1, end=current_timestamp ) if data is not None and len(data) > 0: data = pd.DataFrame(data) data = self.calculate_metrics(data) - logging.info(f"开始保存技术指标数据: {symbol} {bar}") + logger.info(f"开始保存技术指标数据: {symbol} {bar}") self.db_market_data.insert_data_to_mysql(data) diff --git a/market_monitor_main.py b/market_monitor_main.py index 36fc570..3782b2f 100644 --- a/market_monitor_main.py +++ b/market_monitor_main.py @@ -6,16 +6,15 @@ from core.db.db_market_monitor import DBMarketMonitor from core.wechat import Wechat from config import MONITOR_CONFIG, MYSQL_CONFIG from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp +import core.logger as logging -import logging import os import pandas as pd from datetime import datetime, timedelta import json import re -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") - +logger = logging.logger class MarketMonitorMain: def __init__(self): @@ -58,7 +57,7 @@ class MarketMonitorMain: json.dump({}, f, ensure_ascii=False, indent=4) return {} except Exception as e: - logging.error(f"获取最后一次报表生成记录失败: {e}") + logger.error(f"获取最后一次报表生成记录失败: {e}") return {} def monitor_realtime_market( @@ -85,7 +84,7 @@ class MarketMonitorMain: ) if real_time_data is None or len(real_time_data) == 0: - logging.error(f"获取最新市场数据失败: {symbol}, {bar}") + logger.error(f"获取最新市场数据失败: {symbol}, {bar}") return latest_realtime_timestamp = real_time_data["timestamp"].iloc[-1] @@ -102,22 +101,22 @@ class MarketMonitorMain: latest_record_timestamp is not None and latest_realtime_timestamp <= latest_record_timestamp ): - logging.info( + logger.info( f"最新市场数据时间戳 {latest_reatime_datetime} 小于等于最新记录时间戳 {latest_record_datetime}, 不进行监控" ) return - logging.info( + logger.info( f"最新市场数据时间 {latest_reatime_datetime}, 上一次记录时间 {latest_record_datetime}" ) else: - logging.info( + logger.info( f"最新市场数据时间 {latest_reatime_datetime}, 上一次记录时间为空" ) real_time_data = self.market_data_main.add_new_columns(real_time_data) - logging.info(f"开始计算技术指标: {symbol} {bar}") + logger.info(f"开始计算技术指标: {symbol} {bar}") real_time_data = self.market_data_main.calculate_metrics(real_time_data) - logging.info(f"开始计算大成交量: {symbol} {bar} 窗口大小: {self.window_size}") + logger.info(f"开始计算大成交量: {symbol} {bar} 窗口大小: {self.window_size}") real_time_data = self.huge_volume_main.huge_volume.detect_huge_volume( data=real_time_data, window_size=self.window_size, @@ -127,7 +126,7 @@ class MarketMonitorMain: output_excel=False, ) if real_time_data is None or len(real_time_data) == 0: - logging.error( + logger.error( f"计算大成交量失败: {symbol} {bar} 窗口大小: {self.window_size}" ) return @@ -135,27 +134,27 @@ class MarketMonitorMain: if only_output_huge_volume: if realtime_row["huge_volume"] == 1: - logging.info(f"监控到巨量: {symbol} {bar} 窗口大小: {self.window_size}") + logger.info(f"监控到巨量: {symbol} {bar} 窗口大小: {self.window_size}") if only_output_over_mean_volume: # 获得huge_volume==1时的volume_ratio的均量 mean_huge_volume_ratio = real_time_data[real_time_data["huge_volume"] == 1]["volume_ratio"].mean() if realtime_row["volume_ratio"] >= mean_huge_volume_ratio: - logging.info(f"监控到巨量且超过均量: {symbol} {bar} 窗口大小: {self.window_size}") + logger.info(f"监控到巨量且超过均量: {symbol} {bar} 窗口大小: {self.window_size}") else: - logging.info( + logger.info( f"监控到巨量但未超过均量: {symbol} {bar} 窗口大小: {self.window_size},退出本次监控" ) return else: - logging.info( + logger.info( f"监控到非巨量: {symbol} {bar} 窗口大小: {self.window_size},退出本次监控" ) return if only_output_rise: if realtime_row["pct_chg"] > 0: - logging.info(f"监控到上涨: {symbol} {bar} 窗口大小: {self.window_size}") + logger.info(f"监控到上涨: {symbol} {bar} 窗口大小: {self.window_size}") else: - logging.info( + logger.info( f"监控到下跌: {symbol} {bar} 窗口大小: {self.window_size},退出本次监控" ) return @@ -179,7 +178,7 @@ class MarketMonitorMain: ) text_length = len(report.encode("utf-8")) - logging.info(f"发送报告到企业微信,字节数: {text_length}") + logger.info(f"发送报告到企业微信,字节数: {text_length}") self.wechat.send_markdown(report) # remove punction in latest_reatime_datetime @@ -201,7 +200,7 @@ class MarketMonitorMain: "report_file_byte_size": report_file_byte_size, } report_data = pd.DataFrame([report_data]) - logging.info(f"插入数据到数据库") + logger.info(f"插入数据到数据库") self.db_market_monitor.insert_data_to_mysql(report_data) if self.latest_record.get(symbol, None) is None: @@ -224,7 +223,7 @@ class MarketMonitorMain: # 获得bar在self.market_data_main.bars中的索引 bar_index = self.market_data_main.bars.index(bar) if bar_index == len(self.market_data_main.bars) - 1: - logging.error(f"已经是最后一个bar: {bar}") + logger.error(f"已经是最后一个bar: {bar}") return None # 获得下一个bar bar = self.market_data_main.bars[bar_index + 1] @@ -233,10 +232,10 @@ class MarketMonitorMain: symbol=symbol, bar=bar, end_time=end_time, limit=100 ) if data is None or len(data) == 0: - logging.error(f"获取实时数据失败: {symbol}, {bar}") + logger.error(f"获取实时数据失败: {symbol}, {bar}") return None data = self.market_data_main.add_new_columns(data) - logging.info(f"开始计算技术指标: {symbol} {bar}") + logger.info(f"开始计算技术指标: {symbol} {bar}") data = self.market_data_main.calculate_metrics(data) row = data.iloc[-1] return row @@ -249,7 +248,7 @@ class MarketMonitorMain: ): for symbol in self.market_data_main.symbols: for bar in self.market_data_main.bars: - logging.info( + logger.info( f"开始监控: {symbol} {bar} 窗口大小: {self.window_size} 行情数据" ) try: @@ -261,7 +260,7 @@ class MarketMonitorMain: only_output_rise, ) except Exception as e: - logging.error( + logger.error( f"监控失败: {symbol} {bar} 窗口大小: {self.window_size} 行情数据: {e}" ) continue diff --git a/monitor_schedule.py b/monitor_schedule.py index f37bcaa..7bff3e9 100644 --- a/monitor_schedule.py +++ b/monitor_schedule.py @@ -1,21 +1,17 @@ from market_monitor_main import MarketMonitorMain -import logging -import time - -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") +import core.logger as logging +logger = logging.logger def monitor_schedule(): market_monitor_main = MarketMonitorMain() - logging.info("开始监控") - while True: # 每分钟监控一次 - market_monitor_main.batch_monitor_realtime_market( - only_output_huge_volume=True, - only_output_over_mean_volume=True, - only_output_rise=False, - ) - logging.info("本次循环监控结束,等待30秒") - time.sleep(30) + logger.info("开始监控") + market_monitor_main.batch_monitor_realtime_market( + only_output_huge_volume=True, + only_output_over_mean_volume=True, + only_output_rise=False, + ) + logger.info("本次循环监控结束") if __name__ == "__main__": monitor_schedule() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 4e58085..d07f186 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ requests>=2.25.0 sqlalchemy >= 2.0.41 pymysql >= 1.1.1 wechatpy >= 1.8.18 -seaborn >= 0.13.2 \ No newline at end of file +seaborn >= 0.13.2 +schedule >= 1.2.2 \ No newline at end of file diff --git a/statistics_main.py b/statistics_main.py index ff91349..584ca9e 100644 --- a/statistics_main.py +++ b/statistics_main.py @@ -1,7 +1,7 @@ from core.statistics.price_volume_stats import PriceVolumeStats -import logging +import core.logger as logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") +logger = logging.logger def main(): diff --git a/trade_data_main.py b/trade_data_main.py index c602570..49b7220 100644 --- a/trade_data_main.py +++ b/trade_data_main.py @@ -1,4 +1,4 @@ -import logging +import core.logger as logging import time from datetime import datetime, timedelta import pandas as pd @@ -13,7 +13,7 @@ from config import ( MYSQL_CONFIG, ) -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") +logger = logging.logger class TradeDataMain: @@ -65,26 +65,26 @@ class TradeDataMain: end_date_time = timestamp_to_datetime(end_time) # 如果db_earliest_time和db_latest_time存在,则需要调整start_time和end_time if db_earliest_time is None or db_latest_time is None: - logging.info(f"数据库无数据:从API获取交易数据: {symbol}, {start_date_time}, {end_date_time}, {limit}") + logger.info(f"数据库无数据:从API获取交易数据: {symbol}, {start_date_time}, {end_date_time}, {limit}") self.trade_data.get_history_trades(symbol, start_time, end_time, limit) else: if db_earliest_time > start_time: db_earliest_date_time = timestamp_to_datetime(db_earliest_time) - logging.info(f"从API补充最早数据:{symbol}, {start_date_time}, {db_earliest_date_time}") + logger.info(f"从API补充最早数据:{symbol}, {start_date_time}, {db_earliest_date_time}") self.trade_data.get_history_trades(symbol, start_time, db_earliest_time + 1, limit) if db_latest_time < end_time: db_latest_date_time = timestamp_to_datetime(db_latest_time) - logging.info(f"从API补充最新数据:{symbol}, {db_latest_date_time}, {end_date_time}") + logger.info(f"从API补充最新数据:{symbol}, {db_latest_date_time}, {end_date_time}") self.trade_data.get_history_trades(symbol, db_latest_time + 1, end_time, limit) final_data = self.trade_data.db_trade_data.query_trade_data_by_symbol(symbol=symbol, start=start_time, end=end_time) if final_data is not None and len(final_data) > 0: - logging.info(f"获取交易数据: {symbol}, {start_date_time}, {end_date_time}") + logger.info(f"获取交易数据: {symbol}, {start_date_time}, {end_date_time}") final_data = pd.DataFrame(final_data) final_data.sort_values(by="ts", inplace=True) final_data.reset_index(drop=True, inplace=True) return final_data else: - logging.info(f"获取交易数据失败: {symbol}, {start_date_time}, {end_date_time}") + logger.info(f"获取交易数据失败: {symbol}, {start_date_time}, {end_date_time}") return None diff --git a/trade_main.py b/trade_main.py index 5fdcb81..fc11a0b 100644 --- a/trade_main.py +++ b/trade_main.py @@ -1,10 +1,10 @@ -import logging +import core.logger as logging from time import sleep from core.biz.quant_trader import QuantTrader from core.biz.strategy import QuantStrategy from config import API_KEY, SECRET_KEY, PASSPHRASE, SANDBOX, TRADING_CONFIG, TIME_CONFIG -logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') +logger = logging.logger class BizMain: @@ -39,7 +39,7 @@ class BizMain: quantity = 1 try: # 1. 合约开空单流程 - logging.info("[1] 合约开空单流程:") + logger.info("[1] 合约开空单流程:") # price = self.trader.get_current_price(self.trader.symbol_swap) # logging.info(f"当前合约价格: {price}") slot = 0.01 @@ -49,40 +49,40 @@ class BizMain: # logging.info(f"所需保证金: {margin}, 开仓价格: {entry_price}") order_id, entry_price = self.trader.place_short_order(td_mode, quantity, leverage, slot, buffer_ratio) if order_id: - logging.info(f"开空单成功,订单ID: {order_id}, 开仓价格: {entry_price}") + logger.info(f"开空单成功,订单ID: {order_id}, 开仓价格: {entry_price}") else: - logging.error("开空单失败") + logger.error("开空单失败") except Exception as e: - logging.error(f"合约开空单流程异常: {e}") + logger.error(f"合约开空单流程异常: {e}") sleep(1) try: # 2. 现货卖出比特币流程 - logging.info(f"[2] 现货卖出{self.trader.symbol_prefix}流程:") + logger.info(f"[2] 现货卖出{self.trader.symbol_prefix}流程:") balance = self.trader.get_account_balance() btc_balance = balance.get(self.trader.symbol_prefix, 0) - logging.info(f"当前{self.trader.symbol_prefix}余额: {btc_balance}") + logger.info(f"当前{self.trader.symbol_prefix}余额: {btc_balance}") sell_amount = 0.01 if btc_balance >= sell_amount: order_id = self.trader.place_market_order('sell', sell_amount) if order_id: - logging.info(f"现货卖出{sell_amount}{self.trader.symbol_prefix}成功,订单ID: {order_id}") + logger.info(f"现货卖出{sell_amount}{self.trader.symbol_prefix}成功,订单ID: {order_id}") else: - logging.error(f"现货卖出{self.trader.symbol_prefix}失败") + logger.error(f"现货卖出{self.trader.symbol_prefix}失败") else: - logging.error(f"{self.trader.symbol_prefix}余额不足,无法卖出{sell_amount}{self.trader.symbol_prefix}") + logger.error(f"{self.trader.symbol_prefix}余额不足,无法卖出{sell_amount}{self.trader.symbol_prefix}") except Exception as e: - logging.error(f"现货卖出{self.trader.symbol_prefix}流程异常: {e}") + logger.error(f"现货卖出{self.trader.symbol_prefix}流程异常: {e}") sleep(1) try: # 3. 合约平空单流程 - logging.info("[3] 合约平空单流程:") + logger.info("[3] 合约平空单流程:") result = self.trader.close_short_order(td_mode, quantity) if result: - logging.info("平空单成功") + logger.info("平空单成功") else: - logging.error("平空单失败") + logger.error("平空单失败") except Exception as e: - logging.error(f"合约平空单流程异常: {e}") + logger.error(f"合约平空单流程异常: {e}") if __name__ == "__main__":