skyffire 6 månader sedan
förälder
incheckning
cd814a63b4
3 ändrade filer med 49 tillägg och 86 borttagningar
  1. 40 77
      src/exchange/mexc_spot_ws.rs
  2. 8 8
      src/exchange/socket_tool.rs
  3. 1 1
      src/ws_manager.rs

+ 40 - 77
src/exchange/mexc_spot_ws.rs

@@ -11,7 +11,7 @@ use serde_json::json;
 use serde_json::Value;
 use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::{Message};
-use tracing::{error, info, trace};
+use tracing::{error, info, trace, warn};
 use anyhow::Result;
 
 use crate::exchange::response_base::Response;
@@ -215,12 +215,12 @@ impl MexcSpotWs {
             let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
 
             loop {
-                info!("Mexc_usdt_swap socket 连接中……");
+                trace!("Mexc_usdt_swap socket 连接中……");
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
-                error!("Mexc_usdt_swap socket 断连,1s以后重连……");
+                warn!("Mexc_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
             }
         });
@@ -233,8 +233,27 @@ impl MexcSpotWs {
 
     //数据解析-Text
     pub fn message_text(text: String) -> Option<Response> {
-        let response_data = Self::ok_text(text);
-        Option::from(response_data)
+        let mut res_data = Response::new("".to_string(), -201, "success".to_string(), Value::Null);
+        let json_value: Value = serde_json::from_str(&text).unwrap();
+
+        match json_value["msg"].as_str() {
+            Some(msg) => {
+                res_data.message = json_value["msg"].to_string();
+
+                if msg.contains("Not Subscribed successfully!") {
+                    res_data.code = 500
+                } else {
+                    res_data.channel = json_value["msg"].to_string();
+                }
+            }
+            None => {
+                res_data.data = json_value.clone();
+                res_data.code = -1;
+                res_data.message = text;
+            }
+        }
+
+        Option::from(res_data)
     }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<Response> {
@@ -269,22 +288,22 @@ impl MexcSpotWs {
                             kline_message.topic_info.clone(), // 使用解析到的 Topic 信息
                             200,
                             "success".to_string(),
-                        json!({
-                                "interval": kline_data.interval,
-                                "windowStart": kline_data.window_start, //注意 snake_case
-                                "openingPrice": kline_data.opening_price,
-                                "closingPrice": kline_data.closing_price,
-                                "highestPrice": kline_data.highest_price,
-                                "lowestPrice": kline_data.lowest_price,
-                                "volume": kline_data.volume,
-                                "amount": kline_data.amount,
-                                "windowEnd": kline_data.window_end,
-                                // 可以添加顶层字段的信息,如果需要
-                                "topic_info": kline_message.topic_info,
-                                "symbol": kline_message.symbol,
-                                "id_info": kline_message.id_info,
-                                "timestamp_or_version": kline_message.timestamp_or_version,
-                            })
+                            json!({
+                                    "interval": kline_data.interval,
+                                    "windowStart": kline_data.window_start, //注意 snake_case
+                                    "openingPrice": kline_data.opening_price,
+                                    "closingPrice": kline_data.closing_price,
+                                    "highestPrice": kline_data.highest_price,
+                                    "lowestPrice": kline_data.lowest_price,
+                                    "volume": kline_data.volume,
+                                    "amount": kline_data.amount,
+                                    "windowEnd": kline_data.window_end,
+                                    // 可以添加顶层字段的信息,如果需要
+                                    "topic_info": kline_message.topic_info,
+                                    "symbol": kline_message.symbol,
+                                    "id_info": kline_message.id_info,
+                                    "timestamp_or_version": kline_message.timestamp_or_version,
+                                })
                         );
                         return Some(response_data);
                     } else {
@@ -342,7 +361,6 @@ impl MexcSpotWs {
                             })
                         );
                         return Some(response_data);
-
                     } else {
                         info!("顶层深度消息结构解析成功,但未找到嵌套的 depth_data 字段 (Tag 313)");
                         // 处理只有顶层字段的深度相关消息
@@ -378,61 +396,6 @@ impl MexcSpotWs {
         }
         Some(Response::new("".to_string(), 400, "无法解析未知二进制消息".to_string(), Value::Null))
     }
