335 lines
12 KiB
Python
335 lines
12 KiB
Python
import okx.api.account as Account
|
||
import okx.api.trade as Trade
|
||
import okx.api.market as Market
|
||
import okx.api.public as Public
|
||
import pandas as pd
|
||
import numpy as np
|
||
import time
|
||
from datetime import datetime
|
||
import json
|
||
|
||
class BitcoinQuantTrader:
|
||
def __init__(self, api_key, secret_key, passphrase, sandbox=True):
|
||
"""
|
||
初始化比特币量化交易器
|
||
|
||
Args:
|
||
api_key: OKX API Key
|
||
secret_key: OKX Secret Key
|
||
passphrase: OKX API Passphrase
|
||
sandbox: 是否使用沙盒环境(建议先用沙盒测试)
|
||
"""
|
||
self.api_key = api_key
|
||
self.secret_key = secret_key
|
||
self.passphrase = passphrase
|
||
|
||
# 初始化API客户端
|
||
flag = "0" if sandbox else "1" # 0:沙盒环境 1:实盘环境
|
||
|
||
self.account_api = Account.Account(
|
||
api_key, secret_key, passphrase,
|
||
flag=flag, debug=False
|
||
)
|
||
self.trade_api = Trade.Trade(
|
||
api_key, secret_key, passphrase,
|
||
flag=flag, debug=False
|
||
)
|
||
self.market_api = Market.Market(
|
||
api_key, secret_key, passphrase,
|
||
flag=flag, debug=False
|
||
)
|
||
self.public_api = Public.Public(
|
||
api_key, secret_key, passphrase,
|
||
flag=flag, debug=False
|
||
)
|
||
|
||
self.symbol = "BTC-USDT"
|
||
self.position_size = 0.001 # 每次交易0.001 BTC
|
||
|
||
def get_account_balance(self):
|
||
"""获取账户余额"""
|
||
try:
|
||
result = self.account_api.get_account_balance()
|
||
if result['code'] == '0':
|
||
balances = result['data']
|
||
for balance in balances:
|
||
if balance['ccy'] == 'USDT':
|
||
print(f"USDT余额: {balance['bal']}")
|
||
return float(balance['bal'])
|
||
elif balance['ccy'] == 'BTC':
|
||
print(f"BTC余额: {balance['bal']}")
|
||
return float(balance['bal'])
|
||
else:
|
||
print(f"获取余额失败: {result}")
|
||
return 0
|
||
except Exception as e:
|
||
print(f"获取余额异常: {e}")
|
||
return 0
|
||
|
||
def get_current_price(self):
|
||
"""获取当前BTC价格"""
|
||
try:
|
||
result = self.market_api.get_ticker(instId=self.symbol)
|
||
if result['code'] == '0':
|
||
price = float(result['data'][0]['last'])
|
||
print(f"当前BTC价格: ${price:,.2f}")
|
||
return price
|
||
else:
|
||
print(f"获取价格失败: {result}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"获取价格异常: {e}")
|
||
return None
|
||
|
||
def get_kline_data(self, bar='1m', limit=100):
|
||
"""获取K线数据"""
|
||
try:
|
||
result = self.market_api.get_candlesticks(
|
||
instId=self.symbol,
|
||
bar=bar,
|
||
limit=str(limit)
|
||
)
|
||
if result['code'] == '0':
|
||
# 转换为DataFrame
|
||
df = pd.DataFrame(result['data'], columns=[
|
||
'timestamp', 'open', 'high', 'low', 'close', 'volume', 'volCcy'
|
||
])
|
||
# 转换数据类型
|
||
for col in ['open', 'high', 'low', 'close', 'volume']:
|
||
df[col] = pd.to_numeric(df[col])
|
||
df['timestamp'] = pd.to_datetime(df['timestamp'].astype(int), unit='ms')
|
||
return df
|
||
else:
|
||
print(f"获取K线数据失败: {result}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"获取K线数据异常: {e}")
|
||
return None
|
||
|
||
def calculate_sma(self, df, period=20):
|
||
"""计算简单移动平均线"""
|
||
return df['close'].rolling(window=period).mean()
|
||
|
||
def calculate_rsi(self, df, period=14):
|
||
"""计算RSI指标"""
|
||
delta = df['close'].diff()
|
||
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
|
||
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
|
||
rs = gain / loss
|
||
rsi = 100 - (100 / (1 + rs))
|
||
return rsi
|
||
|
||
def place_market_order(self, side, size):
|
||
"""下市价单"""
|
||
try:
|
||
result = self.trade_api.place_order(
|
||
instId=self.symbol,
|
||
tdMode='cash',
|
||
side=side,
|
||
ordType='market',
|
||
sz=str(size)
|
||
)
|
||
if result['code'] == '0':
|
||
print(f"下单成功: {side} {size} BTC")
|
||
return result['data'][0]['ordId']
|
||
else:
|
||
print(f"下单失败: {result}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"下单异常: {e}")
|
||
return None
|
||
|
||
def simple_moving_average_strategy(self):
|
||
"""简单移动平均线策略"""
|
||
print("\n=== 执行移动平均线策略 ===")
|
||
|
||
# 获取K线数据
|
||
df = self.get_kline_data(bar='5m', limit=50)
|
||
if df is None or len(df) < 20:
|
||
print("数据不足,无法执行策略")
|
||
return
|
||
|
||
# 计算移动平均线
|
||
df['sma_short'] = self.calculate_sma(df, 5) # 短期均线
|
||
df['sma_long'] = self.calculate_sma(df, 20) # 长期均线
|
||
|
||
# 获取最新数据
|
||
latest = df.iloc[-1]
|
||
prev = df.iloc[-2]
|
||
|
||
print(f"短期均线: {latest['sma_short']:.2f}")
|
||
print(f"长期均线: {latest['sma_long']:.2f}")
|
||
print(f"当前价格: {latest['close']:.2f}")
|
||
|
||
# 策略逻辑:短期均线上穿长期均线买入,下穿卖出
|
||
if (latest['sma_short'] > latest['sma_long'] and
|
||
prev['sma_short'] <= prev['sma_long']):
|
||
print("信号: 买入")
|
||
self.place_market_order('buy', self.position_size)
|
||
elif (latest['sma_short'] < latest['sma_long'] and
|
||
prev['sma_short'] >= prev['sma_long']):
|
||
print("信号: 卖出")
|
||
self.place_market_order('sell', self.position_size)
|
||
else:
|
||
print("信号: 持仓观望")
|
||
|
||
def rsi_strategy(self):
|
||
"""RSI策略"""
|
||
print("\n=== 执行RSI策略 ===")
|
||
|
||
# 获取K线数据
|
||
df = self.get_kline_data(bar='5m', limit=50)
|
||
if df is None or len(df) < 30:
|
||
print("数据不足,无法执行策略")
|
||
return
|
||
|
||
# 计算RSI
|
||
df['rsi'] = self.calculate_rsi(df, 14)
|
||
|
||
# 获取最新RSI值
|
||
latest_rsi = df['rsi'].iloc[-1]
|
||
print(f"当前RSI: {latest_rsi:.2f}")
|
||
|
||
# 策略逻辑:RSI < 30 超卖买入,RSI > 70 超买卖出
|
||
if latest_rsi < 30:
|
||
print("信号: RSI超卖,买入")
|
||
self.place_market_order('buy', self.position_size)
|
||
elif latest_rsi > 70:
|
||
print("信号: RSI超买,卖出")
|
||
self.place_market_order('sell', self.position_size)
|
||
else:
|
||
print("信号: RSI正常区间,持仓观望")
|
||
|
||
def grid_trading_strategy(self, grid_levels=5, grid_range=0.02):
|
||
"""网格交易策略"""
|
||
print(f"\n=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===")
|
||
|
||
current_price = self.get_current_price()
|
||
if current_price is None:
|
||
return
|
||
|
||
# 计算网格价格
|
||
grid_prices = []
|
||
for i in range(grid_levels):
|
||
price = current_price * (1 + grid_range * (i - grid_levels//2) / grid_levels)
|
||
grid_prices.append(price)
|
||
|
||
print(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}")
|
||
|
||
# 获取K线数据判断当前价格在哪个网格
|
||
df = self.get_kline_data(bar='1m', limit=10)
|
||
if df is None:
|
||
return
|
||
|
||
latest_price = df['close'].iloc[-1]
|
||
|
||
# 找到最近的网格
|
||
closest_grid = min(grid_prices, key=lambda x: abs(x - latest_price))
|
||
grid_index = grid_prices.index(closest_grid)
|
||
|
||
print(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}")
|
||
|
||
# 简单的网格策略:价格下跌到网格线买入,上涨到网格线卖出
|
||
if latest_price < closest_grid * 0.995: # 价格下跌超过0.5%
|
||
print("信号: 价格下跌,网格买入")
|
||
self.place_market_order('buy', self.position_size)
|
||
elif latest_price > closest_grid * 1.005: # 价格上涨超过0.5%
|
||
print("信号: 价格上涨,网格卖出")
|
||
self.place_market_order('sell', self.position_size)
|
||
else:
|
||
print("信号: 价格在网格内,持仓观望")
|
||
|
||
def run_strategy_loop(self, strategy='sma', interval=60):
|
||
"""运行策略循环"""
|
||
print(f"开始运行{strategy}策略,间隔{interval}秒")
|
||
|
||
while True:
|
||
try:
|
||
print(f"\n{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
|
||
# 检查账户余额
|
||
self.get_account_balance()
|
||
|
||
# 执行策略
|
||
if strategy == 'sma':
|
||
self.simple_moving_average_strategy()
|
||
elif strategy == 'rsi':
|
||
self.rsi_strategy()
|
||
elif strategy == 'grid':
|
||
self.grid_trading_strategy()
|
||
else:
|
||
print("未知策略")
|
||
break
|
||
|
||
print(f"等待{interval}秒后继续...")
|
||
time.sleep(interval)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n策略运行被用户中断")
|
||
break
|
||
except Exception as e:
|
||
print(f"策略运行异常: {e}")
|
||
time.sleep(interval)
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("=== 比特币量化交易系统 ===")
|
||
|
||
# 导入配置
|
||
try:
|
||
from config import API_KEY, SECRET_KEY, PASSPHRASE, TRADING_CONFIG, TIME_CONFIG
|
||
except ImportError:
|
||
print("错误:找不到config.py文件,请确保配置文件存在")
|
||
return
|
||
|
||
# 检查是否配置了API密钥
|
||
if API_KEY == "your_api_key_here":
|
||
print("请先在config.py中配置你的OKX API密钥!")
|
||
print("1. 登录OKX官网")
|
||
print("2. 进入API管理页面")
|
||
print("3. 创建API Key、Secret Key和Passphrase")
|
||
print("4. 将密钥填入config.py文件中的相应位置")
|
||
return
|
||
|
||
# 创建交易器实例
|
||
trader = BitcoinQuantTrader(
|
||
API_KEY, SECRET_KEY, PASSPHRASE,
|
||
sandbox=TRADING_CONFIG["sandbox"]
|
||
)
|
||
|
||
# 显示菜单
|
||
while True:
|
||
print("\n请选择操作:")
|
||
print("1. 查看账户余额")
|
||
print("2. 查看当前价格")
|
||
print("3. 执行移动平均线策略")
|
||
print("4. 执行RSI策略")
|
||
print("5. 执行网格交易策略")
|
||
print("6. 运行策略循环")
|
||
print("0. 退出")
|
||
|
||
choice = input("请输入选择 (0-6): ").strip()
|
||
|
||
if choice == '0':
|
||
print("退出程序")
|
||
break
|
||
elif choice == '1':
|
||
trader.get_account_balance()
|
||
elif choice == '2':
|
||
trader.get_current_price()
|
||
elif choice == '3':
|
||
trader.simple_moving_average_strategy()
|
||
elif choice == '4':
|
||
trader.rsi_strategy()
|
||
elif choice == '5':
|
||
trader.grid_trading_strategy()
|
||
elif choice == '6':
|
||
strategy = input("选择策略 (sma/rsi/grid): ").strip()
|
||
interval = int(input("设置间隔秒数 (默认60): ") or "60")
|
||
trader.run_strategy_loop(strategy, interval)
|
||
else:
|
||
print("无效选择,请重新输入")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|