invoke python file by schedule directly, instead of import way. The purpose is to make each event to be totally new initialization action.

This commit is contained in:
blade 2025-08-15 11:37:06 +08:00
parent f929ff8a2f
commit a12f34e80c
36 changed files with 434 additions and 336 deletions

28
auto_schedule.py Normal file
View File

@ -0,0 +1,28 @@
import schedule
import time
import datetime
import core.logger as logging
import subprocess
import os
logger = logging.logger
# 定义要执行的任务
def run_script():
start_time = time.time()
logger.info(f"Executing script at: {datetime.datetime.now()}")
output_file = r'./output/auto_schedule.txt'
with open(output_file, 'a') as f:
f.write(f"Task ran at {datetime.datetime.now()}\n")
python_path = r"D:\miniconda3\envs\okx\python.exe"
script_path = r"D:\python_projects\crypto_quant\monitor_schedule.py"
subprocess.run([python_path, script_path])
end_time = time.time()
logger.info(f"Script execution time: {end_time - start_time} seconds")
# 设置每20秒运行一次
schedule.every(20).seconds.do(run_script)
# 保持程序运行并检查调度
logger.info("Scheduler started. Press Ctrl+C to stop.")
while True:
schedule.run_pending()
time.sleep(1)

View File

@ -1,5 +1,5 @@
from pandas import DataFrame
import logging
import core.logger as logging
import os
import re
import pandas as pd
@ -7,9 +7,7 @@ from datetime import datetime
from copy import deepcopy
from typing import Optional, List, Dict, Any, Tuple
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.logger
class HugeVolume:
@ -81,11 +79,11 @@ class HugeVolume:
DataFrame: 包含异常检测结果的DataFrame
"""
if data is None or len(data) == 0:
logging.warning("数据为空,无法进行成交量异常检测")
logger.warning("数据为空,无法进行成交量异常检测")
return None
if "volume" not in data.columns:
logging.error("数据中缺少volume列")
logger.error("数据中缺少volume列")
return None
# 按时间戳排序
@ -115,13 +113,13 @@ class HugeVolume:
# 如果check_price为True检查价格分位数
if check_price:
if "close" not in data.columns:
logging.error("数据中缺少close列无法进行价格检查")
logger.error("数据中缺少close列无法进行价格检查")
return data
if "high" not in data.columns:
logging.error("数据中缺少high列无法进行价格检查")
logger.error("数据中缺少high列无法进行价格检查")
return data
if "low" not in data.columns:
logging.error("数据中缺少low列无法进行价格检查")
logger.error("数据中缺少low列无法进行价格检查")
return data
for price_column in ["close", "high", "low"]:
@ -137,7 +135,7 @@ class HugeVolume:
if output_excel:
# 检查数据是否为空
if len(data) == 0:
logging.warning("数据为空无法导出Excel文件")
logger.warning("数据为空无法导出Excel文件")
return data
start_date = data["date_time"].iloc[0]
@ -154,7 +152,7 @@ class HugeVolume:
) as writer:
data.to_excel(writer, sheet_name="volume_spike", index=False)
except Exception as e:
logging.error(f"导出Excel文件失败: {e}")
logger.error(f"导出Excel文件失败: {e}")
return data

View File

@ -6,7 +6,7 @@ import seaborn as sns
from openpyxl import Workbook
from openpyxl.drawing.image import Image
from PIL import Image as PILImage
import logging
import core.logger as logging
from datetime import datetime
import pandas as pd
import os
@ -14,9 +14,7 @@ import re
import openpyxl
from openpyxl.styles import Font
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.logger
sns.set_theme(style="whitegrid")
# 设置中文
@ -140,7 +138,7 @@ class HugeVolumeChart:
for symbol in self.symbol_list:
charts_dict[f"{symbol}_{ratio_column}_heatmap"] = {}
for price_type in self.price_type_list:
logging.info(f"绘制{symbol} {price_type} {ratio_column}热力图")
logger.info(f"绘制{symbol} {price_type} {ratio_column}热力图")
df = self.data[(self.data["symbol"] == symbol) & (self.data["price_type"] == price_type)]
pivot_table = df.pivot_table(values=ratio_column, index='window_size', columns='bar', aggfunc='mean')
plt.figure(figsize=(10, 6))
@ -323,7 +321,7 @@ class HugeVolumeChart:
"""
绘制价格上涨下跌图
"""
logging.info(f"绘制价格上涨下跌图: {prefix}")
logger.info(f"绘制价格上涨下跌图: {prefix}")
# 根据price_type_list得到各个price_type的平均rise_ratio平均fall_ratio平均draw_ratio, 平均average_return
price_type_data_dict = {}
for price_type in self.price_type_list:
@ -404,7 +402,7 @@ class HugeVolumeChart:
}
}
"""
logging.info(f"输出Excel文件包含所有{chart_type}图表")
logger.info(f"输出Excel文件包含所有{chart_type}图表")
file_name = f"huge_volume_{chart_type}_{datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx"
file_path = os.path.join(self.output_folder, file_name)
@ -444,7 +442,7 @@ class HugeVolumeChart:
# Update row offset (chart height + padding)
row_offset += chart_rows + 5 # Add 5 rows for padding between charts
except Exception as e:
logging.error(f"输出Excel Sheet {sheet_name} 失败: {e}")
logger.error(f"输出Excel Sheet {sheet_name} 失败: {e}")
continue
# Save Excel file
@ -456,4 +454,4 @@ class HugeVolumeChart:
try:
os.remove(chart_path)
except Exception as e:
logging.error(f"删除临时文件失败: {e}")
logger.error(f"删除临时文件失败: {e}")

View File

