Ver código fonte

feat: 添加行情数据记录脚本并更新依赖

添加新的市场数据记录脚本,支持从Lighter和Binance交易所获取并比较行情数据
更新requirements.txt添加aiohttp依赖
skyfffire 1 semana atrás
pai
commit
8539d52f67
2 arquivos alterados com 166 adições e 1 exclusões
  1. 2 1
      requirements.txt
  2. 164 0
      src/record/market_data_recorder.py

+ 2 - 1
requirements.txt

@@ -1,2 +1,3 @@
 ijson
-requests
+requests
+aiohttp

+ 164 - 0
src/record/market_data_recorder.py

@@ -0,0 +1,164 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+行情数据记录脚本
+该脚本同时记录Lighter和Binance交易所的行情数据。
+支持通过环境变量proxy_address设置代理。
+"""
+
+import asyncio
+import aiohttp
+import time
+import logging
+import os
+from datetime import datetime
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(),
+        logging.FileHandler(f"market_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
+    ]
+)
+logger = logging.getLogger("market_data_recorder")
+
+# API接口地址
+LIGHTER_API_URL = "https://mainnet.zklighter.elliot.ai/api/v1/exchangeStats"
+BINANCE_API_URL = "https://fapi.binance.com/fapi/v2/ticker/price"
+
+# 轮询间隔(秒)
+POLLING_INTERVAL = 10
+
+# 获取代理地址(如果存在)
+PROXY_ADDRESS = os.environ.get('proxy_address', '')
+
+
+async def fetch_lighter_data(session):
+    """从Lighter交易所获取行情数据"""
+    try:
+        # 设置代理参数
+        proxy = 'http://' + PROXY_ADDRESS if PROXY_ADDRESS else None
+        async with session.get(LIGHTER_API_URL, proxy=proxy) as response:
+            if response.status == 200:
+                data = await response.json()
+                logger.info(f"成功获取Lighter数据,包含 {len(data.get('order_book_stats', []))} 个交易对")
+                return data
+            else:
+                logger.error(f"获取Lighter数据失败: HTTP {response.status}")
+                return None
+    except Exception as e:
+        logger.error(f"获取Lighter数据时出错: {str(e)}")
+        return None
+
+
+async def fetch_binance_data(session):
+    """从Binance交易所获取行情数据"""
+    try:
+        # 设置代理参数
+        proxy = 'http://' + PROXY_ADDRESS if PROXY_ADDRESS else None
+        async with session.get(BINANCE_API_URL, proxy=proxy) as response:
+            if response.status == 200:
+                data = await response.json()
+                logger.info(f"成功获取Binance数据,包含 {len(data)} 个交易对")
+                return data
+            else:
+                logger.error(f"获取Binance数据失败: HTTP {response.status}")
+                return None
+    except Exception as e:
+        logger.error(f"获取Binance数据时出错: {str(e)}")
+        return None
+
+
+async def fetch_all_data():
+    """同时从两个交易所获取数据"""
+    # 如果设置了代理,则使用代理
+    if PROXY_ADDRESS:
+        logger.info(f"使用代理: {PROXY_ADDRESS}")
+    
+    # 创建会话时设置代理
+    connector = None
+    if PROXY_ADDRESS:
+        connector = aiohttp.TCPConnector(ssl=False)
+        
+    async with aiohttp.ClientSession(connector=connector) as session:
+        lighter_task = asyncio.create_task(fetch_lighter_data(session))
+        binance_task = asyncio.create_task(fetch_binance_data(session))
+        
+        # 等待两个任务完成
+        lighter_data, binance_data = await asyncio.gather(lighter_task, binance_task)
+        
+        return lighter_data, binance_data
+
+
+def process_data(lighter_data, binance_data):
+    """处理并比较两个交易所的数据"""
+    if not lighter_data or not binance_data:
+        logger.warning("一个或两个交易所的数据缺失")
+        return
+    
+    # 从Lighter数据中提取交易对
+    lighter_symbols = {}
+    for item in lighter_data.get('order_book_stats', []):
+        symbol = item.get('symbol')
+        if symbol:
+            lighter_symbols[symbol] = {
+                'price': item.get('last_trade_price', 0),
+                'daily_volume': item.get('daily_base_token_volume', 0)
+            }
+    
+    # 从Binance数据中提取交易对并比较
+    matching_symbols = []
+    for item in binance_data:
+        binance_symbol = item.get('symbol', '')
+        # 移除USDT后缀以匹配Lighter交易对
+        base_symbol = binance_symbol.replace('USDT', '')
+        
+        if base_symbol in lighter_symbols:
+            matching_symbols.append({
+                'symbol': base_symbol,
+                'binance_price': float(item.get('price', 0)),
+                'lighter_price': lighter_symbols[base_symbol]['price'],
+                'binance_symbol': binance_symbol,
+                'lighter_volume': lighter_symbols[base_symbol]['daily_volume']
+            })
+    
+    # 打印匹配的交易对
+    logger.info(f"在两个交易所中找到 {len(matching_symbols)} 个匹配的交易对")
+    for symbol_data in matching_symbols:
+        logger.info(f"交易对: {symbol_data['symbol']} | "
+                   f"Binance价格: {symbol_data['binance_price']} | "
+                   f"Lighter价格: {symbol_data['lighter_price']} | "
+                   f"价格差异百分比: {((symbol_data['binance_price'] - symbol_data['lighter_price']) / symbol_data['binance_price'] * 100):.2f}%")
+
+
+async def main():
+    """运行数据收集循环的主函数"""
+    logger.info("正在启动行情数据记录器")
+    
+    while True:
+        start_time = time.time()
+        
+        # 从两个交易所获取数据
+        lighter_data, binance_data = await fetch_all_data()
+        
+        # 处理数据
+        process_data(lighter_data, binance_data)
+        
+        # 计算休眠时间
+        elapsed_time = time.time() - start_time
+        sleep_time = max(0, POLLING_INTERVAL - elapsed_time)
+        
+        logger.info(f"完成数据收集周期,耗时 {elapsed_time:.2f}秒,休眠 {sleep_time:.2f}秒")
+        await asyncio.sleep(sleep_time)
+
+
+if __name__ == "__main__":
+    try:
+        asyncio.run(main())
+    except KeyboardInterrupt:
+        logger.info("行情数据记录器被用户停止")
+    except Exception as e:
+        logger.error(f"发生意外错误: {str(e)}")