소스 검색

feat(策略): 添加基于Lighter和Binance价差的交易策略实现

实现交易策略模块,包含状态机和价差监控逻辑
在market_data_recorder中集成策略触发机制
skyfffire 1 주 전
부모
커밋
6629c1d61a
2개의 변경된 파일198개의 추가작업 그리고 18개의 파일을 삭제
  1. 67 18
      src/record/market_data_recorder.py
  2. 131 0
      src/record/strategy.py

+ 67 - 18
src/record/market_data_recorder.py

@@ -17,6 +17,7 @@ import logging
 import os
 import requests
 from datetime import datetime
+from strategy import TradingStrategy
 
 # 配置日志
 # 创建logs目录(如果不存在)
@@ -65,6 +66,9 @@ binance_data_cache = {
     'latest_prices': {}     # 存储最新价格数据 {symbol: price}
 }
 
+# 全局策略实例
+trading_strategy = None
+
 
 async def fetch_lighter_orderbooks(session):
     """从Lighter交易所获取orderBooks数据以建立market_id映射"""
@@ -117,6 +121,24 @@ async def fetch_binance_ticker_price(session):
         return None
 
 
+def update_market_id_mapping(orderbooks_data):
+    """更新market_id到symbol的映射"""
+    global market_id_to_symbol
+    if not orderbooks_data:
+        return
+    
+    try:
+        for orderbook in orderbooks_data:
+            market_id = orderbook.get('market_id')
+            symbol = orderbook.get('symbol')
+            if market_id is not None and symbol:
+                market_id_to_symbol[market_id] = symbol
+        
+        logger.info(f"更新market_id映射,共 {len(market_id_to_symbol)} 个交易对,{market_id_to_symbol}")
+    except Exception as e:
+        logger.error(f"更新market_id映射时出错: {str(e)}")
+
+
 async def handle_binance_data_collection():
     """处理Binance数据收集的主循环,每300ms请求一次"""
     logger.info("开始Binance数据收集任务")
@@ -147,6 +169,9 @@ async def handle_binance_data_collection():
                         if symbol and price:
                             binance_data_cache['latest_prices'][symbol] = float(price)
                 
+                # 触发策略更新
+                trigger_strategy_update()
+                
                 # 每300ms请求一次
                 await asyncio.sleep(0.3)
                 
@@ -155,24 +180,6 @@ async def handle_binance_data_collection():
             await asyncio.sleep(1)  # 出错时等待1秒再重试
 
 
-def update_market_id_mapping(orderbooks_data):
-    """更新market_id到symbol的映射"""
-    global market_id_to_symbol
-    if not orderbooks_data:
-        return
-    
-    try:
-        for orderbook in orderbooks_data:
-            market_id = orderbook.get('market_id')
-            symbol = orderbook.get('symbol')
-            if market_id is not None and symbol:
-                market_id_to_symbol[market_id] = symbol
-        
-        logger.info(f"更新market_id映射,共 {len(market_id_to_symbol)} 个交易对,{market_id_to_symbol}")
-    except Exception as e:
-        logger.error(f"更新market_id映射时出错: {str(e)}")
-
-
 async def handle_market_stats_websocket():
     """
     处理Lighter Market Stats WebSocket连接
@@ -227,6 +234,9 @@ async def handle_market_stats_websocket():
                                 
                                 symbol = market_id_to_symbol.get(market_id, f"UNKNOWN_{market_id}")
                                 logger.debug(f"更新Market Stats缓存 - {symbol}(ID:{market_id}): mark_price={market_data.get('mark_price')}, last_trade_price={market_data.get('last_trade_price')}")
+                            
+                            # # 触发策略更新
+                            # trigger_strategy_update()
                         
                         # 处理订阅确认消息
                         elif message_type == "subscribed/market_stats":
@@ -292,6 +302,39 @@ def write_batch_to_questdb(data_batch):
         return False
 
 
+def trigger_strategy_update():
+    """
+    触发策略更新,将最新的市场数据传递给策略
+    """
+    global trading_strategy
+    
+    if trading_strategy is None:
+        return
+    
+    # 遍历所有匹配的交易对,调用策略
+    for market_id, symbol in market_id_to_symbol.items():
+        if market_id not in market_stats_cache:
+            continue
+        
+        binance_symbol = f"{symbol}USDT"
+        if (binance_symbol not in binance_data_cache['mark_prices'] or 
+            binance_symbol not in binance_data_cache['latest_prices']):
+            continue
+        
+        lighter_stats = market_stats_cache[market_id]
+        market_data = {
+            'symbol': symbol,
+            'binance_mark_price': binance_data_cache['mark_prices'][binance_symbol],
+            'binance_price': binance_data_cache['latest_prices'][binance_symbol],
+            'lighter_mark_price': lighter_stats.get('mark_price'),
+            'lighter_price': lighter_stats.get('last_trade_price'),
+            'timestamp': int(time.time() * 1000)
+        }
+        
+        # 调用策略
+        trading_strategy.do_strategy(market_data)
+
+
 def process_data():
     """
     处理从两个交易所获取的数据,并将匹配的交易对数据保存到数据库
@@ -349,8 +392,14 @@ def process_data():
 
 async def main():
     """运行数据收集循环的主函数"""
