crypto_quant/demo.py

306 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
比特币量化交易系统 - 演示版本
无需API密钥仅用于演示策略逻辑
"""
import pandas as pd
import numpy as np
import time
from datetime import datetime, timedelta
import random
class BitcoinQuantTraderDemo:
def __init__(self):
"""初始化演示版交易器"""
self.symbol = "BTC-USDT"
self.position_size = 0.001 # 每次交易0.001 BTC
self.current_position = 0 # 当前持仓
self.cash = 10000 # 初始资金USDT
self.trades = [] # 交易记录
def generate_mock_data(self, days=30):
"""生成模拟的BTC价格数据"""
print("生成模拟BTC价格数据...")
# 生成时间序列
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
dates = pd.date_range(start=start_date, end=end_date, freq='5min')
# 生成价格数据模拟真实BTC价格波动
np.random.seed(42) # 固定随机种子,确保结果可重现
# 基础价格
base_price = 118000
prices = []
for i in range(len(dates)):
# 添加随机波动
change = np.random.normal(0, 0.002) # 0.2%的标准差
if i == 0:
price = base_price
else:
price = prices[-1] * (1 + change)
prices.append(price)
# 创建DataFrame
df = pd.DataFrame({
'timestamp': dates,
'open': prices,
'high': [p * (1 + abs(np.random.normal(0, 0.001))) for p in prices],
'low': [p * (1 - abs(np.random.normal(0, 0.001))) for p in prices],
'close': prices,
'volume': [np.random.uniform(100, 1000) for _ in prices]
})
# 确保high >= close >= low
df['high'] = df[['open', 'high', 'close']].max(axis=1)
df['low'] = df[['open', 'low', 'close']].min(axis=1)
print(f"生成了 {len(df)} 条价格数据")
print(f"价格范围: ${df['close'].min():.2f} - ${df['close'].max():.2f}")
return df
def calculate_sma(self, df, period=20):
"""计算简单移动平均线"""
return df['close'].rolling(window=period).mean()
def calculate_rsi(self, df, period=14):
"""计算RSI指标"""
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def place_mock_order(self, side, size, price):
"""模拟下单"""
timestamp = datetime.now()
if side == 'buy':
cost = size * price
if cost <= self.cash:
self.cash -= cost
self.current_position += size
print(f"✅ 买入成功: {size} BTC @ ${price:.2f}")
self.trades.append({
'timestamp': timestamp,
'side': side,
'size': size,
'price': price,
'cost': cost
})
return True
else:
print(f"❌ 买入失败: 资金不足 (需要: ${cost:.2f}, 可用: ${self.cash:.2f})")
return False
else: # sell
if self.current_position >= size:
revenue = size * price
self.cash += revenue
self.current_position -= size
print(f"✅ 卖出成功: {size} BTC @ ${price:.2f}")
self.trades.append({
'timestamp': timestamp,
'side': side,
'size': size,
'price': price,
'revenue': revenue
})
return True
else:
print(f"❌ 卖出失败: 持仓不足 (需要: {size} BTC, 持有: {self.current_position} BTC)")
return False
def simple_moving_average_strategy(self, df):
"""简单移动平均线策略"""
print("\n=== 执行移动平均线策略 ===")
if len(df) < 20:
print("数据不足,无法执行策略")
return
# 计算移动平均线
df['sma_short'] = self.calculate_sma(df, 5) # 短期均线
df['sma_long'] = self.calculate_sma(df, 20) # 长期均线
# 获取最新数据
latest = df.iloc[-1]
prev = df.iloc[-2]
print(f"短期均线: {latest['sma_short']:.2f}")
print(f"长期均线: {latest['sma_long']:.2f}")
print(f"当前价格: {latest['close']:.2f}")
# 策略逻辑:短期均线上穿长期均线买入,下穿卖出
if (latest['sma_short'] > latest['sma_long'] and
prev['sma_short'] <= prev['sma_long']):
print("信号: 买入")
self.place_mock_order('buy', self.position_size, latest['close'])
elif (latest['sma_short'] < latest['sma_long'] and
prev['sma_short'] >= prev['sma_long']):
print("信号: 卖出")
self.place_mock_order('sell', self.position_size, latest['close'])
else:
print("信号: 持仓观望")
def rsi_strategy(self, df):
"""RSI策略"""
print("\n=== 执行RSI策略 ===")
if len(df) < 30:
print("数据不足,无法执行策略")
return
# 计算RSI
df['rsi'] = self.calculate_rsi(df, 14)
# 获取最新RSI值
latest = df.iloc[-1]
latest_rsi = latest['rsi']
print(f"当前RSI: {latest_rsi:.2f}")
print(f"当前价格: {latest['close']:.2f}")
# 策略逻辑RSI < 30 超卖买入RSI > 70 超买卖出
if latest_rsi < 30:
print("信号: RSI超卖买入")
self.place_mock_order('buy', self.position_size, latest['close'])
elif latest_rsi > 70:
print("信号: RSI超买卖出")
self.place_mock_order('sell', self.position_size, latest['close'])
else:
print("信号: RSI正常区间持仓观望")
def grid_trading_strategy(self, df, grid_levels=5, grid_range=0.02):
"""网格交易策略"""
print(f"\n=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===")
if len(df) < 10:
print("数据不足,无法执行策略")
return
current_price = df['close'].iloc[-1]
# 计算网格价格
grid_prices = []
for i in range(grid_levels):
price = current_price * (1 + grid_range * (i - grid_levels//2) / grid_levels)
grid_prices.append(price)
print(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}")
print(f"当前价格: ${current_price:.2f}")
# 找到最近的网格
closest_grid = min(grid_prices, key=lambda x: abs(x - current_price))
grid_index = grid_prices.index(closest_grid)
print(f"最近网格: ${closest_grid:.2f}")
# 简单的网格策略:价格下跌到网格线买入,上涨到网格线卖出
if current_price < closest_grid * 0.995: # 价格下跌超过0.5%
print("信号: 价格下跌,网格买入")
self.place_mock_order('buy', self.position_size, current_price)
elif current_price > closest_grid * 1.005: # 价格上涨超过0.5%
print("信号: 价格上涨,网格卖出")
self.place_mock_order('sell', self.position_size, current_price)
else:
print("信号: 价格在网格内,持仓观望")
def show_portfolio_status(self):
"""显示投资组合状态"""
print("\n" + "="*50)
print("📊 投资组合状态")
print("="*50)
print(f"现金余额: ${self.cash:.2f}")
print(f"BTC持仓: {self.current_position:.6f} BTC")
if self.trades:
# 计算总收益
total_cost = sum(t['cost'] for t in self.trades if 'cost' in t)
total_revenue = sum(t['revenue'] for t in self.trades if 'revenue' in t)
net_profit = total_revenue - total_cost
print(f"总交易次数: {len(self.trades)}")
print(f"总投入: ${total_cost:.2f}")
print(f"总收入: ${total_revenue:.2f}")
print(f"净收益: ${net_profit:.2f}")
if total_cost > 0:
roi = (net_profit / total_cost) * 100
print(f"收益率: {roi:.2f}%")
else:
print("暂无交易记录")
print("="*50)
def run_strategy_backtest(self, strategy='sma', days=30):
"""运行策略回测"""
print(f"🚀 开始运行{strategy}策略回测 ({days}天数据)")
# 生成模拟数据
df = self.generate_mock_data(days)
# 重置投资组合
self.cash = 10000
self.current_position = 0
self.trades = []
# 运行策略
if strategy == 'sma':
self.simple_moving_average_strategy(df)
elif strategy == 'rsi':
self.rsi_strategy(df)
elif strategy == 'grid':
self.grid_trading_strategy(df)
else:
print("未知策略")
return
# 显示结果
self.show_portfolio_status()
# 显示交易记录
if self.trades:
print("\n📋 交易记录:")
for i, trade in enumerate(self.trades, 1):
print(f"{i}. {trade['timestamp'].strftime('%Y-%m-%d %H:%M')} "
f"{trade['side'].upper()} {trade['size']} BTC @ ${trade['price']:.2f}")
def main():
"""主函数"""
print("=== 比特币量化交易系统 - 演示版本 ===")
print("⚠️ 此版本使用模拟数据,仅用于演示策略逻辑")
trader = BitcoinQuantTraderDemo()
while True:
print("\n请选择操作:")
print("1. 运行移动平均线策略回测")
print("2. 运行RSI策略回测")
print("3. 运行网格交易策略回测")
print("4. 查看投资组合状态")
print("0. 退出")
choice = input("请输入选择 (0-4): ").strip()
if choice == '0':
print("退出程序")
break
elif choice == '1':
days = int(input("设置回测天数 (默认30): ") or "30")
trader.run_strategy_backtest('sma', days)
elif choice == '2':
days = int(input("设置回测天数 (默认30): ") or "30")
trader.run_strategy_backtest('rsi', days)
elif choice == '3':
days = int(input("设置回测天数 (默认30): ") or "30")
trader.run_strategy_backtest('grid', days)
elif choice == '4':
trader.show_portfolio_status()
else:
print("无效选择,请重新输入")
if __name__ == "__main__":
main()