فهرست منبع

修改 代理的字符串,优化socket 心跳

875428575@qq.com 2 سال پیش
والد
کامیت
bac7dc2b8c
1فایلهای تغییر یافته به همراه39 افزوده شده و 18 حذف شده
  1. 39 18
      src/exchange_libs.rs

+ 39 - 18
src/exchange_libs.rs

@@ -556,7 +556,6 @@ impl SocketTool {
             let mut okx_ping_time = Utc::now().timestamp();
             let mut okx_ping_off = false;
             loop {
-
                 if !socket.can_read() {
                     println!("不能读取的socket");
                     continue;
@@ -669,12 +668,6 @@ impl SocketTool {
             let mut okx_ping_time = Utc::now().timestamp();
             let mut okx_ping_off = false;
             loop {
-                //okx 发起心跳之后几秒内没相应,需要走重连
-                if lable.to_string() == "okx".to_string() && okx_ping_off == true && (Utc::now().timestamp() - okx_ping_time) < 4 {
-                    println!("-----okx--断开重连");
-                }
-
-
                 if !socket.can_read() {
                     continue;
                 }
@@ -692,25 +685,53 @@ impl SocketTool {
                             });
                         }
 
-                        // okx 20s未收到信息则主动发送ping
-                        if lable.to_string() == "okx".to_string() && (Utc::now().timestamp() - okx_ping_time) < 20 {
-                            socket.write_message(Message::text("ping")).expect("TODO: panic message");
-                            writeln!(stdout, "ping----------:{:?}---ping", lable).unwrap();
-                            okx_ping_time = Utc::now().timestamp();
-                            okx_ping_off = true;
+                        // // writeln!(stdout, "---接收数据:{0}", text).unwrap();
+                        // //转json
+                        // let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+                        // if json_value.get("result").is_some() {
+                        //     writeln!(stdout, "-币安--订阅成功:{0}", text).unwrap();
+                        // } else if json_value.get("event").is_some() {
+                        //     writeln!(stdout, "-OKX--订阅成功:{0}", text).unwrap();
+                        // } else {
+                        //     // --推送数据
+                        //     // writeln!(stdout, "---推送数据:{0}", text).unwrap();
+                        //     let rsp_data = ResponseData::new("0".to_string(),
+                        //                                      "success".to_string(),
+                        //                                      text);
+                        //
+                        //     let parse_fn = Arc::clone(&parse_fn); // Clone the Arc for each iteration
+                        //
+                        //     tokio::spawn(async move {
+                        //         let parse_fn = parse_fn.lock().await;
+                        //         parse_fn(rsp_data).await;
+                        //     });
+                        // }
+
+
+                        if lable.to_string() == "okx".to_string() {
+                            if (Utc::now().timestamp() - okx_ping_time) > 2 {
+                                socket.write_message(Message::Ping(Vec::from("ping"))).expect("TODO: panic message");
+                                writeln!(stdout, "主动------:{:?}---ping", lable).unwrap();
+                                okx_ping_time = Utc::now().timestamp();
+                                okx_ping_off = true;
+                            }
                         }
                     }
-                    Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {
+                    Ok(Message::Ping(s)) => {
+                        writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s)).unwrap();
                         let mut op_str = "".to_string();
                         if lable.to_string() == "binance".to_string() {
                             op_str = "pong".to_string();
-                        } else if lable.to_string() == "okx".to_string() {
-                            op_str = "pong".to_string();
+                            socket.write_message(Message::Pong(Vec::from(op_str.clone()))).expect("TODO: panic message");
+                            writeln!(stdout, "ping----------:{:?}---{:?}", lable, op_str).unwrap();
+                        }
+                    }
+                    Ok(Message::Pong(s)) => {
+                        writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s)).unwrap();
+                        if lable.to_string() == "okx".to_string() {
                             okx_ping_off = false;
                             okx_ping_time = Utc::now().timestamp();
                         }
-                        socket.write_message(Message::text(op_str.to_string())).expect("TODO: panic message");
-                        writeln!(stdout, "ping----------:{:?}---{:?}", lable, op_str).unwrap();
                     }
                     Ok(Message::Close(_)) => {
                         writeln!(stderr, "socket 关闭: ").unwrap();