Kaynağa Gözat

feat(market_data): 添加WebSocket支持实时获取市场数据

添加WebSocket连接以实时获取Lighter交易所的市场统计数据,替代原有的轮询方式
引入market_stats_cache缓存最新数据,优化数据处理逻辑
修改write_batch_to_questdb方法以支持新数据结构
skyfffire 1 hafta önce
ebeveyn
işleme
244bc4067c
1 değiştirilmiş dosya ile 179 ekleme ve 106 silme
  1. 179 106
      src/record/market_data_recorder.py

+ 179 - 106
src/record/market_data_recorder.py

@@ -10,6 +10,8 @@
 
 import asyncio
 import aiohttp
+import websockets
+import json
 import time
 import logging
 import os
@@ -30,6 +32,7 @@ logger = logging.getLogger("market_data_recorder")
 # API接口地址
 LIGHTER_API_URL = "https://mainnet.zklighter.elliot.ai/api/v1/exchangeStats"
 LIGHTER_ORDERBOOKS_URL = "https://mainnet.zklighter.elliot.ai/api/v1/orderBooks"
+LIGHTER_WEBSOCKET_URL = "wss://mainnet.zklighter.elliot.ai/ws"
 BINANCE_API_URL = "https://fapi.binance.com/fapi/v2/ticker/price"
 
 # 轮询间隔(秒)
@@ -47,6 +50,9 @@ INFLUX_URL = f"http://{QUESTDB_HOST}:{QUESTDB_INFLUX_PORT}/write"
 # 全局变量存储market_id到symbol的映射
 market_id_to_symbol = {}
 
+# 全局变量存储最新的Market Stats数据
+market_stats_cache = {}
+
 
 async def fetch_lighter_orderbooks(session):
     """从Lighter交易所获取orderBooks数据以建立market_id映射"""
@@ -141,90 +147,151 @@ def update_market_id_mapping(orderbooks_data):
         logger.error(f"更新market_id映射时出错: {str(e)}")
 
 
+async def handle_market_stats_websocket():
+    """
+    处理Lighter Market Stats WebSocket连接
+    订阅所有市场的market_stats数据并更新缓存
+    """
+    while True:
+        try:
+            # 构建代理参数
+            proxy = PROXY_ADDRESS if PROXY_ADDRESS else None
+            
+            logger.info("连接到Lighter Market Stats WebSocket...")
+            async with websockets.connect(LIGHTER_WEBSOCKET_URL, proxy=proxy) as websocket:
+                # 订阅所有市场的market_stats
+                subscribe_message = {
+                    "method": "subscribe",
+                    "params": ["market_stats/all"]
+                }
+                await websocket.send(json.dumps(subscribe_message))
+                logger.info("已订阅所有市场的Market Stats")
+                
+                logger.info("Market Stats WebSocket连接成功,开始接收数据...")
+                
+                async for message in websocket:
+                    try:
+                        data = json.loads(message)
+                        
+                        # 处理market_stats数据
+                        if data.get("type") == "update/market_stats" and "market_stats" in data:
+                            market_stats = data["market_stats"]
+                            market_id = market_stats.get("market_id")
+                            
+                            if market_id is not None:
+                                # 更新缓存,只保留最新数据
+                                market_stats_cache[market_id] = {
+                                    "mark_price": market_stats.get("mark_price"),
+                                    "last_trade_price": market_stats.get("last_trade_price"),
+                                    "timestamp": time.time()
+                                }
+                                
+                                symbol = market_id_to_symbol.get(market_id, f"UNKNOWN_{market_id}")
+                                logger.debug(f"更新Market Stats缓存 - {symbol}(ID:{market_id}): mark_price={market_stats.get('mark_price')}, last_trade_price={market_stats.get('last_trade_price')}")
+                    
+                    except json.JSONDecodeError as e:
+                        logger.error(f"解析WebSocket消息失败: {str(e)}")
+                    except Exception as e:
+                        logger.error(f"处理WebSocket消息时出错: {str(e)}")
+                        
+        except websockets.exceptions.ConnectionClosed:
+            logger.warning("Market Stats WebSocket连接断开,5秒后重连...")
+            await asyncio.sleep(5)
+        except Exception as e:
+            logger.error(f"Market Stats WebSocket连接出错: {str(e)},5秒后重连...")
+            await asyncio.sleep(5)
+
+
 def write_batch_to_questdb(data_batch):
     """批量将数据写入QuestDB"""
     if not data_batch:
         return True
         
     try:
-        # 获取当前的UTC时间戳(纳秒)
-        timestamp_ns = int(time.time_ns())
-        
-        # 构建批量InfluxDB行协议字符串
+        # 构建InfluxDB Line Protocol格式的数据
         lines = []
-        for item in data_batch:
-            symbol = item['symbol']
-            binance_price = item['binance_price']
-            lighter_price = item['lighter_price']
+        for data in data_batch:
+            symbol = data['symbol']
+            binance_price = data['binance_price']
+            lighter_mark_price = data['lighter_mark_price']
+            lighter_last_trade_price = data['lighter_last_trade_price']
             
