Explorar el Código

feat(record): 添加Lighter交易所orderBooks数据获取和market_id映射功能

添加新的API接口地址和功能函数用于获取Lighter交易所的orderBooks数据
新增全局变量存储market_id到symbol的映射关系
实现初始化market_id映射的逻辑
skyfffire hace 1 semana
padre
commit
0150981056
Se han modificado 1 ficheros con 52 adiciones y 2 borrados
  1. 52 2
      src/record/market_data_recorder.py

+ 52 - 2
src/record/market_data_recorder.py

@@ -29,6 +29,7 @@ logger = logging.getLogger("market_data_recorder")
 
 # API接口地址
 LIGHTER_API_URL = "https://mainnet.zklighter.elliot.ai/api/v1/exchangeStats"
+LIGHTER_ORDERBOOKS_URL = "https://mainnet.zklighter.elliot.ai/api/v1/orderBooks"
 BINANCE_API_URL = "https://fapi.binance.com/fapi/v2/ticker/price"
 
 # 轮询间隔(秒)
@@ -43,6 +44,27 @@ QUESTDB_INFLUX_PORT = 9000
 QUESTDB_TABLE_PREFIX = "lighter_binance"  # 表前缀,用于隔离其他项目的表
 INFLUX_URL = f"http://{QUESTDB_HOST}:{QUESTDB_INFLUX_PORT}/write"
 
+# 全局变量存储market_id到symbol的映射
+market_id_to_symbol = {}
+
+
+async def fetch_lighter_orderbooks(session):
+    """从Lighter交易所获取orderBooks数据以建立market_id映射"""
+    try:
+        # 设置代理参数
+        proxy = 'http://' + PROXY_ADDRESS if PROXY_ADDRESS else None
+        async with session.get(LIGHTER_ORDERBOOKS_URL, proxy=proxy) as response:
+            if response.status == 200:
+                data = await response.json()
+                logger.info(f"成功获取Lighter orderBooks数据,包含 {len(data)} 个交易对")
+                return data
+            else:
+                logger.error(f"获取Lighter orderBooks数据失败: HTTP {response.status}")
+                return None
+    except Exception as e:
+        logger.error(f"获取Lighter orderBooks数据时出错: {str(e)}")
+        return None
+
 
 async def fetch_lighter_data(session):
     """从Lighter交易所获取行情数据"""
@@ -101,6 +123,24 @@ async def fetch_all_data():
         return lighter_data, binance_data
 
 
+def update_market_id_mapping(orderbooks_data):
+    """更新market_id到symbol的映射"""
+    global market_id_to_symbol
+    if not orderbooks_data:
+        return
+    
+    try:
+        for orderbook in orderbooks_data:
+            market_id = orderbook.get('market_id')
+            symbol = orderbook.get('symbol')
+            if market_id is not None and symbol:
+                market_id_to_symbol[market_id] = symbol
+        
+        logger.info(f"更新market_id映射,共 {len(market_id_to_symbol)} 个交易对,{market_id_to_symbol}")
+    except Exception as e:
+        logger.error(f"更新market_id映射时出错: {str(e)}")
+
+
 def write_batch_to_questdb(data_batch):
     """批量将数据写入QuestDB"""
     if not data_batch:
@@ -183,8 +223,8 @@ def process_data(lighter_data, binance_data):
             batch_data.append(symbol_data)
     
     # 批量写入所有匹配的数据到QuestDB
-    if batch_data:
-        write_batch_to_questdb(batch_data)
+    # if batch_data:
+    #     write_batch_to_questdb(batch_data)
 
 
 async def main():
@@ -195,6 +235,16 @@ async def main():
     last_hourly_report_time = time.time()
     hourly_matches_count = []
     
+    # 初始化market_id映射
+    logger.info("正在初始化market_id映射...")
+    connector = None
+    if PROXY_ADDRESS:
+        connector = aiohttp.TCPConnector(ssl=False)
+        
+    async with aiohttp.ClientSession(connector=connector) as session:
+        orderbooks_data = await fetch_lighter_orderbooks(session)
+        update_market_id_mapping(orderbooks_data)
+    
     while True:
         start_time = time.time()