Selaa lähdekoodia

feat: 增强错误日志记录并添加lighter-sdk依赖

在多个异常处理中添加traceback信息以增强错误调试能力
将trigger_strategy_update改为异步方法以匹配策略实现
添加lighter-sdk依赖并实现策略账户状态查询
skyfffire 1 viikko sitten
vanhempi
commit
215fe0507e
3 muutettua tiedostoa jossa 53 lisäystä ja 36 poistoa
  1. 2 1
      requirements.txt
  2. 21 12
      src/record/market_data_recorder.py
  3. 30 23
      src/record/strategy.py

+ 2 - 1
requirements.txt

@@ -1,4 +1,5 @@
 ijson
 requests
 aiohttp
-websockets
+websockets
+lighter-sdk

+ 21 - 12
src/record/market_data_recorder.py

@@ -16,6 +16,7 @@ import time
 import logging
 import os
 import requests
+import traceback
 from datetime import datetime
 from strategy import TradingStrategy
 
@@ -85,7 +86,8 @@ async def fetch_lighter_orderbooks(session):
                 logger.error(f"获取Lighter orderBooks数据失败: HTTP {response.status}")
                 return None
     except Exception as e:
-        logger.error(f"获取Lighter orderBooks数据时出错: {str(e)}")
+        error_info = traceback.format_exc()
+        logger.error(f"获取Lighter orderBooks数据时出错: {str(e)}\n{error_info}")
         return None
 
 
@@ -101,7 +103,8 @@ async def fetch_binance_premium_index(session):
                 logger.error(f"获取Binance标记价格数据失败: HTTP {response.status}")
                 return None
     except Exception as e:
-        logger.error(f"获取Binance标记价格数据时出错: {str(e)}")
+        error_info = traceback.format_exc()
+        logger.error(f"获取Binance标记价格数据时出错: {str(e)}\n{error_info}")
         return None
 
 
@@ -117,7 +120,8 @@ async def fetch_binance_ticker_price(session):
                 logger.error(f"获取Binance最新价格数据失败: HTTP {response.status}")
                 return None
     except Exception as e:
-        logger.error(f"获取Binance最新价格数据时出错: {str(e)}")
+        error_info = traceback.format_exc()
+        logger.error(f"获取Binance最新价格数据时出错: {str(e)}\n{error_info}")
         return None
 
 
@@ -136,7 +140,8 @@ def update_market_id_mapping(orderbooks_data):
         
         logger.info(f"更新market_id映射,共 {len(market_id_to_symbol)} 个交易对,{market_id_to_symbol}")
     except Exception as e:
-        logger.error(f"更新market_id映射时出错: {str(e)}")
+        error_info = traceback.format_exc()
+        logger.error(f"更新market_id映射时出错: {str(e)}\n{error_info}")
 
 
 async def handle_binance_data_collection():
@@ -170,13 +175,12 @@ async def handle_binance_data_collection():
                             binance_data_cache['latest_prices'][symbol] = float(price)
                 
                 # 触发策略更新
-                trigger_strategy_update()
+                await trigger_strategy_update()
                 
                 # 每300ms请求一次
                 await asyncio.sleep(0.3)
                 
         except Exception as e:
-            import traceback
             error_info = traceback.format_exc()
             logger.error(f"Binance数据收集出错: {str(e)}\n{error_info}")
             await asyncio.sleep(1)  # 出错时等待1秒再重试
@@ -249,14 +253,17 @@ async def handle_market_stats_websocket():
                             logger.debug(f"收到未处理的消息类型: {message_type}")
                     
                     except json.JSONDecodeError as e:
-                        logger.error(f"解析WebSocket消息失败: {str(e)}")
+                        error_info = traceback.format_exc()
+                        logger.error(f"解析WebSocket消息失败: {str(e)}\n{error_info}")
                     except Exception as e:
-                        logger.error(f"处理WebSocket消息时出错: {str(e)}")
+                        error_info = traceback.format_exc()
+                        logger.error(f"处理WebSocket消息时出错: {str(e)}\n{error_info}")
                         
         except websockets.exceptions.ConnectionClosed:
             logger.warning("Market Stats WebSocket连接断开,5秒后重连...")
             await asyncio.sleep(5)
         except Exception as e:
+            error_info = traceback.format_exc()
             logger.error(f"Market Stats WebSocket连接出错: {str(e)},5秒后重连...")
             await asyncio.sleep(5)
 
@@ -300,11 +307,12 @@ def write_batch_to_questdb(data_batch):
             return False
             
     except Exception as e:
-        logger.error(f"批量写入数据到QuestDB时出错: {str(e)}")
+        error_info = traceback.format_exc()
+        logger.error(f"批量写入数据到QuestDB时出错: {str(e)}\n{error_info}")
         return False
 
 
-def trigger_strategy_update():
+async def trigger_strategy_update():
     """
     触发策略更新,将最新的市场数据传递给策略
     """
@@ -334,7 +342,7 @@ def trigger_strategy_update():
         }
         
         # 调用策略
-        trading_strategy.do_strategy(market_data)
+        await trading_strategy.do_strategy(market_data)
 
 
 def process_data():
@@ -470,4 +478,5 @@ if __name__ == "__main__":
     except KeyboardInterrupt:
         logger.info("行情数据记录器被用户停止")
     except Exception as e:
-        logger.error(f"发生意外错误: {str(e)}")
+        error_info = traceback.format_exc()
+        logger.error(f"发生意外错误: {str(e)}\n{error_info}")

