#!/usr/bin/env python # -*- coding: utf-8 -*- """ 交易策略模块 实现基于Lighter和Binance价差的交易策略 """ import logging from enum import Enum from datetime import datetime import os import lighter import json import time # 配置日志 logs_dir = "logs" if not os.path.exists(logs_dir): os.makedirs(logs_dir) logger = logging.getLogger("strategy") class StrategyState(Enum): """策略状态枚举""" WAITING_INIT = 1 # 等待初始化 IDLE_MONITORING = 2 # 空闲状态监听价差 EXECUTING_TRADE = 3 # 价差达成触发交易 WAITING_CONVERGENCE = 4 # 交易完成等待价差收敛 CLOSING_POSITION = 5 # 收敛完成进行平仓 POSITION_CLOSED = 6 # 平仓完成 class TradingStrategy: """交易策略类""" def __init__(self): """初始化策略""" self.state = StrategyState.WAITING_INIT self.current_position = None # 当前持仓信息 self.entry_price_bps = 5 # 入场时的价差 self.target_symbol = "DOGE" # 目标交易对 self.account_info = None # 存储账户信息 self.account_index = 318163 self.api_key_index = 0 self.api_client = lighter.ApiClient() self.account_api = lighter.AccountApi(self.api_client) self.transaction_api = lighter.TransactionApi(self.api_client) self.signer_client = lighter.SignerClient( url='https://mainnet.zklighter.elliot.ai', private_key='72aafa0426f7ff2806c68625ca5c88de153e34fcb23567f3b872cd56334d2848fb223466efff9c21', account_index=self.account_index, api_key_index=self.api_key_index ) # Check client connection err = self.signer_client.check_client() if err is not None: logger.error(f"SignerClient CheckClient error: {err}") return logger.info("策略初始化完成,当前状态: WAITING_INIT") async def do_strategy(self, market_data): """ 执行策略逻辑 Args: market_data: 包含市场数据的字典,格式: { 'symbol': str, 'binance_mark_price': float, 'binance_price': float, 'lighter_mark_price': float, 'lighter_price': float, 'timestamp': int } """ if not market_data: return # 每次执行时更新账户信息 try: account_response = await self.account_api.account(by="index", value=f"{self.account_index}") if account_response.code == 200: self.account_info = account_response # logger.info(f"账户信息更新成功: 可用余额={account_response.accounts[0].available_balance}, 总资产={account_response.accounts[0].total_asset_value}") else: logger.warning(f"账户信息查询失败: code={account_response.code}, message={account_response.message}") except Exception as e: logger.error(f"查询账户信息时出错: {str(e)}") return symbol = market_data.get('symbol') # 如果是DOGE交易对,打印实时行情 if symbol == self.target_symbol: await self._print_market_data(market_data) # 根据当前状态执行相应逻辑 if self.state == StrategyState.WAITING_INIT: await self._handle_waiting_init() elif self.state == StrategyState.IDLE_MONITORING: await self._handle_idle_monitoring(market_data) elif self.state == StrategyState.EXECUTING_TRADE: await self._handle_executing_trade(market_data) elif self.state == StrategyState.WAITING_CONVERGENCE: await self._handle_waiting_convergence(market_data) elif self.state == StrategyState.CLOSING_POSITION: await self._handle_closing_position(market_data) elif self.state == StrategyState.POSITION_CLOSED: await self._handle_position_closed() async def _print_market_data(self, market_data): """打印市场数据""" symbol = market_data.get('symbol') # binance_mark = market_data.get('binance_mark_price') binance_price = market_data.get('binance_price') # lighter_mark = market_data.get('lighter_mark_price') lighter_price = market_data.get('lighter_price') # 计算价差,转换为bps单位 if binance_price and lighter_price: # 确保两个价格都是浮点数 binance_price_float = float(binance_price) if isinstance(binance_price, str) else binance_price lighter_price_float = float(lighter_price) if isinstance(lighter_price, str) else lighter_price # 计算价差并转换为bps (1bps = 0.01%) price_diff_bps = int((lighter_price_float - binance_price_float) / binance_price_float * 10000) if binance_price_float else 0 else: price_diff_bps = None # 格式化输出 price_diff_str = f"{price_diff_bps}bps" if price_diff_bps is not None else "N/A" logger.info(f"[{symbol}] Binance: 最新价={binance_price} | Lighter: 最新价={lighter_price} | 价差={price_diff_str}") async def _handle_waiting_init(self): """处理等待初始化状态""" # 初始化完成后转到空闲监听状态 self.state = StrategyState.IDLE_MONITORING logger.info("状态转换: WAITING_INIT -> IDLE_MONITORING") async def _handle_idle_monitoring(self, market_data): """处理空闲监听状态 - 监控价差""" # TODO: 实现价差监控逻辑 pass async def _handle_executing_trade(self, market_data): """处理执行交易状态""" # TODO: 实现交易执行逻辑 pass async def _handle_waiting_convergence(self, market_data): """处理等待收敛状态""" # TODO: 实现等待价差收敛逻辑 pass async def _handle_closing_position(self, market_data): """处理平仓状态""" # TODO: 实现平仓逻辑 pass async def _handle_position_closed(self): """处理平仓完成状态""" # 平仓完成后回到空闲监听状态 self.state = StrategyState.IDLE_MONITORING logger.info("状态转换: POSITION_CLOSED -> IDLE_MONITORING") async def main(): strategy = TradingStrategy() account = await strategy.account_api.account(by="index", value=f"{strategy.account_index}") # [AccountPosition(market_id=3, symbol='DOGE', initial_margin_fraction='10.00', open_order_count=0, pending_order_count=0, position_tied_order_count=0, sign=1, position='1', avg_entry_price='0.194368', position_value='0.194360', unrealized_pnl='-0.000008', realized_pnl='0.000000', liquidation_price='0', total_funding_paid_out=None, margin_mode=0, allocated_margin='0.000000', additional_properties={})] print(account.accounts[0].positions) doge_market = { "symbol": "DOGE", "market_id": 3, "status": "active", "taker_fee": "0.0000", "maker_fee": "0.0000", "liquidation_fee": "1.0000", "min_base_amount": "10", "min_quote_amount": "10.000000", "order_quote_limit": "", "supported_size_decimals": 0, "supported_price_decimals": 6, "supported_quote_decimals": 6 } next_nonce = await strategy.transaction_api.next_nonce(account_index=strategy.account_index, api_key_index=strategy.api_key_index) nonce_value = next_nonce.nonce base_amount = int(1 * (10 ** doge_market['supported_size_decimals'])) avg_execution_price = int(0.190000 * (10 ** doge_market['supported_price_decimals'])) # 打印所有参数 client_order_index = int(time.time() * 1000) print("=== 创建订单参数 ===") print(f"market_index: {doge_market['market_id']}") print(f"client_order_index: {client_order_index}") print(f"base_amount: {base_amount}") print(f"price: {avg_execution_price}") print(f"is_ask: True") print(f"order_type: {strategy.signer_client.ORDER_TYPE_MARKET}") print(f"time_in_force: {strategy.signer_client.ORDER_TIME_IN_FORCE_IMMEDIATE_OR_CANCEL}") print(f"reduce_only: True") print(f"trigger_price: 0") print(f"nonce: {nonce_value}") print(f"order_expiry: 0") print("==================") tx_info, error = strategy.signer_client.sign_create_order( market_index=doge_market["market_id"], client_order_index=client_order_index, base_amount=base_amount, price=avg_execution_price, is_ask=True, order_type=strategy.signer_client.ORDER_TYPE_MARKET, time_in_force=strategy.signer_client.ORDER_TIME_IN_FORCE_IMMEDIATE_OR_CANCEL, reduce_only=True, trigger_price=0, nonce=nonce_value, order_expiry=0, # 所有市价单(包括减仓市价单)都必须使用NilOrderExpiry (0) ) if error is not None: print(f"Error signing first order (first batch): {error}") return print(tx_info) tx_hash = await strategy.transaction_api.send_tx(strategy.signer_client.TX_TYPE_CREATE_ORDER, tx_info) print(tx_hash) if __name__ == '__main__': import asyncio asyncio.run(main())