浏览代码

feat(market_data): 添加Binance标记价格采集并优化数据处理流程

- 新增Binance标记价格和最新价格API端点
- 实现Binance数据缓存和100ms高频采集任务
- 重构数据处理逻辑使用缓存数据替代实时请求
- 在数据库写入中添加标记价格字段
- 将统计报告间隔从1小时改为10分钟
skyfffire 1 周之前
父节点
当前提交
7a7365cdb2
共有 1 个文件被更改,包括 126 次插入52 次删除
  1. 126 52
      src/record/market_data_recorder.py

+ 126 - 52
src/record/market_data_recorder.py

@@ -39,6 +39,8 @@ LIGHTER_API_URL = "https://mainnet.zklighter.elliot.ai/api/v1/exchangeStats"
 LIGHTER_ORDERBOOKS_URL = "https://mainnet.zklighter.elliot.ai/api/v1/orderBooks"
 LIGHTER_WEBSOCKET_URL = "wss://mainnet.zklighter.elliot.ai/stream"
 BINANCE_API_URL = "https://fapi.binance.com/fapi/v2/ticker/price"
+BINANCE_PREMIUM_INDEX_URL = "https://fapi.binance.com/fapi/v1/premiumIndex"
+BINANCE_TICKER_PRICE_URL = "https://fapi.binance.com/fapi/v2/ticker/price"
 
 # 轮询间隔(秒)
 POLLING_INTERVAL = 1
@@ -58,6 +60,12 @@ market_id_to_symbol = {}
 # 全局变量存储最新的Market Stats数据
 market_stats_cache = {}
 
+# 全局变量存储Binance数据缓存
+binance_data_cache = {
+    'mark_prices': {},      # 存储标记价格数据 {symbol: mark_price}
+    'latest_prices': {}     # 存储最新价格数据 {symbol: price}
+}
+
 
 async def fetch_lighter_orderbooks(session):
     """从Lighter交易所获取orderBooks数据以建立market_id映射"""
@@ -114,6 +122,76 @@ async def fetch_binance_data(session):
         return None
 
 
+async def fetch_binance_premium_index(session):
+    """从Binance获取标记价格数据"""
+    try:
+        proxy = 'http://' + PROXY_ADDRESS if PROXY_ADDRESS else None
+        async with session.get(BINANCE_PREMIUM_INDEX_URL, proxy=proxy) as response:
+            if response.status == 200:
+                data = await response.json()
+                return data
+            else:
+                logger.error(f"获取Binance标记价格数据失败: HTTP {response.status}")
+                return None
+    except Exception as e:
+        logger.error(f"获取Binance标记价格数据时出错: {str(e)}")
+        return None
+
+
+async def fetch_binance_ticker_price(session):
+    """从Binance获取最新价格数据"""
+    try:
+        proxy = 'http://' + PROXY_ADDRESS if PROXY_ADDRESS else None
+        async with session.get(BINANCE_TICKER_PRICE_URL, proxy=proxy) as response:
+            if response.status == 200:
+                data = await response.json()
+                return data
+            else:
+                logger.error(f"获取Binance最新价格数据失败: HTTP {response.status}")
+                return None
+    except Exception as e:
+        logger.error(f"获取Binance最新价格数据时出错: {str(e)}")
+        return None
+
+
+async def handle_binance_data_collection():
+    """处理Binance数据收集的主循环,每100ms请求一次"""
+    logger.info("开始Binance数据收集任务")
+    
+    while True:
+        try:
+            async with aiohttp.ClientSession() as session:
+                # 同时发起两个请求
+                premium_task = asyncio.create_task(fetch_binance_premium_index(session))
+                price_task = asyncio.create_task(fetch_binance_ticker_price(session))
+                
+                # 等待两个请求完成
+                premium_data, price_data = await asyncio.gather(premium_task, price_task, return_exceptions=True)
+                
+                # 处理标记价格数据
+                if isinstance(premium_data, list) and premium_data:
+                    for item in premium_data:
+                        symbol = item.get('symbol')
+                        mark_price = item.get('markPrice')
+                        if symbol and mark_price:
+                            binance_data_cache['mark_prices'][symbol] = float(mark_price)
+                
+                # 处理最新价格数据
+                if isinstance(price_data, list) and price_data:
+                    for item in price_data:
+                        symbol = item.get('symbol')
+                        price = item.get('price')
+                        if symbol and price:
+                            binance_data_cache['latest_prices'][symbol] = float(price)
+                
+                # 每100ms请求一次
+                await asyncio.sleep(0.1)
+                
+        except Exception as e:
+            logger.error(f"Binance数据收集出错: {str(e)}")
+            await asyncio.sleep(1)  # 出错时等待1秒再重试
+
+
 async def fetch_all_data():
     """同时从两个交易所获取数据"""
     # 如果设置了代理,则使用代理
