Prechádzať zdrojové kódy

新版原子操作的延迟统计逻辑

skyffire 6 mesiacov pred
rodič
commit
26622e5d4a
3 zmenil súbory, kde vykonal 94 pridanie a 310 odobranie
  1. 1 1
      readme.md
  2. 29 265
      src/data_manager.rs
  3. 64 44
      src/ws_manager.rs

+ 1 - 1
readme.md

@@ -5,7 +5,7 @@
 - [x] ~~2370多个币对的同时订阅信息测试~~
 - [x] ~~ws的ping、pong链接健壮性测试~~
 - [x] ~~ws延迟监听测试~~
-- [ ] 现在ws延迟越来越高,应该是处理性能到极限了
+- [x] ~~现在ws延迟很高,需要重新整理架构~~
 - [ ] k线信息的处理逻辑
 - [ ] 深度信息的处理逻辑
 - [ ] private接口未对接、测试

+ 29 - 265
src/data_manager.rs

@@ -1,17 +1,18 @@
 use std::collections::HashMap;
-use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, Ordering};
 use rust_decimal::Decimal;
 use serde_json::Value;
 use anyhow::Result;
-use tokio::sync::Mutex;
+use tracing::warn;
 
 pub struct DataManager {
     pub exchange_info_map: HashMap<String, Value>,
     pub klines_map: HashMap<String, Vec<Value>>,
     pub asks_map: HashMap<String, HashMap<Decimal, Decimal>>,
     pub bids_map: HashMap<String, HashMap<Decimal, Decimal>>,
-    pub delay_total: Arc<Mutex<u64>>,
-    pub delay_count: Arc<Mutex<u64>>,
+
+    pub delay_total: AtomicU64,
+    pub delay_count: AtomicU64,
 }
 
 impl DataManager {
@@ -25,11 +26,33 @@ impl DataManager {
             klines_map,
             asks_map,
             bids_map,
-            delay_total: Arc::new(Mutex::new(0)),
-            delay_count: Arc::new(Mutex::new(0)),
+            delay_total: AtomicU64::new(0),
+            delay_count: AtomicU64::new(0),
         }
     }
 
+    pub fn record_latency(&self, received_at: i64, origin_timestamp: u64) {
+        if let Some(delay) = (received_at as u64).checked_sub(origin_timestamp) {
+            self.delay_total.fetch_add(delay, Ordering::Relaxed);       // 原子加
+            self.delay_count.fetch_add(1, Ordering::Relaxed);       // 原子加
+        } else {
+            warn!("时间戳计算出现问题: received_at={}, origin_timestamp={}", received_at, origin_timestamp);
+        }
+    }
+
+    // 获取当前的统计数据
+    pub fn get_delay_stats(&self) -> (u64, u64) {
+        let total = self.delay_total.load(Ordering::Relaxed);
+        let count = self.delay_count.load(Ordering::Relaxed);
+        (total, count)
+    }
+
+    // 重置统计数据 -> 这个是关键!
+    pub fn reset_delay_stats(&self) {
+        self.delay_total.store(0, Ordering::Relaxed); // 原子写
+        self.delay_count.store(0, Ordering::Relaxed); // 原子写
+    }
+
     pub async fn process_klines_map(&mut self, _kline: Value) -> Result<()> {
         Ok(())
     }
@@ -38,262 +61,3 @@ impl DataManager {
         Ok(())
     }
 }
