Explorar el Código

优化 测试 socket 推送解析

875428575@qq.com hace 2 años
padre
commit
ff0525e757
Se han modificado 1 ficheros con 65 adiciones y 34 borrados
  1. 65 34
      src/exchange_libs.rs

+ 65 - 34
src/exchange_libs.rs

@@ -1,5 +1,6 @@
 use std::collections::{BTreeMap, HashMap};
 use std::{env, io, thread};
+use std::error::Error;
 use std::future::Future;
 use std::io::{Write};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@@ -534,37 +535,48 @@ impl SocketTool {
                 //数据解析
                 match msg {
                     Ok(Message::Text(text)) => {
-                        // writeln!(stdout, "---接收数据:{0}", text).unwrap();
-                        //转json
-                        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
-                        if json_value.get("result").is_some() {
-                            writeln!(stdout, "-币安--订阅成功:{0}", text).unwrap();
-                        } else if json_value.get("event").is_some() {
-                            writeln!(stdout, "-OKX--订阅成功:{0}", text).unwrap();
-                        } else {
-                            // --推送数据
-                            // writeln!(stdout, "---推送数据:{0}", text).unwrap();
-                            let rsp_data = ResponseData::new("0".to_string(),
-                                                             "success".to_string(),
-                                                             text);
-                            // parse_fn(rsp_data);
+                        let rsp_data = SocketTool::message_ok_unscramble(text);
+                        if rsp_data.code.to_string() == "0".to_string() {
                             let parse_fn = Arc::clone(&parse_fn); // Clone the Arc for each iteration
-
                             tokio::spawn(async move {
                                 let parse_fn = parse_fn.lock().await;
                                 parse_fn(rsp_data).await;
                             });
                         }
+
+                        // // writeln!(stdout, "---接收数据:{0}", text).unwrap();
+                        // //转json
+                        // let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+                        // if json_value.get("result").is_some() {
+                        //     writeln!(stdout, "-币安--订阅成功:{0}", text).unwrap();
+                        // } else if json_value.get("event").is_some() {
+                        //     writeln!(stdout, "-OKX--订阅成功:{0}", text).unwrap();
+                        // } else {
+                        //     // --推送数据
+                        //     // writeln!(stdout, "---推送数据:{0}", text).unwrap();
+                        //     let rsp_data = ResponseData::new("0".to_string(),
+                        //                                      "success".to_string(),
+                        //                                      text);
+                        //
+                        //     let parse_fn = Arc::clone(&parse_fn); // Clone the Arc for each iteration
+                        //
+                        //     tokio::spawn(async move {
+                        //         let parse_fn = parse_fn.lock().await;
+                        //         parse_fn(rsp_data).await;
+                        //     });
+                        // }
                     }
                     Ok(Message::Ping(_)) | Ok(Message::Pong(_)) | Ok(Message::Close(_)) => {
                         socket.write_message(Message::text("pong")).expect("TODO: panic message");
-                        // writeln!(stdout, "ping----------pong").unwrap();
                         writeln!(stdout, "ping----------pong").unwrap();
                     }
                     Err(error) => {
                         writeln!(stderr, "Error receiving message: {}", error).unwrap();
                         let rsp_data = ResponseData::error("socket 发生错误!".to_string());
-                        // parse_fn(rsp_data);
+                        tokio::spawn(async move {
+                            let parse_fn = parse_fn.lock().await;
+                            parse_fn(rsp_data).await;
+                        });
                         break;
                     }
                     _ => {}
@@ -608,29 +620,27 @@ impl SocketTool {
 
                 match msg {
                     Ok(Message::Text(text)) => {
-                        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
-                        if json_value.get("result").is_some() {
-                            writeln!(stdout, "---订阅成功:{:?}", text).unwrap();
-                        } else {
-                            writeln!(stdout, "---推送数据:{0}", text).unwrap();
-                            let rsp_data = ResponseData::new("0".to_string(),
-                                                             "success".to_string(),
-                                                             text);
-                            // parse_fn(rsp_data);
+                        let rsp_data = SocketTool::message_ok_unscramble(text);
+                        if rsp_data.code.to_string() == "0".to_string() {
+                            let parse_fn = Arc::clone(&parse_fn); // Clone the Arc for each iteration
+                            tokio::spawn(async move {
+                                let parse_fn = parse_fn.lock().await;
+                                parse_fn(rsp_data).await;
+                            });
                         }
                     }
                     Ok(Message::Ping(_)) | Ok(Message::Pong(_)) | Ok(Message::Close(_)) => {
-                        socket.write_message(Message::Pong(vec![]))
-                            .unwrap();
+                        socket.write_message(Message::text("pong")).expect("TODO: panic message");
                         writeln!(stdout, "ping----------pong").unwrap();
                     }
                     Err(error) => {
                         writeln!(stderr, "Error receiving message: {}", error).unwrap();
-                        // let rsp_data = ResponseData::error("socket 发生错误!".to_string());
-                        // tokio::spawn(async move {
-                        //     parse_fn(rsp_data).await;
-                        // });
-                        // break;
+                        let rsp_data = ResponseData::error("socket 发生错误!".to_string());
+                        tokio::spawn(async move {
+                            let parse_fn = parse_fn.lock().await;
+                            parse_fn(rsp_data).await;
+                        });
+                        break;
                     }
                     _ => {}
                 }
@@ -639,7 +649,6 @@ impl SocketTool {
             socket.close(None).unwrap();
         }
     }
-
     fn log_in_to_str(login_param: BTreeMap<String, String>) -> String {
         let mut login_json_str = String::from("");
 
@@ -692,6 +701,28 @@ impl SocketTool {
 
         login_json_str
     }
+    //推送数据解析
+    fn message_ok_unscramble(text: String) -> ResponseData {
+        let mut rsp_data = ResponseData::new("-1".to_string(),
+                                             "success".to_string(),
+                                             text.parse().unwrap());
+        let mut stdout = io::stdout();
+        //转json
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+        if json_value.get("result").is_some() {
+            writeln!(stdout, "-币安--订阅成功:{0}", text).unwrap();
+            rsp_data.code = "99".parse().unwrap()
+        } else if json_value.get("event").is_some() {
+            writeln!(stdout, "-OKX--订阅成功:{0}", text).unwrap();
+            rsp_data.code = "98".parse().unwrap()
+        } else {
+            // --推送数据
+            // writeln!(stdout, "---推送数据:{0}", text).unwrap();
+            rsp_data.code = "0".parse().unwrap()
+        }
+        rsp_data
+    }
+
 
     //币安--自定义-订阅
     pub fn binance_run_custom(b_array: Vec<&str>, parse_fn: impl Fn(ResponseData)) {}