Explorar el Código

feat(市场数据记录器): 更新WebSocket连接和数据处理逻辑

- 修改WebSocket URL和订阅消息格式以匹配新的API规范
- 添加websockets依赖到requirements.txt
- 改进market_stats数据处理逻辑,支持新的数据结构
- 创建logs目录确保日志文件存储位置存在
- 更新代理配置格式以正确处理HTTP代理
- 简化process_data函数参数,直接使用缓存数据
skyfffire hace 1 semana
padre
commit
dd8b9becf1
Se han modificado 2 ficheros con 24 adiciones y 15 borrados
  1. 2 1
      requirements.txt
  2. 22 14
      src/record/market_data_recorder.py

+ 2 - 1
requirements.txt

@@ -1,3 +1,4 @@
 ijson
 requests
-aiohttp
+aiohttp
+websockets

+ 22 - 14
src/record/market_data_recorder.py

@@ -19,12 +19,17 @@ import requests
 from datetime import datetime
 
 # 配置日志
+# 创建logs目录(如果不存在)
+logs_dir = "logs"
+if not os.path.exists(logs_dir):
+    os.makedirs(logs_dir)
+
 logging.basicConfig(
     level=logging.INFO,
     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
     handlers=[
         logging.StreamHandler(),
-        logging.FileHandler(f"market_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
+        logging.FileHandler(f"logs/market_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
     ]
 )
 logger = logging.getLogger("market_data_recorder")
@@ -32,7 +37,7 @@ logger = logging.getLogger("market_data_recorder")
 # API接口地址
 LIGHTER_API_URL = "https://mainnet.zklighter.elliot.ai/api/v1/exchangeStats"
 LIGHTER_ORDERBOOKS_URL = "https://mainnet.zklighter.elliot.ai/api/v1/orderBooks"
-LIGHTER_WEBSOCKET_URL = "wss://mainnet.zklighter.elliot.ai/ws"
+LIGHTER_WEBSOCKET_URL = "wss://mainnet.zklighter.elliot.ai/stream"
 BINANCE_API_URL = "https://fapi.binance.com/fapi/v2/ticker/price"
 
 # 轮询间隔(秒)
@@ -61,7 +66,8 @@ async def fetch_lighter_orderbooks(session):
         proxy = 'http://' + PROXY_ADDRESS if PROXY_ADDRESS else None
         async with session.get(LIGHTER_ORDERBOOKS_URL, proxy=proxy) as response:
             if response.status == 200:
-                data = await response.json()
+                response_json = await response.json()
+                data = response_json['order_books']
                 logger.info(f"成功获取Lighter orderBooks数据,包含 {len(data)} 个交易对")
                 return data
             else:
@@ -155,14 +161,14 @@ async def handle_market_stats_websocket():
     while True:
         try:
             # 构建代理参数
-            proxy = PROXY_ADDRESS if PROXY_ADDRESS else None
+            proxy = 'http://' + PROXY_ADDRESS if PROXY_ADDRESS else None
             
             logger.info("连接到Lighter Market Stats WebSocket...")
             async with websockets.connect(LIGHTER_WEBSOCKET_URL, proxy=proxy) as websocket:
                 # 订阅所有市场的market_stats
                 subscribe_message = {
-                    "method": "subscribe",
-                    "params": ["market_stats/all"]
+                    "type": "subscribe",
+                    "channel": "market_stats/all"
                 }
                 await websocket.send(json.dumps(subscribe_message))
                 logger.info("已订阅所有市场的Market Stats")
@@ -175,19 +181,21 @@ async def handle_market_stats_websocket():
                         
                         # 处理market_stats数据
                         if data.get("type") == "update/market_stats" and "market_stats" in data:
-                            market_stats = data["market_stats"]
-                            market_id = market_stats.get("market_id")
+                            market_stats_data = data["market_stats"]
                             
-                            if market_id is not None:
+                            # market_stats是一个字典,键是market_id字符串,值是市场数据
+                            for market_id_str, market_data in market_stats_data.items():
+                                market_id = int(market_id_str)  # 将字符串转换为整数
+                                
                                 # 更新缓存,只保留最新数据
                                 market_stats_cache[market_id] = {
-                                    "mark_price": market_stats.get("mark_price"),
-                                    "last_trade_price": market_stats.get("last_trade_price"),
+                                    "mark_price": market_data.get("mark_price"),
+                                    "last_trade_price": market_data.get("last_trade_price"),
                                     "timestamp": time.time()
                                 }
                                 
                                 symbol = market_id_to_symbol.get(market_id, f"UNKNOWN_{market_id}")
-                                logger.debug(f"更新Market Stats缓存 - {symbol}(ID:{market_id}): mark_price={market_stats.get('mark_price')}, last_trade_price={market_stats.get('last_trade_price')}")
+                                logger.debug(f"更新Market Stats缓存 - {symbol}(ID:{market_id}): mark_price={market_data.get('mark_price')}, last_trade_price={market_data.get('last_trade_price')}")
                     
                     except json.JSONDecodeError as e:
                         logger.error(f"解析WebSocket消息失败: {str(e)}")
@@ -244,7 +252,7 @@ def write_batch_to_questdb(data_batch):
         return False
 
 
-def process_data(lighter_data, binance_data):
+def process_data(binance_data):
     """
     处理从两个交易所获取的数据,并将匹配的交易对数据保存到数据库
     现在使用market_stats_cache作为Lighter数据源
@@ -340,7 +348,7 @@ async def main():
                 hourly_matches_count.append(matching_count)
             
             # 处理数据(使用Market Stats缓存中的数据)
-            process_data(None, binance_data)
+            process_data(binance_data)
             
             # 检查是否需要打印每小时报告
             current_time = time.time()