skyffire преди 1 година
родител
ревизия
ed27f21bb7
променени са 1 файла, в които са добавени 48 реда и са изтрити 0 реда
  1. 48 0
      strategy/src/bybit_usdt_swap.rs

+ 48 - 0
strategy/src/bybit_usdt_swap.rs

@@ -76,6 +76,54 @@ pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
                                    symbols: Vec<String>,
                                    is_colo: bool,
                                    exchange_params: BTreeMap<String, String>) {
+    let name_c = name.clone();
+    let symbols_c = symbols.clone();
+    let is_shutdown_arc_c = is_shutdown_arc.clone();
+    let core_arc_c = core_arc.clone();
+    spawn(async move {
+        //创建读写通道
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
+        let mut ws = BybitSwapWs::new_label(name_c, is_colo, None, BybitSwapWsType::Public);
+        ws.set_subscribe(vec![
+            BybitSwapSubscribeType::PuTrade,
+        ]);
+
+        // 读取数据
+        let core_arc_clone = Arc::clone(&core_arc_c);
+        let mut rest = core_arc_clone.lock().await.platform_rest.clone_box();
+        let multiplier = rest.get_self_market().multiplier;
+        let mut records = rest.get_record("1".to_string()).await.unwrap();
+        for record in records.iter_mut() {
+            let core_arc_clone = core_arc_c.clone();
+
+            on_record(core_arc_clone, record).await
+        }
+
+        let depth_asks = Arc::new(Mutex::new(Vec::new()));
+        let depth_bids = Arc::new(Mutex::new(Vec::new()));
+
+        let fun = move |data: ResponseData| {
+            // 在 async 块之前克隆 Arc
+            let core_arc_cc = core_arc_clone.clone();
+            let mul = multiplier.clone();
+
+            let depth_asks = Arc::clone(&depth_asks);
+            let depth_bids = Arc::clone(&depth_bids);
+
+            async move {
+                let mut depth_asks = depth_asks.lock().await;
+                let mut depth_bids = depth_bids.lock().await;
+                // 使用克隆后的 Arc,避免 move 语义
+                on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
+            }
+        };
+
+        // 链接
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        ws.set_symbols(symbols_c);
+        ws.ws_connect_async(is_shutdown_arc_c, fun, &write_tx_am, write_rx).await.expect("链接失败");
+    });
+
     spawn(async move {
         // 交易交易所需要启动私有ws
         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();