-    //数据解析
-    pub fn ok_text(text: String) -> Response
-    {
-        info!("{}", text);
-        let mut res_data = Response::new("".to_string(), 200, "success".to_string(), Value::Null);
-        let json_value: Value = serde_json::from_str(&text).unwrap();
-
-        match json_value["channel"].as_str() {
-            Some(method) => {
-                if method.contains("pong") {
-                    return Response::new("".to_string(), -301, "success".to_string(), Value::Null);
-                } else if method.contains("rs.sub.") {
-                    //订阅响应
-                    let data = json_value["data"].as_str().unwrap();
-                    if method.contains(".depth") {
-                        res_data.channel = "futures.order_book".to_string();
-                    } else if method.contains(".kline") {
-                        res_data.channel = "futures.candlesticks".to_string();
-                    } else if method.contains(".deal") {
-                        res_data.channel = "futures.trades".to_string();
-                    } else {
-                        res_data.channel = "未知频道订阅".to_string();
-                    }
-
-                    if data == "success" {
-                        res_data.code = -201;
-                        res_data.message = "订阅成功".to_string();
-                    } else {
-                        res_data.code = 400;
-                        res_data.message = "订阅失败".to_string();
-                    }
-                } else if method.contains("push.") {
-                    if method.contains(".depth") {
-                        res_data.channel = "futures.order_book".to_string();
-                    } else if method.contains(".kline") {
-                        res_data.channel = "futures.candlesticks".to_string();
-                    } else {
-                        res_data.channel = "未知频道推送".to_string();
-                    }
-                    res_data.code = 200;
-                    res_data.data = json_value.clone();
-                } else {
-                    res_data.code = -1;
-                    res_data.message = "未知解析".to_string();
-                }
-            }
-            None => {
-                res_data.data = json_value.clone();
-                res_data.code = -1;
-                res_data.message = "未知解析".to_string();
-            }
-        }
-
-        res_data
-    }
 }
 
 #[cfg(test)]

+ 8 - 8
src/exchange/socket_tool.rs

@@ -14,7 +14,7 @@ use tokio::sync::Mutex;
 use tokio::time::Instant;
 use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
 use tokio_tungstenite::tungstenite::{Error, Message};
-use tracing::{error, info, trace};
+use tracing::{error, info, trace, warn};
 
 use crate::exchange::proxy;
 use crate::exchange::proxy::{ProxyEnum, ProxyResponseEnum};
@@ -64,9 +64,9 @@ impl AbstractWsMode {
         };
         // 如果不需要事先登录,则直接订阅消息
         if !is_first_login {
-            info!("不需要先登录,订阅内容:");
+            trace!("不需要先登录,订阅内容:");
             for s in &subscribe_array {
-                info!("{}", s);
+                trace!("{}", s);
                 let mut write_lock = ws_write_arc.lock().await;
                 write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
             }
@@ -114,9 +114,9 @@ impl AbstractWsMode {
                         -200 => {
                             //登录成功
                             info!("ws登录成功:{:?}", data);
-                            info!("订阅内容:{:?}", subscribe_array.clone());
                             if is_first_login {
                                 for s in &subscribe_array {
+                                    info!("订阅内容:{}", s);
                                     let mut write_lock = ws_write_arc.lock().await;
                                     write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
                                 }
@@ -124,8 +124,8 @@ impl AbstractWsMode {
                             }
                         }
                         -201 => {
-                            //订阅成功
-                            trace!("订阅成功:{:?}", data);
+                            // 订阅成功
+                            // trace!("订阅成功:{:?}", data);
                         }
                         -300 => {
                             //服务器发送心跳 ping 给客户端,客户端需要pong回应
@@ -200,7 +200,7 @@ impl AbstractWsMode {
 
         match connect_async(address_url.clone(), proxy).await {
             Ok((ws_stream, _)) => {
-                info!("socket 链接成功,{}。", address_url);
+                trace!("socket 链接成功,{}。", address_url);
 
                 Self::ws_connected(write_to_socket_rx_arc,
                                    is_first_login,
@@ -215,7 +215,7 @@ impl AbstractWsMode {
                                    message_binary).await;
             }
             Err(e) => {
-                error!("WebSocket 握手失败:{:?}", e);
+                warn!("WebSocket 握手失败:{:?}", e);
             }
         }
     }

+ 1 - 1
src/ws_manager.rs

@@ -19,7 +19,7 @@ pub struct WsManager {
 impl WsManager {
     pub fn new(symbols: Vec<String>, data_manager_am: Arc<Mutex<DataManager>>, running: Arc<AtomicBool>) -> WsManager {
         WsManager {
-            symbols,
+            symbols: symbols[0..100].to_owned(),
             data_manager_am,
             running
         }