Переглянути джерело

增量信息更新搞定了。

skyfffire 4 місяців тому
батько
коміт
acbb64df6d
2 змінених файлів з 71 додано та 55 видалено
  1. 69 53
      src/data_manager.rs
  2. 2 2
      src/main.rs

+ 69 - 53
src/data_manager.rs

@@ -1,17 +1,18 @@
-use std::collections::HashMap;
+use std::cmp::Reverse;
+use std::collections::{BTreeMap, HashMap};
 use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
 use rust_decimal::Decimal;
 use serde_json::{from_value, Value};
 use anyhow::{bail, Context, Result};
 use serde::{Deserialize, Serialize};
-use tracing::{info, warn};
+use tracing::{warn};
 use crate::exchange::response_base::Response;
 
 pub struct DataManager {
     pub exchange_info_map: HashMap<String, Value>,
     pub klines_map: HashMap<String, Vec<Value>>,
-    pub asks_map: HashMap<String, Vec<Vec<Decimal>>>,
-    pub bids_map: HashMap<String, Vec<Vec<Decimal>>>,
+    pub asks_map: HashMap<String, BTreeMap<Decimal, Decimal>>,
+    pub bids_map: HashMap<String, BTreeMap<Reverse<Decimal>, Decimal>>,
 
     pub delay_total: AtomicI64,
     pub delay_count: AtomicU64,
@@ -98,8 +99,8 @@ struct DepthUpdateData {
 impl DataManager {
     pub fn new(exchange_info_map: HashMap<String, Value>) -> Self {
         let klines_map: HashMap<String, Vec<Value>> = HashMap::new();
-        let asks_map: HashMap<String, Vec<Vec<Decimal>>> = HashMap::new();
-        let bids_map: HashMap<String, Vec<Vec<Decimal>>> = HashMap::new();
+        let asks_map: HashMap<String, BTreeMap<Decimal, Decimal>> = HashMap::new();
+        let bids_map: HashMap<String, BTreeMap<Reverse<Decimal>, Decimal>> = HashMap::new();
 
         DataManager {
             exchange_info_map,
@@ -136,6 +137,33 @@ impl DataManager {
     pub async fn process_klines(&mut self, _kline: &Value) -> Result<()> {
         Ok(())
     }
+    
+    fn print_order_book(&mut self, key: &String) {
+        // --- 添加以下打印语句来验证 ---
+        println!("\n--- Processed Order Book Map ---");
+        
+        // 打印 asks_map
+        if let Some(asks) = self.asks_map.get(key) {
+            println!("Asks for {}:", key);
+            for (idx, (price, quantity)) in asks.iter().enumerate() {
+                println!("  Entry {}: Price={}, Quantity={}", idx + 1, price, quantity);
+            }
+        } else {
+            println!("No asks data found for {}", key);
+        }
+        
+        // 打印 bids_map
+        if let Some(bids) = self.bids_map.get(key) {
+            println!("Bids for {}:", key);
+            for (idx, (r_price, quantity)) in bids.iter().enumerate() {
+                println!("  Entry {}: Price={}, Quantity={}", idx + 1, r_price.0, quantity);
+            }
+        } else {
+            println!("No bids data found for {}", key);
+        }
+        
+        println!("----------------------------------");
+    }
 
     pub async fn process_full_depth(&mut self, full_depth: &Value) -> Result<()> {
         // 1. 使用 `?` 操作符代替 match.
@@ -152,69 +180,33 @@ impl DataManager {
         for (i, entry) in data.asks.iter().enumerate() {
             let (price, quantity) = entry.to_decimals()
                 .with_context(|| format!("Failed to parse ask entry #{}, content: {:?}", i + 1, entry))?;
-            let mut value = price * quantity;
-            value.rescale(8);
 
             // 如果该交易对的asks还不存在,进行初始化
             if !self.asks_map.contains_key(&key) {
-                self.asks_map.insert(key.to_string(), vec![]);
+                self.asks_map.insert(key.to_string(), BTreeMap::new());
             }
 
             // 按顺序添加
             let asks = self.asks_map.get_mut(&key).unwrap();
-            asks.push(vec![price, quantity, value]);
+            asks.insert(price, quantity);
         }
 
         // bids
         for (i, entry) in data.bids.iter().enumerate() {
             let (price, quantity) = entry.to_decimals()
                 .with_context(|| format!("Failed to parse bid entry #{}, content: {:?}", i + 1, entry))?;
-            let mut value = price * quantity;
-            value.rescale(8);
 
             // 如果该交易对的bids还不存在,进行初始化
             if !self.bids_map.contains_key(&key) {
-                self.bids_map.insert(key.to_string(), vec![]);
+                self.bids_map.insert(key.to_string(), BTreeMap::new());
             }
 
             // 按顺序添加
             let bids = self.bids_map.get_mut(&key).unwrap();
-            bids.push(vec![price, quantity, value]);
+            bids.insert(Reverse(price), quantity);
         }
 
-        // // --- 添加以下打印语句来验证 ---
-        // println!("\n--- Processed Order Book Map ---");
-        // 
-        // // 打印 asks_map
-        // if let Some(asks_list) = self.asks_map.get(&key) {
-        //     println!("Asks for {}:", key);
-        //     for (idx, entry) in asks_list.iter().enumerate() {
-        //         // 确保 entry 是 Vec<Decimal> 且至少有三个元素
-        //         if entry.len() >= 3 {
-        //             println!("  Entry {}: Price={}, Quantity={}, Value={}", idx + 1, entry[0], entry[1], entry[2]);
-        //         } else {
-        //             println!("  Entry {}: Invalid format {:?}", idx + 1, entry);
-        //         }
-        //     }
-        // } else {
-        //     println!("No asks data found for {}", key);
-        // }
-        // 
-        // // 打印 bids_map
-        // if let Some(bids_list) = self.bids_map.get(&key) {
-        //     println!("Bids for {}:", key);
-        //     for (idx, entry) in bids_list.iter().enumerate() {
-        //         if entry.len() >= 3 {
-        //             println!("  Entry {}: Price={}, Quantity={}, Value={}", idx + 1, entry[0], entry[1], entry[2]);
-        //         } else {
-        //             println!("  Entry {}: Invalid format {:?}", idx + 1, entry);
-        //         }
-        //     }
-        // } else {
-        //     println!("No bids data found for {}", key);
-        // }
-        // 
-        // println!("----------------------------------");
+        // self.print_order_book(&key);
 
         // 如果所有操作都成功,函数最后返回 Ok(())
         Ok(())
@@ -244,9 +236,20 @@ impl DataManager {
             // 如果 to_decimals() 失败, `?` 会立即让整个函数返回 Err
             let (price, quantity) = ask_level.to_decimals()
                 .with_context(|| format!("Error parsing ask level #{}, {:?}", i + 1, ask_level))?;
-            let mut value = price * quantity;
-            value.rescale(8);
-            
+
+            // 该交易对已经初始化过,才insert
+            if self.asks_map.contains_key(&key) {
+                let asks = self.asks_map.get_mut(&key).unwrap();
+                
+                if quantity.is_zero() {
+                    // 数量为0,移除该价格级别
+                    asks.remove(&price);
+                } else {
+                    // 插入或更新
+                    asks.insert(price, quantity);
+                }
+            }
+
             // println!("  {}. Price: {}, Quantity: {}", i + 1, price, quantity);
         }
 
@@ -255,15 +258,28 @@ impl DataManager {
         for (i, bid_level) in parsed_data.bids.iter().enumerate() {
             let (price, quantity) = bid_level.to_decimals()
                 .with_context(|| format!("Error parsing bid level #{}, {:?}", i + 1, bid_level))?;
-            let mut value = price * quantity;
-            value.rescale(8);
+
+            // 该交易对已经初始化过,才insert
+            if self.bids_map.contains_key(&key) {
+                let bids = self.bids_map.get_mut(&key).unwrap();
+                
+                if quantity.is_zero() {
+                    // 移除时,也需要用 Reverse 包装 key
+                    bids.remove(&Reverse(price));
+                } else {
+                    // 插入或更新时,用 Reverse 包装 key
+                    bids.insert(Reverse(price), quantity);
+                }
+            }
 
             // println!("  {}. Price: {}, Quantity: {}", i + 1, price, quantity);
         }
 
+        self.print_order_book(&key);
+
         Ok(())
     }
-    
+
     pub async fn dispatch_message(&mut self, response: &Response) -> Result<()> {
         // 1. 预解析为通用的 Value
         let v = response.data.clone();

+ 2 - 2
src/main.rs

@@ -13,7 +13,7 @@ use std::collections::{HashMap, HashSet};
 use backtrace::Backtrace;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
-use anyhow::{anyhow};
+use anyhow::{anyhow, bail};
 use anyhow::Result;
 use serde_json::Value;
 use tokio::sync::Mutex;
@@ -120,7 +120,7 @@ pub async fn run_mexc_subscriptions(
     let filtered_map = process_exchange_info(&default_symbols_response.data, &exchange_info_response.data)?;
     info!("成功过滤并转换了交易对信息,最终数量: {}", filtered_map.len());
     if filtered_map.is_empty() {
-        return Err(anyhow!("未能获取到任何有效的交易对,请检查网络或 API 接口。"));
+        bail!("未能获取到任何有效的交易对,请检查网络或 API 接口。");
     }
 
     let mut symbols: Vec<String> = filtered_map.keys().cloned().collect();