Răsfoiți Sursa

k线处理好了。

skyffire 6 luni în urmă
părinte
comite
3de8c123e1
3 a modificat fișierele cu 98 adăugiri și 22 ștergeri
  1. 2 2
      build.rs
  2. 21 2
      proto/mexc_spot_kline_v3.proto
  3. 75 18
      src/exchange/mexc_spot_ws.rs

+ 2 - 2
build.rs

@@ -6,7 +6,7 @@ use std::io::Result;
 fn main() -> Result<()> {
     // 告诉 Cargo,如果在 proto/mexc_depth.proto 文件发生变化时,需要重新运行 build 脚本
     // 注意路径现在是 proto/mexc_depth.proto
-    println!("cargo:rerun-if-changed=proto/mexc_spot_increase_depth_v3.proto");
+    // println!("cargo:rerun-if-changed=proto/mexc_spot_increase_depth_v3.proto");
     // 如果有多个 proto 文件,可以为每个文件添加 rerun-if-changed
     println!("cargo:rerun-if-changed=proto/mexc_spot_kline_v3.proto");
 
@@ -15,7 +15,7 @@ fn main() -> Result<()> {
     // 第二个参数是寻找 proto 文件的根目录列表,这里是 "&["proto/"]"
     prost_build::compile_protos(
         &[
-            "proto/mexc_spot_increase_depth_v3.proto", // 修改这里的路径
+            // "proto/mexc_spot_increase_depth_v3.proto", // 修改这里的路径
             "proto/mexc_spot_kline_v3.proto", // 如果有其他 proto 文件也添加进来
         ],
         &["proto/"] // 指定 proto 文件的根目录

+ 21 - 2
proto/mexc_spot_kline_v3.proto

@@ -7,7 +7,8 @@ option optimize_for = SPEED;
 option java_multiple_files = true;
 option java_outer_classname = "PublicSpotKlineV3ApiProto";
 
-message PublicSpotKlineV3Api {
+// 这是实际的 K 线数据结构,它现在变成了一个内部的消息
+message KlineDataV3 {
   //K线周期(Min1,Min5,Min15,Min30,Min60,Hour4,Hour8,Day1,Week1,Month1)
   string interval = 1;
   // 窗口开始时间戳(秒时间戳)
@@ -26,4 +27,22 @@ message PublicSpotKlineV3Api {
   string amount = 8;
   // 窗口结束时间戳(秒时间戳)
   int64 windowEnd = 9;
-}
+  // 注意:如果根据 decode_raw 输出,还有其他内部字段,也在这里添加
+}
+
+// 这是顶层接收到的消息结构
+message PublicSpotKlineV3ApiMessage {
+  // Tag 1 可能包含 Topic 信息
+  string topic_info = 1;
+  // Tag 3 可能包含 Symbol
+  string symbol = 3;
+  // Tag 4 可能包含某种 ID 或标识符
+  string id_info = 4;
+  // Tag 5 可能是一个时间戳或版本号,根据值大小和你的需要定义类型
+  // 1745894084490 看起来像毫秒时间戳 (或者其他什么)
+  // 如果需要解析为数字,改为 sint64 或 int64。如果不需要深入解析,可以仍然定义为 bytes 或 string
+  sint64 timestamp_or_version = 5; // 假设它是一个 sint64
+
+  // Tag 308 是包含实际 K 线数据的嵌套消息
+  KlineDataV3 kline_data = 308; // <-- 这里的 Tag 编号是 308,类型是我们上面定义的 KlineDataV3 消息
+}

+ 75 - 18
src/exchange/mexc_spot_ws.rs

@@ -1,11 +1,14 @@
 use std::fmt::format;
-use std::io::Read;
+use std::fs::File;
+use std::io::{Read, Write};
+use std::path::Path;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 
 use flate2::read::GzDecoder;
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use prost::Message as ProstMessage;
 use serde_json::json;
 use serde_json::Value;
 use tokio::sync::Mutex;
@@ -24,8 +27,9 @@ pub mod mexc_spot {
     include!(concat!(env!("OUT_DIR"), "/_.rs"));
 }
 
-use mexc_spot::PublicIncreaseDepthsV3Api; // 使用 proto 中定义的消息名
-use mexc_spot::PublicSpotKlineV3Api; // 假设 KlineData 是 kline.proto 中定义的消息名
+// use mexc_spot::PublicIncreaseDepthsV3Api; // 使用 proto 中定义的消息名
+use mexc_spot::PublicSpotKlineV3ApiMessage;
+use mexc_spot::KlineDataV3;
 
 #[derive(Debug)]
 #[derive(Clone)]
@@ -243,21 +247,74 @@ impl MexcSpotWs {
     }
     //数据解析-二进制
     pub fn message_binary(po: Vec<u8>) -> Option<Response> {
-        info!("{:?}", po);
-        //二进制WebSocket消息
-        // let mut gz_decoder = GzDecoder::new(&po[..]);
-        // let mut decompressed_data = Vec::new();
-
-        // 将解压后的字节向量转换为 UTF-8 字符串
-        match String::from_utf8(po) {
-            Ok(text) => {
-                let response_data = Self::ok_text(text);
-                Option::from(response_data)
-            }
-            Err(e) => {
-                Option::from(Response::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null))
+        info!("Received binary message ({} bytes)", po.len());
+
+        // 1. 尝试用新的顶层消息结构 PublicSpotKlineV3ApiMessage 来解析 K 线数据
+        // 根据 Topic 前缀判断依然有效,但现在是判断是否**可能**是 K 线相关消息
+        let prefix_len = po.len().min(100);
+        let prefix_string = String::from_utf8_lossy(&po[..prefix_len]);
+
+        if prefix_string.contains("spot@public.kline.v3.api.pb") {
+            // info!("通过 Topic 前缀判断为 K 线数据相关消息");
+
+            // 尝试解析为 PublicSpotKlineV3ApiMessage
+            match PublicSpotKlineV3ApiMessage::decode(&po[..]) {
+                Ok(kline_message) => {
+                    // info!("成功解析为顶层 K 线消息结构");
+                    // 检查是否包含嵌套的 KlineDataV3 字段 (Tag 308)
+                    if let Some(kline_data) = kline_message.kline_data { // 注意这里 PublicSpotKlineV3ApiMessage 的 kline_data 字段是 Option<KlineDataV3>
+                        // info!("找到并成功访问嵌套的 KlineDataV3");
+                        // 现在 kline_data 是 KlineDataV3 结构体,你可以使用它了!
+                        // 填充 Response 并返回 (省略详细实现)
+                        let response_data = Response::new(
+                            kline_message.topic_info.clone(), // 使用解析到的 Topic 信息
+                            200,
+                            "OK".to_string(),
+                        json!({
+                                "interval": kline_data.interval,
+                                "windowStart": kline_data.window_start, //注意 snake_case
+                                "openingPrice": kline_data.opening_price,
+                                "closingPrice": kline_data.closing_price,
+                                "highestPrice": kline_data.highest_price,
+                                "lowestPrice": kline_data.lowest_price,
+                                "volume": kline_data.volume,
+                                "amount": kline_data.amount,
+                                "windowEnd": kline_data.window_end,
+                                // 可以添加顶层字段的信息,如果需要
+                                "topic_info": kline_message.topic_info,
+                                "symbol": kline_message.symbol,
+                                "id_info": kline_message.id_info,
+                                "timestamp_or_version": kline_message.timestamp_or_version,
+                            })
+                        );
+                        return Some(response_data);
+                    } else {
+                        info!("顶层 K 线消息结构解析成功,但未找到嵌套的 kline_data 字段 (Tag 308)");
+                        // 这可能是一个只有顶层字段的控制消息
+                        return Some(Response::new(
+                            kline_message.topic_info.clone(), // 使用解析到的 Topic 信息
+                            200,
+                            "OK (Control Message)".to_string(),
+                            json!({
+                                "topic_info": kline_message.topic_info,
+                                "symbol": kline_message.symbol,
+                                "id_info": kline_message.id_info,
+                                "timestamp_or_version": kline_message.timestamp_or_version,
+                            })
+                        ));
+                    }
+                }
+                Err(e) => {
+                    error!("尝试解析为 PublicSpotKlineV3ApiMessage 失败: {:?}", e);
+
+                    return Some(Response::new("".to_string(), 500, format!("Protobuf K 线顶层消息解析出错: {:?}", e), Value::Null));
+                }
             }
         }
+
+        // 3. 如果都不是已知的 Protobuf 类型,处理未知消息
+        error!("无法将二进制消息解析为任何已知 Protobuf 类型");
+        Some(Response::new("".to_string(), 400, "无法解析未知二进制消息".to_string(), Value::Null))
     }
     //数据解析
     pub fn ok_text(text: String) -> Response
@@ -336,8 +393,8 @@ mod tests {
         let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
 
         ws.set_subscribe(vec![
-            // MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
-            MexcSpotWsSubscribeType::PuFuturesDepth
+            MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
+            // MexcSpotWsSubscribeType::PuFuturesDepth
         ]);
 
         ws.set_symbols(vec!["BTC_USDT".to_string()]);