فهرست منبع

对接前准备。

skyffire 1 سال پیش
والد
کامیت
a4c1b02145
2فایلهای تغییر یافته به همراه9 افزوده شده و 49 حذف شده
  1. 3 3
      strategy/src/binance_usdt_swap.rs
  2. 6 46
      strategy/src/bybit_usdt_swap.rs

+ 3 - 3
strategy/src/binance_usdt_swap.rs

@@ -25,7 +25,8 @@ pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
         let mut ws = BinanceSwapWs::new_label(name, is_colo, None, BinanceSwapWsType::Public).await;
         ws.set_subscribe(vec![
-            BinanceSwapSubscribeType::PuBookTicker
+            BinanceSwapSubscribeType::PuBookTicker,
+            BinanceSwapSubscribeType::PuAggTrade
         ]);
 
         // 读取数据
@@ -67,8 +68,7 @@ pub(crate) async fn binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         ws.set_subscribe(vec![
             BinanceSwapSubscribeType::PrPosition,
             BinanceSwapSubscribeType::PrAccount,
-            BinanceSwapSubscribeType::PrBalance,
-            BinanceSwapSubscribeType::PuAggTrade
+            BinanceSwapSubscribeType::PrBalance
         ]);
 
         // 读取数据

+ 6 - 46
strategy/src/bybit_usdt_swap.rs

@@ -27,8 +27,9 @@ pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
         let mut ws = BybitSwapWs::new_label(name, is_colo, None, BybitSwapWsType::Public);
         ws.set_subscribe(vec![
+            BybitSwapSubscribeType::PuTrade,
             BybitSwapSubscribeType::PuOrderBook1,
-            BybitSwapSubscribeType::PuKline("1".to_string()),
+            // BybitSwapSubscribeType::PuKline("1".to_string()),
             // BybitSwapSubscribeType::PuTickers
         ]);
 
@@ -76,54 +77,14 @@ pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
                                    symbols: Vec<String>,
                                    is_colo: bool,
                                    exchange_params: BTreeMap<String, String>) {
+    // 参考
     let name_c = name.clone();
     let symbols_c = symbols.clone();
     let is_shutdown_arc_c = is_shutdown_arc.clone();
     let core_arc_c = core_arc.clone();
-    spawn(async move {
-        //创建读写通道
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
-        let mut ws = BybitSwapWs::new_label(name_c, is_colo, None, BybitSwapWsType::Public);
-        ws.set_subscribe(vec![
-            BybitSwapSubscribeType::PuTrade,
-        ]);
-
-        // 读取数据
-        let core_arc_clone = Arc::clone(&core_arc_c);
-        let mut rest = core_arc_clone.lock().await.platform_rest.clone_box();
-        let multiplier = rest.get_self_market().multiplier;
-        let mut records = rest.get_record("1".to_string()).await.unwrap();
-        for record in records.iter_mut() {
-            let core_arc_clone = core_arc_c.clone();
-
-            on_record(core_arc_clone, record).await
-        }
-
-        let depth_asks = Arc::new(Mutex::new(Vec::new()));
-        let depth_bids = Arc::new(Mutex::new(Vec::new()));
-
-        let fun = move |data: ResponseData| {
-            // 在 async 块之前克隆 Arc
-            let core_arc_cc = core_arc_clone.clone();
-            let mul = multiplier.clone();
-
-            let depth_asks = Arc::clone(&depth_asks);
-            let depth_bids = Arc::clone(&depth_bids);
-
-            async move {
-                let mut depth_asks = depth_asks.lock().await;
-                let mut depth_bids = depth_bids.lock().await;
-                // 使用克隆后的 Arc,避免 move 语义
-                on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
-            }
-        };
-
-        // 链接
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        ws.set_symbols(symbols_c);
-        ws.ws_connect_async(is_shutdown_arc_c, fun, &write_tx_am, write_rx).await.expect("链接失败");
-    });
+    reference_bybit_swap_run(is_shutdown_arc_c, core_arc_c, name_c, symbols_c, is_colo).await;
 
+    // 交易
     spawn(async move {
         // 交易交易所需要启动私有ws
         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
@@ -132,8 +93,7 @@ pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         ws.set_subscribe(vec![
             BybitSwapSubscribeType::PrPosition,
             BybitSwapSubscribeType::PrOrder,
-            BybitSwapSubscribeType::PrWallet,
-            BybitSwapSubscribeType::PuTrade,
+            BybitSwapSubscribeType::PrWallet
         ]);
 
         let core_arc_clone_private = core_arc.clone();