@@ -220,13 +298,14 @@ def write_batch_to_questdb(data_batch):
         lines = []
         for data in data_batch:
             symbol = data['symbol']
+            binance_mark_price = data['binance_mark_price']
             binance_price = data['binance_price']
             lighter_mark_price = data['lighter_mark_price']
             lighter_price = data['lighter_price']
             
             # 构建InfluxDB Line Protocol格式
             # 格式: measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp
-            line = f"{QUESTDB_TABLE_PREFIX}_market_data,symbol={symbol} binance_price={binance_price},lighter_mark_price={lighter_mark_price},lighter_price={lighter_price}"
+            line = f"{QUESTDB_TABLE_PREFIX}_market_data,symbol={symbol} binance_mark_price={binance_mark_price},binance_price={binance_price},lighter_mark_price={lighter_mark_price},lighter_price={lighter_price}"
             lines.append(line)
         
         # 将所有行合并为一个字符串,用换行符分隔
@@ -252,12 +331,12 @@ def write_batch_to_questdb(data_batch):
         return False
 
 
-def process_data(binance_data):
+def process_data():
     """
     处理从两个交易所获取的数据,并将匹配的交易对数据保存到数据库
-    现在使用market_stats_cache作为Lighter数据源
+    现在使用market_stats_cache作为Lighter数据源,binance_data_cache作为Binance数据源
     """
-    if not binance_data or not market_stats_cache:
+    if not market_stats_cache or not binance_data_cache['mark_prices'] or not binance_data_cache['latest_prices']:
         logger.warning("缺少必要的数据,跳过本次处理")
         return 0
     
@@ -265,37 +344,42 @@ def process_data(binance_data):
     batch_data = []
     matching_count = 0
     
-    # 遍历Binance数据
-    for binance_item in binance_data:
-        binance_symbol = binance_item.get('symbol', '')
-        base_symbol = binance_symbol.replace('USDT', '')
-        binance_price = binance_item.get('price')
+    # 遍历market_id_to_symbol映射,查找匹配的交易对
+    for market_id, symbol in market_id_to_symbol.items():
+        # 检查Lighter数据是否存在
+        if market_id not in market_stats_cache:
+            continue
+            
+        # 构造Binance交易对名称
+        binance_symbol = f"{symbol}USDT"
         
-        if not binance_price:
+        # 检查Binance数据是否存在
+        if (binance_symbol not in binance_data_cache['mark_prices'] or 
+            binance_symbol not in binance_data_cache['latest_prices']):
             continue
         
