浏览代码

消息分发与增量深度信息的格式化。

skyfffire 4 月之前
父节点
当前提交
03ac630e96
共有 2 个文件被更改,包括 111 次插入7 次删除
  1. 109 4
      src/data_manager.rs
  2. 2 3
      src/ws_manager.rs

+ 109 - 4
src/data_manager.rs

@@ -2,9 +2,10 @@ use std::collections::HashMap;
 use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
 use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
 use rust_decimal::Decimal;
 use rust_decimal::Decimal;
 use serde_json::{from_value, Value};
 use serde_json::{from_value, Value};
-use anyhow::{Context, Result};
+use anyhow::{bail, Context, Result};
 use serde::{Deserialize, Serialize};
 use serde::{Deserialize, Serialize};
-use tracing::warn;
+use tracing::{info, warn};
+use crate::exchange::response_base::Response;
 
 
 pub struct DataManager {
 pub struct DataManager {
     pub exchange_info_map: HashMap<String, Value>,
     pub exchange_info_map: HashMap<String, Value>,
@@ -16,6 +17,7 @@ pub struct DataManager {
     pub delay_count: AtomicU64,
     pub delay_count: AtomicU64,
 }
 }
 
 
+// ---------------------------------------------- 全量深度数据格式化 -------------------------------------------
 // 定义 JSON 对象的整体结构
 // 定义 JSON 对象的整体结构
 #[derive(Debug, Serialize, Deserialize)]
 #[derive(Debug, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
 #[serde(rename_all = "camelCase")]
@@ -23,7 +25,6 @@ struct FullDepthData {
     symbol: String,
     symbol: String,
     asks: Vec<OrderBookEntry>,
     asks: Vec<OrderBookEntry>,
     bids: Vec<OrderBookEntry>,
     bids: Vec<OrderBookEntry>,
-    #[serde(rename = "lastUpdateId")]
     last_update_id: u64,
     last_update_id: u64,
     timestamp: u64,
     timestamp: u64,
 }
 }
@@ -57,6 +58,43 @@ impl OrderBookEntry {
     }
     }
 }
 }
 
 
