Pārlūkot izejas kodu

优化CPU占用:使用持久session、减少数据处理频率、优化字典查询

skyfffire 1 dienu atpakaļ
vecāks
revīzija
4c9f343071
1 mainītis faili ar 70 papildinājumiem un 61 dzēšanām
  1. 70 61
      src/record/market_data_recorder.py

+ 70 - 61
src/record/market_data_recorder.py

@@ -77,6 +77,9 @@ binance_data_cache = {
 # 全局策略实例
 trading_strategy = None
 
+# 全局aiohttp session,避免重复创建
+binance_session = None
+
 
 async def fetch_lighter_orderbooks(session):
     """从Lighter交易所获取orderBooks数据以建立market_id映射"""
@@ -134,35 +137,39 @@ def update_market_id_mapping(orderbooks_data):
     global market_id_to_orderbook
     if not orderbooks_data:
         return
-    
+
     try:
         for orderbook in orderbooks_data:
             market_id = orderbook.get('market_id')
             symbol = orderbook.get('symbol')
             if market_id is not None and symbol:
                 market_id_to_orderbook[market_id] = orderbook
-        
-        # 只打印映射数量和symbol信息,避免日志过长
-        symbols_info = {market_id: orderbook.get('symbol') for market_id, orderbook in market_id_to_orderbook.items()}
-        logger.info(f"更新market_id映射,共 {len(market_id_to_orderbook)} 个交易对,{symbols_info}")
+
+        # 只打印映射数量,避免日志过长和CPU占用
+        logger.info(f"更新market_id映射,共 {len(market_id_to_orderbook)} 个交易对")
     except Exception as e:
         logger.error(f"更新market_id映射时出错: {str(e)}")
 
 
 async def handle_binance_data_collection():
     """处理Binance数据收集的主循环,每300ms请求一次"""
+    global binance_session
     logger.info("开始Binance数据收集任务")
-    
-    while True:
-        try:
-            async with aiohttp.ClientSession() as session:
+
+    # 创建一个持久的session,避免重复创建
+    connector = aiohttp.TCPConnector(limit=10, limit_per_host=5)
+    binance_session = aiohttp.ClientSession(connector=connector)
+
+    try:
+        while True:
+            try:
                 # 同时发起两个请求
-                premium_task = asyncio.create_task(fetch_binance_premium_index(session))
-                price_task = asyncio.create_task(fetch_binance_ticker_price(session))
-                
+                premium_task = asyncio.create_task(fetch_binance_premium_index(binance_session))
+                price_task = asyncio.create_task(fetch_binance_ticker_price(binance_session))
+
                 # 等待两个请求完成
                 premium_data, price_data = await asyncio.gather(premium_task, price_task, return_exceptions=True)
-                
+
                 # 处理标记价格数据
                 if isinstance(premium_data, list) and premium_data:
                     for item in premium_data:
@@ -170,7 +177,7 @@ async def handle_binance_data_collection():
                         mark_price = item.get('markPrice')
                         if symbol and mark_price:
                             binance_data_cache['mark_prices'][symbol] = float(mark_price)
-                
+
                 # 处理最新价格数据
                 if isinstance(price_data, list) and price_data:
                     for item in price_data:
@@ -178,18 +185,21 @@ async def handle_binance_data_collection():
                         price = item.get('price')
                         if symbol and price:
                             binance_data_cache['latest_prices'][symbol] = float(price)
-                
+
                 # 触发策略更新
                 # await trigger_strategy_update()
-                
+
                 # 每300ms请求一次
                 await asyncio.sleep(0.3)
-                
-        except Exception as e:
-            import traceback
-            error_info = traceback.format_exc()
-            logger.error(f"Binance数据收集出错: {str(e)}\n{error_info}")
-            await asyncio.sleep(1)  # 出错时等待1秒再重试
+
+            except Exception as e:
+                import traceback
+                error_info = traceback.format_exc()
+                logger.error(f"Binance数据收集出错: {str(e)}\n{error_info}")
+                await asyncio.sleep(1)  # 出错时等待1秒再重试
+    finally:
+        if binance_session:
+            await binance_session.close()
 
 
 async def handle_market_stats_websocket():
@@ -355,54 +365,52 @@ def process_data():
     现在使用market_stats_cache作为Lighter数据源,binance_data_cache作为Binance数据源
     """
     if not market_stats_cache or not binance_data_cache['mark_prices'] or not binance_data_cache['latest_prices']:
-        logger.warning("缺少必要的数据,跳过本次处理")
         return 0
-    
+
     # 准备批量写入的数据
     batch_data = []
     matching_count = 0
-    
+
+    # 缓存binance数据的引用,避免重复字典查询
+    mark_prices = binance_data_cache['mark_prices']
+    latest_prices = binance_data_cache['latest_prices']
+    current_timestamp = int(time.time() * 1000)
+
     # 遍历market_id_to_orderbook映射,查找匹配的交易对
     for market_id, orderbook in market_id_to_orderbook.items():
         # 检查Lighter数据是否存在
         if market_id not in market_stats_cache:
             continue
-            
+
         # 获取交易对符号
         symbol = orderbook.get('symbol')
         # 构造Binance交易对名称
         binance_symbol = f"{symbol}USDT"
-        
-        # 检查Binance数据是否存在
-        if (binance_symbol not in binance_data_cache['mark_prices'] or 
-            binance_symbol not in binance_data_cache['latest_prices']):
+
+        # 检查Binance数据是否存在(使用in操作符更快)
+        if binance_symbol not in mark_prices or binance_symbol not in latest_prices:
             continue
-        
+
         # 获取数据
         lighter_stats = market_stats_cache[market_id]
-        binance_mark_price = binance_data_cache['mark_prices'][binance_symbol]
-        binance_latest_price = binance_data_cache['latest_prices'][binance_symbol]
-        
+
         # 构建数据记录
         symbol_data = {
             'symbol': symbol,
-            'binance_mark_price': binance_mark_price,
-            'binance_price': binance_latest_price,
+            'binance_mark_price': mark_prices[binance_symbol],
+            'binance_price': latest_prices[binance_symbol],
             'lighter_mark_price': lighter_stats.get('mark_price'),
             'lighter_price': lighter_stats.get('last_trade_price'),
-            'timestamp': int(time.time() * 1000)  # 毫秒时间戳
+            'timestamp': current_timestamp
         }
-        
+
         batch_data.append(symbol_data)
         matching_count += 1
-    
+
     # 如果有匹配的数据,批量写入数据库
     if batch_data:
-        # logger.info(f"找到 {matching_count} 个匹配的交易对,准备写入数据库")
         write_batch_to_questdb(batch_data)
-    else:
-        logger.warning("没有找到匹配的交易对数据")
-    
+
     return matching_count
 
 
@@ -439,18 +447,23 @@ async def main():
     logger.info("已启动Binance数据收集任务")
     
     try:
+        process_interval = 5  # 每5秒处理一次数据,而不是每秒
+        last_process_time = time.time()
+
         while True:
-            start_time = time.time()
-            
-            # 处理数据(现在使用缓存中的数据)
-            matching_count = process_data()
-            
-            # 记录本次匹配数量
-            if matching_count > 0:
-                hourly_matches_count.append(matching_count)
-            
-            # 检查是否需要打印每十分钟报告
             current_time = time.time()
+
+            # 每5秒处理一次数据,减少CPU占用
+            if current_time - last_process_time >= process_interval:
+                matching_count = process_data()
+
+                # 记录本次匹配数量
+                if matching_count > 0:
+                    hourly_matches_count.append(matching_count)
+
+                last_process_time = current_time
+
+            # 检查是否需要打印每十分钟报告
             if current_time - last_hourly_report_time >= 600:  # 600秒 = 10分钟
                 if hourly_matches_count:
                     avg_matches = sum(hourly_matches_count) / len(hourly_matches_count)
@@ -459,17 +472,13 @@ async def main():
                     logger.info(f"过去十分钟内匹配交易对统计: 平均 {avg_matches:.1f} 个, 最少 {min_matches} 个, 最多 {max_matches} 个")
                 else:
                     logger.info("过去十分钟内没有匹配到交易对")
-                
+
                 # 重置计数器和时间
                 last_hourly_report_time = current_time
                 hourly_matches_count = []
-            
-            # 计算休眠时间
-            elapsed_time = time.time() - start_time
-            sleep_time = max(0, POLLING_INTERVAL - elapsed_time)
-            
-            # logger.info(f"完成数据收集周期,耗时 {elapsed_time:.2f}秒,休眠 {sleep_time:.2f}秒")
-            await asyncio.sleep(sleep_time)
+
+            # 使用较长的sleep时间,减少循环频率
+            await asyncio.sleep(0.5)  # 每500ms检查一次,而不是每秒
     
     except Exception as e:
         logger.error(f"主循环出错: {str(e)}")