#!/usr/bin/env python # -*- coding: utf-8 -*- """ 交易策略模块 实现基于Lighter和Binance价差的交易策略 """ import logging from enum import Enum from datetime import datetime import os import lighter # 配置日志 logs_dir = "logs" if not os.path.exists(logs_dir): os.makedirs(logs_dir) logger = logging.getLogger("strategy") class StrategyState(Enum): """策略状态枚举""" WAITING_INIT = 1 # 等待初始化 IDLE_MONITORING = 2 # 空闲状态监听价差 EXECUTING_TRADE = 3 # 价差达成触发交易 WAITING_CONVERGENCE = 4 # 交易完成等待价差收敛 CLOSING_POSITION = 5 # 收敛完成进行平仓 POSITION_CLOSED = 6 # 平仓完成 class TradingStrategy: """交易策略类""" def __init__(self): """初始化策略""" self.state = StrategyState.WAITING_INIT self.current_position = None # 当前持仓信息 self.entry_price_bps = 5 # 入场时的价差 self.target_symbol = "DOGE" # 目标交易对 self.account_index = 281474976643718 self.api_client = lighter.ApiClient() self.account_api = lighter.AccountApi(self.api_client) self.signer_client = lighter.SignerClient( url='https://mainnet.zklighter.elliot.ai', private_key='0xf3625c4662ab0b338e405f61b7555e90aeda8fa28dd607588c9e275dc6f326ddcbd9341e18ca2950', account_index=self.account_index, api_key_index=0 ) logger.info("策略初始化完成,当前状态: WAITING_INIT") async def do_strategy(self, market_data): """ 执行策略逻辑 Args: market_data: 包含市场数据的字典,格式: { 'symbol': str, 'binance_mark_price': float, 'binance_price': float, 'lighter_mark_price': float, 'lighter_price': float, 'timestamp': int } """ if not market_data: return symbol = market_data.get('symbol') # 如果是DOGE交易对,打印实时行情 if symbol == self.target_symbol: await self._print_market_data(market_data) # 根据当前状态执行相应逻辑 if self.state == StrategyState.WAITING_INIT: await self._handle_waiting_init() elif self.state == StrategyState.IDLE_MONITORING: await self._handle_idle_monitoring(market_data) elif self.state == StrategyState.EXECUTING_TRADE: await self._handle_executing_trade(market_data) elif self.state == StrategyState.WAITING_CONVERGENCE: await self._handle_waiting_convergence(market_data) elif self.state == StrategyState.CLOSING_POSITION: await self._handle_closing_position(market_data) elif self.state == StrategyState.POSITION_CLOSED: await self._handle_position_closed() async def _print_market_data(self, market_data): """打印市场数据""" symbol = market_data.get('symbol') # binance_mark = market_data.get('binance_mark_price') binance_price = market_data.get('binance_price') # lighter_mark = market_data.get('lighter_mark_price') lighter_price = market_data.get('lighter_price') # 计算价差,转换为bps单位 if binance_price and lighter_price: # 确保两个价格都是浮点数 binance_price_float = float(binance_price) if isinstance(binance_price, str) else binance_price lighter_price_float = float(lighter_price) if isinstance(lighter_price, str) else lighter_price # 计算价差并转换为bps (1bps = 0.01%) price_diff_bps = int((lighter_price_float - binance_price_float) / binance_price_float * 10000) if binance_price_float else 0 else: price_diff_bps = None # 格式化输出 price_diff_str = f"{price_diff_bps}bps" if price_diff_bps is not None else "N/A" logger.info(f"[{symbol}] Binance: 最新价={binance_price} | Lighter: 最新价={lighter_price} | 价差={price_diff_str}") account = await self.account_api.account(by="index", value=f"{self.account_index}") logger.info(f"账户状态: {account}") async def _handle_waiting_init(self): """处理等待初始化状态""" # 初始化完成后转到空闲监听状态 self.state = StrategyState.IDLE_MONITORING logger.info("状态转换: WAITING_INIT -> IDLE_MONITORING") async def _handle_idle_monitoring(self, market_data): """处理空闲监听状态 - 监控价差""" # TODO: 实现价差监控逻辑 pass async def _handle_executing_trade(self, market_data): """处理执行交易状态""" # TODO: 实现交易执行逻辑 pass async def _handle_waiting_convergence(self, market_data): """处理等待收敛状态""" # TODO: 实现等待价差收敛逻辑 pass async def _handle_closing_position(self, market_data): """处理平仓状态""" # TODO: 实现平仓逻辑 pass async def _handle_position_closed(self): """处理平仓完成状态""" # 平仓完成后回到空闲监听状态 self.state = StrategyState.IDLE_MONITORING logger.info("状态转换: POSITION_CLOSED -> IDLE_MONITORING")