From ebf90534aef13883a3ad9b9dfda84f2e16227125 Mon Sep 17 00:00:00 2001 From: blade <8019068@qq.com> Date: Tue, 22 Jul 2025 17:59:18 +0800 Subject: [PATCH] support both of Perpetual Swap and Swap Trading --- .gitignore | 2 + __pycache__/config.cpython-312.pyc | Bin 734 -> 0 bytes biz_main.py | 89 +++++++ config.py | 3 +- core/base.py | 296 +++++++++++++++++++++ core/strategy.py | 209 +++++++++++++++ play.py | 404 ++++------------------------- 7 files changed, 649 insertions(+), 354 deletions(-) create mode 100644 .gitignore delete mode 100644 __pycache__/config.cpython-312.pyc create mode 100644 biz_main.py create mode 100644 core/base.py create mode 100644 core/strategy.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a943f5a --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/core/__pycache__ +/__pycache__/*.pyc diff --git a/__pycache__/config.cpython-312.pyc b/__pycache__/config.cpython-312.pyc deleted file mode 100644 index ab71eeaa4bf310a5e35e6c99f755de17b089a098..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 734 zcmYL`Pj3=I7{(VUe`LGRQc5ivqtT?{fU-afYZ@yp+e%E46g)^8Hw*8utX+0CJ4;hA zCYbmgyz~q7;urDeX)`y|6E~XPj0b1i5+`||`8{uD@;;gQDF{)Ht&7Vq{a}dWEXN@XcqC`zSl5kVnambQ9X5X&9t7C|bDCHhzrH}mD3RFK47QOwDCd8=63ugYS+ zQYgvNR&~tpB2()OU*~fQjuY*4|%M0a#-DnuV68YZd<9ehTY$Us#{AKQ*%AzQ~l<6g(W zrZ#UxZKBV4(PMmyY=RA-vAOHuq1JaOuh}{%20BFsq0t*-tEPtN;y^u9<_V}k9Bi=> zgeX?mp>an&L5gMLp3XvaovHU3)W12H0)tF|E)&OA9i)vE-E@$Bsv0zQV(2FN@A+GA z=mXuUXM!|bY1EXr@&_7i%6mt0>y{gpX0!3;sM3^ataVhW*6Ih!-n;sK?SS&F+M#@B zF1TwpnESHRJE* diff --git a/biz_main.py b/biz_main.py new file mode 100644 index 0000000..1376f46 --- /dev/null +++ b/biz_main.py @@ -0,0 +1,89 @@ +import logging +from time import sleep +from core.base import QuantTrader +from core.strategy import QuantStrategy +from config import API_KEY, SECRET_KEY, PASSPHRASE, TRADING_CONFIG, TIME_CONFIG + +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') + + +class BizMain: + def __init__(self): + api_key = API_KEY + secret_key = SECRET_KEY + passphrase = PASSPHRASE + sandbox = TRADING_CONFIG.get("sandbox", True) + symbol = TRADING_CONFIG.get("symbol", "BTC-USDT") + position_size = TRADING_CONFIG.get("position_size", 0.001) + self.trader = QuantTrader(api_key, secret_key, passphrase, sandbox, symbol, position_size) + # self.strategy = QuantStrategy(api_key, secret_key, passphrase, sandbox, symbol, position_size) + + def start_job(self): + """ + 1. 合约开空单流程 + 1.1. 获取当前价格 + 1.2 设置tdmode为cross,张数为1,每张货币单位为0.01,杠杆为1,缓冲比例为0.3 + 1.2. 计算保证金 + 1.4. 开空单 + + 2. 现货卖出虚拟货币流程 + 2.1 获取当前虚拟货币数量 + 2.2 卖出0.01单位虚拟货币 + + 3. 合约平空单流程 + 3.1 设置tdmode为cross,张数为1 + 3.2 平空单 + """ + td_mode = "cross" + quantity = 1 + try: + # 1. 合约开空单流程 + logging.info("[1] 合约开空单流程:") + # price = self.trader.get_current_price(self.trader.symbol_swap) + # logging.info(f"当前合约价格: {price}") + slot = 0.01 + leverage = 1 + buffer_ratio = 0.3 + # margin, entry_price = self.trader.calculate_margin(quantity, leverage, slot, buffer_ratio) + # logging.info(f"所需保证金: {margin}, 开仓价格: {entry_price}") + order_id, entry_price = self.trader.place_short_order(td_mode, quantity, leverage, slot, buffer_ratio) + if order_id: + logging.info(f"开空单成功,订单ID: {order_id}, 开仓价格: {entry_price}") + else: + logging.error("开空单失败") + except Exception as e: + logging.error(f"合约开空单流程异常: {e}") + sleep(1) + try: + # 2. 现货卖出比特币流程 + logging.info(f"[2] 现货卖出{self.trader.symbol_prefix}流程:") + balance = self.trader.get_account_balance() + btc_balance = balance.get(self.trader.symbol_prefix, 0) + logging.info(f"当前{self.trader.symbol_prefix}余额: {btc_balance}") + sell_amount = 0.01 + if btc_balance >= sell_amount: + order_id = self.trader.place_market_order('sell', sell_amount) + if order_id: + logging.info(f"现货卖出{sell_amount}{self.trader.symbol_prefix}成功,订单ID: {order_id}") + else: + logging.error(f"现货卖出{self.trader.symbol_prefix}失败") + else: + logging.error(f"{self.trader.symbol_prefix}余额不足,无法卖出{sell_amount}{self.trader.symbol_prefix}") + except Exception as e: + logging.error(f"现货卖出{self.trader.symbol_prefix}流程异常: {e}") + sleep(1) + try: + # 3. 合约平空单流程 + logging.info("[3] 合约平空单流程:") + result = self.trader.close_short_order(td_mode, quantity) + if result: + logging.info("平空单成功") + else: + logging.error("平空单失败") + except Exception as e: + logging.error(f"合约平空单流程异常: {e}") + + +if __name__ == "__main__": + biz_main = BizMain() + biz_main.start_job() \ No newline at end of file diff --git a/config.py b/config.py index c84f45e..b429055 100644 --- a/config.py +++ b/config.py @@ -18,6 +18,7 @@ TRADING_CONFIG = { # 策略参数 "sma_short_period": 5, # 短期移动平均线周期 "sma_long_period": 20, # 长期移动平均线周期 + "rsi_period": 14, # RSI计算周期 "rsi_oversold": 30, # RSI超卖阈值 "rsi_overbought": 70, # RSI超买阈值 @@ -34,7 +35,7 @@ TRADING_CONFIG = { # 时间间隔配置 TIME_CONFIG = { - "strategy_interval": 60, # 策略执行间隔(秒) + "strategy_interval": 30, # 策略执行间隔(秒) "kline_interval": "5m", # K线数据间隔 "kline_limit": 100, # K线数据条数 } \ No newline at end of file diff --git a/core/base.py b/core/base.py new file mode 100644 index 0000000..faf0fe6 --- /dev/null +++ b/core/base.py @@ -0,0 +1,296 @@ +import okx.Account as Account +import okx.Trade as Trade +import okx.MarketData as Market +import okx.PublicData as Public +import pandas as pd +import logging +from typing import Optional + +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') + +class QuantTrader: + def __init__(self, + api_key: str, + secret_key: str, + passphrase: str, + sandbox: bool = True, + symbol: str = "BTC-USDT", + position_size: float = 0.001): + """ + 初始化货币量化交易器 + Args: + api_key: OKX API Key + secret_key: OKX Secret Key + passphrase: OKX API Passphrase + sandbox: 是否使用沙盒环境(建议先用沙盒测试) + """ + self.api_key = api_key + self.secret_key = secret_key + self.passphrase = passphrase + flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境 + self.account_api = Account.AccountAPI( + api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, + flag=flag + ) + self.trade_api = Trade.TradeAPI( + api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, + flag=flag + ) + self.market_api = Market.MarketAPI( + api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, + flag=flag + ) + self.public_api = Public.PublicAPI( + api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, + flag=flag + ) + self.symbol = symbol + self.symbol_swap = f'{symbol}-SWAP' + self.symbol_prefix = symbol.split('-')[0] + self.position_size = position_size + + def get_account_balance(self) -> float: + """获取账户余额""" + try: + result = {} + search_result = self.account_api.get_account_balance() + if search_result.get('code') == '0': + balances = search_result.get('data', []) + for balance in balances: + details = balance.get('details', []) + for detail in details: + if detail.get('ccy') == 'USDT': + logging.info(f"USDT余额: {detail.get('availBal')}") + result['USDT'] = float(detail.get('availBal', 0)) + if detail.get('ccy') == self.symbol_prefix: + logging.info(f"{self.symbol_prefix}余额: {detail.get('availBal')}") + result[self.symbol_prefix] = float(detail.get('availBal', 0)) + if detail.get('ccy') == self.symbol_swap: + logging.info(f"{self.symbol_swap}余额: {detail.get('availBal')}") + result[self.symbol_swap] = float(detail.get('availBal', 0)) + return result + else: + logging.error(f"获取余额失败: {search_result}") + return {} + except Exception as e: + logging.error(f"获取余额异常: {e}") + return {} + + def get_current_price(self, symbol: str = None) -> Optional[float]: + """获取当前货币价格""" + try: + if symbol is None: + symbol = self.symbol + symbol_prefix = self.symbol_prefix + else: + symbol_prefix = symbol.split('-')[0] + result = self.market_api.get_ticker(instId=symbol) + if result.get('code') == '0': + data = result.get('data', []) + if data and 'last' in data[0]: + price = float(data[0]['last']) + logging.info(f"当前{symbol_prefix}价格: ${price:,.2f}") + return price + else: + logging.error(f"ticker数据格式异常: {data}") + return None + else: + logging.error(f"获取价格失败: {result}") + return None + except Exception as e: + logging.error(f"获取价格异常: {e}") + return None + + def get_kline_data(self, bar: str = '1m', limit: int = 100) -> Optional[pd.DataFrame]: + """获取K线数据""" + try: + result = self.market_api.get_candlesticks( + instId=self.symbol, + bar=bar, + limit=str(limit) + ) + if result.get('code') == '0': + data = result.get('data', []) + if not data: + logging.warning("K线数据为空") + return None + df = pd.DataFrame(data, columns=[ + 'timestamp', 'open', 'high', 'low', 'close', + 'volume', 'volCcy', "volCCyQuote", "confirm" + ]) + for col in ['open', 'high', 'low', 'close', 'volume']: + df[col] = pd.to_numeric(df[col], errors='coerce') + df['timestamp'] = pd.to_datetime(df['timestamp'].astype(int), unit='ms', errors='coerce') + return df + else: + logging.error(f"获取K线数据失败: {result}") + return None + except Exception as e: + logging.error(f"获取K线数据异常: {e}") + return None + + def place_market_order(self, side: str, size: float) -> Optional[str]: + """下市价单""" + balance = self.get_account_balance() + usdt_balance = balance.get('USDT') + symbol_balance = balance.get(self.symbol_prefix) + if side == 'sell': + try: + if symbol_balance < size: + logging.error(f"{self.symbol_prefix}余额不足,目前余额: {symbol_balance}") + return None + result = self.trade_api.place_order( + instId=self.symbol, + tdMode='cash', + side=side, + ordType='market', + sz=str(size) + ) + if result.get('code') == '0': + logging.info(f"下单成功: {side} {size} {self.symbol_prefix}") + return result['data'][0]['ordId'] + else: + logging.error(f"下单失败: {result}") + return None + except Exception as e: + logging.error(f"下单异常: {e}") + return None + elif side == 'buy': + try: + instrument_result = self.public_api.get_instruments(instType="SPOT", instId=self.symbol) + instrument_data = instrument_result.get("data", []) + if not instrument_data: + logging.error(f"未获取到合约信息: {instrument_result}") + return None + min_sz = float(instrument_data[0].get("minSz", 0)) + if size < min_sz: + size = min_sz + ticker = self.market_api.get_ticker(instId=self.symbol) + last_price = float(ticker["data"][0]["last"]) + usdt_amount = float(last_price * size) + if usdt_balance < usdt_amount: + logging.error(f"USDT余额不足,目前余额: {usdt_balance}") + return None + result = self.trade_api.place_order( + instId=self.symbol, + tdMode="cash", + side=side, + ordType="market", + sz=str(usdt_amount) + ) + if result.get('code') == '0': + logging.info(f"下单成功: {side} {usdt_amount} USDT") + return result['data'][0]['ordId'] + else: + logging.error(f"下单失败: {result}") + return None + except Exception as e: + logging.error(f"下单异常: {e}") + return None + else: + logging.error(f"不支持的下单方向: {side}") + return None + + # 设置杠杆倍数 + def set_leverage(self, leverage: int = 1, mgn_mode: str = "cross", ccy: str = "USDT", posSide: str = "short"): + result = self.account_api.set_leverage( + lever=str(leverage), + mgnMode=mgn_mode, + instId=self.symbol_swap, + ccy=ccy, + posSide=posSide + ) + if result["code"] == "0": + logging.info(f"设置杠杆倍数 {leverage}x 成功") + else: + logging.error(f"设置杠杆失败: {result['msg']}") + return result["code"] == "0" + + # 计算保证金需求 + def calculate_margin(self, quantity: int = 10, leverage: int = 1, slot: float = 0.01, buffer_ratio: float = 0.3): + price = self.get_current_price(self.symbol_swap) + if not price: + return None + contract_value = quantity * slot * price # 每张 0.01 BTC + initial_margin = contract_value / leverage + recommended_margin = initial_margin * (1 + buffer_ratio) + logging.info(f"开仓{self.symbol_swap}价格: {price:.2f} USDT") + logging.info(f"合约总价值: {contract_value:.2f} USDT") + logging.info(f"初始保证金: {initial_margin:.2f} USDT") + logging.info(f"推荐保证金 (含 {buffer_ratio*100}% 缓冲): {recommended_margin:.2f} USDT") + return recommended_margin, price + + # 开空头仓位(卖出空单) + def place_short_order(self, td_mode: str = "cross", quantity: int = 10, leverage: int = 1, slot: float = 0.01, buffer_ratio: float = 0.3): + """开空头仓位(卖出空单)""" + # 计算所需保证金和开仓价格 + margin_data = self.calculate_margin(quantity, leverage, slot, buffer_ratio) + if not margin_data: + logging.error("无法计算保证金,终止下单") + return None, None + required_margin, entry_price = margin_data + + # 检查余额 + balance = self.get_account_balance() + avail_bal = balance.get('USDT') + if avail_bal is None or avail_bal < required_margin: + logging.error(f"保证金不足,需至少 {required_margin:.2f} USDT,当前余额: {avail_bal}") + return None, None + + # 设置杠杆 + if not self.set_leverage(leverage, mgn_mode=td_mode, ccy="USDT", posSide="short"): + return None, None + + # 下单 + order_data = { + "instId": self.symbol_swap, + "tdMode": td_mode, + "ccy": "USDT", + "side": "sell", + "posSide": "short", + "ordType": "market", + "sz": str(quantity), + } + result = self.trade_api.place_order(**order_data) + if result.get("code") == "0": + logging.info(f"开空单成功,订单ID: {result['data'][0]['ordId']}") + return result["data"][0]["ordId"], entry_price + else: + logging.error(f"开空单失败: {result.get('msg', result)}") + return None, None + + # 平空单(买入平仓) + def close_short_order(self, td_mode: str = "cross", quantity: float = 10) -> bool: + """平空单(买入平仓)""" + order_data = { + "instId": self.symbol_swap, + "tdMode": td_mode, + "ccy": "USDT", + "side": "buy", + "posSide": "short", + "ordType": "market", + "sz": str(quantity), + } + result = self.trade_api.place_order(**order_data) + if result.get("code") == "0": + logging.info(f"平空单成功,订单ID: {result['data'][0]['ordId']}") + return True + else: + logging.error(f"平空单失败: {result.get('msg', result)}") + return False + + def get_minimun_order_size(self) -> None: + """获取最小订单数量""" + try: + result = self.public_api.get_instruments(instType="SPOT", instId=self.symbol) + if result.get("code") == "0": + instrument = result.get("data", [{}])[0] + min_sz = float(instrument.get("minSz", 0)) + lot_sz = float(instrument.get("lotSz", 0)) + logging.info(f"最小交易量 (minSz): {min_sz} {self.symbol_prefix}") + logging.info(f"交易量精度 (lotSz): {lot_sz} {self.symbol_prefix}") + else: + logging.error(f"错误: {result.get('msg', result)}") + except Exception as e: + logging.error(f"异常: {str(e)}") + diff --git a/core/strategy.py b/core/strategy.py new file mode 100644 index 0000000..547cd71 --- /dev/null +++ b/core/strategy.py @@ -0,0 +1,209 @@ +import time +from datetime import datetime +import logging +from typing import Optional +import pandas as pd +from core.base import QuantTrader + +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') + +class QuantStrategy: + def __init__(self, + api_key: str, + secret_key: str, + passphrase: str, + sandbox: bool = True, + symbol: str = "BTC-USDT", + position_size: float = 0.001): + """ + 初始化量化策略 + """ + self.quant_trader = QuantTrader( + api_key, + secret_key, + passphrase, + sandbox, + symbol, + position_size) + + def calculate_sma(self, df: pd.DataFrame, period: int = 20) -> pd.Series: + """ + 计算简单移动平均线 + """ + if 'close' not in df: + logging.error("DataFrame缺少'close'列,无法计算SMA") + return pd.Series([float('nan')] * len(df)) + if len(df) < period: + logging.warning(f"数据长度不足{period},SMA结果将包含NaN") + return df['close'].rolling(window=period).mean() + + def calculate_rsi(self, df: pd.DataFrame, period: int = 14) -> pd.Series: + """ + 计算RSI指标 + """ + if 'close' not in df: + logging.error("DataFrame缺少'close'列,无法计算RSI") + return pd.Series([float('nan')] * len(df)) + if len(df) < period: + logging.warning(f"数据长度不足{period},RSI结果将包含NaN") + delta = df['close'].diff() + gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() + loss = loss.replace(0, 1e-10) # 防止除零 + rs = gain / loss + rsi = 100 - (100 / (1 + rs)) + return rsi + + def simple_moving_average_strategy(self, sma_short_period: int = 5, sma_long_period: int = 20) -> None: + """ + 简单移动平均线策略 + """ + logging.info("=== 执行移动平均线策略 ===") + try: + df = self.quant_trader.get_kline_data(bar='5m', limit=max(50, sma_long_period+2)) + except Exception as e: + logging.error(f"获取K线数据失败: {e}") + return + if df is None or len(df) < max(sma_short_period, sma_long_period, 2): + logging.warning("数据不足,无法执行策略") + return + df['sma_short'] = self.calculate_sma(df, sma_short_period) + df['sma_long'] = self.calculate_sma(df, sma_long_period) + if len(df) < 2: + logging.warning("数据不足2条,无法判断金叉死叉") + return + latest = df.iloc[-1] + prev = df.iloc[-2] + if pd.isna(latest['sma_short']) or pd.isna(latest['sma_long']) or pd.isna(prev['sma_short']) or pd.isna(prev['sma_long']): + logging.warning("均线数据存在NaN,跳过本次信号判断") + return + logging.info(f"短期均线: {latest['sma_short']:.2f}") + logging.info(f"长期均线: {latest['sma_long']:.2f}") + logging.info(f"当前价格: {latest['close']:.2f}") + if (latest['sma_short'] > latest['sma_long'] and prev['sma_short'] <= prev['sma_long']): + logging.info("信号: 买入") + self.quant_trader.place_market_order('buy', self.quant_trader.position_size) + elif (latest['sma_short'] < latest['sma_long'] and prev['sma_short'] >= prev['sma_long']): + logging.info("信号: 卖出") + self.quant_trader.place_market_order('sell', self.quant_trader.position_size) + else: + logging.info("信号: 持仓观望") + + def rsi_strategy(self, period: int = 14, oversold: int = 30, overbought: int = 70) -> None: + """ + RSI策略 + """ + logging.info("=== 执行RSI策略 ===") + try: + df = self.quant_trader.get_kline_data(bar='5m', limit=max(50, period+2)) + except Exception as e: + logging.error(f"获取K线数据失败: {e}") + return + if df is None or len(df) < period: + logging.warning("数据不足,无法执行策略") + return + df['rsi'] = self.calculate_rsi(df, period) + latest_rsi = df['rsi'].iloc[-1] + if pd.isna(latest_rsi): + logging.warning("最新RSI为NaN,跳过本次信号判断") + return + logging.info(f"当前RSI: {latest_rsi:.2f}") + if latest_rsi < oversold: + logging.info("信号: RSI超卖,买入") + self.quant_trader.place_market_order('buy', self.quant_trader.position_size) + elif latest_rsi > overbought: + logging.info("信号: RSI超买,卖出") + self.quant_trader.place_market_order('sell', self.quant_trader.position_size) + else: + logging.info("信号: RSI正常区间,持仓观望") + + def grid_trading_strategy(self, grid_levels: int = 5, grid_range: float = 0.02) -> None: + """ + 网格交易策略 + """ + if grid_levels <= 0: + logging.error("网格数必须大于0") + return + if grid_range <= 0: + logging.error("网格范围必须大于0") + return + logging.info(f"=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===") + try: + current_price = self.quant_trader.get_current_price() + except Exception as e: + logging.error(f"获取当前价格失败: {e}") + return + if current_price is None: + logging.warning("当前价格获取失败") + return + grid_prices = [] + for i in range(grid_levels): + price = current_price * (1 + grid_range * (i - grid_levels//2) / grid_levels) + grid_prices.append(price) + logging.info(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}") + try: + df = self.quant_trader.get_kline_data(bar='1m', limit=10) + except Exception as e: + logging.error(f"获取K线数据失败: {e}") + return + if df is None or len(df) == 0 or 'close' not in df: + logging.warning("K线数据无效,无法执行网格策略") + return + latest_price = df['close'].iloc[-1] + if pd.isna(latest_price): + logging.warning("最新价格为NaN,跳过本次信号判断") + return + closest_grid = min(grid_prices, key=lambda x: abs(x - latest_price)) + logging.info(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}") + if latest_price < closest_grid * 0.995: + logging.info("信号: 价格下跌,网格买入") + self.quant_trader.place_market_order('buy', self.quant_trader.position_size) + elif latest_price > closest_grid * 1.005: + logging.info("信号: 价格上涨,网格卖出") + self.quant_trader.place_market_order('sell', self.quant_trader.position_size) + else: + logging.info("信号: 价格在网格内,持仓观望") + + def run_strategy_loop(self, + strategy: str = 'sma', + interval: int = 60, + trading_config: dict = {}) -> None: + """ + 运行策略循环 + """ + if interval <= 0: + logging.error("循环间隔必须大于0秒") + return + logging.info(f"开始运行{strategy}策略,间隔{interval}秒") + while True: + try: + logging.info(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + try: + self.quant_trader.get_account_balance() + except Exception as e: + logging.error(f"获取账户余额失败: {e}") + if strategy == 'sma': + sma_short_period = trading_config.get("sma_short_period", 5) + sma_long_period = trading_config.get("sma_long_period", 20) + self.simple_moving_average_strategy(sma_short_period, sma_long_period) + elif strategy == 'rsi': + period = trading_config.get("rsi_period", 14) + oversold = trading_config.get("rsi_oversold", 30) + overbought = trading_config.get("rsi_overbought", 70) + self.rsi_strategy(period, oversold, overbought) + elif strategy == 'grid': + grid_levels = trading_config.get("grid_levels", 5) + grid_range = trading_config.get("grid_range", 0.02) + self.grid_trading_strategy(grid_levels, grid_range) + else: + logging.error("未知策略") + break + logging.info(f"等待{interval}秒后继续...") + time.sleep(interval) + except KeyboardInterrupt: + logging.info("策略运行被用户中断") + break + except Exception as e: + logging.error(f"策略运行异常: {e}") + time.sleep(interval) + \ No newline at end of file diff --git a/play.py b/play.py index 7298c52..6c3268e 100644 --- a/play.py +++ b/play.py @@ -1,400 +1,98 @@ -import okx.Account as Account -import okx.Trade as Trade -import okx.MarketData as Market -import okx.PublicData as Public -import pandas as pd -import numpy as np -import time -from datetime import datetime -import json +import logging +from core.base import QuantTrader +from core.strategy import QuantStrategy -class BitcoinQuantTrader: - def __init__(self, api_key, secret_key, passphrase, sandbox=True): - """ - 初始化比特币量化交易器 - - Args: - api_key: OKX API Key - secret_key: OKX Secret Key - passphrase: OKX API Passphrase - sandbox: 是否使用沙盒环境(建议先用沙盒测试) - """ - self.api_key = api_key - self.secret_key = secret_key - self.passphrase = passphrase - - # 初始化API客户端 - flag = "1" if sandbox else "0" # 0:实盘环境 1:沙盒环境 - - self.account_api = Account.AccountAPI( - api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, - flag=flag - ) - self.trade_api = Trade.TradeAPI( - api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, - flag=flag - ) - self.market_api = Market.MarketAPI( - api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, - flag=flag - ) - self.public_api = Public.PublicAPI( - api_key=api_key, api_secret_key=secret_key, passphrase=passphrase, - flag=flag - ) - - self.symbol = "BTC-USDT" - self.position_size = 0.001 # 每次交易0.001 BTC - - def get_account_balance(self): - """获取账户余额""" - try: - result = self.account_api.get_account_balance() - if result['code'] == '0': - balances = result['data'] - for balance in balances: - details = balance['details'] - for detail in details: - if detail['ccy'] == 'USDT': - print(f"USDT余额: {detail['availBal']}") - return float(detail['availBal']) - elif detail['ccy'] == 'BTC': - print(f"BTC余额: {detail['availBal']}") - # return float(detail['availBal']) - else: - print(f"获取余额失败: {result}") - return 0 - except Exception as e: - print(f"获取余额异常: {e}") - return 0 - - def get_current_price(self): - """获取当前BTC价格""" - try: - result = self.market_api.get_ticker(instId=self.symbol) - if result['code'] == '0': - price = float(result['data'][0]['last']) - print(f"当前BTC价格: ${price:,.2f}") - return price - else: - print(f"获取价格失败: {result}") - return None - except Exception as e: - print(f"获取价格异常: {e}") - return None - - def get_kline_data(self, bar='1m', limit=100): - """获取K线数据""" - try: - result = self.market_api.get_candlesticks( - instId=self.symbol, - bar=bar, - limit=str(limit) - ) - if result['code'] == '0': - # 转换为DataFrame - df = pd.DataFrame(result['data'], columns=[ - 'timestamp', 'open', 'high', 'low', 'close', - 'volume', 'volCcy', "volCCyQuote", "confirm" - ]) - # 转换数据类型 - for col in ['open', 'high', 'low', 'close', 'volume']: - df[col] = pd.to_numeric(df[col]) - df['timestamp'] = pd.to_datetime(df['timestamp'].astype(int), unit='ms') - return df - else: - print(f"获取K线数据失败: {result}") - return None - except Exception as e: - print(f"获取K线数据异常: {e}") - return None - - def calculate_sma(self, df, period=20): - """计算简单移动平均线""" - return df['close'].rolling(window=period).mean() - - def calculate_rsi(self, df, period=14): - """计算RSI指标""" - delta = df['close'].diff() - gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() - loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() - rs = gain / loss - rsi = 100 - (100 / (1 + rs)) - return rsi - - def place_market_order(self, side, size): - """下市价单""" - if side == 'sell': - try: - result = self.trade_api.place_order( - instId=self.symbol, - tdMode='cash', - side=side, - ordType='market', - sz=str(size) - ) - if result['code'] == '0': - print(f"下单成功: {side} {size} BTC") - return result['data'][0]['ordId'] - else: - print(f"下单失败: {result}") - return None - except Exception as e: - print(f"下单异常: {e}") - return None - elif side == 'buy': - instrument = self.public_api.get_instruments(instType="SPOT", instId=self.symbol)["data"][0] - min_sz = float(instrument["minSz"]) # 最小交易量 - if size < min_sz: - size = min_sz - ticker = self.market_api.get_ticker(instId=self.symbol) - last_price = float(ticker["data"][0]["last"]) # 最新价格 +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') - # 买入数量是USDT,将BTC转换为USDT - usdt_amount = float(last_price * size) - - try: - result = self.trade_api.place_order( - instId=self.symbol, - tdMode="cash", - side=side, - ordType="market", - sz=str(usdt_amount) - ) - print("下单结果:", result) - except Exception as e: - print("错误:", str(e)) - - def get_minimun_order_size(self): - """获取最小订单数量""" - try: - result = self.public_api.get_instruments(instType="SPOT", instId=self.symbol) - if result["code"] == "0": - instrument = result["data"][0] - min_sz = float(instrument["minSz"]) # 最小交易量(BTC) - lot_sz = float(instrument["lotSz"]) # 交易量精度 - print(f"最小交易量 (minSz): {min_sz} BTC") - print(f"交易量精度 (lotSz): {lot_sz} BTC") - else: - print(f"错误: {result['msg']}") - except Exception as e: - print(f"异常: {str(e)}") - - def simple_moving_average_strategy(self): - """简单移动平均线策略""" - print("\n=== 执行移动平均线策略 ===") - - # 获取K线数据 - df = self.get_kline_data(bar='5m', limit=50) - if df is None or len(df) < 20: - print("数据不足,无法执行策略") - return - - # 计算移动平均线 - df['sma_short'] = self.calculate_sma(df, 5) # 短期均线 - df['sma_long'] = self.calculate_sma(df, 20) # 长期均线 - - # 获取最新数据 - latest = df.iloc[-1] - prev = df.iloc[-2] - - print(f"短期均线: {latest['sma_short']:.2f}") - print(f"长期均线: {latest['sma_long']:.2f}") - print(f"当前价格: {latest['close']:.2f}") - - # 策略逻辑:短期均线上穿长期均线买入,下穿卖出 - if (latest['sma_short'] > latest['sma_long'] and - prev['sma_short'] <= prev['sma_long']): - print("信号: 买入") - self.place_market_order('buy', self.position_size) - elif (latest['sma_short'] < latest['sma_long'] and - prev['sma_short'] >= prev['sma_long']): - print("信号: 卖出") - self.place_market_order('sell', self.position_size) - else: - print("信号: 持仓观望") - - def rsi_strategy(self): - """RSI策略""" - print("\n=== 执行RSI策略 ===") - - # 获取K线数据 - df = self.get_kline_data(bar='5m', limit=50) - if df is None or len(df) < 30: - print("数据不足,无法执行策略") - return - - # 计算RSI - df['rsi'] = self.calculate_rsi(df, 14) - - # 获取最新RSI值 - latest_rsi = df['rsi'].iloc[-1] - print(f"当前RSI: {latest_rsi:.2f}") - - # 策略逻辑:RSI < 30 超卖买入,RSI > 70 超买卖出 - if latest_rsi < 30: - print("信号: RSI超卖,买入") - self.place_market_order('buy', self.position_size) - elif latest_rsi > 70: - print("信号: RSI超买,卖出") - self.place_market_order('sell', self.position_size) - else: - print("信号: RSI正常区间,持仓观望") - - def grid_trading_strategy(self, grid_levels=5, grid_range=0.02): - """网格交易策略""" - print(f"\n=== 执行网格交易策略 (网格数: {grid_levels}, 范围: {grid_range*100}%) ===") - - current_price = self.get_current_price() - if current_price is None: - return - - # 计算网格价格 - grid_prices = [] - for i in range(grid_levels): - price = current_price * (1 + grid_range * (i - grid_levels//2) / grid_levels) - grid_prices.append(price) - - print(f"网格价格: {[f'${p:.2f}' for p in grid_prices]}") - - # 获取K线数据判断当前价格在哪个网格 - df = self.get_kline_data(bar='1m', limit=10) - if df is None: - return - - latest_price = df['close'].iloc[-1] - - # 找到最近的网格 - closest_grid = min(grid_prices, key=lambda x: abs(x - latest_price)) - grid_index = grid_prices.index(closest_grid) - - print(f"当前价格: ${latest_price:.2f}, 最近网格: ${closest_grid:.2f}") - - # 简单的网格策略:价格下跌到网格线买入,上涨到网格线卖出 - if latest_price < closest_grid * 0.995: # 价格下跌超过0.5% - print("信号: 价格下跌,网格买入") - self.place_market_order('buy', self.position_size) - elif latest_price > closest_grid * 1.005: # 价格上涨超过0.5% - print("信号: 价格上涨,网格卖出") - self.place_market_order('sell', self.position_size) - else: - print("信号: 价格在网格内,持仓观望") - - def run_strategy_loop(self, strategy='sma', interval=60): - """运行策略循环""" - print(f"开始运行{strategy}策略,间隔{interval}秒") - - while True: - try: - print(f"\n{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - - # 检查账户余额 - self.get_account_balance() - - # 执行策略 - if strategy == 'sma': - self.simple_moving_average_strategy() - elif strategy == 'rsi': - self.rsi_strategy() - elif strategy == 'grid': - self.grid_trading_strategy() - else: - print("未知策略") - break - - print(f"等待{interval}秒后继续...") - time.sleep(interval) - - except KeyboardInterrupt: - print("\n策略运行被用户中断") - break - except Exception as e: - print(f"策略运行异常: {e}") - time.sleep(interval) - -def main(): +def main() -> None: """主函数""" - print("=== 比特币量化交易系统 ===") - + logging.info("=== 比特币量化交易系统 ===") # 导入配置 try: from config import API_KEY, SECRET_KEY, PASSPHRASE, TRADING_CONFIG, TIME_CONFIG except ImportError: - print("错误:找不到config.py文件,请确保配置文件存在") + logging.error("找不到config.py文件,请确保配置文件存在") return - # 检查是否配置了API密钥 if API_KEY == "your_api_key_here": - print("请先在config.py中配置你的OKX API密钥!") - print("1. 登录OKX官网") - print("2. 进入API管理页面") - print("3. 创建API Key、Secret Key和Passphrase") - print("4. 将密钥填入config.py文件中的相应位置") + logging.error("请先在config.py中配置你的OKX API密钥!\n1. 登录OKX官网\n2. 进入API管理页面\n3. 创建API Key、Secret Key和Passphrase\n4. 将密钥填入config.py文件中的相应位置") return - # 创建交易器实例 - trader = BitcoinQuantTrader( + sandbox = TRADING_CONFIG.get("sandbox", True) + symbol = TRADING_CONFIG.get("symbol", "BTC-USDT") + position_size = TRADING_CONFIG.get("position_size", 0.001) + trader = QuantTrader( API_KEY, SECRET_KEY, PASSPHRASE, - sandbox=TRADING_CONFIG["sandbox"] + sandbox=sandbox, + symbol=symbol, + position_size=position_size + ) + strategy = QuantStrategy( + API_KEY, SECRET_KEY, PASSPHRASE, + sandbox=sandbox, + symbol=symbol, + position_size=position_size ) - # 显示菜单 while True: - print("\n请选择操作:") - print("1. 查看账户余额") - print("2. 查看当前价格") - print("3. 执行移动平均线策略") - print("4. 执行RSI策略") - print("5. 执行网格交易策略") - print("6. 运行策略循环") - print("7. 买入测试") - print("8. 卖出测试") - print("9. 获取最小交易量") - print("0. 退出") - + logging.info("\n请选择操作:\n1. 查看账户余额\n2. 查看当前价格\n3. 执行移动平均线策略\n4. 执行RSI策略\n5. 执行网格交易策略\n6. 运行策略循环\n7. 买入测试\n8. 卖出测试\n9. 获取最小交易量\n0. 退出") choice = input("请输入选择 (0-9): ").strip() - if choice == '0': - print("退出程序") + logging.info("退出程序") break elif choice == '1': trader.get_account_balance() elif choice == '2': trader.get_current_price() elif choice == '3': - trader.simple_moving_average_strategy() + sma_short_period = TRADING_CONFIG.get("sma_short_period", 5) + sma_long_period = TRADING_CONFIG.get("sma_long_period", 20) + strategy.simple_moving_average_strategy(sma_short_period, sma_long_period) elif choice == '4': - trader.rsi_strategy() + period = TRADING_CONFIG.get("rsi_period", 14) + rsi_oversold = TRADING_CONFIG.get("rsi_oversold", 30) + rsi_overbought = TRADING_CONFIG.get("rsi_overbought", 70) + strategy.rsi_strategy(period, rsi_oversold, rsi_overbought) elif choice == '5': - trader.grid_trading_strategy() + grid_levels = TRADING_CONFIG.get("grid_levels", 5) + grid_range = TRADING_CONFIG.get("grid_range", 0.02) + strategy.grid_trading_strategy(grid_levels, grid_range) elif choice == '6': - strategy = input("选择策略 (sma/rsi/grid): ").strip() - interval = int(input("设置间隔秒数 (默认60): ") or "60") - trader.run_strategy_loop(strategy, interval) + strategy_name = input("选择策略 (sma/rsi/grid): ").strip() + interval = TIME_CONFIG.get("strategy_interval", 30) + strategy.run_strategy_loop(strategy_name, interval, TRADING_CONFIG) elif choice == '7': - position_size = 0.01 + defalt_position_size = 0.01 input_size = input("请输入买入数量: ") if input_size: try: position_size = float(input_size) - print(f"买入{position_size}BTC") + logging.info(f"买入{position_size}BTC") trader.place_market_order('buy', position_size) except ValueError: - print(f"输入无效,默认买入{position_size}BTC") + logging.warning(f"输入无效,默认买入{defalt_position_size}BTC") + trader.place_market_order('buy', defalt_position_size) + else: + logging.info(f"默认买入{defalt_position_size}BTC") + trader.place_market_order('buy', defalt_position_size) elif choice == '8': - position_size = 0.01 + defalt_position_size = 0.01 input_size = input("请输入卖出数量: ") if input_size: try: position_size = float(input_size) - print(f"卖出{position_size}BTC") + logging.info(f"卖出{position_size}BTC") trader.place_market_order('sell', position_size) except ValueError: - print(f"输入无效,默认卖出{position_size}BTC") + logging.warning(f"输入无效,默认卖出{defalt_position_size}BTC") + trader.place_market_order('sell', defalt_position_size) + else: + logging.info(f"默认卖出{defalt_position_size}BTC") + trader.place_market_order('sell', defalt_position_size) elif choice == '9': trader.get_minimun_order_size() else: - print("无效选择,请重新输入") + logging.warning("无效选择,请重新输入") if __name__ == "__main__": main()