Bläddra i källkod

优化网络层错误提醒,发布v1.5.1

skyffire 1 år sedan
förälder
incheckning
477b74a4d6

+ 1 - 1
Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "as-rust"
-version = "1.4.4"
+version = "1.5.1"
 edition = "2021"
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

+ 11 - 8
exchanges/src/binance_spot_ws.rs

@@ -180,13 +180,16 @@ impl BinanceSpotWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-现货");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:币安-现货链接关闭-{:?}",e); }
+            }
             trace!("线程-异步链接-结束");
         });
         tokio::try_join!(t2).unwrap();
@@ -199,7 +202,7 @@ impl BinanceSpotWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 11 - 9
exchanges/src/binance_swap_ws.rs

@@ -180,14 +180,16 @@ impl BinanceSwapWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-期货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:币安-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");
@@ -199,7 +201,7 @@ impl BinanceSwapWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 11 - 8
exchanges/src/bitget_spot_ws.rs

@@ -271,13 +271,16 @@ impl BitgetSpotWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-期货");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:bitget-现货链接关闭-{:?}",e); }
+            }
             trace!("线程-异步链接-结束");
         });
         tokio::try_join!(t2).unwrap();
@@ -290,7 +293,7 @@ impl BitgetSpotWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 10 - 8
exchanges/src/bybit_swap_ws.rs

@@ -239,14 +239,16 @@ impl BybitSwapWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-期货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:bitget-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");

+ 10 - 9
exchanges/src/crypto_spot_ws.rs

@@ -111,7 +111,6 @@ impl CryptoSpotWs {
                 CryptoSpotSubscribeType::PuTicker => false,
                 CryptoSpotSubscribeType::PuTrade => false,
                 CryptoSpotSubscribeType::PuCandlestick => false,
-
             } {
                 return true;
             }
@@ -199,14 +198,16 @@ impl CryptoSpotWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-期货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:crypt-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");

+ 22 - 11
exchanges/src/gate_swap_ws.rs

@@ -1,5 +1,6 @@
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use chrono::Utc;
 
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use hex;
@@ -87,7 +88,7 @@ impl GateSwapWs {
             login_param,
             symbol_s: vec![],
             subscribe_types: vec![],
-            heartbeat_time: 1000 * 20,
+            heartbeat_time: 1000 * 10,
         }
     }
 
