Forráskód Böngészése

订阅过程调试

skyffire 6 hónapja
szülő
commit
0a553c9036
3 módosított fájl, 38 hozzáadás és 22 törlés
  1. 13 0
      readme.md
  2. 1 1
      src/main.rs
  3. 24 21
      src/ws_manager.rs

+ 13 - 0
readme.md

@@ -0,0 +1,13 @@
+## 当前项目 Todo List
+
+- [x] ~~public接口对接、测试~~
+- [x] ~~整体架构~~
+- [ ] 2370多个币对的同时订阅信息未测试
+- [ ] private接口未对接、测试
+- [ ] k线信息的处理逻辑
+- [ ] 深度信息的处理逻辑
+- [ ] ws的ping、pong链接健壮性未测试
+- [ ] 交易逻辑对接测试
+- [ ] 数据存储方案的设计与实现
+- [ ] 后台接口的设计与实现
+- [ ] UI的设计与实现

+ 1 - 1
src/main.rs

@@ -57,7 +57,7 @@ async fn main() {
         warn!("接收到退出信号 (Ctrl+C)... 开始关闭.");
         r.store(false, Ordering::Relaxed);
     });
-    info!("应用程序启动...");
+    info!("==================================== 应用程序启动 =======================================");
 
     // ---- 运行核心订阅逻辑 ----
     let task_running = running.clone();

+ 24 - 21
src/ws_manager.rs

@@ -1,5 +1,6 @@
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use tokio::spawn;
 use tokio::sync::Mutex;
 use anyhow::Result;
 use tokio_tungstenite::tungstenite::Message;
@@ -31,6 +32,14 @@ impl WsManager {
         // 计算总共需要多少批次
         let num_batches = (self.symbols.len() + BATCH_SIZE - 1) / BATCH_SIZE;
 
+        let fun = move |response: Response| {
+            if response.code != 200 {
+                error!("{}", serde_json::to_string_pretty(&response.data).unwrap());
+            }
+
+            async move {}
+        };
+
         for i in 0..num_batches {
             // 计算当前批次的起始和结束索引
             let start_index = i * BATCH_SIZE;
@@ -42,30 +51,24 @@ impl WsManager {
 
             // 这个通道主要是为了后面给这个ws发送消息
             let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
-            let _guard = setup_logging().unwrap();
-
-            let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
-
-            ws.set_subscribe(vec![
-                MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
-                MexcSpotWsSubscribeType::PuFuturesDepth
-            ]);
-
-            ws.set_symbols(current_batch_symbols);
+            let write_tx_am = Arc::new(Mutex::new(write_tx));
 
-            let fun = move |response: Response| {
-                if response.code != 200 {
-                    error!("{}", serde_json::to_string_pretty(&response.data).unwrap());
-                }
+            // 异步去订阅阻塞
+            let running = self.running.clone();
+            spawn(async move {
+                let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
+                ws.set_subscribe(vec![
+                    MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
+                    MexcSpotWsSubscribeType::PuFuturesDepth
+                ]);
 
-                async move {}
-            };
+                ws.set_symbols(current_batch_symbols);
 
-            // 链接
-            let write_tx_am = Arc::new(Mutex::new(write_tx));
-            ws.ws_connect_async(self.running.clone(), fun, &write_tx_am, write_rx)
-                .await
-                .expect("链接失败");
+                // 链接
+                ws.ws_connect_async(running, fun, &write_tx_am, write_rx)
+                    .await
+                    .expect("链接失败");
+            });
         }
 
         Ok(())