Bladeren bron

perf: 优化市场数据记录器性能并减少日志输出

减少轮询间隔至1秒以提高数据时效性
移除冗余日志输出以提升性能
添加每小时匹配交易对统计功能
skyfffire 1 week geleden
bovenliggende
commit
18abd5ed7d
1 gewijzigde bestanden met toevoegingen van 45 en 14 verwijderingen
  1. 45 14
      src/record/market_data_recorder.py

+ 45 - 14
src/record/market_data_recorder.py

@@ -32,7 +32,7 @@ LIGHTER_API_URL = "https://mainnet.zklighter.elliot.ai/api/v1/exchangeStats"
 BINANCE_API_URL = "https://fapi.binance.com/fapi/v2/ticker/price"
 
 # 轮询间隔(秒)
-POLLING_INTERVAL = 10
+POLLING_INTERVAL = 1
 
 # 获取代理地址(如果存在)
 PROXY_ADDRESS = os.environ.get('proxy_address', '')
@@ -52,7 +52,7 @@ async def fetch_lighter_data(session):
         async with session.get(LIGHTER_API_URL, proxy=proxy) as response:
             if response.status == 200:
                 data = await response.json()
-                logger.info(f"成功获取Lighter数据,包含 {len(data.get('order_book_stats', []))} 个交易对")
+                # logger.info(f"成功获取Lighter数据,包含 {len(data.get('order_book_stats', []))} 个交易对")
                 return data
             else:
                 logger.error(f"获取Lighter数据失败: HTTP {response.status}")
@@ -70,7 +70,7 @@ async def fetch_binance_data(session):
         async with session.get(BINANCE_API_URL, proxy=proxy) as response:
             if response.status == 200:
                 data = await response.json()
-                logger.info(f"成功获取Binance数据,包含 {len(data)} 个交易对")
+                # logger.info(f"成功获取Binance数据,包含 {len(data)} 个交易对")
                 return data
             else:
                 logger.error(f"获取Binance数据失败: HTTP {response.status}")
@@ -83,8 +83,8 @@ async def fetch_binance_data(session):
 async def fetch_all_data():
     """同时从两个交易所获取数据"""
     # 如果设置了代理,则使用代理
-    if PROXY_ADDRESS:
-        logger.info(f"使用代理: {PROXY_ADDRESS}")
+    # if PROXY_ADDRESS:
+    #     logger.info(f"使用代理: {PROXY_ADDRESS}")
     
     # 创建会话时设置代理
     connector = None
@@ -182,34 +182,65 @@ def process_data(lighter_data, binance_data):
                 price_diff_pct,
                 lighter_symbols[base_symbol]['daily_volume']
             )
-    
-    # 打印匹配的交易对
-    logger.info(f"在两个交易所中找到 {len(matching_symbols)} 个匹配的交易对")
-    for symbol_data in matching_symbols:
-        logger.info(f"交易对: {symbol_data['symbol']} | "
-                   f"Binance价格: {symbol_data['binance_price']} | "
-                   f"Lighter价格: {symbol_data['lighter_price']} | "
-                   f"价格差异百分比: {symbol_data['price_diff_pct']:.2f}%")
 
 
 async def main():
     """运行数据收集循环的主函数"""
     logger.info("正在启动行情数据记录器")
     
+    # 添加每小时打印匹配交易对数量的变量
+    last_hourly_report_time = time.time()
+    hourly_matches_count = []
+    
     while True:
         start_time = time.time()
         
         # 从两个交易所获取数据
         lighter_data, binance_data = await fetch_all_data()
         
+        # 处理数据并获取匹配交易对数量
+        matching_count = 0
+        if lighter_data and binance_data:
+            # 从Lighter数据中提取交易对
+            lighter_symbols = {}
+            for item in lighter_data.get('order_book_stats', []):
+                symbol = item.get('symbol')
+                if symbol:
+                    lighter_symbols[symbol] = True
+            
+            # 计算匹配的交易对数量
+            for item in binance_data:
+                binance_symbol = item.get('symbol', '')
+                base_symbol = binance_symbol.replace('USDT', '')
+                if base_symbol in lighter_symbols:
+                    matching_count += 1
+            
+            # 记录本次匹配数量
+            hourly_matches_count.append(matching_count)
+        
         # 处理数据
         process_data(lighter_data, binance_data)
         
+        # 检查是否需要打印每小时报告
+        current_time = time.time()
+        if current_time - last_hourly_report_time >= 60:  # 3600秒 = 1小时
+            if hourly_matches_count:
+                avg_matches = sum(hourly_matches_count) / len(hourly_matches_count)
+                min_matches = min(hourly_matches_count)
+                max_matches = max(hourly_matches_count)
+                logger.info(f"过去一小时内匹配交易对统计: 平均 {avg_matches:.1f} 个, 最少 {min_matches} 个, 最多 {max_matches} 个")
+            else:
+                logger.info("过去一小时内没有匹配到交易对")
+            
+            # 重置计数器和时间
+            last_hourly_report_time = current_time
+            hourly_matches_count = []
+        
         # 计算休眠时间
         elapsed_time = time.time() - start_time
         sleep_time = max(0, POLLING_INTERVAL - elapsed_time)
         
-        logger.info(f"完成数据收集周期,耗时 {elapsed_time:.2f}秒,休眠 {sleep_time:.2f}秒")
+        # logger.info(f"完成数据收集周期,耗时 {elapsed_time:.2f}秒,休眠 {sleep_time:.2f}秒")
         await asyncio.sleep(sleep_time)