-            # 构建表名,使用前缀和交易对作为表名
-            table_name = f"{QUESTDB_TABLE_PREFIX}_{symbol}"
-            
-            # 构建InfluxDB行协议字符串
-            # 格式: <table_name> <field_key>=<field_value>,... <timestamp_ns>
-            line_protocol = (
-                f"{table_name} "
-                f"binance_price={binance_price},"
-                f"lighter_price={lighter_price} "
-                f"{timestamp_ns}"
-            )
-            lines.append(line_protocol)
+            # 构建InfluxDB Line Protocol格式
+            # 格式: measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp
+            line = f"{QUESTDB_TABLE_PREFIX}_market_data,symbol={symbol} binance_price={binance_price},lighter_mark_price={lighter_mark_price},lighter_last_trade_price={lighter_last_trade_price}"
+            lines.append(line)
         
-        # 将所有行合并为一个批量请求
-        batch_data = '\n'.join(lines)
+        # 将所有行合并为一个字符串,用换行符分隔
+        data_string = '\n'.join(lines)
         
-        # 发送POST请求批量写入数据
-        response = requests.post(INFLUX_URL, data=batch_data.encode('utf-8'), timeout=10)
+        # 发送到QuestDB
+        response = requests.post(
+            INFLUX_URL,
+            data=data_string,
+            headers={'Content-Type': 'text/plain'},
+            timeout=10
+        )
         
-        # 检查响应状态码
-        if response.status_code == 204:
-            logger.info(f"成功批量写入{len(data_batch)}个交易对数据到QuestDB")
+        if response.status_code == 200:
+            logger.info(f"成功批量写入 {len(data_batch)} 条数据到QuestDB")
             return True
         else:
-            logger.error(f"批量写入数据到QuestDB失败: HTTP {response.status_code}, 响应: {response.text}")
+            logger.error(f"写入QuestDB失败,状态码: {response.status_code}, 响应: {response.text}")
             return False
+            
     except Exception as e:
         logger.error(f"批量写入数据到QuestDB时出错: {str(e)}")
         return False
 
 
 def process_data(lighter_data, binance_data):
-    """处理并比较两个交易所的数据"""
-    if not lighter_data or not binance_data:
-        logger.warning("一个或两个交易所的数据缺失")
-        return
-    
-    # 从Lighter数据中提取交易对
-    lighter_symbols = {}
-    for item in lighter_data.get('order_book_stats', []):
-        symbol = item.get('symbol')
-        if symbol:
-            lighter_symbols[symbol] = {
-                'price': item.get('last_trade_price', 0)
-            }
+    """
+    处理从两个交易所获取的数据,并将匹配的交易对数据保存到数据库
+    现在使用market_stats_cache作为Lighter数据源
+    """
+    if not binance_data or not market_stats_cache:
+        logger.warning("缺少必要的数据,跳过本次处理")
+        return 0
     
-    # 从Binance数据中提取交易对并收集匹配的数据
+    # 准备批量写入的数据
     batch_data = []
-    for item in binance_data:
-        binance_symbol = item.get('symbol', '')
-        # 移除USDT后缀以匹配Lighter交易对
+    matching_count = 0
+    
+    # 遍历Binance数据
+    for binance_item in binance_data:
+        binance_symbol = binance_item.get('symbol', '')
         base_symbol = binance_symbol.replace('USDT', '')
+        binance_price = binance_item.get('price')
         
-        if base_symbol in lighter_symbols:
-            binance_price = float(item.get('price', 0))
-            lighter_price = lighter_symbols[base_symbol]['price']
-            
-            symbol_data = {
-                'symbol': base_symbol,
-                'binance_price': binance_price,
-                'lighter_price': lighter_price,
-                'binance_symbol': binance_symbol
-            }
-            batch_data.append(symbol_data)
+        if not binance_price:
+            continue
+        
+        # 在market_stats_cache中查找对应的Lighter数据
+        for market_id, symbol in market_id_to_symbol.items():
+            if symbol == base_symbol and market_id in market_stats_cache:
+                lighter_stats = market_stats_cache[market_id]
+                
+                # 构建数据记录
+                symbol_data = {
+                    'symbol': base_symbol,
+                    'binance_price': float(binance_price),
+                    'lighter_mark_price': lighter_stats.get('mark_price'),
+                    'lighter_last_trade_price': lighter_stats.get('last_trade_price'),
+                    'timestamp': int(time.time() * 1000)  # 毫秒时间戳
+                }
+                
+                batch_data.append(symbol_data)
+                matching_count += 1
+                break
+    
+    # 如果有匹配的数据,批量写入数据库
+    if batch_data:
+        logger.info(f"找到 {matching_count} 个匹配的交易对,准备写入数据库")
+        # write_batch_to_questdb(batch_data)  # 暂时注释掉数据库写入
+    else:
+        logger.warning("没有找到匹配的交易对数据")
     
