Parcourir la source

feat(数据记录): 添加将行情数据存储到QuestDB的功能

新增QuestDB数据库支持,将Lighter和Binance交易所的行情数据通过InfluxDB行协议写入QuestDB。包含表前缀配置、数据写入函数和错误处理逻辑,同时计算并存储价格差异百分比。
skyfffire il y a 1 semaine
Parent
commit
ad5d9aecd7
1 fichiers modifiés avec 64 ajouts et 6 suppressions
  1. 64 6
      src/record/market_data_recorder.py

+ 64 - 6
src/record/market_data_recorder.py

@@ -5,6 +5,7 @@
 行情数据记录脚本
 该脚本同时记录Lighter和Binance交易所的行情数据。
 支持通过环境变量proxy_address设置代理。
+数据将存储到QuestDB数据库中。
 """
 
 import asyncio
@@ -12,6 +13,7 @@ import aiohttp
 import time
 import logging
 import os
+import requests
 from datetime import datetime
 
 # 配置日志
@@ -35,6 +37,12 @@ POLLING_INTERVAL = 10
 # 获取代理地址(如果存在)
 PROXY_ADDRESS = os.environ.get('proxy_address', '')
 
+# QuestDB配置
+QUESTDB_HOST = "127.0.0.1"
+QUESTDB_INFLUX_PORT = 9000
+QUESTDB_TABLE_PREFIX = "lighter_binance"  # 表前缀,用于隔离其他项目的表
+INFLUX_URL = f"http://{QUESTDB_HOST}:{QUESTDB_INFLUX_PORT}/write"
+
 
 async def fetch_lighter_data(session):
     """从Lighter交易所获取行情数据"""
@@ -93,6 +101,41 @@ async def fetch_all_data():
         return lighter_data, binance_data
 
 
+def write_to_questdb(symbol, binance_price, lighter_price, price_diff_pct, lighter_volume):
+    """将数据写入QuestDB"""
+    try:
+        # 获取当前的UTC时间戳(纳秒)
+        timestamp_ns = int(time.time_ns())
+        
+        # 构建表名,使用前缀和交易对作为表名
+        table_name = f"{QUESTDB_TABLE_PREFIX}_{symbol}"
+        
+        # 构建InfluxDB行协议字符串
+        # 格式: <table_name> <field_key>=<field_value>,... <timestamp_ns>
+        line_protocol = (
+            f"{table_name} "
+            f"binance_price={binance_price},"
+            f"lighter_price={lighter_price},"
+            f"price_diff_pct={price_diff_pct},"
+            f"lighter_volume={lighter_volume} "
+            f"{timestamp_ns}"
+        )
+        
+        # 发送POST请求写入数据
+        response = requests.post(INFLUX_URL, data=line_protocol.encode('utf-8'), timeout=5)
+        
+        # 检查响应状态码
+        if response.status_code == 204:
+            logger.debug(f"成功将{symbol}数据写入QuestDB")
+            return True
+        else:
+            logger.error(f"写入{symbol}数据到QuestDB失败: HTTP {response.status_code}, 响应: {response.text}")
+            return False
+    except Exception as e:
+        logger.error(f"写入{symbol}数据到QuestDB时出错: {str(e)}")
+        return False
+
+
 def process_data(lighter_data, binance_data):
     """处理并比较两个交易所的数据"""
     if not lighter_data or not binance_data:
@@ -117,13 +160,28 @@ def process_data(lighter_data, binance_data):
         base_symbol = binance_symbol.replace('USDT', '')
         
         if base_symbol in lighter_symbols:
-            matching_symbols.append({
+            binance_price = float(item.get('price', 0))
+            lighter_price = lighter_symbols[base_symbol]['price']
+            price_diff_pct = ((binance_price - lighter_price) / binance_price * 100) if binance_price > 0 else 0
+            
+            symbol_data = {
                 'symbol': base_symbol,
-                'binance_price': float(item.get('price', 0)),
-                'lighter_price': lighter_symbols[base_symbol]['price'],
+                'binance_price': binance_price,
+                'lighter_price': lighter_price,
                 'binance_symbol': binance_symbol,
-                'lighter_volume': lighter_symbols[base_symbol]['daily_volume']
-            })
+                'lighter_volume': lighter_symbols[base_symbol]['daily_volume'],
+                'price_diff_pct': price_diff_pct
+            }
+            matching_symbols.append(symbol_data)
+            
+            # 将数据写入QuestDB
+            write_to_questdb(
+                base_symbol, 
+                binance_price, 
+                lighter_price, 
+                price_diff_pct,
+                lighter_symbols[base_symbol]['daily_volume']
+            )
     
     # 打印匹配的交易对
     logger.info(f"在两个交易所中找到 {len(matching_symbols)} 个匹配的交易对")
@@ -131,7 +189,7 @@ def process_data(lighter_data, binance_data):
         logger.info(f"交易对: {symbol_data['symbol']} | "
                    f"Binance价格: {symbol_data['binance_price']} | "
                    f"Lighter价格: {symbol_data['lighter_price']} | "
-                   f"价格差异百分比: {((symbol_data['binance_price'] - symbol_data['lighter_price']) / symbol_data['binance_price'] * 100):.2f}%")
+                   f"价格差异百分比: {symbol_data['price_diff_pct']:.2f}%")
 
 
 async def main():