소스 검색

解析二进制消息的部分明天再做了……太沙雕了

skyffire 6 달 전
부모
커밋
b285e943a6
6개의 변경된 파일109개의 추가작업 그리고 33개의 파일을 삭제
  1. 5 4
      Cargo.toml
  2. 24 0
      build.rs
  3. 21 0
      proto/mexc_spot_increase_depth_v3.proto
  4. 29 0
      proto/mexc_spot_kline_v3.proto
  5. 27 27
      src/exchange/mexc_spot_ws.rs
  6. 3 2
      src/exchange/socket_tool.rs

+ 5 - 4
Cargo.toml

@@ -74,10 +74,11 @@ thiserror = "1"
 sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite", "macros"] }
 backtrace = "0.3.74"
 
-# 可选项:更高级的配置管理库,如果配置结构比较复杂,可以考虑使用
-# config = "0.13"
+prost = "0.11"
 
 # =======================================================
 # 以下是一些在开发过程中可能会用到的devDependencies,只用于开发和测试,不包含在最终发布版本中
-# [dev-dependencies]
-# pretty_assertions = "1" # 用于在测试中提供更清晰的断言失败信息
+[dev-dependencies]
+
+[build-dependencies]
+prost-build = "0.11" # 或者最新版本

+ 24 - 0
build.rs

@@ -0,0 +1,24 @@
+// build.rs
+
+use prost_build;
+use std::io::Result;
+
+fn main() -> Result<()> {
+    // 告诉 Cargo,如果在 proto/mexc_depth.proto 文件发生变化时,需要重新运行 build 脚本
+    // 注意路径现在是 proto/mexc_depth.proto
+    println!("cargo:rerun-if-changed=proto/mexc_spot_increase_depth_v3.proto");
+    // 如果有多个 proto 文件,可以为每个文件添加 rerun-if-changed
+    println!("cargo:rerun-if-changed=proto/mexc_spot_kline_v3.proto");
+
+    // 使用 prost_build 编译 proto 文件
+    // 第一个参数是要编译的文件列表,注意路径是 proto/mexc_depth.proto
+    // 第二个参数是寻找 proto 文件的根目录列表,这里是 "&["proto/"]"
+    prost_build::compile_protos(
+        &[
+            "proto/mexc_spot_increase_depth_v3.proto", // 修改这里的路径
+            "proto/mexc_spot_kline_v3.proto", // 如果有其他 proto 文件也添加进来
+        ],
+        &["proto/"] // 指定 proto 文件的根目录
+    )?;
+    Ok(())
+}

+ 21 - 0
proto/mexc_spot_increase_depth_v3.proto

@@ -0,0 +1,21 @@
+// spot@public.increase.depth.v3.api.pb
+
+syntax = "proto3";
+
+option java_package = "com.mxc.push.common.protobuf";
+option optimize_for = SPEED;
+option java_multiple_files = true;
+option java_outer_classname = "PublicIncreaseDepthsV3ApiProto";
+
+message PublicIncreaseDepthsV3Api {
+
+  repeated PublicIncreaseDepthV3ApiItem asks  = 1;
+  repeated PublicIncreaseDepthV3ApiItem bids  = 2;
+  string eventType = 3;
+  string version = 4;
+}
+
+message PublicIncreaseDepthV3ApiItem {
+  string price = 1;
+  string quantity = 2;
+}

+ 29 - 0
proto/mexc_spot_kline_v3.proto

@@ -0,0 +1,29 @@
+// spot@public.kline.v3.api.pb@<symbol>@<interval>
+
+syntax = "proto3";
+
+option java_package = "com.mxc.push.common.protobuf";
+option optimize_for = SPEED;
+option java_multiple_files = true;
+option java_outer_classname = "PublicSpotKlineV3ApiProto";
+
+message PublicSpotKlineV3Api {
+  //K线周期(Min1,Min5,Min15,Min30,Min60,Hour4,Hour8,Day1,Week1,Month1)
+  string interval = 1;
+  // 窗口开始时间戳(秒时间戳)
+  int64 windowStart = 2;
+  // 开盘价
+  string openingPrice = 3;
+  // 收盘价
+  string closingPrice = 4;
+  // 最高价
+  string highestPrice = 5;
+  // 最低价
+  string lowestPrice = 6;
+  // 成交量
+  string volume = 7;
+  // 成交额
+  string amount = 8;
+  // 窗口结束时间戳(秒时间戳)
+  int64 windowEnd = 9;
+}

+ 27 - 27
src/exchange/mexc_spot_ws.rs

@@ -20,6 +20,12 @@ pub enum MexcSpotWsType {
     PublicAndPrivate,
 }
 
