Procházet zdrojové kódy

尝试解决性能问题,原本的data manager1秒要处理3w条数据

skyffire před 6 měsíci
rodič
revize
81e4947fd5
4 změnil soubory, kde provedl 35 přidání a 22 odebrání
  1. 1 1
      readme.md
  2. 6 0
      src/data_manager.rs
  3. 2 4
      src/main.rs
  4. 26 17
      src/ws_manager.rs

+ 1 - 1
readme.md

@@ -5,7 +5,7 @@
 - [x] ~~2370多个币对的同时订阅信息测试~~
 - [x] ~~ws的ping、pong链接健壮性测试~~
 - [x] ~~ws延迟监听测试~~
-- [ ] 现在ws延迟越来越高,应该是处理性能到极限了,试试1000条
+- [ ] 现在ws延迟越来越高,应该是处理性能到极限了
 - [ ] k线信息的处理逻辑
 - [ ] 深度信息的处理逻辑
 - [ ] private接口未对接、测试

+ 6 - 0
src/data_manager.rs

@@ -1,13 +1,17 @@
 use std::collections::HashMap;
+use std::sync::Arc;
 use rust_decimal::Decimal;
 use serde_json::Value;
 use anyhow::Result;
+use tokio::sync::Mutex;
 
 pub struct DataManager {
     pub exchange_info_map: HashMap<String, Value>,
     pub klines_map: HashMap<String, Vec<Value>>,
     pub asks_map: HashMap<String, HashMap<Decimal, Decimal>>,
     pub bids_map: HashMap<String, HashMap<Decimal, Decimal>>,
+    pub delay_total: Arc<Mutex<u64>>,
+    pub delay_count: Arc<Mutex<u64>>,
 }
 
 impl DataManager {
@@ -21,6 +25,8 @@ impl DataManager {
             klines_map,
             asks_map,
             bids_map,
+            delay_total: Arc::new(Mutex::new(0)),
+            delay_count: Arc::new(Mutex::new(0)),
         }
     }
 

+ 2 - 4
src/main.rs

@@ -19,7 +19,6 @@ use serde_json::Value;
 use tokio::sync::Mutex;
 use tracing::{error, info, warn};
 use utils::log_setup;
-use crate::data_manager::DataManager;
 use crate::exchange::mexc_spot_client::MexcSpotClient;
 use crate::ws_manager::WsManager;
 
@@ -102,7 +101,7 @@ async fn main() {
 /// # Returns
 pub async fn run_mexc_subscriptions(
     running: Arc<AtomicBool>, // 接收 running 标志,以便在出错时可以停止
-    client_am: Arc<tokio::sync::Mutex<MexcSpotClient>>,
+    client_am: Arc<Mutex<MexcSpotClient>>,
 ) -> Result<()> {
     info!("开始获取 MEXC 交易对...");
 
@@ -126,8 +125,7 @@ pub async fn run_mexc_subscriptions(
 
     let symbols: Vec<String> = filtered_map.keys().cloned().collect();
     info!("成功获取 {} 个交易对,准备订阅 1 分钟 K 线、深度数据(需要http初始化)...", symbols.len());
-    let data_manager_am = Arc::new(Mutex::new(DataManager::new(filtered_map)));
-    let mut ws_manager = WsManager::new(symbols, data_manager_am.clone(), running.clone());
+    let mut ws_manager = WsManager::new(symbols, filtered_map.clone(), running.clone());
     ws_manager.subscribe_all().await?;
     info!("所有订阅已提交,程序将继续运行并接收实时数据...");
 

+ 26 - 17
src/ws_manager.rs

@@ -1,3 +1,4 @@
+use std::collections::HashMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use std::time::Duration;
@@ -5,6 +6,7 @@ use tokio::spawn;
 use tokio::sync::Mutex;
 use anyhow::Result;
 use chrono::Utc;
+use serde_json::Value;
 use tokio_tungstenite::tungstenite::Message;
 use tracing::{error, info};
 use crate::data_manager::DataManager;
@@ -13,20 +15,18 @@ use crate::exchange::response_base::Response;
 
 pub struct WsManager {
     pub symbols: Vec<String>,
-    pub data_manager_am: Arc<Mutex<DataManager>>,
+    pub managers: Arc<Mutex<Vec<Arc<Mutex<DataManager>>>>>,
+    pub filtered_map: HashMap<String, Value>,
     pub running: Arc<AtomicBool>,
-    pub delay_total: Arc<Mutex<u64>>,
-    pub delay_count: Arc<Mutex<u64>>,
 }
 
 impl WsManager {
-    pub fn new(symbols: Vec<String>, data_manager_am: Arc<Mutex<DataManager>>, running: Arc<AtomicBool>) -> WsManager {
+    pub fn new(symbols: Vec<String>, filtered_map: HashMap<String, Value>, running: Arc<AtomicBool>) -> WsManager {
         let mut wm = WsManager {
             symbols,
-            data_manager_am,
+            filtered_map,
             running,
-            delay_total: Arc::new(Mutex::new(0)),
-            delay_count: Arc::new(Mutex::new(0)),
+            managers: Arc::new(Mutex::new(vec![])),
         };
 
         wm.show_delay_infos().expect("初始化延迟监听失败");
@@ -42,11 +42,17 @@ impl WsManager {
         let num_batches = (self.symbols.len() + BATCH_SIZE - 1) / BATCH_SIZE;
 
         for i in 0..num_batches {
-            let dm = self.data_manager_am.clone();
-            let dt = self.delay_total.clone();
-            let dc = self.delay_count.clone();
+            let data_manager = DataManager::new(self.filtered_map.clone());
+            let data_manager_am = Arc::new(Mutex::new(data_manager));
+
+            {
+                self.managers.lock().await.push(data_manager_am.clone());
+            }
 
             // 定义需要处理数据的fun
+            let dm = data_manager_am.clone();
+            let dt = data_manager_am.lock().await.delay_total.clone();
+            let dc = data_manager_am.lock().await.delay_count.clone();
             let fun = move |response: Response| {
                 if response.code != 200 {
                     error!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap());
@@ -117,8 +123,7 @@ impl WsManager {
     }
 
     pub fn show_delay_infos(&mut self) -> Result<()> {
-        let dt = self.delay_total.clone();
-        let dc = self.delay_count.clone();
+        let dms = Arc::clone(&self.managers);
 
         spawn(async move {
             // 使用 tokio::time::interval 创建一个周期性定时器
@@ -127,12 +132,16 @@ impl WsManager {
             loop {
                 interval.tick().await; // 等待下一个周期
 
-                // 获取 dt 和 dc 的当前值
-                let dt_guard = dt.lock().await;
-                let dc_guard = dc.lock().await;
+                let mut total_delay = 0u64;
+                let mut delay_count = 0u64;
 
-                let total_delay = *dt_guard;
-                let delay_count = *dc_guard;
+                let managers = dms.lock().await;
+                for manager in managers.iter() {
+                    let td_guard = manager.lock().await.delay_total.lock().await.clone();
+                    let dc_guard = manager.lock().await.delay_count.lock().await.clone();
+                    total_delay += td_guard;
+                    delay_count += dc_guard;
+                }
 
                 // 计算平均延迟
                 if delay_count > 0 {