@@ -255,13 +256,18 @@ impl GateSwapWs {
         let address_url = self.address_url.clone();
         let label = self.label.clone();
         let heartbeat_time = self.heartbeat_time.clone();
+        let timestamp = Utc::now().timestamp();
 
 
         //心跳-- 方法内部线程启动
         let write_tx_clone1 = Arc::clone(write_tx_am);
         tokio::spawn(async move {
             trace!("线程-异步心跳-开始");
-            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+            let ping_str = json!({
+                "time" : timestamp,
+                "channel" : "futures.ping",
+            });
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
             trace!("线程-异步心跳-结束");
         });
 
@@ -277,14 +283,16 @@ impl GateSwapWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-期货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:gate-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");
@@ -314,7 +322,10 @@ impl GateSwapWs {
         let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
 
-        if json_value.get("error").is_some() {
+        if json_value["channel"].as_str() == Option::from("futures.pong") {
+            res_data.code = "-301".to_string();
+            res_data.message = "success".to_string();
+        } else if json_value.get("error").is_some() {
             let message = json_value["error"]["message"].as_str().unwrap().to_string();
             let mes = message.trim_end_matches('\n');
 

+ 11 - 9
exchanges/src/kucoin_spot_ws.rs

@@ -306,14 +306,16 @@ impl KucoinSpotWs {
 
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array.clone(),
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("kucoin-现货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array.clone(),
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:kucoin-现货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");
@@ -326,7 +328,7 @@ impl KucoinSpotWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 13 - 10
exchanges/src/kucoin_swap_ws.rs

@@ -31,7 +31,8 @@ pub struct KucoinSwapWsParam {
 //订阅频道
 #[derive(Clone)]
 pub enum KucoinSwapSubscribeType {
-    PuContractMarketLevel2Depth50,//买卖盘 快照,asks:卖,bids:买入
+    PuContractMarketLevel2Depth50,
+    //买卖盘 快照,asks:卖,bids:买入
     PuContractMarketExecution,
     PuContractMarkettickerV2,
 
@@ -320,14 +321,16 @@ impl KucoinSwapWs {
 
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("kucoin-期货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:kucoin-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");
@@ -340,7 +343,7 @@ impl KucoinSwapWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 5 - 3
exchanges/src/okx_swap_ws.rs

@@ -304,14 +304,16 @@ impl OkxSwapWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
                                              label.clone(), subscribe_array,
                                              write_rx, read_tx,
                                              Self::message_text,
                                              Self::message_ping,
                                              Self::message_pong,
-            ).await.expect("OKX-期货");
-            trace!("线程-异步链接-结束");
+            ).await{
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:okx-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");

+ 185 - 105
exchanges/src/socket_tool.rs

@@ -23,6 +23,7 @@ use crate::response_base::ResponseData;
 pub enum HeartbeatType {
     Ping,
     Pong,
+    Custom(String),
 }
 
 pub struct AbstractWsMode {}
@@ -56,123 +57,199 @@ impl AbstractWsMode {
         };
 
         loop {
-            let (ws_stream, _) = connect_async(address_url.clone(), proxy).await?;
-            trace!("WebSocket 握手完成。");
-            let (write, mut read) = ws_stream.split();
-
-            let write_arc = Arc::new(Mutex::new(write));
-            let write_clone = Arc::clone(&write_arc);
-            //订阅写入(包括订阅信息 )
-            trace!("订阅内容:{:?}",subscribe_array.clone());
-            for s in &subscribe_array {
-                let mut write_lock = write_clone.lock().await;
-                write_lock.send(Message::Text(s.parse().unwrap())).await?;
-            }
+            match connect_async(address_url.clone(), proxy).await {
+                Ok((ws_stream, _)) => {
+                    trace!("WebSocket 握手完成。");
+                    let (write, mut read) = ws_stream.split();
+
+                    let write_arc = Arc::new(Mutex::new(write));
+                    let write_clone = Arc::clone(&write_arc);
+                    //订阅写入(包括订阅信息 )
+                    trace!("订阅内容:{:?}",subscribe_array.clone());
+                    for s in &subscribe_array {
+                        let mut write_lock = write_clone.lock().await;
+                        write_lock.send(Message::Text(s.parse().unwrap())).await?;
+                    }
 
-            //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
-            // let stdin_to_ws = write_rx.map(Ok).forward(write);
-            // Writing task
-            let write_clone2 = Arc::clone(&write_arc);
-            let stdin_to_ws = async {
-                while let Some(message) = write_rx.next().await {
-                    let mut write_lock2 = write_clone2.lock().await;
-                    write_lock2.send(message).await?;
-                }
-                Ok::<(), tokio_tungstenite::tungstenite::Error>(())
-            };
-            let write_clone3 = Arc::clone(&write_arc);
-            let ws_to_stdout = async {
-                while let Some(message) = read.next().await {
-                    let mut write_lock3 = write_clone3.lock().await;
-                    let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
-                    // let response_data = func(message);
-                    if response_data.is_some() {
-                        let mut data = response_data.unwrap();
-                        data.label = lable.clone();
-                        let code = data.code.clone();
-                        /*
-                            200 -正确返回
-                           -200 -登录成功
-                           -201 -订阅成功
-                           -300 -客户端收到服务器心跳ping,需要响应
-                           -301 -客户端收到服务器心跳pong,需要响应
-                           -302 -客户端收到服务器心跳自定义,需要响应自定义
-                        */
-                        match code.as_str() {
-                            "200" => {
-                                if bool_v1.load(Ordering::Relaxed) {
-                                    read_tx.unbounded_send(data).unwrap();
-                                }
-                            }
-                            "-200" => {
-                                //登录成功
-                                trace!("登录成功:{:?}", data);
-                            }
-                            "-201" => {
-                                //订阅成功
-                                trace!("订阅成功:{:?}", data);
-                            }
-                            "-300" => {
-                                //服务器发送心跳 ping 给客户端,客户端需要pong回应
-                                trace!("服务器响应-ping");
-                                if data.data.len() > 0 {
-                                    write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
-                                    trace!("客户端回应服务器-pong");
-                                }
-                            }
-                            "-301" => {
-                                //服务器发送心跳 pong 给客户端,客户端需要ping回应
-                                trace!("服务器响应-pong");
-                                if data.data.len() > 0 {
-                                    write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
-                                    trace!("客户端回应服务器-ping");
+                    //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
+                    // let stdin_to_ws = write_rx.map(Ok).forward(write);
+                    // Writing task
+                    let write_clone2 = Arc::clone(&write_arc);
+                    let stdin_to_ws = async {
+                        while let Some(message) = write_rx.next().await {
+                            let mut write_lock2 = write_clone2.lock().await;
+                            write_lock2.send(message).await?;
+                        }
+                        Ok::<(), tokio_tungstenite::tungstenite::Error>(())
+                    };
+                    let write_clone3 = Arc::clone(&write_arc);
+                    let ws_to_stdout = async {
+                        while let Some(message) = read.next().await {
+                            let mut write_lock3 = write_clone3.lock().await;
+                            let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
+                            // let response_data = func(message);
+                            if response_data.is_some() {
+                                let mut data = response_data.unwrap();
+                                data.label = lable.clone();
+                                let code = data.code.clone();
+                                /*
+                                    200 -正确返回
+                                   -200 -登录成功
+                                   -201 -订阅成功
+                                   -300 -客户端收到服务器心跳ping,需要响应
+                                   -301 -客户端收到服务器心跳pong,需要响应
+                                   -302 -客户端收到服务器心跳自定义,需要响应自定义
+                                */
+                                match code.as_str() {
+                                    "200" => {
+                                        if bool_v1.load(Ordering::Relaxed) {
+                                            read_tx.unbounded_send(data).unwrap();
+                                        }
+                                    }
+                                    "-200" => {
+                                        //登录成功
+                                        trace!("登录成功:{:?}", data);
+                                    }
+                                    "-201" => {
+                                        //订阅成功
+                                        trace!("订阅成功:{:?}", data);
+                                    }
+                                    "-300" => {
+                                        //服务器发送心跳 ping 给客户端,客户端需要pong回应
+                                        trace!("服务器响应-ping");
+                                        if data.data.len() > 0 {
+                                            write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
+                                            trace!("客户端回应服务器-pong");
+                                        }
+                                    }
+                                    "-301" => {
+                                        //服务器发送心跳 pong 给客户端,客户端需要ping回应
+                                        trace!("服务器响应-pong");
+                                        if data.data.len() > 0 {
+                                            write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
+                                            trace!("客户端回应服务器-ping");
+                                        }
+                                    }
+                                    "-302" => {
+                                        //客户端收到服务器心跳自定义,需要响应自定义
+                                        trace!("特定字符心跳,特殊响应:{:?}", data);
+                                        write_lock3.send(Message::Text(data.data)).await?;
+                                        trace!("特殊字符心跳-回应完成");
+                                    }
+                                    _ => {
+                                        trace!("未知:{:?}",data);
+                                    }
                                 }
                             }
-                            "-302" => {
-                                //客户端收到服务器心跳自定义,需要响应自定义
-                                trace!("特定字符心跳,特殊响应:{:?}", data);
-                                write_lock3.send(Message::Text(data.data)).await?;
-                                trace!("特殊字符心跳-回应完成");
-                            }
-                            _ => {
-                                trace!("未知:{:?}",data);
-                            }
                         }
-                    }
+                        Ok::<(), tokio_tungstenite::tungstenite::Error>(())
+                    };
+
+                    //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
+                    pin_mut!(stdin_to_ws, ws_to_stdout,);
+                    future::select(stdin_to_ws, ws_to_stdout).await;
+                }
+                Err(e) => {
+                    trace!("WebSocket 握手失败:{:?}",e);
                 }
-                Ok::<(), tokio_tungstenite::tungstenite::Error>(())
-            };
-            // let ws_to_stdout = {
-            //     trace!("---1");
-            //     //读,循环读取,然后拿到 message,,然后开启异步处理 message,
-            //     let result = read.for_each(|message| async {
-            //         let response_data = func(message);
+            }
+            trace!("---5");
+            trace!("---4");
+            trace!("重启...");
+
+            // let (ws_stream, _) = connect_async(address_url.clone(), proxy).await.unwrap();
+            // trace!("WebSocket 握手完成。");
+            // let (write, mut read) = ws_stream.split();
+            //
+            // let write_arc = Arc::new(Mutex::new(write));
+            // let write_clone = Arc::clone(&write_arc);
+            // //订阅写入(包括订阅信息 )
+            // trace!("订阅内容:{:?}",subscribe_array.clone());
+            // for s in &subscribe_array {
+            //     let mut write_lock = write_clone.lock().await;
+            //     write_lock.send(Message::Text(s.parse().unwrap())).await?;
+            // }
+            //
+            // //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
+            // // let stdin_to_ws = write_rx.map(Ok).forward(write);
+            // // Writing task
+            // let write_clone2 = Arc::clone(&write_arc);
+            // let stdin_to_ws = async {
+            //     while let Some(message) = write_rx.next().await {
+            //         let mut write_lock2 = write_clone2.lock().await;
+            //         write_lock2.send(message).await?;
+            //     }
+            //     Ok::<(), tokio_tungstenite::tungstenite::Error>(())
+            // };
+            // let write_clone3 = Arc::clone(&write_arc);
+            // let ws_to_stdout = async {
+            //     while let Some(message) = read.next().await {
+            //         let mut write_lock3 = write_clone3.lock().await;
+            //         let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
+            //         // let response_data = func(message);
             //         if response_data.is_some() {
             //             let mut data = response_data.unwrap();
             //             data.label = lable.clone();
             //             let code = data.code.clone();
-            //             if code.as_str() == "-1" {
-            //                 // let close_frame = CloseFrame {
-            //                 //     code: CloseCode::Normal,
-            //                 //     reason: Cow::Borrowed("Bye bye"),
-            //                 // };
-            //                 // let close_message = Message::Close(Some(close_frame));
-            //                 // write.send(close_message);
-            //             } else if code.as_str() == "200" {
-            //                 read_tx.unbounded_send(data).unwrap();
+            //             /*
+            //                 200 -正确返回
+            //                -200 -登录成功
+            //                -201 -订阅成功
+            //                -300 -客户端收到服务器心跳ping,需要响应
+            //                -301 -客户端收到服务器心跳pong,需要响应
+            //                -302 -客户端收到服务器心跳自定义,需要响应自定义
+            //             */
+            //             match code.as_str() {
+            //                 "200" => {
+            //                     if bool_v1.load(Ordering::Relaxed) {
+            //                         read_tx.unbounded_send(data).unwrap();
+            //                     }
+            //                 }
+            //                 "-200" => {
+            //                     //登录成功
+            //                     trace!("登录成功:{:?}", data);
+            //                 }
+            //                 "-201" => {
+            //                     //订阅成功
+            //                     trace!("订阅成功:{:?}", data);
+            //                 }
+            //                 "-300" => {
+            //                     //服务器发送心跳 ping 给客户端,客户端需要pong回应
+            //                     trace!("服务器响应-ping");
+            //                     if data.data.len() > 0 {
+            //                         write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
+            //                         trace!("客户端回应服务器-pong");
+            //                     }
+            //                 }
+            //                 "-301" => {
+            //                     //服务器发送心跳 pong 给客户端,客户端需要ping回应
+            //                     trace!("服务器响应-pong");
+            //                     if data.data.len() > 0 {
+            //                         write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
+            //                         trace!("客户端回应服务器-ping");
+            //                     }
+            //                 }
+            //                 "-302" => {
+            //                     //客户端收到服务器心跳自定义,需要响应自定义
+            //                     trace!("特定字符心跳,特殊响应:{:?}", data);
+            //                     write_lock3.send(Message::Text(data.data)).await?;
+            //                     trace!("特殊字符心跳-回应完成");
+            //                 }
+            //                 _ => {
+            //                     trace!("未知:{:?}",data);
+            //                 }
             //             }
             //         }
-            //     });
-            //     trace!("---3");
-            //     result
+            //     }
+            //     Ok::<(), tokio_tungstenite::tungstenite::Error>(())
             // };
-
-            //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
-            pin_mut!(stdin_to_ws, ws_to_stdout,);
-            future::select(stdin_to_ws, ws_to_stdout).await;
-            trace!("---5");
-            trace!("---4");
-            trace!("重启...");
+            //
+            // //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
+            // pin_mut!(stdin_to_ws, ws_to_stdout,);
+            // future::select(stdin_to_ws, ws_to_stdout).await;
+            // trace!("---5");
+            // trace!("---4");
+            // trace!("重启...");
         }
         // return Ok(());
     }
@@ -190,6 +267,9 @@ impl AbstractWsMode {
                     HeartbeatType::Pong => {
                         Message::Pong(Vec::from("Pong"))
                     }
+                    HeartbeatType::Custom(ref str) => {
+                        Message::Text(str.parse().unwrap())
+                    }
                 }
             ).expect("发送失败");
             trace!("发送指令-心跳:{:?}",h_type);