Kaynağa Gözat

perf(record): 优化市场数据记录器性能,改用批量写入方式

将单条写入改为批量写入QuestDB,减少网络请求次数
提升数据写入效率,降低系统负载
skyfffire 1 hafta önce
ebeveyn
işleme
c7cbb1e554
1 değiştirilmiş dosya ile 42 ekleme ve 31 silme
  1. 42 31
      src/record/market_data_recorder.py

+ 42 - 31
src/record/market_data_recorder.py

@@ -101,38 +101,54 @@ async def fetch_all_data():
         return lighter_data, binance_data
 
 
-def write_to_questdb(symbol, binance_price, lighter_price, price_diff_pct, lighter_volume):
-    """将数据写入QuestDB"""
+def write_batch_to_questdb(data_batch):
+    """批量将数据写入QuestDB"""
+    if not data_batch:
+        return True
+        
     try:
         # 获取当前的UTC时间戳(纳秒)
         timestamp_ns = int(time.time_ns())
         
-        # 构建表名,使用前缀和交易对作为表名
-        table_name = f"{QUESTDB_TABLE_PREFIX}_{symbol}"
+        # 构建批量InfluxDB行协议字符串
+        lines = []
+        for item in data_batch:
+            symbol = item['symbol']
+            binance_price = item['binance_price']
+            lighter_price = item['lighter_price']
+            price_diff_pct = item['price_diff_pct']
+            lighter_volume = item['lighter_volume']
+            
+            # 构建表名,使用前缀和交易对作为表名
+            table_name = f"{QUESTDB_TABLE_PREFIX}_{symbol}"
+            
+            # 构建InfluxDB行协议字符串
+            # 格式: <table_name> <field_key>=<field_value>,... <timestamp_ns>
+            line_protocol = (
+                f"{table_name} "
+                f"binance_price={binance_price},"
+                f"lighter_price={lighter_price},"
+                f"price_diff_pct={price_diff_pct},"
+                f"lighter_volume={lighter_volume} "
+                f"{timestamp_ns}"
+            )
+            lines.append(line_protocol)
         
-        # 构建InfluxDB行协议字符串
-        # 格式: <table_name> <field_key>=<field_value>,... <timestamp_ns>
-        line_protocol = (
-            f"{table_name} "
-            f"binance_price={binance_price},"
-            f"lighter_price={lighter_price},"
-            f"price_diff_pct={price_diff_pct},"
-            f"lighter_volume={lighter_volume} "
-            f"{timestamp_ns}"
-        )
+        # 将所有行合并为一个批量请求
+        batch_data = '\n'.join(lines)
         
-        # 发送POST请求写入数据
-        response = requests.post(INFLUX_URL, data=line_protocol.encode('utf-8'), timeout=5)
+        # 发送POST请求批量写入数据
+        response = requests.post(INFLUX_URL, data=batch_data.encode('utf-8'), timeout=10)
         
         # 检查响应状态码
         if response.status_code == 204:
-            logger.debug(f"成功将{symbol}数据写入QuestDB")
+            logger.info(f"成功批量写入{len(data_batch)}个交易对数据到QuestDB")
             return True
         else:
-            logger.error(f"写入{symbol}数据到QuestDB失败: HTTP {response.status_code}, 响应: {response.text}")
+            logger.error(f"批量写入数据到QuestDB失败: HTTP {response.status_code}, 响应: {response.text}")
             return False
     except Exception as e:
-        logger.error(f"写入{symbol}数据到QuestDB时出错: {str(e)}")
+        logger.error(f"批量写入数据到QuestDB时出错: {str(e)}")
         return False
 
 
@@ -152,8 +168,8 @@ def process_data(lighter_data, binance_data):
                 'daily_volume': item.get('daily_base_token_volume', 0)
             }
     
-    # 从Binance数据中提取交易对并比较
-    matching_symbols = []
+    # 从Binance数据中提取交易对并收集匹配的数据
+    batch_data = []
     for item in binance_data:
         binance_symbol = item.get('symbol', '')
         # 移除USDT后缀以匹配Lighter交易对
@@ -172,16 +188,11 @@ def process_data(lighter_data, binance_data):
                 'lighter_volume': lighter_symbols[base_symbol]['daily_volume'],
                 'price_diff_pct': price_diff_pct
             }
-            matching_symbols.append(symbol_data)
-            
-            # 将数据写入QuestDB
-            write_to_questdb(
-                base_symbol, 
-                binance_price, 
-                lighter_price, 
-                price_diff_pct,
-                lighter_symbols[base_symbol]['daily_volume']
-            )
+            batch_data.append(symbol_data)
+    
+    # 批量写入所有匹配的数据到QuestDB
+    if batch_data:
+        write_batch_to_questdb(batch_data)
 
 
 async def main():