crypto_quant/core/biz/strategy.py

210 lines
8.9 KiB
Python
Raw Permalink Normal View History

import time
2025-09-16 06:31:15 +00:00
from datetime import datetime, timezone, timedelta
from core.utils import get_current_date_time
import core.logger as logging
from typing import Optional
import pandas as pd
from core.biz.quant_trader import QuantTrader
logger = logging.logger
class QuantStrategy:
def __init__(self,
api_key: str,
secret_key: str,
passphrase: str,
sandbox: bool = True,
symbol: str = "BTC-USDT",
position_size: float = 0.001):
"""
初始化量化策略
"""
self.quant_trader = QuantTrader(
api_key,
secret_key,
passphrase,
sandbox,
symbol,
position_size)
def calculate_sma(self, df: pd.DataFrame, period: int = 20) -> pd.Series:
"""
计算简单移动平均线
"""
if 'close' not in df:
logger.error("DataFrame缺少'close'无法计算SMA")
return pd.Series([float('nan')] * len(df))
if len(df) < period:
logger.warning(f"数据长度不足{period}SMA结果将包含NaN")
return df['close'].rolling(window=period).mean()
def calculate_rsi(self, df: pd.DataFrame, period: int = 14) -> pd.Series:
"""
计算RSI指标
"""
if 'close' not in df:
logger.error("DataFrame缺少'close'无法计算RSI")
return pd.Series([float('nan')] * len(df))
if len(df) < period:
logger.warning(f"数据长度不足{period}RSI结果将包含NaN")
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
loss = loss.replace(0, 1e-10) # 防止除零
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def simple_moving_average_strategy(self, sma_short_period: int = 5, sma_long_period: int = 20) -> None:
"""
简单移动平均线策略
"""
logger.info("=== 执行移动平均线策略 ===")
try:
df = self.quant_trader.get_kline_data(bar='5m', limit=max(50, sma_long_period+2))
except Exception as e:
logger.error(f"获取K线数据失败: {e}")
return
if df is None or len(df) < max(sma_short_period, sma_long_period, 2):
logger.warning("数据不足,无法执行策略")
return
df['sma_short'] = self.calculate_sma(df, sma_short_period)
df['sma_long'] = self.calculate_sma(df, sma_long_period)
if len(df) < 2:
logger.warning("数据不足2条无法判断金叉死叉")
return
latest = df.iloc[-1]
prev = df.iloc[-2]
if pd.isna(latest['sma_short']) or pd.isna(latest['sma_long']) or pd.isna(prev['sma_short']) or pd.isna(prev['sma_long']):
logger.warning("均线数据存在NaN跳过本次信号判断")
return
logger.info(f"短期均线: {latest['sma_short']:.2f}")
logger.info(f"长期均线: {latest['sma_long']:.2f}")
logger.info(f"当前价格: {latest['close']:.2f}")
if (latest['sma_short'] > latest['sma_long'] and prev['sma_short'] <= prev['sma_long']):
logger.info("信号: 买入")
self.quant_trader.place_market_order('buy', self.quant_trader.position_size)
elif (latest['sma_short'] < latest['sma_long'] and prev['sma_short'] >= prev['sma_long']):
logger.info("信号: 卖出")
self.quant_trader.place_market_order('sell', self.quant_trader.position_size)
else:
logger.info("信号: 持仓观望")
def rsi_strategy(self, period: int = 14, oversold: int = 30, overbought: int = 70) -> None:
"""
RSI策略
"""
logger.info("=== 执行RSI策略 ===")
try:
df = self.quant_trader.get_kline_data(bar='5m', limit=max(50, period+2))
except Exception as e:
logger.error(f"获取K线数据失败: {e}")
return
if df is None or len(df) < period:
logger.warning("数据不足,无法执行策略")
return
df['rsi'] = self.calculate_rsi(df, period)
latest_rsi = df['rsi'].iloc[-1]
if pd.isna(latest_rsi):
logger.warning("最新RSI为NaN跳过本次信号判断")
return
logger.info(f"当前RSI: {latest_rsi:.2f}")
if latest_rsi < oversold:
logger.info("信号: RSI超卖买入")
self.quant_trader.place_market_order('buy', self.quant_trader.position_size)
elif latest_rsi > overbought:
logger.info("信号: RSI超买卖出")
self.quant_trader.place_market_order('sell', self.quant_trader.position_size)
else:
logger.info("信号: RSI正常区间持仓观望")
def grid_trading_strategy(self, grid_levels: int = 5, grid_range: float = 0.02) -> None:
"""
网格交易策略
"""
if grid_levels <= 0:
logger.error("网格数必须大于0")
return
if grid_range <= 0:
logger.error("网格范围必须大于0")
return
logger.info(f"=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===")
try:
current_price = self.quant_trader.get_current_price()
except Exception as e:
logger.error(f"获取当前价格失败: {e}")
return
if current_price is None:
logger.warning("当前价格获取失败")
return
grid_prices = []
for i in range(grid_levels):
price = current_price * (1 + grid_range * (i - grid_levels//2) / grid_levels)
grid_prices.append(price)
logger.info(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}")
try:
df = self.quant_trader.get_kline_data(bar='1m', limit=10)
except Exception as e:
logger.error(f"获取K线数据失败: {e}")
return
if df is None or len(df) == 0 or 'close' not in df:
logger.warning("K线数据无效无法执行网格策略")
return
latest_price = df['close'].iloc[-1]
if pd.isna(latest_price):
logger.warning("最新价格为NaN跳过本次信号判断")
return
closest_grid = min(grid_prices, key=lambda x: abs(x - latest_price))
logger.info(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}")
if latest_price < closest_grid * 0.995:
logger.info("信号: 价格下跌,网格买入")
self.quant_trader.place_market_order('buy', self.quant_trader.position_size)
elif latest_price > closest_grid * 1.005:
logger.info("信号: 价格上涨,网格卖出")
self.quant_trader.place_market_order('sell', self.quant_trader.position_size)
else:
logger.info("信号: 价格在网格内,持仓观望")
def run_strategy_loop(self,
strategy: str = 'sma',
interval: int = 60,
trading_config: dict = {}) -> None:
"""
运行策略循环
"""
if interval <= 0:
logger.error("循环间隔必须大于0秒")
return
logger.info(f"开始运行{strategy}策略,间隔{interval}")
while True:
try:
2025-09-16 06:31:15 +00:00
logger.info(get_current_date_time())
try:
self.quant_trader.get_account_balance()
except Exception as e:
logger.error(f"获取账户余额失败: {e}")
if strategy == 'sma':
sma_short_period = trading_config.get("sma_short_period", 5)
sma_long_period = trading_config.get("sma_long_period", 20)
self.simple_moving_average_strategy(sma_short_period, sma_long_period)
elif strategy == 'rsi':
period = trading_config.get("rsi_period", 14)
oversold = trading_config.get("rsi_oversold", 30)
overbought = trading_config.get("rsi_overbought", 70)
self.rsi_strategy(period, oversold, overbought)
elif strategy == 'grid':
grid_levels = trading_config.get("grid_levels", 5)
grid_range = trading_config.get("grid_range", 0.02)
self.grid_trading_strategy(grid_levels, grid_range)
else:
logger.error("未知策略")
break
logger.info(f"等待{interval}秒后继续...")
time.sleep(interval)
except KeyboardInterrupt:
logger.info("策略运行被用户中断")
break
except Exception as e:
logger.error(f"策略运行异常: {e}")
time.sleep(interval)