@ -1,12 +1,13 @@
import time
from datetime import datetime, timedelta
import logging
from typing import Optional
import pandas as pd
import okx.MarketData as Market
import okx.TradingData as TradingData
from core.utils import transform_date_time_to_timestamp
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
import core.logger as logging
logger = logging.logger
class MarketData:
def __init__(self,
@ -14,15 +15,18 @@ class MarketData:
secret_key: str,
passphrase: str,
sandbox: bool = True):
flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境
self.flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境
self.api_key = api_key
self.secret_key = secret_key
self.passphrase = passphrase
self.market_api = Market.MarketAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.trade_api = TradingData.TradingDataAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
flag=self.flag
)
# self.trade_api = TradingData.TradingDataAPI(
# api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
# flag=flag
# )
def get_realtime_kline_data(self, symbol: str = None, bar: str = '5m', end_time: int = None, limit: int = 50) -> Optional[pd.DataFrame]:
"""
@ -38,7 +42,7 @@ class MarketData:
else:
end_time = transform_date_time_to_timestamp(end_time)
if end_time is None:
logging.error(f"end_time参数解析失败: {end_time}")
logger.error(f"end_time参数解析失败: {end_time}")
return None
response = self.get_realtime_candlesticks_from_api(symbol, bar, end_time, limit)
if response:
@ -47,7 +51,7 @@ class MarketData:
to_time = int(candles[0][0])
from_time_str = pd.to_datetime(from_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
to_time_str = pd.to_datetime(to_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
logging.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}")
logger.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}")
columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"]
candles_pd = pd.DataFrame(candles, columns=columns)
for col in ['open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote']:
@ -67,7 +71,7 @@ class MarketData:
return candles_pd
else:
logging.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试")
logger.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试")
return None
@ -95,7 +99,7 @@ class MarketData:
else:
start_time = transform_date_time_to_timestamp(start)
if start_time is None:
logging.error(f"start参数解析失败: {start}")
logger.error(f"start参数解析失败: {start}")
return None
columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"]
all_data = []
@ -105,10 +109,10 @@ class MarketData:
# after真实逻辑是获得指定时间之前的数据
response = self.get_historical_candlesticks_from_api(symbol, bar, end_time, limit)
if response is None:
logging.warning(f"请求失败,请稍后再试")
logger.warning(f"请求失败,请稍后再试")
break
if response["code"] != "0" or not response["data"]:
logging.warning(f"请求失败或无数据: {response.get('msg', 'No message')}")
logger.warning(f"请求失败或无数据: {response.get('msg', 'No message')}")
break
candles = response["data"]
@ -120,14 +124,14 @@ class MarketData:
if latest_timestamp > from_time:
latest_timestamp = from_time
else:
logging.warning(f"上一次数据最早时间戳 {latest_timestamp} 小于等于 from_time {from_time} 停止获取数据")
logger.warning(f"上一次数据最早时间戳 {latest_timestamp} 小于等于 from_time {from_time} 停止获取数据")
break
from_time_str = pd.to_datetime(from_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
to_time_str = pd.to_datetime(to_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
logging.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}")
logger.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}")
if from_time < start_time:
start_time_str = pd.to_datetime(start_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
logging.warning(f"本轮获取{symbol}, {bar} 数据最早时间 {from_time_str} 早于 此次数据获取起始时间 {start_time_str} 停止获取数据")
logger.warning(f"本轮获取{symbol}, {bar} 数据最早时间 {from_time_str} 早于 此次数据获取起始时间 {start_time_str} 停止获取数据")
# candels中仅保留start_time之后的数据
candles = [candle for candle in candles if int(candle[0]) >= start_time]
if len(candles) > 0:
@ -147,7 +151,7 @@ class MarketData:
end_time = from_time - 1
time.sleep(0.5)
except Exception as e:
logging.error(f"请求出错: {e}")
logger.error(f"请求出错: {e}")
break
if all_data:
@ -169,14 +173,14 @@ class MarketData:
df = df[['symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote', 'create_time']]
df.sort_values('timestamp', inplace=True)
df.reset_index(drop=True, inplace=True)
logging.info(f"总计获取 {len(df)} 条 K 线数据仅confirm=1")
logger.info(f"总计获取 {len(df)} 条 K 线数据仅confirm=1")
# 获取df中date_time的最早时间与最新时间
earliest_time = df['date_time'].min()
latest_time = df['date_time'].max()
logging.info(f"本轮更新{symbol}, {bar} 数据最早时间: {earliest_time} 最新时间: {latest_time}")
logger.info(f"本轮更新{symbol}, {bar} 数据最早时间: {earliest_time} 最新时间: {latest_time}")
return df
else:
logging.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试")
logger.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试")
return None
def set_buy_and_sell_sz(self, symbol: str, candles: list, columns: list):
@ -201,6 +205,7 @@ class MarketData:
def get_historical_candlesticks_from_api(self, symbol, bar, end_time, limit):
response = None
count = 0
while True:
try:
response = self.market_api.get_history_candlesticks(
@ -212,8 +217,13 @@ class MarketData:
if response:
break
except Exception as e:
logging.error(f"请求出错: {e}")
logger.error(f"请求出错: {e}")
count += 1
# 重新初始化,目的是为了避免长时间运行,导致被平台段封禁
self.market_api = Market.MarketAPI(
api_key=self.api_key, api_secret_key=self.secret_key, passphrase=self.passphrase,
flag=self.flag
)
if count > 3:
break
time.sleep(10)
@ -233,8 +243,13 @@ class MarketData:
if response:
break
except Exception as e:
logging.error(f"请求出错: {e}")
logger.error(f"请求出错: {e}")
count += 1
# 重新初始化,目的是为了避免长时间运行,导致被平台段封禁
self.market_api = Market.MarketAPI(
api_key=self.api_key, api_secret_key=self.secret_key, passphrase=self.passphrase,
flag=self.flag
)
if count > 3:
break
time.sleep(5)

View File

@ -3,10 +3,9 @@ import numpy as np
from metrics_config import METRICS_CONFIG
from config import BAR_THRESHOLD
from time import time
import core.logger as logging
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
def create_metrics_report(
@ -29,16 +28,16 @@ def create_metrics_report(
date_time = row["date_time"]
if only_output_huge_volume:
if huge_volume == 1:
logging.info(
logger.info(
f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 巨量"
)
else:
logging.info(
logger.info(
f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 非巨量,此次不发送相关数据"
)
return
else:
logging.info(
logger.info(
f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time}"
)
@ -51,7 +50,7 @@ def create_metrics_report(
low = round(float(row["low"]), 10)
pct_chg = round(float(row["pct_chg"]), 4)
if only_output_rise and pct_chg < 0:
logging.info(
logger.info(
f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 下跌,不发送相关数据"
)
return
@ -63,9 +62,9 @@ def create_metrics_report(
else:
contents.append(f"## {brief} 交易量报告")
if now_datetime_str is not None:
contents.append(f"## {symbol} {bar} 滑动窗口: {window_size} 最新数据时间: {now_datetime_str}")
contents.append(f"## 滑动窗口: {window_size} 最新数据时间: {now_datetime_str}")
else:
contents.append(f"## {symbol} {bar} 滑动窗口: {window_size} 交易周期时间: {date_time}")
contents.append(f"## 滑动窗口: {window_size} 交易周期时间: {date_time}")
k_shape = str(row["k_shape"])
contents.append(f"### 价格信息")
contents.append(f"当前价格: {close}, 开盘价: {open}, 最高价: {high}, 最低价: {low}")
@ -160,6 +159,7 @@ def create_metrics_report(
if ma_divergence_value < 1:
long_short_info[""].append(f"均线形态: {ma_divergence}")
if is_short:
check_long_short = ""
if is_over_sell:
check_over_sell = "超卖"
else:
@ -354,7 +354,7 @@ def get_last_huge_volume_record(all_data: pd.DataFrame, bar: str, timestamp: int
results.append(f"最近十个周期内,出现巨量的次数: {huge_volume_in_ten_period_count}")
return results
except Exception as e:
logging.error(f"获取最近一次巨量记录信息失败: {e}")
logger.error(f"获取最近一次巨量记录信息失败: {e}")
results.append(f"获取最近一次巨量记录信息失败: {e}")
return results

View File

@ -45,15 +45,13 @@ data = metrics.set_ma_long_short_advanced(data, method="hybrid")
- "震荡"震荡市场建议观望或区间交易
"""
import logging
import core.logger as logging
import pandas as pd
import numpy as np
import talib as tb
from talib import MA_Type
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.logger
class MetricsCalculation:
@ -72,7 +70,7 @@ class MetricsCalculation:
return df
def macd(self, df: pd.DataFrame):
logging.info("计算MACD指标")
logger.info("计算MACD指标")
data = np.array(df.close)
ndata = len(data)
m, n, T = 12, 26, 9
@ -112,7 +110,7 @@ class MetricsCalculation:
return df
def kdj(self, df: pd.DataFrame):
logging.info("计算KDJ指标")
logger.info("计算KDJ指标")
low_list = df["low"].rolling(window=9).min()
low_list.fillna(value=df["low"].expanding().min(), inplace=True)
high_list = df["high"].rolling(window=9).max()
@ -149,7 +147,7 @@ class MetricsCalculation:
KDJ_K < 30, KDJ_D < 30, KDJ_J < 20: 超卖
否则为"徘徊"
"""
logging.info("设置KDJ形态")
logger.info("设置KDJ形态")
# 初始化kdj_pattern列
df["kdj_pattern"] = "徘徊"
@ -204,7 +202,7 @@ class MetricsCalculation:
使用20个周期的滚动窗口计算相对统计特征避免绝对阈值过于严格的问题
"""
logging.info("设置均线多空和发散")
logger.info("设置均线多空和发散")
# 通过趋势强度计算多空
# 震荡:不满足多空条件的其他情况
@ -265,7 +263,7 @@ class MetricsCalculation:
required_columns = ["timestamp", "close", "dif", "macd", "kdj_j"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
logger.info(f"缺少必要的列: {missing_columns}")
return df
# 按时间戳排序(升序)
@ -362,7 +360,7 @@ class MetricsCalculation:
required_columns = ["timestamp", "close", "dif", "macd", "kdj_j"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
logger.info(f"缺少必要的列: {missing_columns}")
return df
# 按时间戳排序(升序)
@ -416,7 +414,7 @@ class MetricsCalculation:
支持所有均线交叉类型5上穿10/20/3010上穿20/3020上穿30
以及对应的下穿信号30下穿20/10/5 20下穿10/510下穿5
"""
logging.info("计算均线指标")
logger.info("计算均线指标")
df["ma5"] = df["close"].rolling(window=5).mean().dropna()
df["ma10"] = df["close"].rolling(window=10).mean().dropna()
df["ma20"] = df["close"].rolling(window=20).mean().dropna()
@ -492,7 +490,7 @@ class MetricsCalculation:
return df
def rsi(self, df: pd.DataFrame):
logging.info("计算RSI指标")
logger.info("计算RSI指标")
df["rsi_14"] = tb.RSI(df["close"].values, timeperiod=14)
df["rsi_signal"] = ""
rsi_high = df["rsi_14"] > 70
@ -507,7 +505,7 @@ class MetricsCalculation:
return df
def boll(self, df: pd.DataFrame):
logging.info("计算BOLL指标")
logger.info("计算BOLL指标")
df["boll_upper"], df["boll_middle"], df["boll_lower"] = tb.BBANDS(
df["close"].values, timeperiod=20, matype=MA_Type.SMA
)
@ -524,7 +522,7 @@ class MetricsCalculation:
超卖价格接近下轨且KDJ超卖
震荡其他情况
"""
logging.info("设置BOLL形态")
logger.info("设置BOLL形态")
# 初始化boll_pattern列
df["boll_pattern"] = "震荡"
@ -532,7 +530,7 @@ class MetricsCalculation:
required_columns = ["close", "boll_upper", "boll_lower", "kdj_j"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
logger.info(f"缺少必要的列: {missing_columns}")
return df
# 计算价格与布林带的距离百分比
@ -600,7 +598,7 @@ class MetricsCalculation:
- K线实体或影线较长
- 超长K线实体和影线都很长
"""
logging.info("设置K线长度")
logger.info("设置K线长度")
# 检查必要的列是否存在
required_columns = ["close", "open", "high", "low"]
missing_columns = [col for col in required_columns if col not in df.columns]
@ -707,12 +705,12 @@ class MetricsCalculation:
- 超大实体实体占比70%-90%
- 光头光脚实体占比>90%非一字情况
"""
logging.info("设置K线形状")
logger.info("设置K线形状")
# 检查必要的列是否存在
required_columns = ["close", "open", "high", "low"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"缺少必要的列: {missing_columns}")
logger.info(f"缺少必要的列: {missing_columns}")
return df
# 计算K线的基本特征
@ -923,7 +921,7 @@ class MetricsCalculation:
- "statistical": 统计分布方法
- "hybrid": 混合方法
"""
logging.info(f"使用{method}方法设置均线多空")
logger.info(f"使用{method}方法设置均线多空")
if method == "weighted_voting":
return self._weighted_voting_method(data)
@ -936,7 +934,7 @@ class MetricsCalculation:
elif method == "hybrid":
return self._hybrid_method(data)
else:
logging.warning(f"未知的方法: {method},使用默认加权投票方法")
logger.warning(f"未知的方法: {method},使用默认加权投票方法")
return self._weighted_voting_method(data)
def _weighted_voting_method(self, data: pd.DataFrame):

View File

@ -3,10 +3,10 @@ import okx.Trade as Trade
import okx.MarketData as Market
import okx.PublicData as Public
import pandas as pd
import logging
import core.logger as logging
from typing import Optional
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.logger
class QuantTrader:
def __init__(self,
@ -60,20 +60,20 @@ class QuantTrader:
details = balance.get('details', [])
for detail in details:
if detail.get('ccy') == 'USDT':
logging.info(f"USDT余额: {detail.get('availBal')}")
logger.info(f"USDT余额: {detail.get('availBal')}")
result['USDT'] = float(detail.get('availBal', 0))
if detail.get('ccy') == self.symbol_prefix:
logging.info(f"{self.symbol_prefix}余额: {detail.get('availBal')}")
logger.info(f"{self.symbol_prefix}余额: {detail.get('availBal')}")
result[self.symbol_prefix] = float(detail.get('availBal', 0))
if detail.get('ccy') == self.symbol_swap:
logging.info(f"{self.symbol_swap}余额: {detail.get('availBal')}")
logger.info(f"{self.symbol_swap}余额: {detail.get('availBal')}")
result[self.symbol_swap] = float(detail.get('availBal', 0))
return result
else:
logging.error(f"获取余额失败: {search_result}")
logger.error(f"获取余额失败: {search_result}")
return {}
except Exception as e:
logging.error(f"获取余额异常: {e}")
logger.error(f"获取余额异常: {e}")
return {}
def get_current_price(self, symbol: str = None) -> Optional[float]:
@ -89,16 +89,16 @@ class QuantTrader:
data = result.get('data', [])
if data and 'last' in data[0]:
price = float(data[0]['last'])
logging.info(f"当前{symbol_prefix}价格: ${price:,.2f}")
logger.info(f"当前{symbol_prefix}价格: ${price:,.2f}")
return price
else:
logging.error(f"ticker数据格式异常: {data}")
logger.error(f"ticker数据格式异常: {data}")
return None
else:
logging.error(f"获取价格失败: {result}")
logger.error(f"获取价格失败: {result}")
return None
except Exception as e:
logging.error(f"获取价格异常: {e}")
logger.error(f"获取价格异常: {e}")
return None
def get_kline_data(self, symbol: str = None, after: str = None, before: str = None, bar: str = '1m', limit: int = 100) -> Optional[pd.DataFrame]:
@ -129,7 +129,7 @@ class QuantTrader:
if result.get('code') == '0':
data = result.get('data', [])
if not data:
logging.warning("K线数据为空")
logger.warning("K线数据为空")
return None
df = pd.DataFrame(data, columns=[
'timestamp', 'open', 'high', 'low', 'close',
@ -140,10 +140,10 @@ class QuantTrader:
df['timestamp'] = pd.to_datetime(df['timestamp'].astype(int), unit='ms', errors='coerce')
return df
else:
logging.error(f"获取K线数据失败: {result}")
logger.error(f"获取K线数据失败: {result}")
return None
except Exception as e:
logging.error(f"获取K线数据异常: {e}")
logger.error(f"获取K线数据异常: {e}")
return None
def place_market_order(self, side: str, size: float) -> Optional[str]:
@ -154,7 +154,7 @@ class QuantTrader:
if side == 'sell':
try:
if symbol_balance < size:
logging.error(f"{self.symbol_prefix}余额不足,目前余额: {symbol_balance}")
logger.error(f"{self.symbol_prefix}余额不足,目前余额: {symbol_balance}")
return None
result = self.trade_api.place_order(
instId=self.symbol,
@ -164,20 +164,20 @@ class QuantTrader:
sz=str(size)
)
if result.get('code') == '0':
logging.info(f"下单成功: {side} {size} {self.symbol_prefix}")
logger.info(f"下单成功: {side} {size} {self.symbol_prefix}")
return result['data'][0]['ordId']
else:
logging.error(f"下单失败: {result}")
logger.error(f"下单失败: {result}")
return None
except Exception as e:
logging.error(f"下单异常: {e}")
logger.error(f"下单异常: {e}")
return None
elif side == 'buy':
try:
instrument_result = self.public_api.get_instruments(instType="SPOT", instId=self.symbol)
instrument_data = instrument_result.get("data", [])
if not instrument_data:
logging.error(f"未获取到合约信息: {instrument_result}")
logger.error(f"未获取到合约信息: {instrument_result}")
return None
min_sz = float(instrument_data[0].get("minSz", 0))
if size < min_sz:
@ -186,7 +186,7 @@ class QuantTrader:
last_price = float(ticker["data"][0]["last"])
usdt_amount = float(last_price * size)
if usdt_balance < usdt_amount:
logging.error(f"USDT余额不足目前余额: {usdt_balance}")
logger.error(f"USDT余额不足目前余额: {usdt_balance}")
return None
result = self.trade_api.place_order(
instId=self.symbol,
@ -196,16 +196,16 @@ class QuantTrader:
sz=str(usdt_amount)
)
if result.get('code') == '0':
logging.info(f"下单成功: {side} {usdt_amount} USDT")
logger.info(f"下单成功: {side} {usdt_amount} USDT")
return result['data'][0]['ordId']
else:
logging.error(f"下单失败: {result}")
logger.error(f"下单失败: {result}")
return None
except Exception as e:
logging.error(f"下单异常: {e}")
logger.error(f"下单异常: {e}")
return None
else:
logging.error(f"不支持的下单方向: {side}")
logger.error(f"不支持的下单方向: {side}")
return None
# 设置杠杆倍数
@ -218,9 +218,9 @@ class QuantTrader:
posSide=posSide
)
if result["code"] == "0":
logging.info(f"设置杠杆倍数 {leverage}x 成功")
logger.info(f"设置杠杆倍数 {leverage}x 成功")
else:
logging.error(f"设置杠杆失败: {result['msg']}")
logger.error(f"设置杠杆失败: {result['msg']}")
return result["code"] == "0"
# 计算保证金需求
@ -231,10 +231,10 @@ class QuantTrader:
contract_value = quantity * slot * price # 每张 0.01 BTC
initial_margin = contract_value / leverage
recommended_margin = initial_margin * (1 + buffer_ratio)
logging.info(f"开仓{self.symbol_swap}价格: {price:.2f} USDT")
logging.info(f"合约总价值: {contract_value:.2f} USDT")
logging.info(f"初始保证金: {initial_margin:.2f} USDT")
logging.info(f"推荐保证金 (含 {buffer_ratio*100}% 缓冲): {recommended_margin:.2f} USDT")
logger.info(f"开仓{self.symbol_swap}价格: {price:.2f} USDT")
logger.info(f"合约总价值: {contract_value:.2f} USDT")
logger.info(f"初始保证金: {initial_margin:.2f} USDT")
logger.info(f"推荐保证金 (含 {buffer_ratio*100}% 缓冲): {recommended_margin:.2f} USDT")
return recommended_margin, price
# 开空头仓位(卖出空单)
@ -243,7 +243,7 @@ class QuantTrader:
# 计算所需保证金和开仓价格
margin_data = self.calculate_margin(quantity, leverage, slot, buffer_ratio)
if not margin_data:
logging.error("无法计算保证金,终止下单")
logger.error("无法计算保证金,终止下单")
return None, None
required_margin, entry_price = margin_data
@ -251,7 +251,7 @@ class QuantTrader:
balance = self.get_account_balance()
avail_bal = balance.get('USDT')
if avail_bal is None or avail_bal < required_margin:
logging.error(f"保证金不足,需至少 {required_margin:.2f} USDT当前余额: {avail_bal}")
logger.error(f"保证金不足,需至少 {required_margin:.2f} USDT当前余额: {avail_bal}")
return None, None
# 设置杠杆
@ -270,10 +270,10 @@ class QuantTrader:
}
result = self.trade_api.place_order(**order_data)
if result.get("code") == "0":
logging.info(f"开空单成功订单ID: {result['data'][0]['ordId']}")
logger.info(f"开空单成功订单ID: {result['data'][0]['ordId']}")
return result["data"][0]["ordId"], entry_price
else:
logging.error(f"开空单失败: {result.get('msg', result)}")
logger.error(f"开空单失败: {result.get('msg', result)}")
return None, None
# 平空单(买入平仓)
@ -290,10 +290,10 @@ class QuantTrader:
}
result = self.trade_api.place_order(**order_data)
if result.get("code") == "0":
logging.info(f"平空单成功订单ID: {result['data'][0]['ordId']}")
logger.info(f"平空单成功订单ID: {result['data'][0]['ordId']}")
return True
else:
logging.error(f"平空单失败: {result.get('msg', result)}")
logger.error(f"平空单失败: {result.get('msg', result)}")
return False
def get_minimun_order_size(self) -> None:
@ -304,10 +304,10 @@ class QuantTrader:
instrument = result.get("data", [{}])[0]
min_sz = float(instrument.get("minSz", 0))
lot_sz = float(instrument.get("lotSz", 0))
logging.info(f"最小交易量 (minSz): {min_sz} {self.symbol_prefix}")
logging.info(f"交易量精度 (lotSz): {lot_sz} {self.symbol_prefix}")
logger.info(f"最小交易量 (minSz): {min_sz} {self.symbol_prefix}")
logger.info(f"交易量精度 (lotSz): {lot_sz} {self.symbol_prefix}")
else:
logging.error(f"错误: {result.get('msg', result)}")
logger.error(f"错误: {result.get('msg', result)}")
except Exception as e:
logging.error(f"异常: {str(e)}")
logger.error(f"异常: {str(e)}")

View File

@ -1,11 +1,11 @@
import time
from datetime import datetime
import logging
import core.logger as logging
from typing import Optional
import pandas as pd
from core.biz.quant_trader import QuantTrader
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.logger
class QuantStrategy:
def __init__(self,
@ -31,10 +31,10 @@ class QuantStrategy:
计算简单移动平均线
"""
if 'close' not in df:
logging.error("DataFrame缺少'close'无法计算SMA")
logger.error("DataFrame缺少'close'无法计算SMA")
return pd.Series([float('nan')] * len(df))
if len(df) < period:
logging.warning(f"数据长度不足{period}SMA结果将包含NaN")
logger.warning(f"数据长度不足{period}SMA结果将包含NaN")
return df['close'].rolling(window=period).mean()
def calculate_rsi(self, df: pd.DataFrame, period: int = 14) -> pd.Series:
@ -42,10 +42,10 @@ class QuantStrategy:
计算RSI指标
"""
if 'close' not in df:
logging.error("DataFrame缺少'close'无法计算RSI")
logger.error("DataFrame缺少'close'无法计算RSI")
return pd.Series([float('nan')] * len(df))
if len(df) < period:
logging.warning(f"数据长度不足{period}RSI结果将包含NaN")
logger.warning(f"数据长度不足{period}RSI结果将包含NaN")
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
@ -58,111 +58,111 @@ class QuantStrategy:
"""
简单移动平均线策略
"""
logging.info("=== 执行移动平均线策略 ===")
logger.info("=== 执行移动平均线策略 ===")
try:
df = self.quant_trader.get_kline_data(bar='5m', limit=max(50, sma_long_period+2))
except Exception as e:
logging.error(f"获取K线数据失败: {e}")
logger.error(f"获取K线数据失败: {e}")
return
if df is None or len(df) < max(sma_short_period, sma_long_period, 2):
logging.warning("数据不足,无法执行策略")
logger.warning("数据不足,无法执行策略")
return
df['sma_short'] = self.calculate_sma(df, sma_short_period)
df['sma_long'] = self.calculate_sma(df, sma_long_period)
if len(df) < 2:
logging.warning("数据不足2条无法判断金叉死叉")
logger.warning("数据不足2条无法判断金叉死叉")
return
latest = df.iloc[-1]
prev = df.iloc[-2]
if pd.isna(latest['sma_short']) or pd.isna(latest['sma_long']) or pd.isna(prev['sma_short']) or pd.isna(prev['sma_long']):
logging.warning("均线数据存在NaN跳过本次信号判断")
logger.warning("均线数据存在NaN跳过本次信号判断")
return
logging.info(f"短期均线: {latest['sma_short']:.2f}")
logging.info(f"长期均线: {latest['sma_long']:.2f}")
logging.info(f"当前价格: {latest['close']:.2f}")
logger.info(f"短期均线: {latest['sma_short']:.2f}")
logger.info(f"长期均线: {latest['sma_long']:.2f}")
logger.info(f"当前价格: {latest['close']:.2f}")
if (latest['sma_short'] > latest['sma_long'] and prev['sma_short'] <= prev['sma_long']):
logging.info("信号: 买入")
logger.info("信号: 买入")
self.quant_trader.place_market_order('buy', self.quant_trader.position_size)
elif (latest['sma_short'] < latest['sma_long'] and prev['sma_short'] >= prev['sma_long']):
logging.info("信号: 卖出")
logger.info("信号: 卖出")
self.quant_trader.place_market_order('sell', self.quant_trader.position_size)
else:
logging.info("信号: 持仓观望")
logger.info("信号: 持仓观望")
def rsi_strategy(self, period: int = 14, oversold: int = 30, overbought: int = 70) -> None:
"""
RSI策略
"""
logging.info("=== 执行RSI策略 ===")
logger.info("=== 执行RSI策略 ===")
try:
df = self.quant_trader.get_kline_data(bar='5m', limit=max(50, period+2))
except Exception as e:
logging.error(f"获取K线数据失败: {e}")
logger.error(f"获取K线数据失败: {e}")
return
if df is None or len(df) < period:
logging.warning("数据不足,无法执行策略")
logger.warning("数据不足,无法执行策略")
return
df['rsi'] = self.calculate_rsi(df, period)
latest_rsi = df['rsi'].iloc[-1]
if pd.isna(latest_rsi):
logging.warning("最新RSI为NaN跳过本次信号判断")
logger.warning("最新RSI为NaN跳过本次信号判断")
return
logging.info(f"当前RSI: {latest_rsi:.2f}")
logger.info(f"当前RSI: {latest_rsi:.2f}")
if latest_rsi < oversold:
logging.info("信号: RSI超卖买入")
logger.info("信号: RSI超卖买入")
self.quant_trader.place_market_order('buy', self.quant_trader.position_size)
elif latest_rsi > overbought:
logging.info("信号: RSI超买卖出")
logger.info("信号: RSI超买卖出")
self.quant_trader.place_market_order('sell', self.quant_trader.position_size)
else:
logging.info("信号: RSI正常区间持仓观望")
logger.info("信号: RSI正常区间持仓观望")
def grid_trading_strategy(self, grid_levels: int = 5, grid_range: float = 0.02) -> None:
"""
网格交易策略
"""
if grid_levels <= 0:
logging.error("网格数必须大于0")
logger.error("网格数必须大于0")
return
if grid_range <= 0:
logging.error("网格范围必须大于0")
logger.error("网格范围必须大于0")
return
logging.info(f"=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===")
logger.info(f"=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===")
try:
current_price = self.quant_trader.get_current_price()
except Exception as e:
logging.error(f"获取当前价格失败: {e}")
logger.error(f"获取当前价格失败: {e}")
return
if current_price is None:
logging.warning("当前价格获取失败")
logger.warning("当前价格获取失败")
return
grid_prices = []
for i in range(grid_levels):
price = current_price * (1 + grid_range * (i - grid_levels//2) / grid_levels)
grid_prices.append(price)
logging.info(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}")
logger.info(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}")
try:
df = self.quant_trader.get_kline_data(bar='1m', limit=10)
except Exception as e:
logging.error(f"获取K线数据失败: {e}")
logger.error(f"获取K线数据失败: {e}")
return
if df is None or len(df) == 0 or 'close' not in df:
logging.warning("K线数据无效无法执行网格策略")
logger.warning("K线数据无效无法执行网格策略")
return
latest_price = df['close'].iloc[-1]
if pd.isna(latest_price):
logging.warning("最新价格为NaN跳过本次信号判断")
logger.warning("最新价格为NaN跳过本次信号判断")
return
closest_grid = min(grid_prices, key=lambda x: abs(x - latest_price))
logging.info(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}")
logger.info(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}")
if latest_price < closest_grid * 0.995:
logging.info("信号: 价格下跌,网格买入")
logger.info("信号: 价格下跌,网格买入")
self.quant_trader.place_market_order('buy', self.quant_trader.position_size)
elif latest_price > closest_grid * 1.005:
logging.info("信号: 价格上涨,网格卖出")
logger.info("信号: 价格上涨,网格卖出")
self.quant_trader.place_market_order('sell', self.quant_trader.position_size)
else:
logging.info("信号: 价格在网格内,持仓观望")
logger.info("信号: 价格在网格内,持仓观望")
def run_strategy_loop(self,
strategy: str = 'sma',
@ -172,16 +172,16 @@ class QuantStrategy:
运行策略循环
"""
if interval <= 0:
logging.error("循环间隔必须大于0秒")
logger.error("循环间隔必须大于0秒")
return
logging.info(f"开始运行{strategy}策略,间隔{interval}")
logger.info(f"开始运行{strategy}策略,间隔{interval}")
while True:
try:
logging.info(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
logger.info(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
try:
self.quant_trader.get_account_balance()
except Exception as e:
logging.error(f"获取账户余额失败: {e}")
logger.error(f"获取账户余额失败: {e}")
if strategy == 'sma':
sma_short_period = trading_config.get("sma_short_period", 5)
sma_long_period = trading_config.get("sma_long_period", 20)
@ -196,14 +196,14 @@ class QuantStrategy:
grid_range = trading_config.get("grid_range", 0.02)
self.grid_trading_strategy(grid_levels, grid_range)
else:
logging.error("未知策略")
logger.error("未知策略")
break
logging.info(f"等待{interval}秒后继续...")
logger.info(f"等待{interval}秒后继续...")
time.sleep(interval)
except KeyboardInterrupt:
logging.info("策略运行被用户中断")
logger.info("策略运行被用户中断")
break
except Exception as e:
logging.error(f"策略运行异常: {e}")
logger.error(f"策略运行异常: {e}")
time.sleep(interval)

View File

@ -1,14 +1,13 @@
import time
from datetime import datetime, timedelta
import logging
from typing import Optional
import pandas as pd
import okx.MarketData as Market
from core.utils import timestamp_to_datetime
from core.db.db_trade_data import DBTradeData
import core.logger as logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.logger
class TradeData:
def __init__(self,
@ -57,7 +56,7 @@ class TradeData:
if result:
break
except Exception as e:
logging.error(f"请求出错: {e}")
logger.error(f"请求出错: {e}")
count += 1
if count > 3:
break
@ -75,7 +74,7 @@ class TradeData:
from_date_time = timestamp_to_datetime(from_time)
to_date_time = timestamp_to_datetime(to_time)
logging.info(f"获得交易数据,最早时间: {from_date_time} 最近时间: {to_date_time}")
logger.info(f"获得交易数据,最早时间: {from_date_time} 最近时间: {to_date_time}")
df = pd.DataFrame(trades)
# 过滤时间范围
@ -112,7 +111,7 @@ class TradeData:
else:
return None
except Exception as e:
logging.error(f"获取历史交易数据失败: {e}")
logger.error(f"获取历史交易数据失败: {e}")
return None

View File

@ -1,11 +1,10 @@
import pandas as pd
import logging
import core.logger as logging
from typing import Optional, List, Dict, Any, Union
from core.db.db_manager import DBData
from core.utils import transform_date_time_to_timestamp
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
class DBHugeVolumeData:
def __init__(
@ -127,7 +126,7 @@ class DBHugeVolumeData:
:param df: 巨量交易数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql(df)
@ -141,7 +140,7 @@ class DBHugeVolumeData:
:param df: 巨量交易数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_fast(df)
@ -156,7 +155,7 @@ class DBHugeVolumeData:
:param chunk_size: 分块大小
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_chunk(df, chunk_size)
@ -169,7 +168,7 @@ class DBHugeVolumeData:
注意会抛出重复键错误需要额外处理
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_simple(df)

View File

@ -1,11 +1,10 @@
import pandas as pd
from sqlalchemy import create_engine, exc, text
import re, datetime
import logging
import core.logger as logging
from core.utils import transform_data_type
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
class DBData:
def __init__(
@ -86,7 +85,7 @@ class DBData:
:param db_url: 数据库连接URL
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
df = df[self.columns]
@ -112,9 +111,9 @@ class DBData:
# 删除临时表
conn.execute(text(f"DROP TABLE IF EXISTS {self.temp_table_name}"))
logging.info("数据已成功写入数据库。")
logger.info("数据已成功写入数据库。")
except Exception as e:
logging.error(f"数据库连接或写入失败: {e}")
logger.error(f"数据库连接或写入失败: {e}")
def insert_data_to_mysql_fast(self, df: pd.DataFrame):
"""
@ -124,7 +123,7 @@ class DBData:
适用场景中等数据量
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
df = df[self.columns]
@ -137,9 +136,9 @@ class DBData:
data_dicts = [row.to_dict() for _, row in df.iterrows()]
conn.execute(sql, data_dicts)
logging.info("数据已成功写入数据库。")
logger.info("数据已成功写入数据库。")
except Exception as e:
logging.error(f"数据库连接或写入失败: {e}")
logger.error(f"数据库连接或写入失败: {e}")
def insert_data_to_mysql_chunk(self, df: pd.DataFrame, chunk_size: int = 1000):
"""
@ -149,7 +148,7 @@ class DBData:
适用场景大数据量>10万条
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
df = df[self.columns]
@ -180,12 +179,12 @@ class DBData:
# 删除临时表
conn.execute(text(f"DROP TABLE IF EXISTS {temp_table_name}"))
logging.info(
logger.info(
f"已处理 {min(i+chunk_size, total_rows)}/{total_rows} 条记录"
)
logging.info("数据已成功写入数据库。")
logger.info("数据已成功写入数据库。")
except Exception as e:
logging.error(f"数据库连接或写入失败: {e}")
logger.error(f"数据库连接或写入失败: {e}")
def insert_data_to_mysql_simple(self, df: pd.DataFrame):
"""
@ -195,7 +194,7 @@ class DBData:
注意会抛出重复键错误需要额外处理
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
df = df[self.columns]
@ -208,9 +207,9 @@ class DBData:
index=False,
method="multi",
)
logging.info("数据已成功写入数据库。")
logger.info("数据已成功写入数据库。")
except Exception as e:
logging.error(f"数据库连接或写入失败: {e}")
logger.error(f"数据库连接或写入失败: {e}")
def query_data(self, sql: str, condition_dict: dict, return_multi: bool = True):
"""
@ -245,5 +244,5 @@ class DBData:
else:
return None
except Exception as e:
logging.error(f"查询数据出错: {e}")
logger.error(f"查询数据出错: {e}")
return None

View File

@ -1,9 +1,9 @@
import pandas as pd
import logging
import core.logger as logging
from core.db.db_manager import DBData
from core.utils import transform_date_time_to_timestamp
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
class DBMarketData:
@ -78,7 +78,7 @@ class DBMarketData:
:param df: K线数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql(df)
@ -92,7 +92,7 @@ class DBMarketData:
:param df: K线数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_fast(df)
@ -107,7 +107,7 @@ class DBMarketData:
:param chunk_size: 分块大小
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_chunk(df, chunk_size)
@ -120,7 +120,7 @@ class DBMarketData:
注意会抛出重复键错误需要额外处理
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_simple(df)
@ -489,12 +489,12 @@ class DBMarketData:
if start is not None:
start = transform_date_time_to_timestamp(start)
if start is None:
logging.warning(f"开始时间格式错误: {start}")
logger.warning(f"开始时间格式错误: {start}")
return None
if end is not None:
end = transform_date_time_to_timestamp(end)
if end is None:
logging.warning(f"结束时间格式错误: {end}")
logger.warning(f"结束时间格式错误: {end}")
return None
if start is not None and end is not None:
if start > end:

View File

@ -1,10 +1,10 @@
import pandas as pd
import logging
import core.logger as logging
from typing import Optional, List, Dict, Any, Union
from core.db.db_manager import DBData
from core.utils import transform_date_time_to_timestamp
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
class DBMarketMonitor:
@ -94,7 +94,7 @@ class DBMarketMonitor:
:param df: 市场监控数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql(df)
@ -108,7 +108,7 @@ class DBMarketMonitor:
:param df: 市场监控数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_fast(df)
@ -123,7 +123,7 @@ class DBMarketMonitor:
:param chunk_size: 每块大小
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_chunk(df, chunk_size)
@ -137,7 +137,7 @@ class DBMarketMonitor:
:param df: 市场监控数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_simple(df)
@ -432,8 +432,8 @@ class DBMarketMonitor:
result = conn.execute(text(sql), condition_dict)
conn.commit()
deleted_count = result.rowcount
logging.info(f"删除了 {deleted_count} 条旧的市场监控数据")
logger.info(f"删除了 {deleted_count} 条旧的市场监控数据")
return deleted_count
except Exception as e:
logging.error(f"删除旧数据时发生错误: {e}")
logger.error(f"删除旧数据时发生错误: {e}")
return 0

View File

@ -1,10 +1,10 @@
import pandas as pd
import logging
import core.logger as logging
from typing import Optional, List, Dict, Any, Union
from core.db.db_manager import DBData
from core.utils import transform_date_time_to_timestamp
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
class DBTradeData:
@ -88,7 +88,7 @@ class DBTradeData:
:param df: 交易数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql(df)
@ -102,7 +102,7 @@ class DBTradeData:
:param df: 交易数据DataFrame
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_fast(df)
@ -117,7 +117,7 @@ class DBTradeData:
:param chunk_size: 分块大小
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_chunk(df, chunk_size)
@ -130,7 +130,7 @@ class DBTradeData:
注意会抛出重复键错误需要额外处理
"""
if df is None or df.empty:
logging.warning("DataFrame为空无需写入数据库。")
logger.warning("DataFrame为空无需写入数据库。")
return
self.db_manager.insert_data_to_mysql_simple(df)

72
core/logger.py Normal file
View File

@ -0,0 +1,72 @@
import logging
import time
from logging.handlers import TimedRotatingFileHandler
import os
'''
日志工具类
'''
class Logger:
def __init__(self):
# log文件存储路径
current_dir = os.getcwd()
if current_dir.endswith('crypto_quant'):
output_folder = r'./output/log/'
elif current_dir.startswith(r'/root/'):
output_folder = r'/root/crypto_quant/output/log/'
else:
output_folder = r'./output/log/'
os.makedirs(output_folder, exist_ok=True)
# add self._log_filename to be adata_yyyyMMddHHmm.log
self._log_filename = os.path.join(output_folder, 'crypto_monitor_{}.log'.format(time.strftime("%Y%m%d%H%M%S", time.localtime())))
# self._log_filename = os.path.join(output_folder, 'adata.log')
'''
%(levelno)s: 打印日志级别的数值
%(levelname)s: 打印日志级别名称
%(pathname)s: 打印当前执行程序的路径其实就是sys.argv[0]
%(filename)s: 打印当前执行程序名
%(funcName)s: 打印日志的当前函数
%(lineno)d: 打印日志的当前行号
%(asctime)s: 打印日志的时间
%(thread)d: 打印线程ID
%(threadName)s: 打印线程名称
%(process)d: 打印进程ID
%(message)s: 打印日志信息
'''
logging.basicConfig()
# 日志信息输出格式
self._formatter = logging.Formatter('%(asctime)s - %(process)d - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# 创建一个日志对象
self._logger = logging.getLogger()
# self.set_console_logger()
self.set_file_logger()
self._logger.setLevel(logging.INFO)
def set_console_logger(self):
'''设置控制台日志输出'''
console_handler = logging.StreamHandler()
console_handler.setFormatter(self._formatter)
console_handler.setLevel(logging.INFO)
self._logger.addHandler(console_handler)
def set_file_logger(self):
'''设置日志文件输出'''
log_file_handler = TimedRotatingFileHandler(filename=self._log_filename,
when="D",
interval=1,
backupCount=3,
encoding='utf-8')
log_file_handler.setFormatter(self._formatter)
log_file_handler.setLevel(logging.INFO)
# log_file_handler.suffix = "%Y%m%d_%H%M%S.log"
self._logger.addHandler(log_file_handler)
def get_logger(self):
return self._logger
logger = Logger().get_logger()

View File

@ -1,4 +1,4 @@
import logging
import core.logger as logging
import os
import pandas as pd
import numpy as np
@ -19,9 +19,7 @@ from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
# seaborn支持中文
plt.rcParams["font.family"] = ["SimHei"]
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.logger
class PriceVolumeStats:
@ -61,7 +59,7 @@ class PriceVolumeStats:
price_volume_stats_list = []
earliest_market_timestamp = None
latest_market_timestamp = None
logging.info(
logger.info(
f"开始统计{self.initial_date}{self.end_date}的价格、成交量、成交量与高价低价关系数据"
)
for symbol in self.symbol_list:
@ -86,22 +84,22 @@ class PriceVolumeStats:
if data["timestamp"].iloc[-1] > latest_market_timestamp:
latest_market_timestamp = data["timestamp"].iloc[-1]
# 统计高成交量小时分布
logging.info(f"统计{symbol} {bar} 巨量小时分布数据")
logger.info(f"统计{symbol} {bar} 巨量小时分布数据")
high_volume_hours_data = self.stats_high_volume_hours(data)
high_volume_hours_list.append(high_volume_hours_data)
huge_high_volume_hours_data = self.stats_high_volume_hours(data, 4)
huge_high_volume_hours_list.append(huge_high_volume_hours_data)
logging.info(f"统计{symbol} {bar} 价格数据")
logger.info(f"统计{symbol} {bar} 价格数据")
price_stats_data = self.calculate_price_statistics(data)
logging.info(f"统计{symbol} {bar} 涨跌百分比数据")
logger.info(f"统计{symbol} {bar} 涨跌百分比数据")
pct_change_stats_data = self.calculate_pct_change_statistics(data)
logging.info(f"统计{symbol} {bar} 价格变化峰值和谷值数据")
logger.info(f"统计{symbol} {bar} 价格变化峰值和谷值数据")
peak_valley_data, peak_valley_stats_data = (
self.calculate_price_change_peak_valley_statistics(data)
)
logging.info(f"统计{symbol} {bar} 成交量数据")
logger.info(f"统计{symbol} {bar} 成交量数据")
volume_stats_data = self.calculate_volume_statistics(data)
logging.info(f"统计{symbol} {bar} 成交量与高价低价关系数据")
logger.info(f"统计{symbol} {bar} 成交量与高价低价关系数据")
price_volume_stats_data = self.calculate_price_volume_statistics(
data
)
@ -136,7 +134,7 @@ class PriceVolumeStats:
latest_market_date_time = re.sub(r"[\:\-\s]", "", str(latest_market_date_time))
output_file_name = f"price_volume_stats_window_size_{self.window_size}_from_{earliest_market_date_time}_to_{latest_market_date_time}.xlsx"
output_file_path = os.path.join(self.stats_output_dir, output_file_name)
logging.info(f"导出{output_file_path}")
logger.info(f"导出{output_file_path}")
with pd.ExcelWriter(output_file_path) as writer:
price_stats_df.to_excel(writer, sheet_name="价格统计", index=False)
pct_change_stats_df.to_excel(
@ -600,7 +598,7 @@ class PriceVolumeStats:
}
}
"""
logging.info(f"将图表输出到{excel_file_path}")
logger.info(f"将图表输出到{excel_file_path}")
# 打开已经存在的Excel文件
wb = openpyxl.load_workbook(excel_file_path)
@ -639,7 +637,7 @@ class PriceVolumeStats:
chart_rows + 5
) # Add 5 rows for padding between charts
except Exception as e:
logging.error(f"输出Excel Sheet {sheet_name} 失败: {e}")
logger.error(f"输出Excel Sheet {sheet_name} 失败: {e}")
continue
# Save Excel file
wb.save(excel_file_path)

View File

@ -1,9 +1,9 @@
from datetime import datetime, timezone, timedelta
from decimal import Decimal
import re
import logging
import core.logger as logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
def datetime_to_timestamp(date_str: str) -> int:
"""
@ -61,11 +61,11 @@ def transform_date_time_to_timestamp(date_time: int | str):
else:
date_time = check_date_time_format(date_time)
if date_time is None:
logging.error(f"日期时间格式错误: {date_time}")
logger.error(f"日期时间格式错误: {date_time}")
return None
# 按北京时间字符串处理转换为毫秒级timestamp
date_time = datetime_to_timestamp(date_time)
return date_time
except Exception as e:
logging.error(f"start参数解析失败: {e}")
logger.error(f"start参数解析失败: {e}")
return None

View File

@ -4,11 +4,11 @@
但需要管理员提供企业id以及secret信息
通过wechatpy库实现
"""
import logging
import core.logger as logging
import requests
from config import WECHAT_CONFIG
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.logger
class Wechat:
def __init__(self):

View File

@ -5,16 +5,14 @@ from core.db.db_huge_volume_data import DBHugeVolumeData
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
from market_data_main import MarketDataMain
from core.wechat import Wechat
import logging
import core.logger as logging
from config import MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE
from datetime import datetime, timedelta
import pandas as pd
import os
import re
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.logger
class HugeVolumeMain:
@ -55,9 +53,9 @@ class HugeVolumeMain:
is_update=False,
)
if data is not None and len(data) > 0:
logging.info(f"此次初始化巨量交易数据: {len(data)}")
logger.info(f"此次初始化巨量交易数据: {len(data)}")
else:
logging.info(f"此次初始化巨量交易数据为空")
logger.info(f"此次初始化巨量交易数据为空")
def detect_volume_spike(
self,
@ -75,20 +73,20 @@ class HugeVolumeMain:
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logging.info(
logger.info(
f"开始处理巨量交易数据: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
data = self.db_market_data.query_market_data_by_symbol_bar(
symbol, bar, start, end
)
if data is None:
logging.warning(
logger.warning(
f"获取行情数据失败: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
return None
else:
if len(data) == 0:
logging.warning(
logger.warning(
f"获取行情数据为空: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
return None
@ -119,7 +117,7 @@ class HugeVolumeMain:
if data is not None and len(data) > 0:
self.db_huge_volume_data.insert_data_to_mysql(data)
else:
logging.warning(
logger.warning(
f"此次处理巨量交易数据为空: {symbol} {bar} {start} {end}"
)
return data
@ -162,15 +160,15 @@ class HugeVolumeMain:
only_output_huge_volume=False,
is_update=True,
)
logging.info(
logger.info(
f"更新巨量交易数据: {symbol} {bar} 窗口大小: {window_size}{earliest_date_time}{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
if data is not None and len(data) > 0:
logging.info(f"此次更新巨量交易数据: {len(data)}")
logger.info(f"此次更新巨量交易数据: {len(data)}")
else:
logging.info(f"此次更新巨量交易数据为空")
logger.info(f"此次更新巨量交易数据为空")
except Exception as e:
logging.error(
logger.error(
f"更新巨量交易数据失败: {symbol} {bar} 窗口大小: {window_size}{earliest_date_time}{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}"
)
@ -234,7 +232,7 @@ class HugeVolumeMain:
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
periods_text = ", ".join([str(period) for period in periods])
logging.info(
logger.info(
f"开始计算巨量出现后,之后{periods_text}个周期,上涨或下跌的比例: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
volume_statistics_data = (
@ -243,7 +241,7 @@ class HugeVolumeMain:
)
)
if volume_statistics_data is None or len(volume_statistics_data) == 0:
logging.warning(
logger.warning(
f"获取巨量交易数据为空: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
return None
@ -299,12 +297,12 @@ class HugeVolumeMain:
start_date_time = timestamp_to_datetime(start_timestamp)
end_date_time = timestamp_to_datetime(end_timestamp)
logging.info(f"开始获取巨量交易数据: {start}{end}")
logger.info(f"开始获取巨量交易数据: {start}{end}")
huge_volume_data = self.db_huge_volume_data.query_huge_volume_records(
start=start_timestamp, end=end_timestamp
)
if huge_volume_data is None or len(huge_volume_data) == 0:
logging.warning(f"获取巨量交易数据为空: {start}{end}")
logger.warning(f"获取巨量交易数据为空: {start}{end}")
return
else:
if isinstance(huge_volume_data, list):
@ -325,7 +323,7 @@ class HugeVolumeMain:
by=["symbol", "bar", "window_size", "timestamp"], ascending=True
)
huge_volume_data = huge_volume_data.reset_index(drop=True)
logging.info(f"获取巨量交易数据: {len(huge_volume_data)}")
logger.info(f"获取巨量交易数据: {len(huge_volume_data)}")
contents = []
contents.append(f"# 放量交易数据: {start_date_time}{end_date_time}")
symbol_list = huge_volume_data["symbol"].unique()
@ -383,7 +381,7 @@ class HugeVolumeMain:
# 获得text的字节数
text_length = len(text.encode("utf-8"))
logging.info(f"发送巨量交易数据到微信,字节数: {text_length}")
logger.info(f"发送巨量交易数据到微信,字节数: {text_length}")
# with open(os.path.join(self.output_folder, "huge_volume_data.md"), "w", encoding="utf-8") as f:
# f.write(text)
wechat = Wechat()
@ -438,7 +436,7 @@ class HugeVolumeMain:
writer, sheet_name="next_periods_statistics", index=False
)
except Exception as e:
logging.error(f"导出Excel文件失败: {e}")
logger.error(f"导出Excel文件失败: {e}")
return total_huge_volume_data, total_result_data
def plot_huge_volume_data(
@ -493,10 +491,10 @@ def test_send_huge_volume_data_to_wechat():
huge_volume_main = HugeVolumeMain(threshold=2.0)
# 获得昨天日期
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
logging.info(f"昨天日期: {yesterday}")
logger.info(f"昨天日期: {yesterday}")
# 获得今天日期
today = datetime.now().strftime("%Y-%m-%d")
logging.info(f"今天日期: {today}")
logger.info(f"今天日期: {today}")
huge_volume_main.send_huge_volume_data_to_wechat(start=yesterday, end=today)

View File

@ -1,4 +1,4 @@
import logging
import core.logger as logging
from datetime import datetime
from time import sleep
import pandas as pd
@ -21,7 +21,7 @@ from config import (
BAR_THRESHOLD,
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
class MarketDataMain:
@ -59,7 +59,7 @@ class MarketDataMain:
"""
for symbol in self.symbols:
for bar in self.bars:
logging.info(f"开始初始化行情数据: {symbol} {bar}")
logger.info(f"开始初始化行情数据: {symbol} {bar}")
latest_data = self.db_market_data.query_latest_data(symbol, bar)
if latest_data:
start = latest_data.get("timestamp")
@ -68,7 +68,7 @@ class MarketDataMain:
else:
start = datetime_to_timestamp(self.initial_date)
start_date_time = self.initial_date
logging.info(
logger.info(
f"开始初始化{symbol}, {bar} 行情数据,从 {start_date_time} 开始"
)
self.fetch_save_data(symbol, bar, start)
@ -80,12 +80,12 @@ class MarketDataMain:
end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
end_time_ts = transform_date_time_to_timestamp(end_time)
if end_time_ts is None:
logging.error(f"结束时间格式错误: {end_time}")
logger.error(f"结束时间格式错误: {end_time}")
return None
start_time_ts = transform_date_time_to_timestamp(start)
if start_time_ts is None:
logging.error(f"开始时间格式错误: {start}")
logger.error(f"开始时间格式错误: {start}")
return None
# 如果bar为5m, 15m, 30m:
@ -110,7 +110,7 @@ class MarketDataMain:
current_start_time_ts = start_time_ts
start_date_time = timestamp_to_datetime(current_start_time_ts)
end_date_time = timestamp_to_datetime(end_time_ts)
logging.info(
logger.info(
f"获取行情数据: {symbol} {bar}{start_date_time}{end_date_time}"
)
data = self.market_data.get_historical_kline_data(
@ -151,7 +151,7 @@ class MarketDataMain:
# data.loc[index, "buy_sz"] = current_buy_sz
# data.loc[index, "sell_sz"] = current_sell_sz
# except Exception as e:
# logging.error(f"设置buy_sz和sell_sz失败: {e}")
# logger.error(f"设置buy_sz和sell_sz失败: {e}")
# continue
if data is not None and len(data) > 0:
data = data[
@ -184,7 +184,7 @@ class MarketDataMain:
if min_start_time_ts is not None and get_data:
# 补充技术指标数据
# 获得min_start_time_ts之前30条数据
logging.info(f"开始补充技术指标数据: {symbol} {bar}")
logger.info(f"开始补充技术指标数据: {symbol} {bar}")
before_data = self.db_market_data.query_data_before_timestamp(
symbol, bar, min_start_time_ts, 30
)
@ -199,7 +199,7 @@ class MarketDataMain:
)
if handle_data is not None:
if before_data is not None and len(handle_data) <= len(before_data):
logging.error(f"handle_data数据条数小于before_data数据条数: {symbol} {bar}")
logger.error(f"handle_data数据条数小于before_data数据条数: {symbol} {bar}")
return None
if isinstance(handle_data, list):
handle_data = pd.DataFrame(handle_data)
@ -208,14 +208,14 @@ class MarketDataMain:
elif isinstance(handle_data, pd.DataFrame):
pass
else:
logging.error(f"handle_data类型错误: {type(handle_data)}")
logger.error(f"handle_data类型错误: {type(handle_data)}")
return None
handle_data = self.calculate_metrics(handle_data)
if latest_before_timestamp is not None:
handle_data = handle_data[handle_data["timestamp"] > latest_before_timestamp]
handle_data.reset_index(drop=True, inplace=True)
logging.info(f"开始保存技术指标数据: {symbol} {bar}")
logger.info(f"开始保存技术指标数据: {symbol} {bar}")
self.db_market_data.insert_data_to_mysql(handle_data)
return data
@ -350,21 +350,21 @@ class MarketDataMain:
"""
更新数据
"""
logging.info(f"开始更新行情数据: {symbol} {bar}")
logger.info(f"开始更新行情数据: {symbol} {bar}")
latest_data = self.db_market_data.query_latest_data(symbol, bar)
if not latest_data:
logging.info(f"{symbol}, {bar} 无数据,开始从{self.initial_date}初始化数据")
logger.info(f"{symbol}, {bar} 无数据,开始从{self.initial_date}初始化数据")
data = self.fetch_save_data(symbol, bar, self.initial_date)
else:
latest_timestamp = latest_data.get("timestamp")
if latest_timestamp:
latest_timestamp = int(latest_timestamp)
latest_date_time = timestamp_to_datetime(latest_timestamp)
logging.info(
logger.info(
f"{symbol}, {bar} 上次获取的最新数据时间: {latest_date_time}"
)
else:
logging.warning(f"获取{symbol}, {bar} 最新数据失败")
logger.warning(f"获取{symbol}, {bar} 最新数据失败")
return
data = self.fetch_save_data(symbol, bar, latest_timestamp + 1)
return data
@ -373,7 +373,7 @@ class MarketDataMain:
"""
批量计算技术指标
"""
logging.info("开始批量计算技术指标")
logger.info("开始批量计算技术指标")
start_date_time = MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-15 00:00:00"
)
@ -382,14 +382,14 @@ class MarketDataMain:
current_timestamp = transform_date_time_to_timestamp(current_date_time)
for symbol in self.symbols:
for bar in self.bars:
logging.info(f"开始计算技术指标: {symbol} {bar}")
logger.info(f"开始计算技术指标: {symbol} {bar}")
data = self.db_market_data.query_market_data_by_symbol_bar(
symbol=symbol, bar=bar, start=start_timestamp - 1, end=current_timestamp
)
if data is not None and len(data) > 0:
data = pd.DataFrame(data)
data = self.calculate_metrics(data)
logging.info(f"开始保存技术指标数据: {symbol} {bar}")
logger.info(f"开始保存技术指标数据: {symbol} {bar}")
self.db_market_data.insert_data_to_mysql(data)

View File

@ -6,16 +6,15 @@ from core.db.db_market_monitor import DBMarketMonitor
from core.wechat import Wechat
from config import MONITOR_CONFIG, MYSQL_CONFIG
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
import core.logger as logging
import logging
import os
import pandas as pd
from datetime import datetime, timedelta
import json
import re
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
class MarketMonitorMain:
def __init__(self):
@ -58,7 +57,7 @@ class MarketMonitorMain:
json.dump({}, f, ensure_ascii=False, indent=4)
return {}
except Exception as e:
logging.error(f"获取最后一次报表生成记录失败: {e}")
logger.error(f"获取最后一次报表生成记录失败: {e}")
return {}
def monitor_realtime_market(
@ -85,7 +84,7 @@ class MarketMonitorMain:
)
if real_time_data is None or len(real_time_data) == 0:
logging.error(f"获取最新市场数据失败: {symbol}, {bar}")
logger.error(f"获取最新市场数据失败: {symbol}, {bar}")
return
latest_realtime_timestamp = real_time_data["timestamp"].iloc[-1]
@ -102,22 +101,22 @@ class MarketMonitorMain:
latest_record_timestamp is not None
and latest_realtime_timestamp <= latest_record_timestamp
):
logging.info(
logger.info(
f"最新市场数据时间戳 {latest_reatime_datetime} 小于等于最新记录时间戳 {latest_record_datetime} 不进行监控"
)
return
logging.info(
logger.info(
f"最新市场数据时间 {latest_reatime_datetime}, 上一次记录时间 {latest_record_datetime}"
)
else:
logging.info(
logger.info(
f"最新市场数据时间 {latest_reatime_datetime}, 上一次记录时间为空"
)
real_time_data = self.market_data_main.add_new_columns(real_time_data)
logging.info(f"开始计算技术指标: {symbol} {bar}")
logger.info(f"开始计算技术指标: {symbol} {bar}")
real_time_data = self.market_data_main.calculate_metrics(real_time_data)
logging.info(f"开始计算大成交量: {symbol} {bar} 窗口大小: {self.window_size}")
logger.info(f"开始计算大成交量: {symbol} {bar} 窗口大小: {self.window_size}")
real_time_data = self.huge_volume_main.huge_volume.detect_huge_volume(
data=real_time_data,
window_size=self.window_size,
@ -127,7 +126,7 @@ class MarketMonitorMain:
output_excel=False,
)
if real_time_data is None or len(real_time_data) == 0:
logging.error(
logger.error(
f"计算大成交量失败: {symbol} {bar} 窗口大小: {self.window_size}"
)
return
@ -135,27 +134,27 @@ class MarketMonitorMain:
if only_output_huge_volume:
if realtime_row["huge_volume"] == 1:
logging.info(f"监控到巨量: {symbol} {bar} 窗口大小: {self.window_size}")
logger.info(f"监控到巨量: {symbol} {bar} 窗口大小: {self.window_size}")
if only_output_over_mean_volume:
# 获得huge_volume==1时的volume_ratio的均量
mean_huge_volume_ratio = real_time_data[real_time_data["huge_volume"] == 1]["volume_ratio"].mean()
if realtime_row["volume_ratio"] >= mean_huge_volume_ratio:
logging.info(f"监控到巨量且超过均量: {symbol} {bar} 窗口大小: {self.window_size}")
logger.info(f"监控到巨量且超过均量: {symbol} {bar} 窗口大小: {self.window_size}")
else:
logging.info(
logger.info(
f"监控到巨量但未超过均量: {symbol} {bar} 窗口大小: {self.window_size},退出本次监控"
)
return
else:
logging.info(
logger.info(
f"监控到非巨量: {symbol} {bar} 窗口大小: {self.window_size},退出本次监控"
)
return
if only_output_rise:
if realtime_row["pct_chg"] > 0:
logging.info(f"监控到上涨: {symbol} {bar} 窗口大小: {self.window_size}")
logger.info(f"监控到上涨: {symbol} {bar} 窗口大小: {self.window_size}")
else:
logging.info(
logger.info(
f"监控到下跌: {symbol} {bar} 窗口大小: {self.window_size},退出本次监控"
)
return
@ -179,7 +178,7 @@ class MarketMonitorMain:
)
text_length = len(report.encode("utf-8"))
logging.info(f"发送报告到企业微信,字节数: {text_length}")
logger.info(f"发送报告到企业微信,字节数: {text_length}")
self.wechat.send_markdown(report)
# remove punction in latest_reatime_datetime
@ -201,7 +200,7 @@ class MarketMonitorMain:
"report_file_byte_size": report_file_byte_size,
}
report_data = pd.DataFrame([report_data])
logging.info(f"插入数据到数据库")
logger.info(f"插入数据到数据库")
self.db_market_monitor.insert_data_to_mysql(report_data)
if self.latest_record.get(symbol, None) is None:
@ -224,7 +223,7 @@ class MarketMonitorMain:
# 获得bar在self.market_data_main.bars中的索引
bar_index = self.market_data_main.bars.index(bar)
if bar_index == len(self.market_data_main.bars) - 1:
logging.error(f"已经是最后一个bar: {bar}")
logger.error(f"已经是最后一个bar: {bar}")
return None
# 获得下一个bar
bar = self.market_data_main.bars[bar_index + 1]
@ -233,10 +232,10 @@ class MarketMonitorMain:
symbol=symbol, bar=bar, end_time=end_time, limit=100
)
if data is None or len(data) == 0:
logging.error(f"获取实时数据失败: {symbol}, {bar}")
logger.error(f"获取实时数据失败: {symbol}, {bar}")
return None
data = self.market_data_main.add_new_columns(data)
logging.info(f"开始计算技术指标: {symbol} {bar}")
logger.info(f"开始计算技术指标: {symbol} {bar}")
data = self.market_data_main.calculate_metrics(data)
row = data.iloc[-1]
return row
@ -249,7 +248,7 @@ class MarketMonitorMain:
):
for symbol in self.market_data_main.symbols:
for bar in self.market_data_main.bars:
logging.info(
logger.info(
f"开始监控: {symbol} {bar} 窗口大小: {self.window_size} 行情数据"
)
try:
@ -261,7 +260,7 @@ class MarketMonitorMain:
only_output_rise,
)
except Exception as e:
logging.error(
logger.error(
f"监控失败: {symbol} {bar} 窗口大小: {self.window_size} 行情数据: {e}"
)
continue

View File

@ -1,21 +1,17 @@
from market_monitor_main import MarketMonitorMain
import logging
import time
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
import core.logger as logging
logger = logging.logger
def monitor_schedule():
market_monitor_main = MarketMonitorMain()
logging.info("开始监控")
while True: # 每分钟监控一次
market_monitor_main.batch_monitor_realtime_market(
only_output_huge_volume=True,
only_output_over_mean_volume=True,
only_output_rise=False,
)
logging.info("本次循环监控结束等待30秒")
time.sleep(30)
logger.info("开始监控")
market_monitor_main.batch_monitor_realtime_market(
only_output_huge_volume=True,
only_output_over_mean_volume=True,
only_output_rise=False,
)
logger.info("本次循环监控结束")
if __name__ == "__main__":
monitor_schedule()

View File

@ -5,3 +5,4 @@ sqlalchemy >= 2.0.41
pymysql >= 1.1.1
wechatpy >= 1.8.18
seaborn >= 0.13.2
schedule >= 1.2.2

View File

@ -1,7 +1,7 @@
from core.statistics.price_volume_stats import PriceVolumeStats
import logging
import core.logger as logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
def main():

View File

@ -1,4 +1,4 @@
import logging
import core.logger as logging
import time
from datetime import datetime, timedelta
import pandas as pd
@ -13,7 +13,7 @@ from config import (
MYSQL_CONFIG,
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.logger
class TradeDataMain:
@ -65,26 +65,26 @@ class TradeDataMain:
end_date_time = timestamp_to_datetime(end_time)
# 如果db_earliest_time和db_latest_time存在则需要调整start_time和end_time
if db_earliest_time is None or db_latest_time is None:
logging.info(f"数据库无数据从API获取交易数据: {symbol}, {start_date_time}, {end_date_time}, {limit}")
logger.info(f"数据库无数据从API获取交易数据: {symbol}, {start_date_time}, {end_date_time}, {limit}")
self.trade_data.get_history_trades(symbol, start_time, end_time, limit)
else:
if db_earliest_time > start_time:
db_earliest_date_time = timestamp_to_datetime(db_earliest_time)
logging.info(f"从API补充最早数据{symbol}, {start_date_time} {db_earliest_date_time}")
logger.info(f"从API补充最早数据{symbol}, {start_date_time} {db_earliest_date_time}")
self.trade_data.get_history_trades(symbol, start_time, db_earliest_time + 1, limit)
if db_latest_time < end_time:
db_latest_date_time = timestamp_to_datetime(db_latest_time)
logging.info(f"从API补充最新数据{symbol}, {db_latest_date_time}, {end_date_time}")
logger.info(f"从API补充最新数据{symbol}, {db_latest_date_time}, {end_date_time}")
self.trade_data.get_history_trades(symbol, db_latest_time + 1, end_time, limit)
final_data = self.trade_data.db_trade_data.query_trade_data_by_symbol(symbol=symbol, start=start_time, end=end_time)
if final_data is not None and len(final_data) > 0:
logging.info(f"获取交易数据: {symbol}, {start_date_time}, {end_date_time}")
logger.info(f"获取交易数据: {symbol}, {start_date_time}, {end_date_time}")
final_data = pd.DataFrame(final_data)
final_data.sort_values(by="ts", inplace=True)
final_data.reset_index(drop=True, inplace=True)
return final_data
else:
logging.info(f"获取交易数据失败: {symbol}, {start_date_time}, {end_date_time}")
logger.info(f"获取交易数据失败: {symbol}, {start_date_time}, {end_date_time}")
return None

View File

@ -1,10 +1,10 @@
import logging
import core.logger as logging
from time import sleep
from core.biz.quant_trader import QuantTrader
from core.biz.strategy import QuantStrategy
from config import API_KEY, SECRET_KEY, PASSPHRASE, SANDBOX, TRADING_CONFIG, TIME_CONFIG
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.logger
class BizMain:
@ -39,7 +39,7 @@ class BizMain:
quantity = 1
try:
# 1. 合约开空单流程
logging.info("[1] 合约开空单流程:")
logger.info("[1] 合约开空单流程:")
# price = self.trader.get_current_price(self.trader.symbol_swap)
# logging.info(f"当前合约价格: {price}")
slot = 0.01
@ -49,40 +49,40 @@ class BizMain:
# logging.info(f"所需保证金: {margin}, 开仓价格: {entry_price}")
order_id, entry_price = self.trader.place_short_order(td_mode, quantity, leverage, slot, buffer_ratio)
if order_id:
logging.info(f"开空单成功订单ID: {order_id}, 开仓价格: {entry_price}")
logger.info(f"开空单成功订单ID: {order_id}, 开仓价格: {entry_price}")
else:
logging.error("开空单失败")
logger.error("开空单失败")
except Exception as e:
logging.error(f"合约开空单流程异常: {e}")
logger.error(f"合约开空单流程异常: {e}")
sleep(1)
try:
# 2. 现货卖出比特币流程
logging.info(f"[2] 现货卖出{self.trader.symbol_prefix}流程:")
logger.info(f"[2] 现货卖出{self.trader.symbol_prefix}流程:")
balance = self.trader.get_account_balance()
btc_balance = balance.get(self.trader.symbol_prefix, 0)
logging.info(f"当前{self.trader.symbol_prefix}余额: {btc_balance}")
logger.info(f"当前{self.trader.symbol_prefix}余额: {btc_balance}")
sell_amount = 0.01
if btc_balance >= sell_amount:
order_id = self.trader.place_market_order('sell', sell_amount)
if order_id:
logging.info(f"现货卖出{sell_amount}{self.trader.symbol_prefix}成功订单ID: {order_id}")
logger.info(f"现货卖出{sell_amount}{self.trader.symbol_prefix}成功订单ID: {order_id}")
else:
logging.error(f"现货卖出{self.trader.symbol_prefix}失败")
logger.error(f"现货卖出{self.trader.symbol_prefix}失败")
else:
logging.error(f"{self.trader.symbol_prefix}余额不足,无法卖出{sell_amount}{self.trader.symbol_prefix}")
logger.error(f"{self.trader.symbol_prefix}余额不足,无法卖出{sell_amount}{self.trader.symbol_prefix}")
except Exception as e:
logging.error(f"现货卖出{self.trader.symbol_prefix}流程异常: {e}")
logger.error(f"现货卖出{self.trader.symbol_prefix}流程异常: {e}")
sleep(1)
try:
# 3. 合约平空单流程
logging.info("[3] 合约平空单流程:")
logger.info("[3] 合约平空单流程:")
result = self.trader.close_short_order(td_mode, quantity)
if result:
logging.info("平空单成功")
logger.info("平空单成功")
else:
logging.error("平空单失败")
logger.error("平空单失败")
except Exception as e:
logging.error(f"合约平空单流程异常: {e}")
logger.error(f"合约平空单流程异常: {e}")
if __name__ == "__main__":