Quellcode durchsuchen

深度处理好了

skyffire vor 6 Monaten
Ursprung
Commit
5f3b8a2cf0
6 geänderte Dateien mit 110 neuen und 16 gelöschten Zeilen
  1. 2 1
      .gitignore
  2. 1 0
      Cargo.toml
  3. 2 2
      build.rs
  4. 29 7
      proto/mexc_spot_increase_depth_v3.proto
  5. 76 5
      src/exchange/mexc_spot_ws.rs
  6. 0 1
      src/main.rs

+ 2 - 1
.gitignore

@@ -2,4 +2,5 @@
 .idea/
 Cargo.lock
 Config.json
-logs/
+logs/
+un_decode.bin

+ 1 - 0
Cargo.toml

@@ -75,6 +75,7 @@ sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite", "macros"] }
 backtrace = "0.3.74"
 
 prost = "0.11"
+hex = "0.4.3"
 
 # =======================================================
 # 以下是一些在开发过程中可能会用到的devDependencies,只用于开发和测试,不包含在最终发布版本中

+ 2 - 2
build.rs

@@ -6,7 +6,7 @@ use std::io::Result;
 fn main() -> Result<()> {
     // 告诉 Cargo,如果在 proto/mexc_depth.proto 文件发生变化时,需要重新运行 build 脚本
     // 注意路径现在是 proto/mexc_depth.proto
-    // println!("cargo:rerun-if-changed=proto/mexc_spot_increase_depth_v3.proto");
+    println!("cargo:rerun-if-changed=proto/mexc_spot_increase_depth_v3.proto");
     // 如果有多个 proto 文件,可以为每个文件添加 rerun-if-changed
     println!("cargo:rerun-if-changed=proto/mexc_spot_kline_v3.proto");
 
@@ -15,7 +15,7 @@ fn main() -> Result<()> {
     // 第二个参数是寻找 proto 文件的根目录列表,这里是 "&["proto/"]"
     prost_build::compile_protos(
         &[
-            // "proto/mexc_spot_increase_depth_v3.proto", // 修改这里的路径
+            "proto/mexc_spot_increase_depth_v3.proto", // 修改这里的路径
             "proto/mexc_spot_kline_v3.proto", // 如果有其他 proto 文件也添加进来
         ],
         &["proto/"] // 指定 proto 文件的根目录

+ 29 - 7
proto/mexc_spot_increase_depth_v3.proto

@@ -7,15 +7,37 @@ option optimize_for = SPEED;
 option java_multiple_files = true;
 option java_outer_classname = "PublicIncreaseDepthsV3ApiProto";
 
-message PublicIncreaseDepthsV3Api {
+// 单个买卖盘条目的结构 (与原始 .proto 保持一致)
+message DepthItemV3 {
+  string price = 1;
+  string quantity = 2;
+}
+
+// 实际深度数据内容的嵌套消息
+message DepthDataContentV3 {
+  // Ask 列表 (repeated DepthItemV3),在嵌套消息内部使用 Tag 1
+  repeated DepthItemV3 asks = 1;
+  // Bid 列表 (repeated DepthItemV3),在嵌套消息内部使用 Tag 2
+  repeated DepthItemV3 bids = 2;
 
-  repeated PublicIncreaseDepthV3ApiItem asks  = 1;
-  repeated PublicIncreaseDepthV3ApiItem bids  = 2;
+  // 事件类型
   string eventType = 3;
+  // 版本号
   string version = 4;
+  // 最后更新 ID (或者其他某种标识)
+  string lastUpdateId = 5; // 根据 protoc --decode_raw 输出 Tag 5 的值和上下文命名
 }
 
-message PublicIncreaseDepthV3ApiItem {
-  string price = 1;
-  string quantity = 2;
-}
+// 顶层接收到的深度数据消息结构
+message PublicIncreaseDepthsV3ApiMessage {
+  // Tag 1 可能包含 Topic 信息
+  string topic_info = 1;
+  // Tag 3 可能包含 Symbol
+  string symbol = 3;
+  // Tag 6 可能是一个时间戳
+  int64 timestamp = 6; // 根据 protoc --decode_raw 输出 Tag 6 的值和上下文命名为 timestamp
+
+  // Tag 313 嵌套了实际的深度数据内容
+  // 注意这里的 Tag 编号是 313
+  DepthDataContentV3 depth_data = 313;
+}

+ 76 - 5
src/exchange/mexc_spot_ws.rs

@@ -27,9 +27,12 @@ pub mod mexc_spot {
     include!(concat!(env!("OUT_DIR"), "/_.rs"));
 }
 
-// use mexc_spot::PublicIncreaseDepthsV3Api; // 使用 proto 中定义的消息名
 use mexc_spot::PublicSpotKlineV3ApiMessage;
 use mexc_spot::KlineDataV3;
+// 引入新的结构体
+use mexc_spot::PublicIncreaseDepthsV3ApiMessage;
+use mexc_spot::DepthDataContentV3;
+use mexc_spot::DepthItemV3;
 
 #[derive(Debug)]
 #[derive(Clone)]
@@ -247,7 +250,7 @@ impl MexcSpotWs {
     }
     //数据解析-二进制
     pub fn message_binary(po: Vec<u8>) -> Option<Response> {
-        info!("Received binary message ({} bytes)", po.len());
+        // info!("Received binary message ({} bytes)", po.len());
 
         // 1. 尝试用新的顶层消息结构 PublicSpotKlineV3ApiMessage 来解析 K 线数据
         // 根据 Topic 前缀判断依然有效,但现在是判断是否**可能**是 K 线相关消息
@@ -312,8 +315,76 @@ impl MexcSpotWs {
             }
         }
 
-        // 3. 如果都不是已知的 Protobuf 类型,处理未知消息
+        // 2. 尝试解析深度数据 (使用新的结构体)
+        if prefix_string.contains("spot@public.aggre.depth.v3.api.pb") {
+            // info!("通过 Topic 前缀判断为深度数据");
+
+            // 尝试解析为 PublicIncreaseDepthsV3ApiMessage (新的顶层深度消息)
+            match PublicIncreaseDepthsV3ApiMessage::decode(&po[..]) {
+                Ok(depth_message) => {
+                    // info!("成功解析为顶层深度消息结构");
+
+                    // 检查是否包含嵌套的 depth_data 字段 (Tag 313)
+                    if let Some(depth_data_content) = depth_message.depth_data {
+                        // info!("找到并成功访问嵌套的 DepthDataContentV3");
+
+                        // 填充 Response 并返回
+                        let response_data = Response::new(
+                            depth_message.topic_info.clone(), // 使用解析到的 Topic
+                            200, "OK".to_string(),
+                            serde_json::json!({
+                                 // 嵌套消息内部的字段
+                                "asks": depth_data_content.asks.into_iter().map(|item| serde_json::json!({"price": item.price, "quantity": item.quantity})).collect::<Vec<_>>(),
+                                "bids": depth_data_content.bids.into_iter().map(|item| serde_json::json!({"price": item.price, "quantity": item.quantity})).collect::<Vec<_>>(),
+                                "eventType": depth_data_content.event_type,
+                                "version": depth_data_content.version,
+                                "lastUpdateId": depth_data_content.last_update_id, // 新增字段
+
+                                // 顶层字段
+                                 "topic_info": depth_message.topic_info,
+                                "symbol": depth_message.symbol,
+                                 "timestamp": depth_message.timestamp, // 新增字段
+                            })
+                        );
+                        return Some(response_data);
+
+                    } else {
+                        info!("顶层深度消息结构解析成功,但未找到嵌套的 depth_data 字段 (Tag 313)");
+                        // 处理只有顶层字段的深度相关消息
+                        return Some(Response::new(
+                            depth_message.topic_info.clone(),
+                            200, "OK (Control Message)".to_string(),
+                            serde_json::json!({
+                               "topic_info": depth_message.topic_info,
+                               "symbol": depth_message.symbol,
+                               "timestamp": depth_message.timestamp,
+                           })
+                        ));
+                    }
+                }
+                Err(e) => {
+                    error!("解析深度消息 PublicIncreaseDepthsV3ApiMessage 失败: {:?}", e);
+                    // 保存数据以供分析
+                    let file_path = Path::new("depth_error_data.bin");
+                    // ... 保存 po 到文件 ...
+                    return Some(Response::new("".to_string(), 500, format!("Protobuf 深度消息解析出错: {:?}", e), Value::Null));
+                }
+            }
+        }
+
+        // 如果都不是已知的 Protobuf 类型,处理未知消息
         error!("无法将二进制消息解析为任何已知 Protobuf 类型");
+        // *** 在这里将原始二进制数据保存到文件 ***
+        let file_path = Path::new("un_decode.bin");
+        match File::create(&file_path) {
+            Ok(mut file) => {
+                match file.write_all(&po) {
+                    Ok(_) => info!("原始 K 线二进制数据保存到 {:?}", file_path),
+                    Err(write_e) => error!("保存 K 线二进制数据失败: {:?}", write_e),
+                }
+            }
+            Err(create_e) => error!("创建文件 {:?} 失败: {:?}", file_path, create_e),
+        }
         Some(Response::new("".to_string(), 400, "无法解析未知二进制消息".to_string(), Value::Null))
     }
     //数据解析
@@ -393,8 +464,8 @@ mod tests {
         let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
 
         ws.set_subscribe(vec![
-            MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
-            // MexcSpotWsSubscribeType::PuFuturesDepth
+            // MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
+            MexcSpotWsSubscribeType::PuFuturesDepth
         ]);
 
         ws.set_symbols(vec!["BTC_USDT".to_string()]);

+ 0 - 1
src/main.rs

@@ -13,7 +13,6 @@ use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use tracing::{info, warn};
 use utils::log_setup;
-use tokio::sync::Mutex;
 
 #[tokio::main]
 async fn main() {