Forráskód Böngészése

ws延迟监听器。

skyffire 6 hónapja
szülő
commit
147266584e
3 módosított fájl, 94 hozzáadás és 21 törlés
  1. 3 2
      readme.md
  2. 6 6
      src/data_manager.rs
  3. 85 13
      src/ws_manager.rs

+ 3 - 2
readme.md

@@ -2,8 +2,9 @@
 
 - [x] ~~public接口对接、测试~~
 - [x] ~~整体架构~~
-- [x] ~~2370多个币对的同时订阅信息未测试~~
-- [x] ~~ws的ping、pong链接健壮性未测试~~
+- [x] ~~2370多个币对的同时订阅信息测试~~
+- [x] ~~ws的ping、pong链接健壮性测试~~
+- [x] ~~ws延迟监听测试~~
 - [ ] k线信息的处理逻辑
 - [ ] 深度信息的处理逻辑
 - [ ] private接口未对接、测试

+ 6 - 6
src/data_manager.rs

@@ -15,20 +15,20 @@ impl DataManager {
         let klines_map: HashMap<String, Vec<Value>> = HashMap::new();
         let asks_map: HashMap<String, HashMap<Decimal, Decimal>> = HashMap::new();
         let bids_map: HashMap<String, HashMap<Decimal, Decimal>> = HashMap::new();
-        
-        DataManager { 
+
+        DataManager {
             exchange_info_map,
             klines_map,
             asks_map,
             bids_map,
         }
     }
-    
-    pub async fn process_klines_map(_symbol: String, _depth: Value) -> Result<()> {
+
+    pub async fn process_klines_map(&mut self, _kline: Value) -> Result<()> {
         Ok(())
     }
-    
-    pub async fn process_depth_data(_symbol: String, _depth: Value) -> Result<()> {
+
+    pub async fn process_depth_data(&mut self, _depth: Value) -> Result<()> {
         Ok(())
     }
 }

+ 85 - 13
src/ws_manager.rs

@@ -1,8 +1,10 @@
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use std::time::Duration;
 use tokio::spawn;
 use tokio::sync::Mutex;
 use anyhow::Result;
+use chrono::Utc;
 use tokio_tungstenite::tungstenite::Message;
 use tracing::{error, info};
 use crate::data_manager::DataManager;
@@ -13,15 +15,23 @@ pub struct WsManager {
     pub symbols: Vec<String>,
     pub data_manager_am: Arc<Mutex<DataManager>>,
     pub running: Arc<AtomicBool>,
+    pub delay_total: Arc<Mutex<u64>>,
+    pub delay_count: Arc<Mutex<u64>>,
 }
 
 impl WsManager {
     pub fn new(symbols: Vec<String>, data_manager_am: Arc<Mutex<DataManager>>, running: Arc<AtomicBool>) -> WsManager {
-        WsManager {
-            symbols: symbols[0..1].to_owned(),
+        let mut wm = WsManager {
+            symbols,
             data_manager_am,
-            running
-        }
+            running,
+            delay_total: Arc::new(Mutex::new(0)),
+            delay_count: Arc::new(Mutex::new(0)),
+        };
+
+        wm.show_delay_infos().expect("初始化延迟监听失败");
+
+        wm
     }
 
     pub async fn subscribe_all(&mut self) -> Result<()> {
@@ -31,17 +41,48 @@ impl WsManager {
         // 计算总共需要多少批次
         let num_batches = (self.symbols.len() + BATCH_SIZE - 1) / BATCH_SIZE;
 
-        let fun = move |response: Response| {
-            if response.code != 200 {
-                error!("{}", serde_json::to_string_pretty(&response.data).unwrap());
-            }
+        for i in 0..num_batches {
+            let dm = self.data_manager_am.clone();
+            let dt = self.delay_total.clone();
+            let dc = self.delay_count.clone();
 
-            // info!("{}", serde_json::to_string_pretty(&response.data).unwrap());
+            // 定义需要处理数据的fun
+            let fun = move |response: Response| {
+                if response.code != 200 {
+                    error!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap());
 
-            async move {}
-        };
+                    panic!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap());
+                }
+
+                // info!("{}", serde_json::to_string_pretty(&response.data).unwrap());
+                let dm_clone = Arc::clone(&dm);
+                let dt_clone = Arc::clone(&dt);
+                let dc_clone = Arc::clone(&dc);
+                async move {
+                    let now = Utc::now().timestamp_millis();
+                    let timestamp = response.data["timestamp"].as_u64().unwrap();
+
+                    // 计算本次请求的延迟
+                    let delay: u64 = (now as u64).checked_sub(timestamp).unwrap_or(0); // 计算延迟,确保不会出现负数
+
+                    // 更新总延迟和计数
+                    {
+                        let mut dt_guard = dt_clone.lock().await; // 锁定 dt 以进行修改
+                        *dt_guard = dt_guard.checked_add(delay).unwrap_or(*dt_guard); // 累加延迟,处理溢出
+                        // 释放 dt 的锁会自动发生在这里
+                    }
+
+                    {
+                        let mut dc_guard = dc_clone.lock().await; // 锁定 dc 以进行修改
+                        *dc_guard = dc_guard.checked_add(1).unwrap_or(*dc_guard); // 增加计数,处理溢出
+                        // 释放 dc 的锁会自动发生在这里
+                    }
+
+                    let mut dm_guard = dm_clone.lock().await;
+                    dm_guard.process_depth_data(response.data).await.unwrap();
+                }
+            };
 
-        for i in 0..num_batches {
             // 计算当前批次的起始和结束索引
             let start_index = i * BATCH_SIZE;
             let end_index = (start_index + BATCH_SIZE).min(self.symbols.len());
@@ -60,7 +101,7 @@ impl WsManager {
                 let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
                 ws.set_subscribe(vec![
                     MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
-                    // MexcSpotWsSubscribeType::PuFuturesDepth
+                    MexcSpotWsSubscribeType::PuFuturesDepth
                 ]);
 
                 ws.set_symbols(current_batch_symbols);
@@ -74,4 +115,35 @@ impl WsManager {
 
         Ok(())
     }
+
+    pub fn show_delay_infos(&mut self) -> Result<()> {
+        let dt = self.delay_total.clone();
+        let dc = self.delay_count.clone();
+
+        spawn(async move {
+            // 使用 tokio::time::interval 创建一个周期性定时器
+            let mut interval = tokio::time::interval(Duration::from_secs(60)); // 每隔 60 秒触发一次
+
+            loop {
+                interval.tick().await; // 等待下一个周期
+
+                // 获取 dt 和 dc 的当前值
+                let dt_guard = dt.lock().await;
+                let dc_guard = dc.lock().await;
+
+                let total_delay = *dt_guard;
+                let delay_count = *dc_guard;
+
+                // 计算平均延迟
+                if delay_count > 0 {
+                    let average_delay = total_delay as f64 / delay_count as f64;
+                    info!("平均延迟: {:.2} 毫秒 (基于 {} 次测量)", average_delay, delay_count);
+                } else {
+                    info!("尚未生成延迟数据。");
+                }
+            }
+        });
+
+        Ok(())
+    }
 }