+    global trading_strategy
+    
     logger.info("正在启动行情数据记录器")
     
+    # 初始化策略
+    trading_strategy = TradingStrategy()
+    logger.info("交易策略已初始化")
+    
     # 添加每小时打印匹配交易对数量的变量
     last_hourly_report_time = time.time()
     hourly_matches_count = []

+ 131 - 0
src/record/strategy.py

@@ -0,0 +1,131 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+交易策略模块
+实现基于Lighter和Binance价差的交易策略
+"""
+
+import logging
+from enum import Enum
+from datetime import datetime
+import os
+
+# 配置日志
+logs_dir = "logs"
+if not os.path.exists(logs_dir):
+    os.makedirs(logs_dir)
+
+logger = logging.getLogger("strategy")
+
+
+class StrategyState(Enum):
+    """策略状态枚举"""
+    WAITING_INIT = 1  # 等待初始化
+    IDLE_MONITORING = 2  # 空闲状态监听差价
+    EXECUTING_TRADE = 3  # 差价达成触发交易
+    WAITING_CONVERGENCE = 4  # 交易完成等待价差收敛
+    CLOSING_POSITION = 5  # 收敛完成进行平仓
+    POSITION_CLOSED = 6  # 平仓完成
+
+
+class TradingStrategy:
+    """交易策略类"""
+    
+    def __init__(self):
+        """初始化策略"""
+        self.state = StrategyState.WAITING_INIT
+        self.current_position = None  # 当前持仓信息
+        self.entry_price_diff = None  # 入场时的价差
+        self.target_symbol = "DOGE"  # 目标交易对
+        
+        logger.info("策略初始化完成,当前状态: WAITING_INIT")
+    
+    def do_strategy(self, market_data):
+        """
+        执行策略逻辑
+        
+        Args:
+            market_data: 包含市场数据的字典,格式:
+            {
+                'symbol': str,
+                'binance_mark_price': float,
+                'binance_price': float,
+                'lighter_mark_price': float,
+                'lighter_price': float,
+                'timestamp': int
+            }
+        """
+        if not market_data:
+            return
+        
+        symbol = market_data.get('symbol')
+        
+        # 如果是DOGE交易对,打印实时行情
+        if symbol == self.target_symbol:
+            self._print_market_data(market_data)
+        
+        # 根据当前状态执行相应逻辑
+        if self.state == StrategyState.WAITING_INIT:
+            self._handle_waiting_init()
+        elif self.state == StrategyState.IDLE_MONITORING:
+            self._handle_idle_monitoring(market_data)
+        elif self.state == StrategyState.EXECUTING_TRADE:
+            self._handle_executing_trade(market_data)
+        elif self.state == StrategyState.WAITING_CONVERGENCE:
+            self._handle_waiting_convergence(market_data)
+        elif self.state == StrategyState.CLOSING_POSITION:
+            self._handle_closing_position(market_data)
+        elif self.state == StrategyState.POSITION_CLOSED:
+            self._handle_position_closed()
+    
+    def _print_market_data(self, market_data):
+        """打印市场数据"""
+        symbol = market_data.get('symbol')
+        binance_mark = market_data.get('binance_mark_price')
+        binance_price = market_data.get('binance_price')
+        lighter_mark = market_data.get('lighter_mark_price')
+        lighter_price = market_data.get('lighter_price')
+        
+        # 计算价差
+        if binance_price and lighter_price:
+            price_diff = binance_price - lighter_price
+            price_diff_pct = (price_diff / binance_price) * 100 if binance_price else 0
+        else:
+            price_diff = None
+            price_diff_pct = None
+        
+        logger.info(f"[{symbol}] Binance: 标记价={binance_mark}, 最新价={binance_price} | "
+                   f"Lighter: 标记价={lighter_mark}, 最新价={lighter_price} | "
+                   f"价差={price_diff:.6f if price_diff else 'N/A'} ({price_diff_pct:.4f}% if price_diff_pct else 'N/A')")
+    
+    def _handle_waiting_init(self):
+        """处理等待初始化状态"""
+        # 初始化完成后转到空闲监听状态
+        self.state = StrategyState.IDLE_MONITORING
+        logger.info("状态转换: WAITING_INIT -> IDLE_MONITORING")
+    
+    def _handle_idle_monitoring(self, market_data):
+        """处理空闲监听状态 - 监控价差"""
+        # TODO: 实现价差监控逻辑
+        pass
+    
+    def _handle_executing_trade(self, market_data):
+        """处理执行交易状态"""
+        # TODO: 实现交易执行逻辑
+        pass
+    
+    def _handle_waiting_convergence(self, market_data):
+        """处理等待收敛状态"""
+        # TODO: 实现等待价差收敛逻辑
+        pass
+    
+    def _handle_closing_position(self, market_data):
+        """处理平仓状态"""
+        # TODO: 实现平仓逻辑
+        pass
+    
+    def _handle_position_closed(self):
+        """处理平仓完成状态"""
+        # 平仓完成后回到空闲监听状态
+        self.state = StrategyState.IDLE_MONITORING
+        logger.info("状态转换: POSITION_CLOSED -> IDLE_MONITORING")