-
-// use std::collections::BTreeMap;
-// use std::sync::Arc;
-// use std::sync::atomic::{AtomicBool};
-// use rust_decimal::Decimal;
-// use tokio::{spawn};
-// use tokio::sync::Mutex;
-// use tracing::{error};
-// use tokio_tungstenite::tungstenite::Message;
-// use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
-// use exchanges::response_base::ResponseData;
-// use global::trace_stack::TraceStack;
-// use standard::exchange::ExchangeEnum::BybitSwap;
-// use standard::exchange_struct_handler::ExchangeStructHandler;
-// use standard::{Depth, OrderBook};
-// use crate::core::Core;
-// use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade};
-// use crate::model::OrderInfo;
-//
-// // 参考 Bybit 合约 启动
-// pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
-//                                              core_arc: Arc<Mutex<Core>>,
-//                                              name: String,
-//                                              symbols: Vec<String>,
-//                                              is_colo: bool,
-//                                              ref_index: usize
-// ) {
-//     spawn(async move {
-//         //创建读写通道
-//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
-//         let mut ws = BybitSwapWs::new_label(name, is_colo, None, BybitSwapWsType::Public);
-//         ws.set_subscribe(vec![
-//             BybitSwapSubscribeType::PuTrade,
-//             BybitSwapSubscribeType::PuOrderBook1,
-//             // BybitSwapSubscribeType::PuKline("1".to_string()),
-//             // BybitSwapSubscribeType::PuTickers
-//         ]);
-//
-//         // 读取数据
-//         let core_arc_clone = Arc::clone(&core_arc);
-//         let mut rest = core_arc_clone.lock().await.platform_rest.clone_box();
-//         let multiplier = rest.get_self_market().multiplier;
-//         let mut records = rest.get_record("1".to_string()).await.unwrap();
-//         for record in records.iter_mut() {
-//             let core_arc_clone = core_arc.clone();
-//
-//             on_record(core_arc_clone, record).await
-//         }
-//
-//         let depth_asks = Arc::new(Mutex::new(Vec::new()));
-//         let depth_bids = Arc::new(Mutex::new(Vec::new()));
-//
-//         let fun = move |data: ResponseData| {
-//             // 在 async 块之前克隆 Arc
-//             let core_arc_cc = core_arc_clone.clone();
-//             let mul = multiplier.clone();
-//
-//             let depth_asks = Arc::clone(&depth_asks);
-//             let depth_bids = Arc::clone(&depth_bids);
-//
-//             async move {
-//                 let mut depth_asks = depth_asks.lock().await;
-//                 let mut depth_bids = depth_bids.lock().await;
-//                 // 使用克隆后的 Arc,避免 move 语义
-//                 on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids, ref_index).await
-//             }
-//         };
-//
-//         // 链接
-//         let write_tx_am = Arc::new(Mutex::new(write_tx));
-//         ws.set_symbols(symbols);
-//         ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
-//     });
-// }
-//
-// // 交易 bybit 合约 启动
-// pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
-//                                    core_arc: Arc<Mutex<Core>>,
-//                                    name: String,
-//                                    symbols: Vec<String>,
-//                                    is_colo: bool,
-//                                    exchange_params: BTreeMap<String, String>) {
-//     // 参考
-//     reference_bybit_swap_run(is_shutdown_arc.clone(), core_arc.clone(), name.clone(), symbols.clone(), is_colo, 233).await;
-//
-//     // 交易
-//     spawn(async move {
-//         // 交易交易所需要启动私有ws
-//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-//         let auth = Some(parse_btree_map_to_bybit_swap_login(exchange_params));
-//         let mut ws = BybitSwapWs::new_label(name.clone(), is_colo, auth, BybitSwapWsType::Private);
-//         ws.set_subscribe(vec![
-//             BybitSwapSubscribeType::PrPosition,
-//             BybitSwapSubscribeType::PrOrder,
-//             BybitSwapSubscribeType::PrWallet
-//         ]);
-//
-//         let core_arc_clone_private = core_arc.clone();
-//         let multiplier = core_arc_clone_private.lock().await.platform_rest.get_self_market().multiplier;
-//         let run_symbol = symbols.clone()[0].clone();
-//
-//         // 挂起私有ws
-//         let fun = move |data: ResponseData| {
-//             // 在 async 块之前克隆 Arc
-//             let core_arc_cc = core_arc_clone_private.clone();
-//             let mul = multiplier.clone();
-//             let rs = run_symbol.clone();
-//
-//             async move {
-//                 // 使用克隆后的 Arc,避免 move 语义
-//                 on_private_data(core_arc_cc, &mul, &rs, &data).await;
-//             }
-//         };
-//
-//         // 链接
-//         let write_tx_am = Arc::new(Mutex::new(write_tx));
-//         ws.set_symbols(symbols);
-//         ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
-//     });
-// }
-//
-// async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>, ref_index: usize) {
-//     let mut trace_stack = TraceStack::new(response.time, response.ins);
-//     trace_stack.on_after_span_line();
-//
-//     match response.channel.as_str() {
-//         "orderbook" => {
-//             trace_stack.set_source("bybit_usdt_swap.bookTicker".to_string());
-//
-//             let mut is_update = false;
-//             if response.data_type == "delta"  {
-//                 is_update = true;
-//             }
-//             let mut depth = ExchangeStructHandler::book_ticker_handle(BybitSwap, &response, mul);
-//             // 是增量更新
-//             if is_update {
-//                 if depth.asks.len() != 0 {
-//                     depth_asks.clear();
-//                     depth_asks.append(&mut depth.asks);
-//                 }
-//
-//                 if depth.bids.len() != 0 {
-//                     depth_bids.clear();
-//                     depth_bids.append(&mut depth.bids);
-//                 }
-//
-//                 let result_depth = Depth {
-//                     time: depth.time,
-//                     symbol: depth.symbol,
-//                     asks: depth_asks.clone(),
-//                     bids: depth_bids.clone(),
-//                 };
-//
-//                 trace_stack.on_after_format();
-//                 on_depth(core_arc.clone(), &response.label, &mut trace_stack, &result_depth, ref_index).await;
-//                 // on_depth(core_arc, &response.label, &mut trace_stack, &result_depth, 1).await;
-//             }
-//             // 全量
-//             else {
-//                 trace_stack.on_after_format();
-//                 on_depth(core_arc.clone(), &response.label, &mut trace_stack, &depth, ref_index).await;
-//                 // on_depth(core_arc, &response.label, &mut trace_stack, &depth, 1).await;
-//
-//                 depth_asks.clear();
-//                 depth_asks.append(&mut depth.asks);
-//                 depth_bids.clear();
-//                 depth_bids.append(&mut depth.bids);
-//             }
-//         }
-//         "trade" => {
-//             trace_stack.set_source("bybit_usdt_swap.trade".to_string());
-//
-//             let mut trades = ExchangeStructHandler::trades_handle(BybitSwap, response, mul);
-//             trace_stack.on_after_format();
-//
-//             for trade in trades.iter_mut() {
-//                 on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, ref_index).await;
-//                 // on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, 1).await;
-//             }
-//         }
-//         "tickers" => {
-//             trace_stack.set_source("bybit_usdt_swap.tickers".to_string());
-//             let ticker = ExchangeStructHandler::ticker_handle(BybitSwap, response).await;
-//             trace_stack.on_after_format();
-//
-//             on_ticker(core_arc, &mut trace_stack, &ticker).await;
-//         },
-//         // k线数据
-//         "kline" => {
-//             let mut records = ExchangeStructHandler::records_handle(BybitSwap, &response);
-//
-//             if records.is_empty() {
-//                 return;
-//             }
-//
-//             for record in records.iter_mut() {
-//                 let core_arc_clone = core_arc.clone();
-//
-//                 on_record(core_arc_clone, record).await
-//             }
-//         },
-//         _ => {
-//             error!("未知推送类型");
-//             error!(?response);
-//         }
-//     }
-// }
-//
-// async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) {
-//     let mut trace_stack = TraceStack::new(response.time, response.ins);
-//     trace_stack.on_after_span_line();
-//
-//     match response.channel.as_str() {
-//         "wallet" => {
-//             let account = ExchangeStructHandler::account_info_handle(BybitSwap, response, run_symbol);
-//             let mut core = core_arc_clone.lock().await;
-//             core.update_equity(account).await;
-//         }
-//         "order" => {
-//             let orders = ExchangeStructHandler::order_handle(BybitSwap, response, ct_val);
-//             trace_stack.on_after_format();
-//
-//             let mut order_infos:Vec<OrderInfo> = Vec::new();
-//             for mut order in orders.order {
-//                 if order.status == "NULL" {
-//                     error!("bybit_usdt_swap 未识别的订单状态:{:?}", response);
-//
-//                     continue;
-//                 }
-//
-//                 // if order.deal_amount != Decimal::ZERO {
-//                 //     info!("bybit order 消息原文:{:?}", response);
-//                 // }
-//
-//                 let order_info = OrderInfo::parse_order_to_order_info(&mut order);
-//                 order_infos.push(order_info);
-//             }
-//
-//             let mut core = core_arc_clone.lock().await;
-//             core.update_order(order_infos, trace_stack).await;
-//         }
-//         "position" => {
-//             let positions = ExchangeStructHandler::position_handle(BybitSwap, response, ct_val);
-//             let mut core = core_arc_clone.lock().await;
-//             core.update_position(positions).await;
-//         }
-//         _ => {
-//             error!("未知推送类型");
-//             error!(?response);
-//         }
-//     }
-// }
-//
-// fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>) -> BybitSwapLogin {
-//     BybitSwapLogin {
-//         api_key: exchange_params.get("access_key").unwrap().clone(),
-//         secret_key: exchange_params.get("secret_key").unwrap().clone(),
-//     }
-// }

