crypto_quant/play.py

401 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import okx.Account as Account
import okx.Trade as Trade
import okx.MarketData as Market
import okx.PublicData as Public
import pandas as pd
import numpy as np
import time
from datetime import datetime
import json
class BitcoinQuantTrader:
def __init__(self, api_key, secret_key, passphrase, sandbox=True):
"""
初始化比特币量化交易器
Args:
api_key: OKX API Key
secret_key: OKX Secret Key
passphrase: OKX API Passphrase
sandbox: 是否使用沙盒环境(建议先用沙盒测试)
"""
self.api_key = api_key
self.secret_key = secret_key
self.passphrase = passphrase
# 初始化API客户端
flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境
self.account_api = Account.AccountAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.trade_api = Trade.TradeAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.market_api = Market.MarketAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.public_api = Public.PublicAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=flag
)
self.symbol = "BTC-USDT"
self.position_size = 0.001 # 每次交易0.001 BTC
def get_account_balance(self):
"""获取账户余额"""
try:
result = self.account_api.get_account_balance()
if result['code'] == '0':
balances = result['data']
for balance in balances:
details = balance['details']
for detail in details:
if detail['ccy'] == 'USDT':
print(f"USDT余额: {detail['availBal']}")
return float(detail['availBal'])
elif detail['ccy'] == 'BTC':
print(f"BTC余额: {detail['availBal']}")
# return float(detail['availBal'])
else:
print(f"获取余额失败: {result}")
return 0
except Exception as e:
print(f"获取余额异常: {e}")
return 0
def get_current_price(self):
"""获取当前BTC价格"""
try:
result = self.market_api.get_ticker(instId=self.symbol)
if result['code'] == '0':
price = float(result['data'][0]['last'])
print(f"当前BTC价格: ${price:,.2f}")
return price
else:
print(f"获取价格失败: {result}")
return None
except Exception as e:
print(f"获取价格异常: {e}")
return None
def get_kline_data(self, bar='1m', limit=100):
"""获取K线数据"""
try:
result = self.market_api.get_candlesticks(
instId=self.symbol,
bar=bar,
limit=str(limit)
)
if result['code'] == '0':
# 转换为DataFrame
df = pd.DataFrame(result['data'], columns=[
'timestamp', 'open', 'high', 'low', 'close',
'volume', 'volCcy', "volCCyQuote", "confirm"
])
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col])
df['timestamp'] = pd.to_datetime(df['timestamp'].astype(int), unit='ms')
return df
else:
print(f"获取K线数据失败: {result}")
return None
except Exception as e:
print(f"获取K线数据异常: {e}")
return None
def calculate_sma(self, df, period=20):
"""计算简单移动平均线"""
return df['close'].rolling(window=period).mean()
def calculate_rsi(self, df, period=14):
"""计算RSI指标"""
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def place_market_order(self, side, size):
"""下市价单"""
if side == 'sell':
try:
result = self.trade_api.place_order(
instId=self.symbol,
tdMode='cash',
side=side,
ordType='market',
sz=str(size)
)
if result['code'] == '0':
print(f"下单成功: {side} {size} BTC")
return result['data'][0]['ordId']
else:
print(f"下单失败: {result}")
return None
except Exception as e:
print(f"下单异常: {e}")
return None
elif side == 'buy':
instrument = self.public_api.get_instruments(instType="SPOT", instId=self.symbol)["data"][0]
min_sz = float(instrument["minSz"]) # 最小交易量
if size < min_sz:
size = min_sz
ticker = self.market_api.get_ticker(instId=self.symbol)
last_price = float(ticker["data"][0]["last"]) # 最新价格
# 买入数量是USDT将BTC转换为USDT
usdt_amount = float(last_price * size)
try:
result = self.trade_api.place_order(
instId=self.symbol,
tdMode="cash",
side=side,
ordType="market",
sz=str(usdt_amount)
)
print("下单结果:", result)
except Exception as e:
print("错误:", str(e))
def get_minimun_order_size(self):
"""获取最小订单数量"""
try:
result = self.public_api.get_instruments(instType="SPOT", instId=self.symbol)
if result["code"] == "0":
instrument = result["data"][0]
min_sz = float(instrument["minSz"]) # 最小交易量BTC
lot_sz = float(instrument["lotSz"]) # 交易量精度
print(f"最小交易量 (minSz): {min_sz} BTC")
print(f"交易量精度 (lotSz): {lot_sz} BTC")
else:
print(f"错误: {result['msg']}")
except Exception as e:
print(f"异常: {str(e)}")
def simple_moving_average_strategy(self):
"""简单移动平均线策略"""
print("\n=== 执行移动平均线策略 ===")
# 获取K线数据
df = self.get_kline_data(bar='5m', limit=50)
if df is None or len(df) < 20:
print("数据不足,无法执行策略")
return
# 计算移动平均线
df['sma_short'] = self.calculate_sma(df, 5) # 短期均线
df['sma_long'] = self.calculate_sma(df, 20) # 长期均线
# 获取最新数据
latest = df.iloc[-1]
prev = df.iloc[-2]
print(f"短期均线: {latest['sma_short']:.2f}")
print(f"长期均线: {latest['sma_long']:.2f}")
print(f"当前价格: {latest['close']:.2f}")
# 策略逻辑:短期均线上穿长期均线买入,下穿卖出
if (latest['sma_short'] > latest['sma_long'] and
prev['sma_short'] <= prev['sma_long']):
print("信号: 买入")
self.place_market_order('buy', self.position_size)
elif (latest['sma_short'] < latest['sma_long'] and
prev['sma_short'] >= prev['sma_long']):
print("信号: 卖出")
self.place_market_order('sell', self.position_size)
else:
print("信号: 持仓观望")
def rsi_strategy(self):
"""RSI策略"""
print("\n=== 执行RSI策略 ===")
# 获取K线数据
df = self.get_kline_data(bar='5m', limit=50)
if df is None or len(df) < 30:
print("数据不足,无法执行策略")
return
# 计算RSI
df['rsi'] = self.calculate_rsi(df, 14)
# 获取最新RSI值
latest_rsi = df['rsi'].iloc[-1]
print(f"当前RSI: {latest_rsi:.2f}")
# 策略逻辑RSI < 30 超卖买入RSI > 70 超买卖出
if latest_rsi < 30:
print("信号: RSI超卖买入")
self.place_market_order('buy', self.position_size)
elif latest_rsi > 70:
print("信号: RSI超买卖出")
self.place_market_order('sell', self.position_size)
else:
print("信号: RSI正常区间持仓观望")
def grid_trading_strategy(self, grid_levels=5, grid_range=0.02):
"""网格交易策略"""
print(f"\n=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===")
current_price = self.get_current_price()
if current_price is None:
return
# 计算网格价格
grid_prices = []
for i in range(grid_levels):
price = current_price * (1 + grid_range * (i - grid_levels//2) / grid_levels)
grid_prices.append(price)
print(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}")
# 获取K线数据判断当前价格在哪个网格
df = self.get_kline_data(bar='1m', limit=10)
if df is None:
return
latest_price = df['close'].iloc[-1]
# 找到最近的网格
closest_grid = min(grid_prices, key=lambda x: abs(x - latest_price))
grid_index = grid_prices.index(closest_grid)
print(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}")
# 简单的网格策略:价格下跌到网格线买入,上涨到网格线卖出
if latest_price < closest_grid * 0.995: # 价格下跌超过0.5%
print("信号: 价格下跌,网格买入")
self.place_market_order('buy', self.position_size)
elif latest_price > closest_grid * 1.005: # 价格上涨超过0.5%
print("信号: 价格上涨,网格卖出")
self.place_market_order('sell', self.position_size)
else:
print("信号: 价格在网格内,持仓观望")
def run_strategy_loop(self, strategy='sma', interval=60):
"""运行策略循环"""
print(f"开始运行{strategy}策略,间隔{interval}")
while True:
try:
print(f"\n{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 检查账户余额
self.get_account_balance()
# 执行策略
if strategy == 'sma':
self.simple_moving_average_strategy()
elif strategy == 'rsi':
self.rsi_strategy()
elif strategy == 'grid':
self.grid_trading_strategy()
else:
print("未知策略")
break
print(f"等待{interval}秒后继续...")
time.sleep(interval)
except KeyboardInterrupt:
print("\n策略运行被用户中断")
break
except Exception as e:
print(f"策略运行异常: {e}")
time.sleep(interval)
def main():
"""主函数"""
print("=== 比特币量化交易系统 ===")
# 导入配置
try:
from config import API_KEY, SECRET_KEY, PASSPHRASE, TRADING_CONFIG, TIME_CONFIG
except ImportError:
print("错误找不到config.py文件请确保配置文件存在")
return
# 检查是否配置了API密钥
if API_KEY == "your_api_key_here":
print("请先在config.py中配置你的OKX API密钥")
print("1. 登录OKX官网")
print("2. 进入API管理页面")
print("3. 创建API Key、Secret Key和Passphrase")
print("4. 将密钥填入config.py文件中的相应位置")
return
# 创建交易器实例
trader = BitcoinQuantTrader(
API_KEY, SECRET_KEY, PASSPHRASE,
sandbox=TRADING_CONFIG["sandbox"]
)
# 显示菜单
while True:
print("\n请选择操作:")
print("1. 查看账户余额")
print("2. 查看当前价格")
print("3. 执行移动平均线策略")
print("4. 执行RSI策略")
print("5. 执行网格交易策略")
print("6. 运行策略循环")
print("7. 买入测试")
print("8. 卖出测试")
print("9. 获取最小交易量")
print("0. 退出")
choice = input("请输入选择 (0-9): ").strip()
if choice == '0':
print("退出程序")
break
elif choice == '1':
trader.get_account_balance()
elif choice == '2':
trader.get_current_price()
elif choice == '3':
trader.simple_moving_average_strategy()
elif choice == '4':
trader.rsi_strategy()
elif choice == '5':
trader.grid_trading_strategy()
elif choice == '6':
strategy = input("选择策略 (sma/rsi/grid): ").strip()
interval = int(input("设置间隔秒数 (默认60): ") or "60")
trader.run_strategy_loop(strategy, interval)
elif choice == '7':
position_size = 0.01
input_size = input("请输入买入数量: ")
if input_size:
try:
position_size = float(input_size)
print(f"买入{position_size}BTC")
trader.place_market_order('buy', position_size)
except ValueError:
print(f"输入无效,默认买入{position_size}BTC")
elif choice == '8':
position_size = 0.01
input_size = input("请输入卖出数量: ")
if input_size:
try:
position_size = float(input_size)
print(f"卖出{position_size}BTC")
trader.place_market_order('sell', position_size)
except ValueError:
print(f"输入无效,默认卖出{position_size}BTC")
elif choice == '9':
trader.get_minimun_order_size()
else:
print("无效选择,请重新输入")
if __name__ == "__main__":
main()