+ 30 - 23
src/record/strategy.py

@@ -9,12 +9,7 @@ import logging
 from enum import Enum
 from datetime import datetime
 import os
-
-"""
-ind 0
-pub c787afe5ff5d02fb3b0180c86ea4bbf9bfdd1bcab3d96e395d52502532fe05cfc43807e062b16814
-pri f3625c4662ab0b338e405f61b7555e90aeda8fa28dd607588c9e275dc6f326ddcbd9341e18ca2950
-"""
+import lighter
 
 
 # 配置日志
@@ -28,8 +23,8 @@ logger = logging.getLogger("strategy")
 class StrategyState(Enum):
     """策略状态枚举"""
     WAITING_INIT = 1  # 等待初始化
-    IDLE_MONITORING = 2  # 空闲状态监听
-    EXECUTING_TRADE = 3  # 价达成触发交易
+    IDLE_MONITORING = 2  # 空闲状态监听价
+    EXECUTING_TRADE = 3  # 价达成触发交易
     WAITING_CONVERGENCE = 4  # 交易完成等待价差收敛
     CLOSING_POSITION = 5  # 收敛完成进行平仓
     POSITION_CLOSED = 6  # 平仓完成
@@ -44,10 +39,20 @@ class TradingStrategy:
         self.current_position = None    # 当前持仓信息
         self.entry_price_bps = 5        # 入场时的价差
         self.target_symbol = "DOGE"     # 目标交易对
+
+        self.account_index = 281474976643718
+        self.api_client = lighter.ApiClient()
+        self.account_api = lighter.AccountApi(self.api_client)
+        self.signer_client = lighter.SignerClient(  
+            url='https://mainnet.zklighter.elliot.ai',  
+            private_key='0xf3625c4662ab0b338e405f61b7555e90aeda8fa28dd607588c9e275dc6f326ddcbd9341e18ca2950',  
+            account_index=self.account_index,
+            api_key_index=0
+        )
         
         logger.info("策略初始化完成,当前状态: WAITING_INIT")
     
-    def do_strategy(self, market_data):
+    async def do_strategy(self, market_data):
         """
         执行策略逻辑
         
@@ -69,23 +74,23 @@ class TradingStrategy:
         
         # 如果是DOGE交易对,打印实时行情
         if symbol == self.target_symbol:
-            self._print_market_data(market_data)
+            await self._print_market_data(market_data)
         
         # 根据当前状态执行相应逻辑
         if self.state == StrategyState.WAITING_INIT:
-            self._handle_waiting_init()
+            await self._handle_waiting_init()
         elif self.state == StrategyState.IDLE_MONITORING:
-            self._handle_idle_monitoring(market_data)
+            await self._handle_idle_monitoring(market_data)
         elif self.state == StrategyState.EXECUTING_TRADE:
-            self._handle_executing_trade(market_data)
+            await self._handle_executing_trade(market_data)
         elif self.state == StrategyState.WAITING_CONVERGENCE:
-            self._handle_waiting_convergence(market_data)
+            await self._handle_waiting_convergence(market_data)
         elif self.state == StrategyState.CLOSING_POSITION:
-            self._handle_closing_position(market_data)
+            await self._handle_closing_position(market_data)
         elif self.state == StrategyState.POSITION_CLOSED:
-            self._handle_position_closed()
+            await self._handle_position_closed()
     
-    def _print_market_data(self, market_data):
+    async def _print_market_data(self, market_data):
         """打印市场数据"""
         symbol = market_data.get('symbol')
         # binance_mark = market_data.get('binance_mark_price')
@@ -108,34 +113,36 @@ class TradingStrategy:
         price_diff_str = f"{price_diff_bps}bps" if price_diff_bps is not None else "N/A"
         
         logger.info(f"[{symbol}] Binance: 最新价={binance_price} | Lighter: 最新价={lighter_price} | 价差={price_diff_str}")
+        account = await self.account_api.account(by="index", value=f"{self.account_index}")
+        logger.info(f"账户状态: {account}")
     
-    def _handle_waiting_init(self):
+    async def _handle_waiting_init(self):
         """处理等待初始化状态"""
         # 初始化完成后转到空闲监听状态
         self.state = StrategyState.IDLE_MONITORING
         logger.info("状态转换: WAITING_INIT -> IDLE_MONITORING")
     
-    def _handle_idle_monitoring(self, market_data):
+    async def _handle_idle_monitoring(self, market_data):
         """处理空闲监听状态 - 监控价差"""
         # TODO: 实现价差监控逻辑
         pass
     
-    def _handle_executing_trade(self, market_data):
+    async def _handle_executing_trade(self, market_data):
         """处理执行交易状态"""
         # TODO: 实现交易执行逻辑
         pass
     
-    def _handle_waiting_convergence(self, market_data):
+    async def _handle_waiting_convergence(self, market_data):
         """处理等待收敛状态"""
         # TODO: 实现等待价差收敛逻辑
         pass
     
-    def _handle_closing_position(self, market_data):
+    async def _handle_closing_position(self, market_data):
         """处理平仓状态"""
         # TODO: 实现平仓逻辑
         pass
     
-    def _handle_position_closed(self):
+    async def _handle_position_closed(self):
         """处理平仓完成状态"""
         # 平仓完成后回到空闲监听状态
         self.state = StrategyState.IDLE_MONITORING