+ 64 - 44
src/ws_manager.rs

@@ -1,6 +1,6 @@
 use std::collections::HashMap;
 use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 use tokio::spawn;
 use tokio::sync::Mutex;
@@ -22,14 +22,14 @@ pub struct WsManager {
 
 impl WsManager {
     pub fn new(symbols: Vec<String>, filtered_map: HashMap<String, Value>, running: Arc<AtomicBool>) -> WsManager {
-        let mut wm = WsManager {
+        let wm = WsManager {
             symbols,
             filtered_map,
             running,
             managers: Arc::new(Mutex::new(vec![])),
         };
 
-        wm.show_delay_infos().expect("初始化延迟监听失败");
+        wm.start_delay_info_reporter().expect("初始化延迟监听失败");
 
         wm
     }
@@ -51,8 +51,6 @@ impl WsManager {
 
             // 定义需要处理数据的fun
             let dm = data_manager_am.clone();
-            let dt = data_manager_am.lock().await.delay_total.clone();
-            let dc = data_manager_am.lock().await.delay_count.clone();
             let fun = move |response: Response| {
                 if response.code != 200 {
                     error!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap());
@@ -62,29 +60,12 @@ impl WsManager {
 
                 // info!("{}", serde_json::to_string_pretty(&response.data).unwrap());
                 let dm_clone = Arc::clone(&dm);
-                let dt_clone = Arc::clone(&dt);
-                let dc_clone = Arc::clone(&dc);
                 async move {
-                    let now = Utc::now().timestamp_millis();
-                    let timestamp = response.data["timestamp"].as_u64().unwrap();
-
-                    // 计算本次请求的延迟
-                    let delay: u64 = (now as u64).checked_sub(timestamp).unwrap_or(0); // 计算延迟,确保不会出现负数
-
-                    // 更新总延迟和计数
-                    {
-                        let mut dt_guard = dt_clone.lock().await; // 锁定 dt 以进行修改
-                        *dt_guard = dt_guard.checked_add(delay).unwrap_or(*dt_guard); // 累加延迟,处理溢出
-                        // 释放 dt 的锁会自动发生在这里
-                    }
-
-                    {
-                        let mut dc_guard = dc_clone.lock().await; // 锁定 dc 以进行修改
-                        *dc_guard = dc_guard.checked_add(1).unwrap_or(*dc_guard); // 增加计数,处理溢出
-                        // 释放 dc 的锁会自动发生在这里
-                    }
+                    let received_at = Utc::now().timestamp_millis();
+                    let origin_timestamp = response.data["timestamp"].as_u64().unwrap();
 
                     let mut dm_guard = dm_clone.lock().await;
+                    dm_guard.record_latency(received_at, origin_timestamp);
                     dm_guard.process_depth_data(response.data).await.unwrap();
                 }
             };
@@ -122,35 +103,74 @@ impl WsManager {
         Ok(())
     }
 
-    pub fn show_delay_infos(&mut self) -> Result<()> {
-        let dms = Arc::clone(&self.managers);
+    // 启动一个后台任务,定期报告所有 DataManager 的平均延迟,并在计数超限时重置
+    fn start_delay_info_reporter(&self) -> Result<(), std::io::Error> {
+        let managers_arc_clone = Arc::clone(&self.managers);
+        let running_clone = Arc::clone(&self.running);
+        const RESET_THRESHOLD: u64 = 1_0000_0000; // 1亿次计数阈值
 
         spawn(async move {
-            // 使用 tokio::time::interval 创建一个周期性定时器
-            let mut interval = tokio::time::interval(Duration::from_secs(60)); // 每隔 60 秒触发一次
+            let mut interval = tokio::time::interval(Duration::from_secs(60));
 
-            loop {
+            while running_clone.load(Ordering::SeqCst) {
                 interval.tick().await; // 等待下一个周期
 
-                let mut total_delay = 0u64;
-                let mut delay_count = 0u64;
-
-                let managers = dms.lock().await;
-                for manager in managers.iter() {
-                    let td_guard = manager.lock().await.delay_total.lock().await.clone();
-                    let dc_guard = manager.lock().await.delay_count.lock().await.clone();
-                    total_delay += td_guard;
-                    delay_count += dc_guard;
+                if !running_clone.load(Ordering::SeqCst) {
+                    break;
                 }
 
-                // 计算平均延迟
-                if delay_count > 0 {
-                    let average_delay = total_delay as f64 / delay_count as f64;
-                    info!("平均延迟: {:.2} 毫秒 (基于 {} 次测量)", average_delay, delay_count);
+                let mut total_delay_sum = 0u64;
+                let mut total_message_count = 0u64;
+                let mut needs_reset = false; // 标记是否需要重置
+
+                // --- 第一步:收集所有 DataManager 的统计数据 ---
+                let managers_guard = managers_arc_clone.lock().await; // 锁定 Vec<Arc<Mutex<DataManager>>>
+                for manager_arc in managers_guard.iter() {
+                    // 不需要锁定内部的 u64 了,直接调用 DataManager 的方法
+                    let manager_lock = manager_arc.lock().await; // 锁定单个 DataManager
+                    let (current_sum, current_count) = manager_lock.get_delay_stats(); // 使用原子读
+
+                    // 使用 saturating_add 防止聚合时溢出 (虽然 u64 很大,但好习惯)
+                    total_delay_sum = total_delay_sum.saturating_add(current_sum);
+                    total_message_count = total_message_count.saturating_add(current_count);
+
+                    // 可以在这里检查总数是否已超阈值,但为了逻辑清晰,我们在聚合后检查
+                }
+                // 释放 managers_guard (Vec 的锁)
+
+                // --- 第二步:计算并报告平均延迟 ---
+                if total_message_count > 0 {
+                    let average_delay = total_delay_sum as f64 / total_message_count as f64;
+                    info!(
+                        "当前 WS 平均延迟: {:.2} 毫秒 (基于 {} 条消息测量)",
+                        average_delay, total_message_count
+                    );
+
+                    // 检查是否达到重置阈值
+                    if total_message_count >= RESET_THRESHOLD { // 使用 >= 更安全
+                        needs_reset = true;
+                        info!(
+                            "总消息计数 {} >= 阈值 {}, 将在本次报告后重置所有计数器。",
+                            total_message_count, RESET_THRESHOLD
+                        );
+                    }
                 } else {
-                    info!("尚未生成延迟数据。");
+                    info!("WS 延迟报告:本周期内未收到用于计算延迟的消息。");
+                }
+
+                // --- 第三步:如果需要,执行重置 ---
+                if needs_reset {
+                    // 再次获取 Vec 的锁来执行重置操作
+                    let managers_guard_for_reset = managers_arc_clone.lock().await;
+                    for manager_arc in managers_guard_for_reset.iter() {
+                        let manager_lock = manager_arc.lock().await; // 锁定 DataManager
+                        manager_lock.reset_delay_stats(); // 调用重置方法 (使用原子写)
+                    }
+                    // 释放锁
+                    info!("所有 DataManager 的延迟统计已重置。");
                 }
             }
+            info!("延迟报告任务已停止。");
         });
 
         Ok(())