-        # 在market_stats_cache中查找对应的Lighter数据
-        for market_id, symbol in market_id_to_symbol.items():
-            if symbol == base_symbol and market_id in market_stats_cache:
-                lighter_stats = market_stats_cache[market_id]
-                
-                # 构建数据记录
-                symbol_data = {
-                    'symbol': base_symbol,
-                    'binance_price': float(binance_price),
-                    'lighter_mark_price': lighter_stats.get('mark_price'),
-                    'lighter_price': lighter_stats.get('last_trade_price'),
-                    'timestamp': int(time.time() * 1000)  # 毫秒时间戳
-                }
-                
-                batch_data.append(symbol_data)
-                matching_count += 1
-                break
+        # 获取数据
+        lighter_stats = market_stats_cache[market_id]
+        binance_mark_price = binance_data_cache['mark_prices'][binance_symbol]
+        binance_latest_price = binance_data_cache['latest_prices'][binance_symbol]
+        
+        # 构建数据记录
+        symbol_data = {
+            'symbol': symbol,
+            'binance_mark_price': binance_mark_price,
+            'binance_price': binance_latest_price,
+            'lighter_mark_price': lighter_stats.get('mark_price'),
+            'lighter_price': lighter_stats.get('last_trade_price'),
+            'timestamp': int(time.time() * 1000)  # 毫秒时间戳
+        }
+        
+        batch_data.append(symbol_data)
+        matching_count += 1
     
     # 如果有匹配的数据,批量写入数据库
     if batch_data:
-        logger.info(f"找到 {matching_count} 个匹配的交易对,准备写入数据库")
-        # write_batch_to_questdb(batch_data)  # 暂时注释掉数据库写入
+        # logger.info(f"找到 {matching_count} 个匹配的交易对,准备写入数据库")
+        write_batch_to_questdb(batch_data)
     else:
         logger.warning("没有找到匹配的交易对数据")
     
@@ -324,42 +408,31 @@ async def main():
     websocket_task = asyncio.create_task(handle_market_stats_websocket())
     logger.info("已启动Market Stats WebSocket任务")
     
+    # 启动Binance数据收集任务
+    binance_task = asyncio.create_task(handle_binance_data_collection())
+    logger.info("已启动Binance数据收集任务")
+    
     try:
         while True:
             start_time = time.time()
             
-            # 从Binance获取数据(Lighter数据现在从WebSocket获取
-            _, binance_data = await fetch_all_data()
+            # 处理数据(现在使用缓存中的数据
+            matching_count = process_data()
             
-            # 处理数据并获取匹配交易对数量
-            matching_count = 0
-            if binance_data and market_stats_cache:
-                # 计算匹配的交易对数量
-                for item in binance_data:
-                    binance_symbol = item.get('symbol', '')
-                    base_symbol = binance_symbol.replace('USDT', '')
-                    # 检查是否在Market Stats缓存中有对应的数据
-                    for market_id, symbol in market_id_to_symbol.items():
-                        if symbol == base_symbol and market_id in market_stats_cache:
-                            matching_count += 1
-                            break
-                
-                # 记录本次匹配数量
+            # 记录本次匹配数量
+            if matching_count > 0:
                 hourly_matches_count.append(matching_count)
             
-            # 处理数据(使用Market Stats缓存中的数据)
-            process_data(binance_data)
-            
-            # 检查是否需要打印每小时报告
+            # 检查是否需要打印每十分钟报告
             current_time = time.time()
-            if current_time - last_hourly_report_time >= 3600:  # 3600秒 = 1小时
+            if current_time - last_hourly_report_time >= 600:  # 600秒 = 10分钟
                 if hourly_matches_count:
                     avg_matches = sum(hourly_matches_count) / len(hourly_matches_count)
                     min_matches = min(hourly_matches_count)
                     max_matches = max(hourly_matches_count)
-                    logger.info(f"过去一小时内匹配交易对统计: 平均 {avg_matches:.1f} 个, 最少 {min_matches} 个, 最多 {max_matches} 个")
+                    logger.info(f"过去十分钟内匹配交易对统计: 平均 {avg_matches:.1f} 个, 最少 {min_matches} 个, 最多 {max_matches} 个")
                 else:
-                    logger.info("过去一小时内没有匹配到交易对")
+                    logger.info("过去十分钟内没有匹配到交易对")
                 
                 # 重置计数器和时间
                 last_hourly_report_time = current_time
@@ -375,6 +448,7 @@ async def main():
     except Exception as e:
         logger.error(f"主循环出错: {str(e)}")
         websocket_task.cancel()
+        binance_task.cancel()
         raise