瀏覽代碼

feat(market_data_recorder): 添加WebSocket消息类型处理逻辑

处理ping/pong心跳消息、连接确认和订阅确认消息
添加对未知消息类型的日志记录
skyfffire 1 周之前
父節點
當前提交
3996974fde
共有 1 個文件被更改,包括 20 次插入1 次删除
  1. 20 1
      src/record/market_data_recorder.py

+ 20 - 1
src/record/market_data_recorder.py

@@ -256,9 +256,20 @@ async def handle_market_stats_websocket():
                 async for message in websocket:
                     try:
                         data = json.loads(message)
+                        message_type = data.get("type")
+                        
+                        # 处理ping消息,回复pong保持连接
+                        if message_type == "ping":
+                            pong_message = {"type": "pong"}
+                            await websocket.send(json.dumps(pong_message))
+                            logger.debug("收到ping消息,已回复pong")
+                        
+                        # 处理连接确认消息
+                        elif message_type == "connected":
+                            logger.info("WebSocket连接已确认")
                         
                         # 处理market_stats数据
-                        if data.get("type") == "update/market_stats" and "market_stats" in data:
+                        elif message_type == "update/market_stats" and "market_stats" in data:
                             market_stats_data = data["market_stats"]
                             
                             # market_stats是一个字典,键是market_id字符串,值是市场数据
@@ -274,6 +285,14 @@ async def handle_market_stats_websocket():
                                 
                                 symbol = market_id_to_symbol.get(market_id, f"UNKNOWN_{market_id}")
                                 logger.debug(f"更新Market Stats缓存 - {symbol}(ID:{market_id}): mark_price={market_data.get('mark_price')}, last_trade_price={market_data.get('last_trade_price')}")
+                        
+                        # 处理订阅确认消息
+                        elif message_type == "subscribed/market_stats":
+                            logger.info("Market Stats订阅确认")
+                        
+                        # 处理其他未知消息类型
+                        else:
+                            logger.debug(f"收到未处理的消息类型: {message_type}")
                     
                     except json.JSONDecodeError as e:
                         logger.error(f"解析WebSocket消息失败: {str(e)}")