support both of Perpetual Swap and Swap Trading

This commit is contained in:
blade 2025-07-22 17:59:18 +08:00
parent b1c3af6489
commit ebf90534ae
7 changed files with 649 additions and 354 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/core/__pycache__
/__pycache__/*.pyc

Binary file not shown.

89
biz_main.py Normal file
View File

@ -0,0 +1,89 @@
import logging
from time import sleep
from core.base import QuantTrader
from core.strategy import QuantStrategy
from config import API_KEY, SECRET_KEY, PASSPHRASE, TRADING_CONFIG, TIME_CONFIG
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
class BizMain:
def __init__(self):
api_key = API_KEY
secret_key = SECRET_KEY
passphrase = PASSPHRASE
sandbox = TRADING_CONFIG.get("sandbox", True)
symbol = TRADING_CONFIG.get("symbol", "BTC-USDT")
position_size = TRADING_CONFIG.get("position_size", 0.001)
self.trader = QuantTrader(api_key, secret_key, passphrase, sandbox, symbol, position_size)
# self.strategy = QuantStrategy(api_key, secret_key, passphrase, sandbox, symbol, position_size)
def start_job(self):
"""
1. 合约开空单流程
1.1. 获取当前价格
1.2 设置tdmode为cross张数为1每张货币单位为0.01杠杆为1缓冲比例为0.3
1.2. 计算保证金
1.4. 开空单
2. 现货卖出虚拟货币流程
2.1 获取当前虚拟货币数量
2.2 卖出0.01单位虚拟货币
3. 合约平空单流程
3.1 设置tdmode为cross张数为1
3.2 平空单
"""
td_mode = "cross"
quantity = 1
try:
# 1. 合约开空单流程
logging.info("[1] 合约开空单流程:")
# price = self.trader.get_current_price(self.trader.symbol_swap)
# logging.info(f"当前合约价格: {price}")
slot = 0.01
leverage = 1
buffer_ratio = 0.3
# margin, entry_price = self.trader.calculate_margin(quantity, leverage, slot, buffer_ratio)
# logging.info(f"所需保证金: {margin}, 开仓价格: {entry_price}")
order_id, entry_price = self.trader.place_short_order(td_mode, quantity, leverage, slot, buffer_ratio)
if order_id:
logging.info(f"开空单成功订单ID: {order_id}, 开仓价格: {entry_price}")
else:
logging.error("开空单失败")
except Exception as e:
logging.error(f"合约开空单流程异常: {e}")
sleep(1)
try:
# 2. 现货卖出比特币流程
logging.info(f"[2] 现货卖出{self.trader.symbol_prefix}流程:")
balance = self.trader.get_account_balance()
btc_balance = balance.get(self.trader.symbol_prefix, 0)
logging.info(f"当前{self.trader.symbol_prefix}余额: {btc_balance}")
sell_amount = 0.01
if btc_balance >= sell_amount:
order_id = self.trader.place_market_order('sell', sell_amount)
if order_id:
logging.info(f"现货卖出{sell_amount}{self.trader.symbol_prefix}成功订单ID: {order_id}")
else:
logging.error(f"现货卖出{self.trader.symbol_prefix}失败")
else:
logging.error(f"{self.trader.symbol_prefix}余额不足,无法卖出{sell_amount}{self.trader.symbol_prefix}")
except Exception as e:
logging.error(f"现货卖出{self.trader.symbol_prefix}流程异常: {e}")
sleep(1)
try:
# 3. 合约平空单流程
logging.info("[3] 合约平空单流程:")
result = self.trader.close_short_order(td_mode, quantity)
if result:
logging.info("平空单成功")
else:
logging.error("平空单失败")
except Exception as e:
logging.error(f"合约平空单流程异常: {e}")
if __name__ == "__main__":
biz_main = BizMain()
biz_main.start_job()

View File

@ -18,6 +18,7 @@ TRADING_CONFIG = {
# 策略参数 # 策略参数
"sma_short_period": 5, # 短期移动平均线周期 "sma_short_period": 5, # 短期移动平均线周期
"sma_long_period": 20, # 长期移动平均线周期 "sma_long_period": 20, # 长期移动平均线周期
"rsi_period": 14, # RSI计算周期 "rsi_period": 14, # RSI计算周期
"rsi_oversold": 30, # RSI超卖阈值 "rsi_oversold": 30, # RSI超卖阈值
"rsi_overbought": 70, # RSI超买阈值 "rsi_overbought": 70, # RSI超买阈值
@ -34,7 +35,7 @@ TRADING_CONFIG = {
# 时间间隔配置 # 时间间隔配置
TIME_CONFIG = { TIME_CONFIG = {
"strategy_interval": 60, # 策略执行间隔(秒) "strategy_interval": 30, # 策略执行间隔(秒)
"kline_interval": "5m", # K线数据间隔 "kline_interval": "5m", # K线数据间隔
"kline_limit": 100, # K线数据条数 "kline_limit": 100, # K线数据条数
} }

296
core/base.py Normal file
View File

@ -0,0 +1,296 @@
import okx.Account as Account
import okx.Trade as Trade
import okx.MarketData as Market
import okx.PublicData as Public
import pandas as pd
import logging
from typing import Optional
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
class QuantTrader:
def __init__(self,
api_key: str,
secret_key: str,
passphrase: str,
sandbox: bool = True,
symbol: str = "BTC-USDT",
position_size: float = 0.001):
"""
初始化货币量化交易器
Args:
api_key: OKX API Key
secret_key: OKX Secret Key
passphrase: OKX API Passphrase
sandbox: 是否使用沙盒环境建议先用沙盒测试
"""
self.api_key = api_key
self.secret_key = secret_key
self.passphrase = passphrase
flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境
self.account_api = Account.AccountAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.trade_api = Trade.TradeAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.market_api = Market.MarketAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.public_api = Public.PublicAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.symbol = symbol
self.symbol_swap = f'{symbol}-SWAP'
self.symbol_prefix = symbol.split('-')[0]
self.position_size = position_size
def get_account_balance(self) -> float:
"""获取账户余额"""
try:
result = {}
search_result = self.account_api.get_account_balance()
if search_result.get('code') == '0':
balances = search_result.get('data', [])
for balance in balances:
details = balance.get('details', [])
for detail in details:
if detail.get('ccy') == 'USDT':
logging.info(f"USDT余额: {detail.get('availBal')}")
result['USDT'] = float(detail.get('availBal', 0))
if detail.get('ccy') == self.symbol_prefix:
logging.info(f"{self.symbol_prefix}余额: {detail.get('availBal')}")
result[self.symbol_prefix] = float(detail.get('availBal', 0))
if detail.get('ccy') == self.symbol_swap:
logging.info(f"{self.symbol_swap}余额: {detail.get('availBal')}")
result[self.symbol_swap] = float(detail.get('availBal', 0))
return result
else:
logging.error(f"获取余额失败: {search_result}")
return {}
except Exception as e:
logging.error(f"获取余额异常: {e}")
return {}
def get_current_price(self, symbol: str = None) -> Optional[float]:
"""获取当前货币价格"""
try:
if symbol is None:
symbol = self.symbol
symbol_prefix = self.symbol_prefix
else:
symbol_prefix = symbol.split('-')[0]
result = self.market_api.get_ticker(instId=symbol)
if result.get('code') == '0':
data = result.get('data', [])
if data and 'last' in data[0]:
price = float(data[0]['last'])
logging.info(f"当前{symbol_prefix}价格: ${price:,.2f}")
return price
else:
logging.error(f"ticker数据格式异常: {data}")
return None
else:
logging.error(f"获取价格失败: {result}")
return None
except Exception as e:
logging.error(f"获取价格异常: {e}")
return None
def get_kline_data(self, bar: str = '1m', limit: int = 100) -> Optional[pd.DataFrame]:
"""获取K线数据"""
try:
result = self.market_api.get_candlesticks(
instId=self.symbol,
bar=bar,
limit=str(limit)
)
if result.get('code') == '0':
data = result.get('data', [])
if not data:
logging.warning("K线数据为空")
return None
df = pd.DataFrame(data, columns=[
'timestamp', 'open', 'high', 'low', 'close',
'volume', 'volCcy', "volCCyQuote", "confirm"
])
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
df['timestamp'] = pd.to_datetime(df['timestamp'].astype(int), unit='ms', errors='coerce')
return df
else:
logging.error(f"获取K线数据失败: {result}")
return None
except Exception as e:
logging.error(f"获取K线数据异常: {e}")
return None
def place_market_order(self, side: str, size: float) -> Optional[str]:
"""下市价单"""
balance = self.get_account_balance()
usdt_balance = balance.get('USDT')
symbol_balance = balance.get(self.symbol_prefix)
if side == 'sell':
try:
if symbol_balance < size:
logging.error(f"{self.symbol_prefix}余额不足,目前余额: {symbol_balance}")
return None
result = self.trade_api.place_order(
instId=self.symbol,
tdMode='cash',
side=side,
ordType='market',
sz=str(size)
)
if result.get('code') == '0':
logging.info(f"下单成功: {side} {size} {self.symbol_prefix}")
return result['data'][0]['ordId']
else:
logging.error(f"下单失败: {result}")
return None
except Exception as e:
logging.error(f"下单异常: {e}")
return None
elif side == 'buy':
try:
instrument_result = self.public_api.get_instruments(instType="SPOT", instId=self.symbol)
instrument_data = instrument_result.get("data", [])
if not instrument_data:
logging.error(f"未获取到合约信息: {instrument_result}")
return None
min_sz = float(instrument_data[0].get("minSz", 0))
if size < min_sz:
size = min_sz
ticker = self.market_api.get_ticker(instId=self.symbol)
last_price = float(ticker["data"][0]["last"])
usdt_amount = float(last_price * size)
if usdt_balance < usdt_amount:
logging.error(f"USDT余额不足目前余额: {usdt_balance}")
return None
result = self.trade_api.place_order(
instId=self.symbol,
tdMode="cash",
side=side,
ordType="market",
sz=str(usdt_amount)
)
if result.get('code') == '0':
logging.info(f"下单成功: {side} {usdt_amount} USDT")
return result['data'][0]['ordId']
else:
logging.error(f"下单失败: {result}")
return None
except Exception as e:
logging.error(f"下单异常: {e}")
return None
else:
logging.error(f"不支持的下单方向: {side}")
return None
# 设置杠杆倍数
def set_leverage(self, leverage: int = 1, mgn_mode: str = "cross", ccy: str = "USDT", posSide: str = "short"):
result = self.account_api.set_leverage(
lever=str(leverage),
mgnMode=mgn_mode,
instId=self.symbol_swap,
ccy=ccy,
posSide=posSide
)
if result["code"] == "0":
logging.info(f"设置杠杆倍数 {leverage}x 成功")
else:
logging.error(f"设置杠杆失败: {result['msg']}")
return result["code"] == "0"
# 计算保证金需求
def calculate_margin(self, quantity: int = 10, leverage: int = 1, slot: float = 0.01, buffer_ratio: float = 0.3):
price = self.get_current_price(self.symbol_swap)
if not price:
return None
contract_value = quantity * slot * price # 每张 0.01 BTC
initial_margin = contract_value / leverage
recommended_margin = initial_margin * (1 + buffer_ratio)
logging.info(f"开仓{self.symbol_swap}价格: {price:.2f} USDT")
logging.info(f"合约总价值: {contract_value:.2f} USDT")
logging.info(f"初始保证金: {initial_margin:.2f} USDT")
logging.info(f"推荐保证金 (含 {buffer_ratio*100}% 缓冲): {recommended_margin:.2f} USDT")
return recommended_margin, price
# 开空头仓位(卖出空单)
def place_short_order(self, td_mode: str = "cross", quantity: int = 10, leverage: int = 1, slot: float = 0.01, buffer_ratio: float = 0.3):
"""开空头仓位(卖出空单)"""
# 计算所需保证金和开仓价格
margin_data = self.calculate_margin(quantity, leverage, slot, buffer_ratio)
if not margin_data:
logging.error("无法计算保证金,终止下单")
return None, None
required_margin, entry_price = margin_data
# 检查余额
balance = self.get_account_balance()
avail_bal = balance.get('USDT')
if avail_bal is None or avail_bal < required_margin:
logging.error(f"保证金不足,需至少 {required_margin:.2f} USDT当前余额: {avail_bal}")
return None, None
# 设置杠杆
if not self.set_leverage(leverage, mgn_mode=td_mode, ccy="USDT", posSide="short"):
return None, None
# 下单
order_data = {
"instId": self.symbol_swap,
"tdMode": td_mode,
"ccy": "USDT",
"side": "sell",
"posSide": "short",
"ordType": "market",
"sz": str(quantity),
}
result = self.trade_api.place_order(**order_data)
if result.get("code") == "0":
logging.info(f"开空单成功订单ID: {result['data'][0]['ordId']}")
return result["data"][0]["ordId"], entry_price
else:
logging.error(f"开空单失败: {result.get('msg', result)}")
return None, None
# 平空单(买入平仓)
def close_short_order(self, td_mode: str = "cross", quantity: float = 10) -> bool:
"""平空单(买入平仓)"""
order_data = {
"instId": self.symbol_swap,
"tdMode": td_mode,
"ccy": "USDT",
"side": "buy",
"posSide": "short",
"ordType": "market",
"sz": str(quantity),
}
result = self.trade_api.place_order(**order_data)
if result.get("code") == "0":
logging.info(f"平空单成功订单ID: {result['data'][0]['ordId']}")
return True
else:
logging.error(f"平空单失败: {result.get('msg', result)}")
return False
def get_minimun_order_size(self) -> None:
"""获取最小订单数量"""
try:
result = self.public_api.get_instruments(instType="SPOT", instId=self.symbol)
if result.get("code") == "0":
instrument = result.get("data", [{}])[0]
min_sz = float(instrument.get("minSz", 0))
lot_sz = float(instrument.get("lotSz", 0))
logging.info(f"最小交易量 (minSz): {min_sz} {self.symbol_prefix}")
logging.info(f"交易量精度 (lotSz): {lot_sz} {self.symbol_prefix}")
else:
logging.error(f"错误: {result.get('msg', result)}")
except Exception as e:
logging.error(f"异常: {str(e)}")

209
core/strategy.py Normal file
View File

@ -0,0 +1,209 @@
import time
from datetime import datetime
import logging
from typing import Optional
import pandas as pd
from core.base import QuantTrader
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
class QuantStrategy:
def __init__(self,
api_key: str,
secret_key: str,
passphrase: str,
sandbox: bool = True,
symbol: str = "BTC-USDT",
position_size: float = 0.001):
"""
初始化量化策略
"""
self.quant_trader = QuantTrader(
api_key,
secret_key,
passphrase,
sandbox,
symbol,
position_size)
def calculate_sma(self, df: pd.DataFrame, period: int = 20) -> pd.Series:
"""
计算简单移动平均线
"""
if 'close' not in df:
logging.error("DataFrame缺少'close'无法计算SMA")
return pd.Series([float('nan')] * len(df))
if len(df) < period:
logging.warning(f"数据长度不足{period}SMA结果将包含NaN")
return df['close'].rolling(window=period).mean()
def calculate_rsi(self, df: pd.DataFrame, period: int = 14) -> pd.Series:
"""
计算RSI指标
"""
if 'close' not in df:
logging.error("DataFrame缺少'close'无法计算RSI")
return pd.Series([float('nan')] * len(df))
if len(df) < period:
logging.warning(f"数据长度不足{period}RSI结果将包含NaN")
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
loss = loss.replace(0, 1e-10) # 防止除零
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def simple_moving_average_strategy(self, sma_short_period: int = 5, sma_long_period: int = 20) -> None:
"""
简单移动平均线策略
"""
logging.info("=== 执行移动平均线策略 ===")
try:
df = self.quant_trader.get_kline_data(bar='5m', limit=max(50, sma_long_period+2))
except Exception as e:
logging.error(f"获取K线数据失败: {e}")
return
if df is None or len(df) < max(sma_short_period, sma_long_period, 2):
logging.warning("数据不足,无法执行策略")
return
df['sma_short'] = self.calculate_sma(df, sma_short_period)
df['sma_long'] = self.calculate_sma(df, sma_long_period)
if len(df) < 2:
logging.warning("数据不足2条无法判断金叉死叉")
return
latest = df.iloc[-1]
prev = df.iloc[-2]
if pd.isna(latest['sma_short']) or pd.isna(latest['sma_long']) or pd.isna(prev['sma_short']) or pd.isna(prev['sma_long']):
logging.warning("均线数据存在NaN跳过本次信号判断")
return
logging.info(f"短期均线: {latest['sma_short']:.2f}")
logging.info(f"长期均线: {latest['sma_long']:.2f}")
logging.info(f"当前价格: {latest['close']:.2f}")
if (latest['sma_short'] > latest['sma_long'] and prev['sma_short'] <= prev['sma_long']):
logging.info("信号: 买入")
self.quant_trader.place_market_order('buy', self.quant_trader.position_size)
elif (latest['sma_short'] < latest['sma_long'] and prev['sma_short'] >= prev['sma_long']):
logging.info("信号: 卖出")
self.quant_trader.place_market_order('sell', self.quant_trader.position_size)
else:
logging.info("信号: 持仓观望")
def rsi_strategy(self, period: int = 14, oversold: int = 30, overbought: int = 70) -> None:
"""
RSI策略
"""
logging.info("=== 执行RSI策略 ===")
try:
df = self.quant_trader.get_kline_data(bar='5m', limit=max(50, period+2))
except Exception as e:
logging.error(f"获取K线数据失败: {e}")
return
if df is None or len(df) < period:
logging.warning("数据不足,无法执行策略")
return
df['rsi'] = self.calculate_rsi(df, period)
latest_rsi = df['rsi'].iloc[-1]
if pd.isna(latest_rsi):
logging.warning("最新RSI为NaN跳过本次信号判断")
return
logging.info(f"当前RSI: {latest_rsi:.2f}")
if latest_rsi < oversold:
logging.info("信号: RSI超卖买入")
self.quant_trader.place_market_order('buy', self.quant_trader.position_size)
elif latest_rsi > overbought:
logging.info("信号: RSI超买卖出")
self.quant_trader.place_market_order('sell', self.quant_trader.position_size)
else:
logging.info("信号: RSI正常区间持仓观望")
def grid_trading_strategy(self, grid_levels: int = 5, grid_range: float = 0.02) -> None:
"""
网格交易策略
"""
if grid_levels <= 0:
logging.error("网格数必须大于0")
return
if grid_range <= 0:
logging.error("网格范围必须大于0")
return
logging.info(f"=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===")
try:
current_price = self.quant_trader.get_current_price()
except Exception as e:
logging.error(f"获取当前价格失败: {e}")
return
if current_price is None:
logging.warning("当前价格获取失败")
return
grid_prices = []
for i in range(grid_levels):
price = current_price * (1 + grid_range * (i - grid_levels//2) / grid_levels)
grid_prices.append(price)
logging.info(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}")
try:
df = self.quant_trader.get_kline_data(bar='1m', limit=10)
except Exception as e:
logging.error(f"获取K线数据失败: {e}")
return
if df is None or len(df) == 0 or 'close' not in df:
logging.warning("K线数据无效无法执行网格策略")
return
latest_price = df['close'].iloc[-1]
if pd.isna(latest_price):
logging.warning("最新价格为NaN跳过本次信号判断")
return
closest_grid = min(grid_prices, key=lambda x: abs(x - latest_price))
logging.info(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}")
if latest_price < closest_grid * 0.995:
logging.info("信号: 价格下跌,网格买入")
self.quant_trader.place_market_order('buy', self.quant_trader.position_size)
elif latest_price > closest_grid * 1.005:
logging.info("信号: 价格上涨,网格卖出")
self.quant_trader.place_market_order('sell', self.quant_trader.position_size)
else:
logging.info("信号: 价格在网格内,持仓观望")
def run_strategy_loop(self,
strategy: str = 'sma',
interval: int = 60,
trading_config: dict = {}) -> None:
"""
运行策略循环
"""
if interval <= 0:
logging.error("循环间隔必须大于0秒")
return
logging.info(f"开始运行{strategy}策略,间隔{interval}")
while True:
try:
logging.info(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
try:
self.quant_trader.get_account_balance()
except Exception as e:
logging.error(f"获取账户余额失败: {e}")
if strategy == 'sma':
sma_short_period = trading_config.get("sma_short_period", 5)
sma_long_period = trading_config.get("sma_long_period", 20)
self.simple_moving_average_strategy(sma_short_period, sma_long_period)
elif strategy == 'rsi':
period = trading_config.get("rsi_period", 14)
oversold = trading_config.get("rsi_oversold", 30)
overbought = trading_config.get("rsi_overbought", 70)
self.rsi_strategy(period, oversold, overbought)
elif strategy == 'grid':
grid_levels = trading_config.get("grid_levels", 5)
grid_range = trading_config.get("grid_range", 0.02)
self.grid_trading_strategy(grid_levels, grid_range)
else:
logging.error("未知策略")
break
logging.info(f"等待{interval}秒后继续...")
time.sleep(interval)
except KeyboardInterrupt:
logging.info("策略运行被用户中断")
break
except Exception as e:
logging.error(f"策略运行异常: {e}")
time.sleep(interval)

404
play.py
View File

@ -1,400 +1,98 @@
import okx.Account as Account import logging
import okx.Trade as Trade from core.base import QuantTrader
import okx.MarketData as Market from core.strategy import QuantStrategy
import okx.PublicData as Public
import pandas as pd
import numpy as np
import time
from datetime import datetime
import json
class BitcoinQuantTrader: logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
def __init__(self, api_key, secret_key, passphrase, sandbox=True):
"""
初始化比特币量化交易器
Args:
api_key: OKX API Key
secret_key: OKX Secret Key
passphrase: OKX API Passphrase
sandbox: 是否使用沙盒环境建议先用沙盒测试
"""
self.api_key = api_key
self.secret_key = secret_key
self.passphrase = passphrase
# 初始化API客户端
flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境
self.account_api = Account.AccountAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.trade_api = Trade.TradeAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.market_api = Market.MarketAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.public_api = Public.PublicAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.symbol = "BTC-USDT"
self.position_size = 0.001 # 每次交易0.001 BTC
def get_account_balance(self):
"""获取账户余额"""
try:
result = self.account_api.get_account_balance()
if result['code'] == '0':
balances = result['data']
for balance in balances:
details = balance['details']
for detail in details:
if detail['ccy'] == 'USDT':
print(f"USDT余额: {detail['availBal']}")
return float(detail['availBal'])
elif detail['ccy'] == 'BTC':
print(f"BTC余额: {detail['availBal']}")
# return float(detail['availBal'])
else:
print(f"获取余额失败: {result}")
return 0
except Exception as e:
print(f"获取余额异常: {e}")
return 0
def get_current_price(self):
"""获取当前BTC价格"""
try:
result = self.market_api.get_ticker(instId=self.symbol)
if result['code'] == '0':
price = float(result['data'][0]['last'])
print(f"当前BTC价格: ${price:,.2f}")
return price
else:
print(f"获取价格失败: {result}")
return None
except Exception as e:
print(f"获取价格异常: {e}")
return None
def get_kline_data(self, bar='1m', limit=100):
"""获取K线数据"""
try:
result = self.market_api.get_candlesticks(
instId=self.symbol,
bar=bar,
limit=str(limit)
)
if result['code'] == '0':
# 转换为DataFrame
df = pd.DataFrame(result['data'], columns=[
'timestamp', 'open', 'high', 'low', 'close',
'volume', 'volCcy', "volCCyQuote", "confirm"
])
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col])
df['timestamp'] = pd.to_datetime(df['timestamp'].astype(int), unit='ms')
return df
else:
print(f"获取K线数据失败: {result}")
return None
except Exception as e:
print(f"获取K线数据异常: {e}")
return None
def calculate_sma(self, df, period=20):
"""计算简单移动平均线"""
return df['close'].rolling(window=period).mean()
def calculate_rsi(self, df, period=14):
"""计算RSI指标"""
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def place_market_order(self, side, size):
"""下市价单"""
if side == 'sell':
try:
result = self.trade_api.place_order(
instId=self.symbol,
tdMode='cash',
side=side,
ordType='market',
sz=str(size)
)
if result['code'] == '0':
print(f"下单成功: {side} {size} BTC")
return result['data'][0]['ordId']
else:
print(f"下单失败: {result}")
return None
except Exception as e:
print(f"下单异常: {e}")
return None
elif side == 'buy':
instrument = self.public_api.get_instruments(instType="SPOT", instId=self.symbol)["data"][0]
min_sz = float(instrument["minSz"]) # 最小交易量
if size < min_sz:
size = min_sz
ticker = self.market_api.get_ticker(instId=self.symbol)
last_price = float(ticker["data"][0]["last"]) # 最新价格
# 买入数量是USDT将BTC转换为USDT def main() -> None:
usdt_amount = float(last_price * size)
try:
result = self.trade_api.place_order(
instId=self.symbol,
tdMode="cash",
side=side,
ordType="market",
sz=str(usdt_amount)
)
print("下单结果:", result)
except Exception as e:
print("错误:", str(e))
def get_minimun_order_size(self):
"""获取最小订单数量"""
try:
result = self.public_api.get_instruments(instType="SPOT", instId=self.symbol)
if result["code"] == "0":
instrument = result["data"][0]
min_sz = float(instrument["minSz"]) # 最小交易量BTC
lot_sz = float(instrument["lotSz"]) # 交易量精度
print(f"最小交易量 (minSz): {min_sz} BTC")
print(f"交易量精度 (lotSz): {lot_sz} BTC")
else:
print(f"错误: {result['msg']}")
except Exception as e:
print(f"异常: {str(e)}")
def simple_moving_average_strategy(self):
"""简单移动平均线策略"""
print("\n=== 执行移动平均线策略 ===")
# 获取K线数据
df = self.get_kline_data(bar='5m', limit=50)
if df is None or len(df) < 20:
print("数据不足,无法执行策略")
return
# 计算移动平均线
df['sma_short'] = self.calculate_sma(df, 5) # 短期均线
df['sma_long'] = self.calculate_sma(df, 20) # 长期均线
# 获取最新数据
latest = df.iloc[-1]
prev = df.iloc[-2]
print(f"短期均线: {latest['sma_short']:.2f}")
print(f"长期均线: {latest['sma_long']:.2f}")
print(f"当前价格: {latest['close']:.2f}")
# 策略逻辑:短期均线上穿长期均线买入,下穿卖出
if (latest['sma_short'] > latest['sma_long'] and
prev['sma_short'] <= prev['sma_long']):
print("信号: 买入")
self.place_market_order('buy', self.position_size)
elif (latest['sma_short'] < latest['sma_long'] and
prev['sma_short'] >= prev['sma_long']):
print("信号: 卖出")
self.place_market_order('sell', self.position_size)
else:
print("信号: 持仓观望")
def rsi_strategy(self):
"""RSI策略"""
print("\n=== 执行RSI策略 ===")
# 获取K线数据
df = self.get_kline_data(bar='5m', limit=50)
if df is None or len(df) < 30:
print("数据不足,无法执行策略")
return
# 计算RSI
df['rsi'] = self.calculate_rsi(df, 14)
# 获取最新RSI值
latest_rsi = df['rsi'].iloc[-1]
print(f"当前RSI: {latest_rsi:.2f}")
# 策略逻辑RSI < 30 超卖买入RSI > 70 超买卖出
if latest_rsi < 30:
print("信号: RSI超卖买入")
self.place_market_order('buy', self.position_size)
elif latest_rsi > 70:
print("信号: RSI超买卖出")
self.place_market_order('sell', self.position_size)
else:
print("信号: RSI正常区间持仓观望")
def grid_trading_strategy(self, grid_levels=5, grid_range=0.02):
"""网格交易策略"""
print(f"\n=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===")
current_price = self.get_current_price()
if current_price is None:
return
# 计算网格价格
grid_prices = []
for i in range(grid_levels):
price = current_price * (1 + grid_range * (i - grid_levels//2) / grid_levels)
grid_prices.append(price)
print(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}")
# 获取K线数据判断当前价格在哪个网格
df = self.get_kline_data(bar='1m', limit=10)
if df is None:
return
latest_price = df['close'].iloc[-1]
# 找到最近的网格
closest_grid = min(grid_prices, key=lambda x: abs(x - latest_price))
grid_index = grid_prices.index(closest_grid)
print(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}")
# 简单的网格策略:价格下跌到网格线买入,上涨到网格线卖出
if latest_price < closest_grid * 0.995: # 价格下跌超过0.5%
print("信号: 价格下跌,网格买入")
self.place_market_order('buy', self.position_size)
elif latest_price > closest_grid * 1.005: # 价格上涨超过0.5%
print("信号: 价格上涨,网格卖出")
self.place_market_order('sell', self.position_size)
else:
print("信号: 价格在网格内,持仓观望")
def run_strategy_loop(self, strategy='sma', interval=60):
"""运行策略循环"""
print(f"开始运行{strategy}策略,间隔{interval}")
while True:
try:
print(f"\n{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 检查账户余额
self.get_account_balance()
# 执行策略
if strategy == 'sma':
self.simple_moving_average_strategy()
elif strategy == 'rsi':
self.rsi_strategy()
elif strategy == 'grid':
self.grid_trading_strategy()
else:
print("未知策略")
break
print(f"等待{interval}秒后继续...")
time.sleep(interval)
except KeyboardInterrupt:
print("\n策略运行被用户中断")
break
except Exception as e:
print(f"策略运行异常: {e}")
time.sleep(interval)
def main():
"""主函数""" """主函数"""
print("=== 比特币量化交易系统 ===") logging.info("=== 比特币量化交易系统 ===")
# 导入配置 # 导入配置
try: try:
from config import API_KEY, SECRET_KEY, PASSPHRASE, TRADING_CONFIG, TIME_CONFIG from config import API_KEY, SECRET_KEY, PASSPHRASE, TRADING_CONFIG, TIME_CONFIG
except ImportError: except ImportError:
print("错误:找不到config.py文件请确保配置文件存在") logging.error("找不到config.py文件请确保配置文件存在")
return return
# 检查是否配置了API密钥 # 检查是否配置了API密钥
if API_KEY == "your_api_key_here": if API_KEY == "your_api_key_here":
print("请先在config.py中配置你的OKX API密钥") logging.error("请先在config.py中配置你的OKX API密钥\n1. 登录OKX官网\n2. 进入API管理页面\n3. 创建API Key、Secret Key和Passphrase\n4. 将密钥填入config.py文件中的相应位置")
print("1. 登录OKX官网")
print("2. 进入API管理页面")
print("3. 创建API Key、Secret Key和Passphrase")
print("4. 将密钥填入config.py文件中的相应位置")
return return
# 创建交易器实例 # 创建交易器实例
trader = BitcoinQuantTrader( sandbox = TRADING_CONFIG.get("sandbox", True)
symbol = TRADING_CONFIG.get("symbol", "BTC-USDT")
position_size = TRADING_CONFIG.get("position_size", 0.001)
trader = QuantTrader(
API_KEY, SECRET_KEY, PASSPHRASE, API_KEY, SECRET_KEY, PASSPHRASE,
sandbox=TRADING_CONFIG["sandbox"] sandbox=sandbox,
symbol=symbol,
position_size=position_size
)
strategy = QuantStrategy(
API_KEY, SECRET_KEY, PASSPHRASE,
sandbox=sandbox,
symbol=symbol,
position_size=position_size
) )
# 显示菜单 # 显示菜单
while True: while True:
print("\n请选择操作:") logging.info("\n请选择操作:\n1. 查看账户余额\n2. 查看当前价格\n3. 执行移动平均线策略\n4. 执行RSI策略\n5. 执行网格交易策略\n6. 运行策略循环\n7. 买入测试\n8. 卖出测试\n9. 获取最小交易量\n0. 退出")
print("1. 查看账户余额")
print("2. 查看当前价格")
print("3. 执行移动平均线策略")
print("4. 执行RSI策略")
print("5. 执行网格交易策略")
print("6. 运行策略循环")
print("7. 买入测试")
print("8. 卖出测试")
print("9. 获取最小交易量")
print("0. 退出")
choice = input("请输入选择 (0-9): ").strip() choice = input("请输入选择 (0-9): ").strip()
if choice == '0': if choice == '0':
print("退出程序") logging.info("退出程序")
break break
elif choice == '1': elif choice == '1':
trader.get_account_balance() trader.get_account_balance()
elif choice == '2': elif choice == '2':
trader.get_current_price() trader.get_current_price()
elif choice == '3': elif choice == '3':
trader.simple_moving_average_strategy() sma_short_period = TRADING_CONFIG.get("sma_short_period", 5)
sma_long_period = TRADING_CONFIG.get("sma_long_period", 20)
strategy.simple_moving_average_strategy(sma_short_period, sma_long_period)
elif choice == '4': elif choice == '4':
trader.rsi_strategy() period = TRADING_CONFIG.get("rsi_period", 14)
rsi_oversold = TRADING_CONFIG.get("rsi_oversold", 30)
rsi_overbought = TRADING_CONFIG.get("rsi_overbought", 70)
strategy.rsi_strategy(period, rsi_oversold, rsi_overbought)
elif choice == '5': elif choice == '5':
trader.grid_trading_strategy() grid_levels = TRADING_CONFIG.get("grid_levels", 5)
grid_range = TRADING_CONFIG.get("grid_range", 0.02)
strategy.grid_trading_strategy(grid_levels, grid_range)
elif choice == '6': elif choice == '6':
strategy = input("选择策略 (sma/rsi/grid): ").strip() strategy_name = input("选择策略 (sma/rsi/grid): ").strip()
interval = int(input("设置间隔秒数 (默认60): ") or "60") interval = TIME_CONFIG.get("strategy_interval", 30)
trader.run_strategy_loop(strategy, interval) strategy.run_strategy_loop(strategy_name, interval, TRADING_CONFIG)
elif choice == '7': elif choice == '7':
position_size = 0.01 defalt_position_size = 0.01
input_size = input("请输入买入数量: ") input_size = input("请输入买入数量: ")
if input_size: if input_size:
try: try:
position_size = float(input_size) position_size = float(input_size)
print(f"买入{position_size}BTC") logging.info(f"买入{position_size}BTC")
trader.place_market_order('buy', position_size) trader.place_market_order('buy', position_size)
except ValueError: except ValueError:
print(f"输入无效,默认买入{position_size}BTC") logging.warning(f"输入无效,默认买入{defalt_position_size}BTC")
trader.place_market_order('buy', defalt_position_size)
else:
logging.info(f"默认买入{defalt_position_size}BTC")
trader.place_market_order('buy', defalt_position_size)
elif choice == '8': elif choice == '8':
position_size = 0.01 defalt_position_size = 0.01
input_size = input("请输入卖出数量: ") input_size = input("请输入卖出数量: ")
if input_size: if input_size:
try: try:
position_size = float(input_size) position_size = float(input_size)
print(f"卖出{position_size}BTC") logging.info(f"卖出{position_size}BTC")
trader.place_market_order('sell', position_size) trader.place_market_order('sell', position_size)
except ValueError: except ValueError:
print(f"输入无效,默认卖出{position_size}BTC") logging.warning(f"输入无效,默认卖出{defalt_position_size}BTC")
trader.place_market_order('sell', defalt_position_size)
else:
logging.info(f"默认卖出{defalt_position_size}BTC")
trader.place_market_order('sell', defalt_position_size)
elif choice == '9': elif choice == '9':
trader.get_minimun_order_size() trader.get_minimun_order_size()
else: else:
print("无效选择,请重新输入") logging.warning("无效选择,请重新输入")
if __name__ == "__main__": if __name__ == "__main__":
main() main()