#!/usr/bin/env python # -*- coding: utf-8 -*- """ 交易策略模块 实现基于Lighter和Binance价差的交易策略 """ import logging from enum import Enum import os import lighter import time # 配置日志 logs_dir = "logs" if not os.path.exists(logs_dir): os.makedirs(logs_dir) logger = logging.getLogger("strategy") logger.setLevel(logging.INFO) # 显式设置logger级别为INFO class StrategyState(Enum): """策略状态枚举""" WAITING_INIT = 1 # 等待初始化 IDLE_MONITORING = 2 # 空闲状态监听价差 EXECUTING_OPEN = 3 # 执行开仓操作 CHECKING_OPEN = 4 # 检查开仓结果 WAITING_CONVERGENCE = 5 # 交易完成等待价差收敛 EXECUTING_CLOSE = 6 # 执行平仓操作 CHECKING_CLOSE = 7 # 检查平仓结果 POSITION_CLOSED = 8 # 平仓完成 class TradingStrategy: """交易策略类""" def __init__(self): """初始化策略""" self.state = StrategyState.WAITING_INIT self.current_position = None # 当前持仓信息 self.entry_price_bps = 5 # 入场时的价差(单位:bps) self.target_symbol = "DOGE" # 目标交易对 self.trade_quantity = 1 # 交易数量(买卖数量) self.account_info = None # 存储账户信息 self.last_account_update_time = 0 # 上次更新账户信息的时间戳 self.last_trade_time = 0 # 上次交易时间戳(开仓或平仓) self.position_side = None # 持仓方向:'long' 或 'short' self.account_index = 318163 self.api_key_index = 0 self.api_client = lighter.ApiClient() self.account_api = lighter.AccountApi(self.api_client) self.transaction_api = lighter.TransactionApi(self.api_client) self.signer_client = lighter.SignerClient( url='https://mainnet.zklighter.elliot.ai', private_key='72aafa0426f7ff2806c68625ca5c88de153e34fcb23567f3b872cd56334d2848fb223466efff9c21', account_index=self.account_index, api_key_index=self.api_key_index ) # Check client connection err = self.signer_client.check_client() if err is not None: logger.error(f"SignerClient CheckClient error: {err}") return logger.info("策略初始化完成,当前状态: WAITING_INIT") async def do_strategy(self, market_data): """ 执行策略逻辑 Args: market_data: 包含市场数据的字典,格式: { 'symbol': str, 'binance_mark_price': float, 'binance_price': float, 'lighter_mark_price': float, 'lighter_price': float, 'timestamp': int, 'orderbook': dict # 市场信息(market_info),不是深度信息 } """ if not market_data: return # 更新账户信息,但至少间隔1秒 current_time = time.time() if current_time - self.last_account_update_time >= 1.0: # 确保至少间隔1秒 try: account_response = await self.account_api.account(by="index", value=f"{self.account_index}") if account_response.code == 200: self.account_info = account_response self.last_account_update_time = current_time # 更新时间戳 # logger.info(f"账户信息更新成功: 可用余额={account_response.accounts[0].available_balance}, 总资产={account_response.accounts[0].total_asset_value}") else: logger.warning(f"账户信息查询失败: code={account_response.code}, message={account_response.message}") except Exception as e: logger.error(f"查询账户信息时出错: {str(e)}") return symbol = market_data.get('symbol') # 如果是DOGE交易对,打印实时行情 if symbol == self.target_symbol: await self._print_market_data(market_data) # 根据当前状态执行相应逻辑 if self.state == StrategyState.WAITING_INIT: await self._handle_waiting_init() elif self.state == StrategyState.IDLE_MONITORING: await self._handle_idle_monitoring(market_data) elif self.state == StrategyState.EXECUTING_OPEN: await self._handle_executing_open(market_data) elif self.state == StrategyState.CHECKING_OPEN: await self._handle_checking_open(market_data) elif self.state == StrategyState.WAITING_CONVERGENCE: await self._handle_waiting_convergence(market_data) elif self.state == StrategyState.EXECUTING_CLOSE: await self._handle_executing_close(market_data) elif self.state == StrategyState.CHECKING_CLOSE: await self._handle_checking_close(market_data) elif self.state == StrategyState.POSITION_CLOSED: await self._handle_position_closed() async def _print_market_data(self, market_data): """打印市场数据""" symbol = market_data.get('symbol') # binance_mark = market_data.get('binance_mark_price') binance_price = market_data.get('binance_price') # lighter_mark = market_data.get('lighter_mark_price') lighter_price = market_data.get('lighter_price') # 计算价差,转换为bps单位 if binance_price and lighter_price: # 确保两个价格都是浮点数 binance_price_float = float(binance_price) if isinstance(binance_price, str) else binance_price lighter_price_float = float(lighter_price) if isinstance(lighter_price, str) else lighter_price # 计算价差并转换为bps (1bps = 0.01%) price_diff_bps = int((lighter_price_float - binance_price_float) / binance_price_float * 10000) if binance_price_float else 0 else: price_diff_bps = None # 格式化输出 price_diff_str = f"{price_diff_bps}bps" if price_diff_bps is not None else "N/A" logger.info(f"[{symbol}] Binance: 最新价={binance_price} | Lighter: 最新价={lighter_price} | 价差={price_diff_str}") async def _handle_waiting_init(self): """处理等待初始化状态""" # 初始化完成后转到空闲监听状态 self.state = StrategyState.IDLE_MONITORING logger.info("状态转换: WAITING_INIT -> IDLE_MONITORING") async def _handle_idle_monitoring(self, market_data): """处理空闲监听状态 - 监控价差""" symbol = market_data.get('symbol') if symbol != self.target_symbol: return binance_price = market_data.get('binance_price') lighter_price = market_data.get('lighter_price') orderbook = market_data.get('orderbook') if not binance_price or not lighter_price or not orderbook: return # 计算价差(单位:bps) binance_price_float = float(binance_price) if isinstance(binance_price, str) else binance_price lighter_price_float = float(lighter_price) if isinstance(lighter_price, str) else lighter_price price_diff_bps = (lighter_price_float - binance_price_float) / binance_price_float * 10000 # 检查是否触发开仓条件 if price_diff_bps > self.entry_price_bps: # 做空:价差过大,lighter价格高于binance,卖出lighter logger.info(f"触发做空条件:价差={price_diff_bps:.2f}bps > {self.entry_price_bps}bps") self.position_side = 'short' self.state = StrategyState.EXECUTING_OPEN logger.info(f"状态转换: IDLE_MONITORING -> EXECUTING_OPEN") elif price_diff_bps < -self.entry_price_bps: # 做多:价差过小(负值),lighter价格低于binance,买入lighter logger.info(f"触发做多条件:价差={price_diff_bps:.2f}bps < -{self.entry_price_bps}bps") self.position_side = 'long' self.state = StrategyState.EXECUTING_OPEN logger.info(f"状态转换: IDLE_MONITORING -> EXECUTING_OPEN") async def _open_position(self, orderbook, binance_price): """开仓""" # 确定开仓方向和价格 if self.position_side == 'long': # 做多:在Lighter买入(使用ask价格) price = binance_price is_ask = False side_desc = '做多' else: # short # 做空:在Lighter卖出(使用bid价格) price = binance_price is_ask = True side_desc = '做空' logger.info(f"开始开仓:方向={side_desc},数量={self.trade_quantity},价格={price}") tx_hash, error = await self.create_order_and_send_tx( orderbook=orderbook, quantity=self.trade_quantity, price=price, is_ask=is_ask, reduce_only=False ) if error: logger.error(f"开仓失败: {error}") logger.info(f"状态转换: EXECUTING_OPEN -> IDLE_MONITORING") self.state = StrategyState.IDLE_MONITORING # 开仓失败,保持在 EXECUTING_OPEN 状态,等待重试 return # 记录开仓时间 self.last_trade_time = time.time() # 转换状态到检查开仓 self.state = StrategyState.CHECKING_OPEN logger.info(f"状态转换: EXECUTING_OPEN -> CHECKING_OPEN,交易哈希={tx_hash}") logger.info(f"等待1秒后检查持仓...") async def _handle_executing_open(self, market_data): """处理执行开仓状态 - 执行开仓操作""" symbol = market_data.get('symbol') if symbol != self.target_symbol: return orderbook = market_data.get('orderbook') if not orderbook: logger.warning("缺少市场信息,无法执行开仓") return binance_price = market_data.get('binance_price') lighter_price = market_data.get('lighter_price') if binance_price is None or lighter_price is None: logger.warning("价格数据不完整,无法执行开仓") return # 执行开仓操作 await self._open_position(orderbook, binance_price) async def _handle_checking_open(self, market_data): """处理检查开仓状态 - 等待1秒后检查持仓""" # 检查是否已经等待了至少1秒 if time.time() - self.last_trade_time < 1.0: return # 检查持仓 symbol = market_data.get('symbol') if symbol != self.target_symbol: return if not self.account_info or not self.account_info.accounts: logger.warning("账户信息不可用,无法检查持仓") return # 查找目标交易对的持仓 position = None for pos in self.account_info.accounts[0].positions: if pos.symbol == self.target_symbol: position = pos break if position and int(position.position) != 0: # 有持仓,转换到等待价差回归状态 self.current_position = position self.state = StrategyState.WAITING_CONVERGENCE logger.info(f"检测到持仓:方向={'做多' if position.sign == 1 else '做空'},数量={position.position}") logger.info(f"状态转换: CHECKING_OPEN -> WAITING_CONVERGENCE") else: # 没有持仓,回到检测状态 logger.warning(f"开仓后未检测到持仓,回到检测状态") self.state = StrategyState.IDLE_MONITORING logger.info(f"状态转换: CHECKING_OPEN -> IDLE_MONITORING") async def _handle_waiting_convergence(self, market_data): """处理等待收敛状态 - 等待价差回归到0轴""" symbol = market_data.get('symbol') if symbol != self.target_symbol: return binance_price = market_data.get('binance_price') lighter_price = market_data.get('lighter_price') orderbook = market_data.get('orderbook') if not binance_price or not lighter_price or not orderbook: return # 计算价差(单位:bps) binance_price_float = float(binance_price) if isinstance(binance_price, str) else binance_price lighter_price_float = float(lighter_price) if isinstance(lighter_price, str) else lighter_price price_diff_bps = (lighter_price_float - binance_price_float) / binance_price_float * 10000 # 检查是否触发平仓条件 should_close = False if self.position_side == 'long': # 做多:价差需要往上回归(从负值回到0或正值) if price_diff_bps >= 0: should_close = True logger.info(f"做多平仓条件触发:价差={price_diff_bps:.2f}bps >= 0") elif self.position_side == 'short': # 做空:价差需要往下回归(从正值回到0或负值) if price_diff_bps <= 0: should_close = True logger.info(f"做空平仓条件触发:价差={price_diff_bps:.2f}bps <= 0") if should_close: # 转换到执行平仓状态 self.state = StrategyState.EXECUTING_CLOSE logger.info(f"状态转换: WAITING_CONVERGENCE -> EXECUTING_CLOSE") async def _close_position(self, orderbook, binance_price, lighter_price): """平仓""" # 确定平仓价格:使用不利方向的价格 if self.position_side == 'short': # 做空平仓(买入):取两者较高的价格 close_price = max(binance_price, lighter_price) is_ask = False # 买入 else: # long # 做多平仓(卖出):取两者较低的价格 close_price = min(binance_price, lighter_price) is_ask = True # 卖出 # 获取实际持仓数量 position_quantity = abs(int(self.current_position.position)) if self.current_position else self.trade_quantity logger.info(f"开始平仓:方向={'做空' if self.position_side == 'short' else '做多'},价格={close_price},数量={position_quantity}") tx_hash, error = await self.create_order_and_send_tx( orderbook=orderbook, quantity=position_quantity, price=close_price, is_ask=is_ask, reduce_only=True ) if error: logger.error(f"平仓失败: {error}") # 平仓失败,保持在执行平仓状态,等待下次重试 logger.info(f"平仓失败,保持在 EXECUTING_CLOSE 状态等待重试") return # 记录平仓时间 self.last_trade_time = time.time() # 转换状态到检查平仓 self.state = StrategyState.CHECKING_CLOSE logger.info(f"状态转换: EXECUTING_CLOSE -> CHECKING_CLOSE,交易哈希={tx_hash}") logger.info(f"等待1秒后检查平仓是否生效...") async def _handle_executing_close(self, market_data): """处理执行平仓状态 - 执行平仓操作""" symbol = market_data.get('symbol') if symbol != self.target_symbol: return binance_price = market_data.get('binance_price') lighter_price = market_data.get('lighter_price') orderbook = market_data.get('orderbook') if not binance_price or not lighter_price or not orderbook: return # 计算价格 binance_price_float = float(binance_price) if isinstance(binance_price, str) else binance_price lighter_price_float = float(lighter_price) if isinstance(lighter_price, str) else lighter_price await self._close_position(orderbook, binance_price_float, lighter_price_float) async def _handle_checking_close(self, market_data): """处理检查平仓状态 - 等待1秒后检查持仓是否为0""" # 检查是否已经等待了至少1秒 if time.time() - self.last_trade_time < 1.0: return # 检查持仓 symbol = market_data.get('symbol') if symbol != self.target_symbol: return if not self.account_info or not self.account_info.accounts: logger.warning("账户信息不可用,无法检查平仓状态") return # 查找目标交易对的持仓 position = None for pos in self.account_info.accounts[0].positions: if pos.symbol == self.target_symbol: position = pos break if not position or int(position.position) == 0: # 平仓成功,回到空闲状态 logger.info(f"平仓成功,当前持仓为0") self.state = StrategyState.IDLE_MONITORING self.position_side = None self.current_position = None logger.info(f"状态转换: CHECKING_CLOSE -> IDLE_MONITORING") else: # 平仓未生效,重新执行平仓 logger.warning(f"平仓未生效,当前持仓={position.position},重新执行平仓") self.state = StrategyState.EXECUTING_CLOSE logger.info(f"状态转换: CHECKING_CLOSE -> EXECUTING_CLOSE") async def _handle_position_closed(self): """处理平仓完成状态""" # 平仓完成后回到空闲监听状态 self.state = StrategyState.IDLE_MONITORING logger.info("状态转换: POSITION_CLOSED -> IDLE_MONITORING") async def create_order_and_send_tx(self, orderbook, quantity, price, is_ask=True, reduce_only=False): """ 创建订单接口 Args: orderbook: 市场信息,等价于之前的doge_market quantity: 下单数量 price: 下单价格 is_ask: 是否为卖单,True为卖,False为买 reduce_only: 是否为只减仓单 Returns: tuple: (tx_info, error) 交易信息和错误信息 """ try: # 计算实际下单数量和价格(根据精度转换) base_amount = int(quantity * (10 ** orderbook.get('supported_size_decimals', 0))) formatted_price = int(price * (10 ** orderbook.get('supported_price_decimals', 6))) # 生成客户端订单ID client_order_index = int(time.time() * 1000) # 记录下单参数 logger.info(f"创建订单 - 市场: {orderbook.get('symbol')}(ID:{orderbook.get('market_id')})") logger.info(f"订单参数 - 数量: {quantity}, 价格: {price}, 方向: {'卖出' if is_ask else '买入'}, 只减仓: {reduce_only}") logger.info(f"格式化参数 - base_amount: {base_amount}, price: {formatted_price}, client_order_index: {client_order_index}") # 签名创建订单 tx_info, error = self.signer_client.sign_create_order( market_index=orderbook.get("market_id"), client_order_index=client_order_index, base_amount=base_amount, price=formatted_price, is_ask=is_ask, order_type=self.signer_client.ORDER_TYPE_MARKET, time_in_force=self.signer_client.ORDER_TIME_IN_FORCE_IMMEDIATE_OR_CANCEL, reduce_only=reduce_only, trigger_price=0, order_expiry=0, # 所有市价单(包括减仓市价单)都必须使用NilOrderExpiry (0) ) if error is not None: logger.error(f"订单签名失败: {error}") return None, error logger.info(f"订单签名成功,准备发送交易: {tx_info}") # 发送交易 tx_response = await self.transaction_api.send_tx(self.signer_client.TX_TYPE_CREATE_ORDER, tx_info) # 检查返回状态码 if tx_response.code != 200: error_msg = f"交易发送失败: code={tx_response.code}, message={tx_response.message}, tx_hash={tx_response.tx_hash}" logger.error(error_msg) raise Exception(error_msg) tx_hash = tx_response.tx_hash logger.info(f"交易发送成功: tx_hash={tx_hash}") return tx_hash, None except Exception as e: logger.error(f"创建订单时发生错误: {str(e)}") return None, str(e) async def main(): strategy = TradingStrategy() # account = await strategy.account_api.account(by="index", value=f"{strategy.account_index}") # [AccountPosition(market_id=3, symbol='DOGE', initial_margin_fraction='10.00', open_order_count=0, pending_order_count=0, position_tied_order_count=0, sign=1, position='1', avg_entry_price='0.194368', position_value='0.194360', unrealized_pnl='-0.000008', realized_pnl='0.000000', liquidation_price='0', total_funding_paid_out=None, margin_mode=0, allocated_margin='0.000000', additional_properties={})] print(account.accounts[0].positions) # doge_market = { # "symbol": "DOGE", # "market_id": 3, # "status": "active", # "taker_fee": "0.0000", # "maker_fee": "0.0000", # "liquidation_fee": "1.0000", # "min_base_amount": "10", # "min_quote_amount": "10.000000", # "order_quote_limit": "", # "supported_size_decimals": 0, # "supported_price_decimals": 6, # "supported_quote_decimals": 6 # } # tx_response, error = await strategy.create_order_and_send_tx(doge_market, 1, 0.1, is_ask=True, reduce_only=True) # if error is not None: # print(f"Error sending first order (first batch): {error}") # return # print(tx_response) if __name__ == '__main__': import asyncio asyncio.run(main())