-    # 批量写入所有匹配的数据到QuestDB
-    # if batch_data:
-    #     write_batch_to_questdb(batch_data)
+    return matching_count
 
 
 async def main():
@@ -245,56 +312,62 @@ async def main():
         orderbooks_data = await fetch_lighter_orderbooks(session)
         update_market_id_mapping(orderbooks_data)
     
-    while True:
-        start_time = time.time()
-        
-        # 从两个交易所获取数据
-        lighter_data, binance_data = await fetch_all_data()
-        
-        # 处理数据并获取匹配交易对数量
-        matching_count = 0
-        if lighter_data and binance_data:
-            # 从Lighter数据中提取交易对
-            lighter_symbols = {}
-            for item in lighter_data.get('order_book_stats', []):
-                symbol = item.get('symbol')
-                if symbol:
-                    lighter_symbols[symbol] = True
+    # 启动Market Stats WebSocket任务
+    websocket_task = asyncio.create_task(handle_market_stats_websocket())
+    logger.info("已启动Market Stats WebSocket任务")
+    
+    try:
+        while True:
+            start_time = time.time()
             
-            # 计算匹配的交易对数量
-            for item in binance_data:
-                binance_symbol = item.get('symbol', '')
-                base_symbol = binance_symbol.replace('USDT', '')
-                if base_symbol in lighter_symbols:
-                    matching_count += 1
+            # 从Binance获取数据(Lighter数据现在从WebSocket获取)
+            _, binance_data = await fetch_all_data()
             
-            # 记录本次匹配数量
-            hourly_matches_count.append(matching_count)
-        
-        # 处理数据
-        process_data(lighter_data, binance_data)
-        
-        # 检查是否需要打印每小时报告
-        current_time = time.time()
-        if current_time - last_hourly_report_time >= 3600:  # 3600秒 = 1小时
-            if hourly_matches_count:
-                avg_matches = sum(hourly_matches_count) / len(hourly_matches_count)
-                min_matches = min(hourly_matches_count)
-                max_matches = max(hourly_matches_count)
-                logger.info(f"过去一小时内匹配交易对统计: 平均 {avg_matches:.1f} 个, 最少 {min_matches} 个, 最多 {max_matches} 个")
-            else:
-                logger.info("过去一小时内没有匹配到交易对")
+            # 处理数据并获取匹配交易对数量
+            matching_count = 0
+            if binance_data and market_stats_cache:
+                # 计算匹配的交易对数量
+                for item in binance_data:
+                    binance_symbol = item.get('symbol', '')
+                    base_symbol = binance_symbol.replace('USDT', '')
+                    # 检查是否在Market Stats缓存中有对应的数据
+                    for market_id, symbol in market_id_to_symbol.items():
+                        if symbol == base_symbol and market_id in market_stats_cache:
+                            matching_count += 1
+                            break
+                
+                # 记录本次匹配数量
+                hourly_matches_count.append(matching_count)
             
-            # 重置计数器和时间
-            last_hourly_report_time = current_time
-            hourly_matches_count = []
-        
-        # 计算休眠时间
-        elapsed_time = time.time() - start_time
-        sleep_time = max(0, POLLING_INTERVAL - elapsed_time)
-        
-        # logger.info(f"完成数据收集周期,耗时 {elapsed_time:.2f}秒,休眠 {sleep_time:.2f}秒")
-        await asyncio.sleep(sleep_time)
+            # 处理数据(使用Market Stats缓存中的数据)
+            process_data(None, binance_data)
+            
+            # 检查是否需要打印每小时报告
+            current_time = time.time()
+            if current_time - last_hourly_report_time >= 3600:  # 3600秒 = 1小时
+                if hourly_matches_count:
+                    avg_matches = sum(hourly_matches_count) / len(hourly_matches_count)
+                    min_matches = min(hourly_matches_count)
+                    max_matches = max(hourly_matches_count)
+                    logger.info(f"过去一小时内匹配交易对统计: 平均 {avg_matches:.1f} 个, 最少 {min_matches} 个, 最多 {max_matches} 个")
+                else:
+                    logger.info("过去一小时内没有匹配到交易对")
+                
+                # 重置计数器和时间
+                last_hourly_report_time = current_time
+                hourly_matches_count = []
+            
+            # 计算休眠时间
+            elapsed_time = time.time() - start_time
+            sleep_time = max(0, POLLING_INTERVAL - elapsed_time)
+            
+            # logger.info(f"完成数据收集周期,耗时 {elapsed_time:.2f}秒,休眠 {sleep_time:.2f}秒")
+            await asyncio.sleep(sleep_time)
+    
+    except Exception as e:
+        logger.error(f"主循环出错: {str(e)}")
+        websocket_task.cancel()
+        raise
 
 
 if __name__ == "__main__":