Kaynağa Gözat

改成同步测试。

skyfffire 1 yıl önce
ebeveyn
işleme
57edd590f1
1 değiştirilmiş dosya ile 13 ekleme ve 15 silme
  1. 13 15
      exchanges/tests/binance_swap_test.rs

+ 13 - 15
exchanges/tests/binance_swap_test.rs

@@ -1,8 +1,8 @@
+use std::cmp::max;
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 
-use futures_util::StreamExt;
 use tokio::sync::Mutex;
 use tracing::{info, trace};
 
@@ -44,21 +44,19 @@ async fn ws_custom_subscribe() {
     let _bool_v1_clone = Arc::clone(&bool_v1);
     let _tr = tokio::spawn(async move {
         trace!("线程-数据读取-开启");
+        let mut max_delay = 0i64;
         loop {
-            if let Some(data) = read_rx.next().await {
-                tokio::spawn(async move {
-                    let mut trace_stack = TraceStack::default();
-                    trace_stack.on_before_unlock_quant();
-                    trace_stack.on_after_network(data.time);
-
-                    if data.time != 0 {
-                        info!("bian>{}", trace_stack.to_string());
-                    }
-                });
-
-                // 从通道中接收并丢弃所有的消息,直到通道为空
-                while let Ok(Some(_)) = read_rx.try_next() {
-                    // 消息被忽略
+            // 从通道中接收并丢弃所有的消息,直到通道为空
+            while let Ok(Some(data)) = read_rx.try_next() {
+                // 消息被忽略
+                let mut trace_stack = TraceStack::default();
+                trace_stack.on_before_unlock_quant();
+                trace_stack.on_after_network(data.time);
+
+                if data.time != 0 {
+                    let delay = trace_stack.before_unlock_quant - trace_stack.after_network;
+                    max_delay = max(max_delay, delay);
+                    info!("{}us, max={}us", delay, max_delay);
                 }
             }
         }