Browse Source

写完了,下午学习下这些。

skyfffire 4 months ago
parent
commit
7667a91bb2
2 changed files with 142 additions and 13 deletions
  1. 133 8
      src/data_manager.rs
  2. 9 5
      src/ws_manager.rs

+ 133 - 8
src/data_manager.rs

@@ -1,25 +1,67 @@
 use std::collections::HashMap;
 use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
 use rust_decimal::Decimal;
-use serde_json::Value;
-use anyhow::Result;
+use serde_json::{from_value, Value};
+use anyhow::{Context, Result};
+use serde::{Deserialize, Serialize};
 use tracing::warn;
 
 pub struct DataManager {
     pub exchange_info_map: HashMap<String, Value>,
     pub klines_map: HashMap<String, Vec<Value>>,
-    pub asks_map: HashMap<String, HashMap<Decimal, Decimal>>,
-    pub bids_map: HashMap<String, HashMap<Decimal, Decimal>>,
+    pub asks_map: HashMap<String, Vec<Vec<Decimal>>>,
+    pub bids_map: HashMap<String, Vec<Vec<Decimal>>>,
 
     pub delay_total: AtomicI64,
     pub delay_count: AtomicU64,
 }
 
+// 定义 JSON 对象的整体结构
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct FullDepthData {
+    symbol: String,
+    asks: Vec<OrderBookEntry>,
+    bids: Vec<OrderBookEntry>,
+    #[serde(rename = "lastUpdateId")]
+    last_update_id: u64,
+    timestamp: u64,
+}
+
+// 定义 asks 和 bids 数组中每个元素的结构
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(untagged)]
+enum OrderBookEntry {
+    // 这种方式让 serde_json 直接尝试将字符串解析为 Decimal
+    // 如果字符串不是有效的Decimal,它会失败
+    TupleDecimal(Decimal, Decimal),
+    // 备份:如果 TupleDecimal 失败,尝试解析为字符串,
+    // 在 to_decimals() 中手动转换和处理错误
+    TupleString(String, String),
+}
+
+impl OrderBookEntry {
+    // 确保能够获取到 Decimal 值,如果直接解析的方式失败,可以回退到字符串解析
+    fn to_decimals(&self) -> Result<(Decimal, Decimal)> {
+        match self {
+            OrderBookEntry::TupleDecimal(price, quantity) => Ok((*price, *quantity)),
+            OrderBookEntry::TupleString(price_str, quantity_str) => {
+                // 现在可以使用 ? 操作符,它会自动将错误转换为 anyhow::Error
+                let price = price_str.parse::<Decimal>()
+                    .context(format!("Failed to parse price string '{}' as Decimal", price_str))?;
+                let quantity = quantity_str.parse::<Decimal>()
+                    .context(format!("Failed to parse quantity string '{}' as Decimal", quantity_str))?;
+                Ok((price, quantity))
+            }
+        }
+    }
+}
+
 impl DataManager {
     pub fn new(exchange_info_map: HashMap<String, Value>) -> Self {
         let klines_map: HashMap<String, Vec<Value>> = HashMap::new();
-        let asks_map: HashMap<String, HashMap<Decimal, Decimal>> = HashMap::new();
-        let bids_map: HashMap<String, HashMap<Decimal, Decimal>> = HashMap::new();
+        let asks_map: HashMap<String, Vec<Vec<Decimal>>> = HashMap::new();
+        let bids_map: HashMap<String, Vec<Vec<Decimal>>> = HashMap::new();
 
         DataManager {
             exchange_info_map,
@@ -53,11 +95,94 @@ impl DataManager {
         self.delay_count.store(0, Ordering::Relaxed); // 原子写
     }
 
-    pub async fn process_klines_map(&mut self, _kline: Value) -> Result<()> {
+    pub async fn process_klines(&mut self, _kline: &Value) -> Result<()> {
+        Ok(())
+    }
+
+    pub async fn process_full_depth(&mut self, full_depth: &Value) -> Result<()> {
+        // 1. 使用 `?` 操作符代替 match.
+        //    如果 from_value 失败,函数会立即返回 Err(anyhow::Error).
+        //    .context() 会在错误传播链上添加有用的信息。
+        let data = from_value::<FullDepthData>(full_depth.clone())
+            .context("Failed to parse the full_depth Value into the FullDepthData struct")?;
+
+        // 2. 如果上面那行没有返回Err,说明成功了。
+        //    这里的代码就是 "happy path",不再需要嵌套在 Ok(...) 分支里。
+        let key = data.symbol;
+
+        // asks
+        for (i, entry) in data.asks.iter().enumerate() {
+            let (price, quantity) = entry.to_decimals()
+                .with_context(|| format!("Failed to parse ask entry #{}, content: {:?}", i + 1, entry))?;
+            let mut value = price * quantity;
+            value.rescale(8);
+
+            // 如果该交易对的asks还不存在,进行初始化
+            if !self.asks_map.contains_key(&key) {
+                self.asks_map.insert(key.to_string(), vec![]);
+            }
+
+            // 按顺序添加
+            let asks = self.asks_map.get_mut(&key).unwrap();
+            asks.push(vec![price, quantity, value]);
+        }
+
+        // bids
+        for (i, entry) in data.bids.iter().enumerate() {
+            let (price, quantity) = entry.to_decimals()
+                .with_context(|| format!("Failed to parse bid entry #{}, content: {:?}", i + 1, entry))?;
+            let mut value = price * quantity;
+            value.rescale(8);
+
+            // 如果该交易对的bids还不存在,进行初始化
+            if !self.bids_map.contains_key(&key) {
+                self.bids_map.insert(key.to_string(), vec![]);
+            }
+
+            // 按顺序添加
+            let bids = self.bids_map.get_mut(&key).unwrap();
+            bids.push(vec![price, quantity, value]);
+        }
+
+        // // --- 添加以下打印语句来验证 ---
+        // println!("\n--- Processed Order Book Map ---");
+        // 
+        // // 打印 asks_map
+        // if let Some(asks_list) = self.asks_map.get(&key) {
+        //     println!("Asks for {}:", key);
+        //     for (idx, entry) in asks_list.iter().enumerate() {
+        //         // 确保 entry 是 Vec<Decimal> 且至少有三个元素
+        //         if entry.len() >= 3 {
+        //             println!("  Entry {}: Price={}, Quantity={}, Value={}", idx + 1, entry[0], entry[1], entry[2]);
+        //         } else {
+        //             println!("  Entry {}: Invalid format {:?}", idx + 1, entry);
+        //         }
+        //     }
+        // } else {
+        //     println!("No asks data found for {}", key);
+        // }
+        // 
+        // // 打印 bids_map
+        // if let Some(bids_list) = self.bids_map.get(&key) {
+        //     println!("Bids for {}:", key);
+        //     for (idx, entry) in bids_list.iter().enumerate() {
+        //         if entry.len() >= 3 {
+        //             println!("  Entry {}: Price={}, Quantity={}, Value={}", idx + 1, entry[0], entry[1], entry[2]);
+        //         } else {
+        //             println!("  Entry {}: Invalid format {:?}", idx + 1, entry);
+        //         }
+        //     }
+        // } else {
+        //     println!("No bids data found for {}", key);
+        // }
+        // 
+        // println!("----------------------------------");
+
+        // 如果所有操作都成功,函数最后返回 Ok(())
         Ok(())
     }
 
-    pub async fn process_depth_data(&mut self, _depth: Value) -> Result<()> {
+    pub async fn process_incremental_depth(&mut self, _incremental_depth: &Value) -> Result<()> {
         Ok(())
     }
 }

+ 9 - 5
src/ws_manager.rs

@@ -73,7 +73,7 @@ impl WsManager {
 
                     // 提前获取depth全量数据
                     // info!("{}", serde_json::to_string_pretty(&response.data).unwrap());
-                    dm_guard.process_depth_data(response.data).await.unwrap();
+                    dm_guard.process_incremental_depth(&response.data).await.unwrap();
                 }
             };
 
@@ -93,11 +93,15 @@ impl WsManager {
                     "limit": 100,
                 });
 
-                let depth_response = rest_client.depth(params).await;
+                let mut depth_response = rest_client.depth(params).await;
 
-                if let Some(object_map) = depth_response.data.as_object() {
-                    let keys: Vec<String> = object_map.keys().map(|s| s.to_string()).collect();
-                    info!("Symbol: {}, Keys in the top-level object: {:?}", symbol, keys);
+                if let Some(data_obj) = depth_response.data.as_object_mut() {
+                    // let keys: Vec<String> = data_obj.keys().map(|s| s.to_string()).collect();
+                    data_obj.insert("symbol".to_string(), Value::String(symbol));
+                    
+                    let mut dm_guard = data_manager_am.lock().await;
+                    
+                    dm_guard.process_full_depth(&depth_response.data).await?;
                 } else {
                     info!("The top-level value is not a JSON object:{:?}", serde_json::to_string_pretty(&depth_response.data)?);
                 }