+// ---------------------------------------------- 增量深度数据格式化 -------------------------------------------
+// 定义 asks 和 bids 数组中每个条目的结构
+// 对应 JSON: {"price": "...", "quantity": "..."}
+#[derive(Debug, Serialize, Deserialize)]
+struct PriceLevel {
+    // 这次 JSON 的字段名是 snake_case,所以不需要 rename
+    price: String,
+    quantity: String,
+}
+
+impl PriceLevel {
+    // 辅助方法,将字符串解析为 Decimal 类型
+    // 与之前的 to_decimals 逻辑一致
+    fn to_decimals(&self) -> Result<(Decimal, Decimal)> {
+        let price = self.price.parse::<Decimal>()
+            .with_context(|| format!("Failed to parse price string '{}' as Decimal", self.price))?;
+        let quantity = self.quantity.parse::<Decimal>()
+            .with_context(|| format!("Failed to parse quantity string '{}' as Decimal", self.quantity))?;
+        Ok((price, quantity))
+    }
+}
+
+// 定义增量数据的整体结构
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")] // JSON 的键是驼峰式命名 (camelCase)
+struct DepthUpdateData {
+    event_type: String,
+    symbol: String,
+    timestamp: u64, // serde 可以自动将数字字符串解析为 u64
+    last_update_id: String, // serde 也可以处理这个
+    version: String,
+    #[serde(rename = "topic_info")]
+    topic_info: String,
+    asks: Vec<PriceLevel>, // 数组内容是 PriceLevel 对象
+    bids: Vec<PriceLevel>, // 数组内容是 PriceLevel 对象
+}
+
 impl DataManager {
 impl DataManager {
     pub fn new(exchange_info_map: HashMap<String, Value>) -> Self {
     pub fn new(exchange_info_map: HashMap<String, Value>) -> Self {
         let klines_map: HashMap<String, Vec<Value>> = HashMap::new();
         let klines_map: HashMap<String, Vec<Value>> = HashMap::new();
@@ -182,7 +220,74 @@ impl DataManager {
         Ok(())
         Ok(())
     }
     }
 
 
-    pub async fn process_incremental_depth(&mut self, _incremental_depth: &Value) -> Result<()> {
+    pub async fn process_depth_update(&mut self, incremental_depth: &Value) -> Result<()> {
+        // info!("{}", serde_json::to_string_pretty(incremental_depth)?);
+
+        // --- 解析 JSON 字符串 ---
+        // 使用 serde_json::from_str 并通过 anyhow 进行错误处理
+        let parsed_data = from_value::<DepthUpdateData>(incremental_depth.clone())
+            .context("Failed to parse the incremental depth update JSON")?;
+
+        // --- 打印格式化后的数据 ---
+        // println!("--- Incremental Depth Update Parsed ---");
+        // println!("Symbol: {}", parsed_data.symbol);
+        // println!("Event Type: {}", parsed_data.event_type);
+        // println!("Timestamp: {}", parsed_data.timestamp);
+        // println!("Last Update ID: {}", parsed_data.last_update_id);
+        // println!("---------------------------------------\n");
+        let key = parsed_data.symbol;
+
+        // --- 使用 anyhow 重构后的打印逻辑 ---
+        // println!("Asks Updates:");
+        // 直接在循环中使用 `?`
+        for (i, ask_level) in parsed_data.asks.iter().enumerate() {
+            // 如果 to_decimals() 失败, `?` 会立即让整个函数返回 Err
+            let (price, quantity) = ask_level.to_decimals()
+                .with_context(|| format!("Error parsing ask level #{}, {:?}", i + 1, ask_level))?;
+            let mut value = price * quantity;
+            value.rescale(8);
+            
+            // println!("  {}. Price: {}, Quantity: {}", i + 1, price, quantity);
+        }
+
+        // println!("\nBids Updates:");
+        // 同样在 bids 循环中使用 `?`
+        for (i, bid_level) in parsed_data.bids.iter().enumerate() {
+            let (price, quantity) = bid_level.to_decimals()
+                .with_context(|| format!("Error parsing bid level #{}, {:?}", i + 1, bid_level))?;
+            let mut value = price * quantity;
+            value.rescale(8);
+
+            // println!("  {}. Price: {}, Quantity: {}", i + 1, price, quantity);
+        }
+
+        Ok(())
+    }
+    
+    pub async fn dispatch_message(&mut self, response: &Response) -> Result<()> {
+        // 1. 预解析为通用的 Value
+        let v = response.data.clone();
+
+        // 2. 获取 topic_info 字段用于路由
+        // and_then 确保了 get 返回 Some 时才调用 as_str
+        // context 在任何一步失败时提供错误信息 (字段不存在,或不是字符串)
+        let topic_info = v
+            .get("topic_info")
+            .and_then(Value::as_str)
+            .context("Message is missing 'topic_info' field or it's not a string")?;
+
+        // 3. 根据 topic_info 的内容进行分发 (match)
+        if topic_info.contains("spot@public.kline.v3.api.pb") {
+            // 如果是K线数据,调用 process_kline
+            self.process_klines(&v).await?;
+        } else if topic_info.contains("spot@public.aggre.depth.v3.api.pb") {
+            // 如果是增量深度数据,调用 process_depth_update
+            self.process_depth_update(&v).await?;
+        } else {
+            // 如果是未知的 topic,返回一个错误
+            bail!("Received a message with an unknown topic_info: {}", topic_info);
+        }
+
         Ok(())
         Ok(())
     }
     }
 }
 }

+ 2 - 3
src/ws_manager.rs

@@ -71,9 +71,8 @@ impl WsManager {
                     let mut dm_guard = dm_clone.lock().await;
                     let mut dm_guard = dm_clone.lock().await;
                     dm_guard.record_latency(received_at, origin_timestamp);
                     dm_guard.record_latency(received_at, origin_timestamp);
 
 
-                    // 提前获取depth全量数据
-                    // info!("{}", serde_json::to_string_pretty(&response.data).unwrap());
-                    dm_guard.process_incremental_depth(&response.data).await.unwrap();
+                    // 交给消息分发函数
+                    dm_guard.dispatch_message(&response).await.unwrap();
                 }
                 }
             };
             };