瀏覽代碼

同步改异步,先保证功能可以用。

skyfffire 1 年之前
父節點
當前提交
ceed9b31dc
共有 2 個文件被更改,包括 9 次插入19 次删除
  1. 7 10
      strategy/src/binance_usdt_swap.rs
  2. 2 9
      strategy/src/gate_swap.rs

+ 7 - 10
strategy/src/binance_usdt_swap.rs

@@ -1,7 +1,7 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
-use std::time::Duration;
+use futures_util::StreamExt;
 use rust_decimal::Decimal;
 use tokio::sync::Mutex;
 use exchanges::response_base::ResponseData;
@@ -41,16 +41,13 @@ pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>,
             let mut min_sell = Decimal::ZERO;
 
             loop {
-                while let Ok(Some(data)) = read_rx.try_next() {
-                    on_data(bot_arc_clone.clone(), &mut update_flag_u, &mut max_buy, &mut min_sell, data).await;
-
-                    // 从通道中接收并丢弃所有的消息,直到通道为空
-                    while let Ok(Some(_)) = read_rx.try_next() {
-                        // 消息被忽略
-                    }
+                while let Some(data) = read_rx.next().await {
+                    on_data(bot_arc_clone.clone(),
+                            &mut update_flag_u,
+                            &mut max_buy,
+                            &mut min_sell,
+                            data).await;
                 }
-
-                tokio::time::sleep(Duration::from_nanos(1)).await;
             }
         });
 

+ 2 - 9
strategy/src/gate_swap.rs

@@ -1,7 +1,7 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
-use std::time::Duration;
+use futures_util::StreamExt;
 use rust_decimal::Decimal;
 use serde_json::Value;
 use tokio::spawn;
@@ -80,7 +80,7 @@ pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>,
         let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
 
         loop {
-            while let Ok(Some(data)) = read_rx.try_next() {
+            while let Some(data) = read_rx.next().await {
                 on_data(bot_arc_clone.clone(),
                         &mut update_flag_u,
                         multiplier,
@@ -88,14 +88,7 @@ pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>,
                         &mut max_buy,
                         &mut min_sell,
                         data).await;
-
-                // 从通道中接收并丢弃所有的消息,直到通道为空
-                while let Ok(Some(_)) = read_rx.try_next() {
-                    // 消息被忽略
-                }
             }
-
-            tokio::time::sleep(Duration::from_nanos(1)).await;
         }
     });
 }