+pub mod mexc_spot {
+    include!(concat!(env!("OUT_DIR"), "/_.rs"));
+}
+
+use mexc_spot::PublicIncreaseDepthsV3Api; // 使用 proto 中定义的消息名
+use mexc_spot::PublicSpotKlineV3Api; // 假设 KlineData 是 kline.proto 中定义的消息名
 
 #[derive(Debug)]
 #[derive(Clone)]
@@ -83,7 +89,7 @@ impl MexcSpotWs {
         /*******公共频道-私有频道数据组装*/
         let address_url = match ws_type {
             MexcSpotWsType::PublicAndPrivate => {
-                let url = "ws://wbs-api.mexc.com/ws".to_string();
+                let url = "wss://wbs-api.mexc.com/ws".to_string();
                 url
             }
         };
@@ -119,7 +125,7 @@ impl MexcSpotWs {
             // 大写
             *symbol = symbol.to_uppercase();
             // 字符串替换
-            *symbol = symbol.replace("_", "_");
+            *symbol = symbol.replace("_", "");
         }
         self.symbol_s = symbol_array;
     }
@@ -142,7 +148,7 @@ impl MexcSpotWs {
             MexcSpotWsSubscribeType::PuFuturesDepth => {
                 json!({
                     "method": "SUBSCRIPTION",
-                    "param": [
+                    "params": [
                         format!("spot@public.aggre.depth.v3.api.pb@10ms@{symbol}")
                     ]
                 })
@@ -151,7 +157,7 @@ impl MexcSpotWs {
             MexcSpotWsSubscribeType::PuFuturesRecords(interval) => {
                 json!({
                     "method": "SUBSCRIPTION",
-                    "param": [
+                    "params": [
                         format!("spot@public.kline.v3.api.pb@{symbol}@{interval}")
                     ]
                 })
@@ -237,34 +243,26 @@ impl MexcSpotWs {
     }
     //数据解析-二进制
     pub fn message_binary(po: Vec<u8>) -> Option<Response> {
+        info!("{:?}", po);
         //二进制WebSocket消息
-        // let message_str = format!("Binary:{:?}", _po);
-        // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
-        // let result = String::from_utf8(bytes);
-        // let result = String::from_utf8(po);
-
-        let mut gz_decoder = GzDecoder::new(&po[..]);
-        let mut decompressed_data = Vec::new();
-
-        // 尝试解压数据
-        if let Ok(_) = gz_decoder.read_to_end(&mut decompressed_data) {
-            // 将解压后的字节向量转换为 UTF-8 字符串
-            match String::from_utf8(decompressed_data) {
-                Ok(text) => {
-                    let response_data = Self::ok_text(text);
-                    return Option::from(response_data);
-                }
-                Err(_) => {
-                    return Option::from(Response::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
-                }
+        // let mut gz_decoder = GzDecoder::new(&po[..]);
+        // let mut decompressed_data = Vec::new();
+
+        // 将解压后的字节向量转换为 UTF-8 字符串
+        match String::from_utf8(po) {
+            Ok(text) => {
+                let response_data = Self::ok_text(text);
+                Option::from(response_data)
+            }
+            Err(e) => {
+                Option::from(Response::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null))
             }
-        } else {
-            return Option::from(Response::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
         }
     }
     //数据解析
     pub fn ok_text(text: String) -> Response
     {
+        info!("{}", text);
         let mut res_data = Response::new("".to_string(), 200, "success".to_string(), Value::Null);
         let json_value: Value = serde_json::from_str(&text).unwrap();
 
@@ -308,6 +306,7 @@ impl MexcSpotWs {
                 }
             }
             None => {
+                res_data.data = json_value.clone();
                 res_data.code = -1;
                 res_data.message = "未知解析".to_string();
             }
@@ -337,10 +336,11 @@ mod tests {
         let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
 
         ws.set_subscribe(vec![
-            MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string())
+            // MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
+            MexcSpotWsSubscribeType::PuFuturesDepth
         ]);
 
-        ws.set_symbols(vec!["BTCUSDT".to_string()]);
+        ws.set_symbols(vec!["BTC_USDT".to_string()]);
 
         let fun = move |response: Response| {
             info!("{}", serde_json::to_string_pretty(&response.data).unwrap());

+ 3 - 2
src/exchange/socket_tool.rs

@@ -64,8 +64,9 @@ impl AbstractWsMode {
         };
         // 如果不需要事先登录,则直接订阅消息
         if !is_first_login {
-            info!("不需要先登录,订阅内容:{:?}", subscribe_array.clone());
+            info!("不需要先登录,订阅内容:");
             for s in &subscribe_array {
+                info!("{}", s);
                 let mut write_lock = ws_write_arc.lock().await;
                 write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
             }
@@ -152,7 +153,7 @@ impl AbstractWsMode {
                             trace!("特殊字符心跳-回应完成");
                         }
                         _ => {
-                            trace!("未知:{:?}",data);
+                            error!("未知:{:?}", data);
                         }
                     }
                 }