Bladeren bron

代码调整 ,优化心跳

hl 1 jaar geleden
bovenliggende
commit
44ed44653d

+ 11 - 20
exchanges/src/binance_spot_ws.rs

@@ -164,12 +164,12 @@ impl BinanceSpotWs {
 
 
         //心跳-- 方法内部线程启动
-        let write_tx_clone1 = Arc::clone(write_tx_am);
-        tokio::spawn(async move {
-            trace!("线程-异步心跳-开始");
-            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
-            trace!("线程-异步心跳-结束");
-        });
+        // let write_tx_clone1 = Arc::clone(write_tx_am);
+        // tokio::spawn(async move {
+        //     trace!("线程-异步心跳-开始");
+        //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
+        //     trace!("线程-异步心跳-结束");
+        // });
 
         //设置订阅
         let mut subscribe_array = vec![];
@@ -200,26 +200,16 @@ impl BinanceSpotWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let mut response_data = Self::ok_text(text);
-        response_data.time = get_time_microsecond();
-        match response_data.code.as_str() {
-            "200" => Option::from(response_data),
-            "-201" => {
-                trace!("订阅成功:{:?}", response_data);
-                None
-            }
-            _ => None
-        }
+        let  response_data = Self::ok_text(text);
+        Option::from(response_data)
     }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-ping");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "pong".to_string()));
     }
     //数据解析-pong
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-pong");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
@@ -255,6 +245,7 @@ impl BinanceSpotWs {
             res_data.channel = "depth".to_string();
             res_data.data = text;
         } else {
+            res_data.code = "".to_string();
             res_data.channel = "未知的频道".to_string();
         }
         res_data

+ 13 - 21
exchanges/src/binance_swap_ws.rs

@@ -164,12 +164,12 @@ impl BinanceSwapWs {
 
 
         //心跳-- 方法内部线程启动
-        let write_tx_clone1 = Arc::clone(write_tx_am);
-        tokio::spawn(async move {
-            trace!("线程-异步心跳-开始");
-            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
-            trace!("线程-异步心跳-结束");
-        });
+        // let write_tx_clone1 = Arc::clone(write_tx_am);
+        // tokio::spawn(async move {
+        //     trace!("线程-异步心跳-开始");
+        //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
+        //     trace!("线程-异步心跳-结束");
+        // });
 
         //设置订阅
         let mut subscribe_array = vec![];
@@ -200,26 +200,16 @@ impl BinanceSwapWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let mut response_data = Self::ok_text(text);
-        response_data.time = get_time_microsecond();
-        match response_data.code.as_str() {
-            "200" => Option::from(response_data),
-            "-201" => {
-                trace!("订阅成功:{:?}", response_data);
-                None
-            }
-            _ => None
-        }
+        let  response_data = Self::ok_text(text);
+        Option::from(response_data)
     }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-ping");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "pong".to_string()));
     }
     //数据解析-pong
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-pong");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
@@ -248,10 +238,12 @@ impl BinanceSwapWs {
             } else if channel.contains("@bookTicker") {
                 res_data.channel = "bookTicker".to_string();
             } else {
+                res_data.code = "".to_string();
                 res_data.channel = "未知的频道".to_string();
             }
         } else {
-            res_data.data = text
+            res_data.code = "".to_string();
+            res_data.channel = "未知的频道2".to_string();
         }
         res_data
     }

+ 7 - 186
exchanges/src/bitget_spot_ws.rs

@@ -89,7 +89,7 @@ impl BitgetSpotWs {
             login_param,
             symbol_s: vec![],
             subscribe_types: vec![],
-            heartbeat_time: 1000 * 20,
+            heartbeat_time: 1000 * 10,
         }
     }
 
@@ -254,7 +254,7 @@ impl BitgetSpotWs {
         let write_tx_clone1 = Arc::clone(write_tx_am);
         tokio::spawn(async move {
             trace!("线程-异步心跳-开始");
-            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
             trace!("线程-异步心跳-结束");
         });
 
@@ -286,200 +286,21 @@ impl BitgetSpotWs {
 
         Ok(())
     }
-
-    // //代理
-    // async fn proxy_subscription(&self, bool_v1: Arc<AtomicBool>, mut web_socket: WebSocket<ProxyAutoStream>, subscription: String)
-    // {
-    //     info!("走代理-链接成功!开始数据读取");
-    //     let label = self.label.clone();
-    //     /*****登陆***/
-    //     let login_str = self.log_in_to_str();
-    //     if login_str != "" {
-    //         let _ = web_socket.write_message(Message::Text(login_str));
-    //         tokio::time::sleep(Duration::from_secs(3)).await;
-    //     }
-    //     /*****订阅***/
-    //     if subscription.len() > 0 {
-    //         info!("订阅信息:{}", subscription);
-    //         web_socket.write_message(Message::Text(subscription.clone()))
-    //             .unwrap();
-    //     }
-    //     /*****消息溜***/
-    //     let mut ping_timeout = chrono::Utc::now().timestamp_millis();
-    //     loop {
-    //         tokio::time::sleep(Duration::from_millis(1)).await;
-    //         let msg = web_socket.read_message();
-    //         match msg {
-    //             Ok(Message::Text(text)) => {
-    //                 let get_time = chrono::Utc::now().timestamp_millis();
-    //                 if (get_time - ping_timeout) >= (1000 * 30) {
-    //                     trace!("30s 一次主动发送心跳包!");
-    //                     let _ = web_socket.write_message(Message::Ping(Vec::from("ping")));
-    //                     ping_timeout = get_time;
-    //                 }
-    //
-    //                 // let st_time = get_time_microsecond();
-    //                 let mut res_data = Self::ok_text(label.to_string(), text);
-    //                 // trace!("解析数据耗时::{}", st_time  - get_time_microsecond());
-    //                 res_data.time = get_time_microsecond();
-    //                 if res_data.code == "-200" {
-    //                     trace!("登陆:{:?}", res_data);
-    //                 } else if res_data.code == "-201" {
-    //                     trace!("订阅成功:{:?}", res_data.data);
-    //                 } else if res_data.code == "-202" {
-    //                     trace!("空数据不予返回:{:?}", res_data);
-    //                 } else {
-    //                     let sender = self.sender.clone();
-    //                     // let prev_time = Utc::now().timestamp_millis();
-    //                     tokio::spawn(async move {
-    //                         // info!("{:04} {}", Utc::now().timestamp_millis() - prev_time, res_data.channel);
-    //                         sender.send(res_data).await.unwrap();
-    //                     });
-    //                     tokio::spawn(async move {});
-    //                 }
-    //             }
-    //             Ok(Message::Ping(s)) => {
-    //                 trace!( "Ping-响应--{:?}", String::from_utf8(s.clone()));
-    //                 let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
-    //                 trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Pong(s)) => {
-    //                 // trace!("Pong-响应--{:?}", String::from_utf8(s));
-    //                 trace!("Pong-响应--{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Close(_)) => {
-    //                 // trace!("socket 关闭: ");
-    //                 trace!( "Close-响应");
-    //                 break;
-    //             }
-    //             Err(error) => {
-    //                 // trace!("Error receiving message: {}", error);
-    //                 error!( "Err-响应{}", error);
-    //                 break;
-    //             }
-    //             _ => {}
-    //         }
-    //
-    //         let bool_v1_v = bool_v1.load(Ordering::SeqCst);
-    //         if !bool_v1_v {
-    //             break;
-    //         }
-    //     }
-    //     web_socket.close(None).unwrap();
-    // }
-    //
-    // //非代理
-    // async fn subscription(&self, bool_v1: Arc<AtomicBool>, mut web_socket: WebSocket<AutoStream>,
-    //                       subscription: String)
-    // {
-    //     info!("链接成功!开始数据读取");
-    //     let label = self.label.clone();
-    //     /*****登陆***/
-    //     let login_str = self.log_in_to_str();
-    //     if login_str != "" {
-    //         let _ = web_socket.write_message(Message::Text(login_str));
-    //         thread::sleep(Duration::from_secs(3));
-    //     }
-    //     /*****订阅***/
-    //     if subscription.len() > 0 {
-    //         trace!("订阅信息:{}",subscription);
-    //         web_socket.write_message(Message::Text(subscription))
-    //             .unwrap();
-    //     }
-    //     /*****消息溜***/
-    //     let mut ping_timeout = chrono::Utc::now().timestamp_millis();
-    //     loop {
-    //         tokio::time::sleep(Duration::from_millis(1)).await;
-    //         let msg = web_socket.read_message();
-    //         match msg {
-    //             Ok(Message::Text(text)) => {
-    //                 let get_time = chrono::Utc::now().timestamp_millis();
-    //                 if (get_time - ping_timeout) >= (1000 * 30) {
-    //                     trace!("30s 一次主动发送心跳包!");
-    //                     let _ = web_socket.write_message(Message::Ping(Vec::from("ping")));
-    //                     ping_timeout = get_time;
-    //                 }
-    //
-    //                 // let st_time = get_time_microsecond();
-    //                 let mut res_data = Self::ok_text(label.to_string(), text);
-    //                 // trace!("解析数据耗时::{}", st_time  - get_time_microsecond());
-    //                 res_data.time = get_time_microsecond();
-    //                 if res_data.code == "-200" {
-    //                     trace!("订阅成功:{:?}", res_data.data);
-    //                 } else if res_data.code == "-201" {
-    //                     trace!("登陆:{:?}", res_data);
-    //                 } else if res_data.code == "-202" {
-    //                     trace!("空数据不予返回:{:?}", res_data);
-    //                 } else {
-    //                     let sender = self.sender.clone();
-    //                     tokio::spawn(async move {
-    //                         sender.send(res_data).await.unwrap();
-    //                     });
-    //                     tokio::spawn(async move {});
-    //                 }
-    //             }
-    //             Ok(Message::Ping(s)) => {
-    //                 trace!( "Ping-响应--{:?}", String::from_utf8(s.clone()));
-    //                 let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
-    //                 trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Pong(s)) => {
-    //                 // trace!("Pong-响应--{:?}", String::from_utf8(s));
-    //                 trace!("Pong-响应--{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Close(_)) => {
-    //                 // trace!("socket 关闭: ");
-    //                 trace!( "Close-响应");
-    //                 break;
-    //             }
-    //             Err(error) => {
-    //                 // trace!("Error receiving message: {}", error);
-    //                 error!( "Err-响应{}", error);
-    //                 break;
-    //             }
-    //             _ => {}
-    //         }
-    //
-    //         let bool_v1_v = bool_v1.load(Ordering::SeqCst);
-    //         if !bool_v1_v {
-    //             break;
-    //         }
-    //     }
-    //     web_socket.close(None).unwrap();
-    // }
     /*******************************************************************************************************/
     /*****************************************数据解析*****************************************************/
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let mut response_data = Self::ok_text(text);
-        response_data.time = get_time_microsecond();
-        match response_data.code.as_str() {
-            "200" => Option::from(response_data),
-            "-201" => {
-                trace!("订阅成功:{:?}", response_data);
-                None
-            }
-            "-200" => {
-                trace!("登录成功:{:?}", response_data);
-                None
-            }
-            "-202" => {
-                trace!("未知解析:{:?}", response_data);
-                None
-            }
-            _ => None
-        }
+        let  response_data = Self::ok_text(text);
+        Option::from(response_data)
     }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-ping");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析-pong
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-pong");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData
@@ -509,7 +330,7 @@ impl BitgetSpotWs {
             // trace!("解析-推送数据:{}", text);
             res_data.data = json_value["data"].to_string();
             if res_data.data == "[]" {
-                res_data.code = "-202".to_string();
+                res_data.code = "".to_string();
             } else {
                 res_data.code = "200".to_string();
             }

+ 267 - 0
exchanges/src/crypto_spot_ws.rs

@@ -0,0 +1,267 @@
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use chrono::Utc;
+
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use serde_json::json;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{info, trace};
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+use crate::utils::get_time_microsecond;
+
+//类型
+pub enum CryptoSpotWsType {
+    Public,
+    Private,
+}
+
+//订阅频道
+#[derive(Clone)]
+pub enum CryptoSpotSubscribeType {
+    PuGetInstruments,
+}
+
+//账号信息
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct CryptoSpotLogin {
+    pub api_key: String,
+    pub api_secret: String,
+}
+
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct CryptoSpotWs {
+    //类型
+    label: String,
+    //地址
+    address_url: String,
+    //账号
+    login_param: Option<CryptoSpotLogin>,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<CryptoSpotSubscribeType>,
+    //心跳间隔
+    heartbeat_time: u64,
+}
+
+impl CryptoSpotWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: Option<CryptoSpotLogin>, ws_type: CryptoSpotWsType) -> CryptoSpotWs {
+        return CryptoSpotWs::new_label("default-CryptoSpotWs".to_string(), is_colo, login_param, ws_type);
+    }
+    pub fn new_label(label: String, is_colo: bool, login_param: Option<CryptoSpotLogin>, ws_type: CryptoSpotWsType) -> CryptoSpotWs {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            CryptoSpotWsType::Public => {
+                "wss://stream.crypto.com/exchange/v1/market".to_string()
+            },
+            CryptoSpotWsType::Private => {
+                "wss://stream.crypto.com/exchange/v1/user".to_string()
+            }
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",address_url);
+        } else {
+            info!("走普通通道:{}",address_url);
+        }
+
+        CryptoSpotWs {
+            label,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 3,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<CryptoSpotSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 小写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "");
+            *symbol = symbol.replace("-", "");
+        }
+        self.symbol_s = b_array;
+    }
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                CryptoSpotSubscribeType::PuGetInstruments => false,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: CryptoSpotSubscribeType) -> String {
+        match subscribe_type {
+            CryptoSpotSubscribeType::PuGetInstruments => {
+                format!("{}@aggTrade", symbol)
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> String {
+        let mut params = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                params.push(ty_str);
+            }
+        }
+
+       let nonce =  Utc::now().timestamp_millis();
+        let str = json!({
+          "id": 1,
+          "method": "subscribe",
+          "params": {
+            "channels":params
+          },
+          "nonce": nonce
+        });
+
+        str.to_string()
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async(&mut self,
+                                  bool_v1: Arc<AtomicBool>,
+                                  write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                  write_rx: UnboundedReceiver<Message>,
+                                  read_tx: UnboundedSender<ResponseData>) -> Result<(), Error>
+    {
+        let login_is = self.contains_pr();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let label = self.label.clone();
+        let heartbeat_time = self.heartbeat_time.clone();
+
+
+        //心跳-- 方法内部线程启动
+        // let write_tx_clone1 = Arc::clone(write_tx_am);
+        // tokio::spawn(async move {
+        //     trace!("线程-异步心跳-开始");
+        //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
+        //     trace!("线程-异步心跳-结束");
+        // });
+
+        //设置订阅
+        let mut subscribe_array = vec![];
+        if login_is {
+            //登录相关
+        }
+        // subscribe_array.push(subscription.to_string());
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            trace!("线程-异步链接-开始");
+            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                             label.clone(), subscribe_array,
+                                             write_rx, read_tx,
+                                             Self::message_text,
+                                             Self::message_ping,
+                                             Self::message_pong,
+            ).await.expect("币安-期货");
+            trace!("线程-异步链接-结束");
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let  response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData {
+        trace!("原始数据");
+        trace!(?text);
+        let mut res_data = ResponseData::new("".to_string(), "".to_string(), "success".to_string(), "".to_string());
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+
+        if json_value.get("id").is_some()  && json_value.get("method").is_some()&& json_value.get("code").is_some()  {
+            //服务器心跳,需要做响应
+            let id = json_value["id"].as_i64().unwrap();
+            let method = json_value["method"].as_str().unwrap();
+            let code = json_value["code"].as_i64().unwrap();
+            if code == 0 && method == "public/heartbeat"{
+                res_data.code = "-302".to_string();
+                let str = json!({
+                  "id": id,
+                  "method": "public/respond-heartbeat"
+                });
+                res_data.message = "服务器主动心跳检测,客户端回应~!".to_string();
+                res_data.data = str .to_string();
+            }
+        } else{
+            res_data.message = "未知解析!!".to_string();
+        }
+
+        // if json_value.get("result").is_some() && json_value.get("id").is_some() &&
+        //     json_value.get("id").unwrap() == 1
+        // {
+        //     res_data.code = "-201".to_string();
+        //     res_data.message = "订阅成功".to_string();
+        // } else if json_value.get("error").is_some() {//订阅返回
+        //     res_data.code = json_value["error"]["code"].to_string();
+        //     res_data.message = json_value["error"]["msg"].to_string();
+        // } else if json_value.get("stream").is_some() {//订阅返回
+        //     res_data.data = format!("{}", json_value.get("data").as_ref().unwrap());
+        //     res_data.code = "200".to_string();
+        //
+        //     let channel = format!("{}", json_value.get("stream").as_ref().unwrap());
+        //     if channel.contains("@aggTrade") {
+        //         res_data.channel = "aggTrade".to_string();
+        //     } else if channel.contains("@depth20@100ms") {
+        //         res_data.channel = "depth".to_string();
+        //     } else if channel.contains("@bookTicker") {
+        //         res_data.channel = "bookTicker".to_string();
+        //     } else {
+        //         res_data.channel = "未知的频道".to_string();
+        //     }
+        // } else {
+        //     res_data.data = text
+        // }
+
+
+        res_data
+    }
+}

+ 5 - 165
exchanges/src/gate_swap_ws.rs

@@ -262,7 +262,7 @@ impl GateSwapWs {
         let write_tx_clone1 = Arc::clone(write_tx_am);
         tokio::spawn(async move {
             trace!("线程-异步心跳-开始");
-            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
             trace!("线程-异步心跳-结束");
         });
 
@@ -292,181 +292,21 @@ impl GateSwapWs {
 
         Ok(())
     }
-    // //代理
-    // async fn proxy_subscription(&self, bool_v1: Arc<AtomicBool>, mut web_socket: WebSocket<ProxyAutoStream>, subscription: Vec<Value>)
-    // {
-    //     info!("走代理-链接成功!开始数据读取");
-    //     let label = self.label.clone();
-    //     /*****订阅***/
-    //     for sub in &subscription {
-    //         trace!("订阅信息:{}", sub.to_string());
-    //         web_socket.write_message(Message::Text(sub.to_string()))
-    //             .unwrap();
-    //     }
-    //     /*****消息溜***/
-    //     let mut start_ping = chrono::Utc::now().timestamp_millis();
-    //     loop {
-    //         tokio::time::sleep(Duration::from_millis(1)).await;
-    //         let msg = web_socket.read_message();
-    //         match msg {
-    //             Ok(Message::Text(text)) => {
-    //                 // trace!("获取推送:{}",text.clone());
-    //                 let mut res_data = Self::ok_text(label.to_string(), text);
-    //                 res_data.time = get_time_microsecond();
-    //                 if res_data.code == "-200" {
-    //                     trace!("订阅成功:{:?}", res_data.data);
-    //                 } else {
-    //                     // self.sender.send(res_data).await.unwrap();
-    //                     let sender = self.sender.clone();
-    //                     tokio::spawn(async move {
-    //                         sender.send(res_data).await.unwrap();
-    //                     });
-    //                     tokio::spawn(async move {});
-    //
-    //                     //主动ping 服务器
-    //                     let new_time = chrono::Utc::now().timestamp_millis();
-    //                     // let tr = format!("判断-ping {}--{},{},{}",new_time,start_ping,(new_time - start_ping), (new_time - start_ping) > 10000);
-    //                     // trace!(tr);
-    //                     if (new_time - start_ping) > 10000 {
-    //                         let t = chrono::Utc::now().timestamp();
-    //                         let ping_str = serde_json::json!({
-    //                             "time" :t, "channel" : "futures.ping"
-    //                         });
-    //                         let _ = web_socket.write_message(Message::Ping(Vec::from(ping_str.to_string())));
-    //                         start_ping = new_time;
-    //                     }
-    //                 }
-    //             }
-    //             Ok(Message::Ping(s)) => {
-    //                 trace!( "Ping-响应--{:?}", String::from_utf8(s.clone()));
-    //                 let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
-    //                 trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Pong(s)) => {
-    //                 // trace!("Pong-响应--{:?}", String::from_utf8(s));
-    //                 trace!("Pong-响应--{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Close(_)) => {
-    //                 // trace!("socket 关闭: ");
-    //                 trace!( "Close-响应");
-    //                 break;
-    //             }
-    //             Err(error) => {
-    //                 // trace!("Error receiving message: {}", error);
-    //                 error!( "Err-响应{}", error);
-    //                 break;
-    //             }
-    //             _ => {}
-    //         }
-    //
-    //         let bool_v1_v = bool_v1.load(Ordering::SeqCst);
-    //         if !bool_v1_v {
-    //             break;
-    //         }
-    //     }
-    //     web_socket.close(None).unwrap();
-    // }
-    //
-    // //非代理
-    // async fn subscription(&self, bool_v1: Arc<AtomicBool>, mut web_socket: WebSocket<AutoStream>,
-    //                       subscription: Vec<Value>)
-    // {
-    //     info!("链接成功!开始数据读取");
-    //     let label = self.label.clone();
-    //     /*****订阅***/
-    //     for sub in &subscription {
-    //         trace!("订阅信息:{}", sub.to_string());
-    //         web_socket.write_message(Message::Text(sub.to_string()))
-    //             .unwrap();
-    //     }
-    //     /*****消息溜***/
-    //     let mut start_ping = chrono::Utc::now().timestamp_millis();
-    //     loop {
-    //         tokio::time::sleep(Duration::from_millis(1)).await;
-    //         let msg = web_socket.read_message();
-    //         match msg {
-    //             Ok(Message::Text(text)) => {
-    //                 // trace!("获取推送:{}",text.clone());
-    //                 let mut res_data = Self::ok_text(label.to_string(), text);
-    //                 res_data.time = get_time_microsecond();
-    //                 if res_data.code == "-200" {
-    //                     trace!("订阅成功:{:?}", res_data.data);
-    //                 } else {
-    //                     // self.sender.send(res_data).await.unwrap();
-    //                     let sender = self.sender.clone();
-    //                     tokio::spawn(async move {
-    //                         sender.send(res_data).await.unwrap();
-    //                     });
-    //                     tokio::spawn(async move {});
-    //
-    //                     //主动ping 服务器
-    //                     let new_time = chrono::Utc::now().timestamp_millis();
-    //                     // let tr = format!("判断-ping {}--{},{},{}",new_time,start_ping,(new_time - start_ping), (new_time - start_ping) > 10000);
-    //                     // trace!(tr);
-    //                     if (new_time - start_ping) > 10000 {
-    //                         let t = chrono::Utc::now().timestamp();
-    //                         let ping_str = serde_json::json!({
-    //                             "time" :t, "channel" : "futures.ping"
-    //                         });
-    //                         let _ = web_socket.write_message(Message::Ping(Vec::from(ping_str.to_string())));
-    //                         start_ping = new_time;
-    //                     }
-    //                 }
-    //             }
-    //             Ok(Message::Ping(s)) => {
-    //                 trace!( "Ping-响应--{:?}", String::from_utf8(s.clone()));
-    //                 let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
-    //                 trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Pong(s)) => {
-    //                 // trace!("Pong-响应--{:?}", String::from_utf8(s));
-    //                 trace!("Pong-响应--{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Close(_)) => {
-    //                 // trace!("socket 关闭: ");
-    //                 trace!( "Close-响应");
-    //                 break;
-    //             }
-    //             Err(error) => {
-    //                 // trace!("Error receiving message: {}", error);
-    //                 error!( "Err-响应{}", error);
-    //                 break;
-    //             }
-    //             _ => {}
-    //         }
-    //
-    //         let bool_v1_v = bool_v1.load(Ordering::SeqCst);
-    //         if !bool_v1_v {
-    //             break;
-    //         }
-    //     }
-    //     web_socket.close(None).unwrap();
-    // }
     /*******************************************************************************************************/
     /*****************************************数据解析*****************************************************/
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let mut response_data = Self::ok_text(text);
-        response_data.time = get_time_microsecond();
-        match response_data.code.as_str() {
-            "200" => Option::from(response_data),
-            "-201" => {
-                trace!("订阅成功:{:?}", response_data);
-                None
-            }
-            _ => None
-        }
+        let  response_data = Self::ok_text(text);
+        Option::from(response_data)
     }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-ping");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析-pong
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-pong");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData

+ 8 - 22
exchanges/src/kucoin_spot_ws.rs

@@ -276,13 +276,13 @@ impl KucoinSpotWs {
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let label = self.label.clone();
-        let heartbeat_time = self.heartbeat_time.clone();
+        let heartbeat_time = self.ws_param.ws_ping_interval;
 
         //心跳-- 方法内部线程启动
         let write_tx_clone1 = write_tx_am.clone();
         tokio::spawn(async move {
             trace!("线程-异步心跳-开始");
-            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
             trace!("线程-异步心跳-结束");
         });
 
@@ -327,30 +327,16 @@ impl KucoinSpotWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let mut response_data = Self::ok_text(text);
-        response_data.time = get_time_microsecond();
-        match response_data.code.as_str() {
-            "200" => Option::from(response_data),
-            "-201" => {
-                trace!("订阅成功:{:?}", response_data);
-                None
-            }
-            "-202" => {
-                trace!("未知解析:{:?}", response_data);
-                None
-            }
-            _ => None
-        }
+        let  response_data = Self::ok_text(text);
+        Option::from(response_data)
     }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-ping");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析-pong
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-pong");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
     }
 
     //数据解析
@@ -374,7 +360,7 @@ impl KucoinSpotWs {
             res_data.channel = format!("{}", json_value["subject"].as_str().unwrap());
 
             if json_value["topic"].as_str() == Option::from("/contractAccount/wallet") {
-                res_data.code = "-202".to_string();
+                res_data.code = "".to_string();
                 if json_value["subject"].as_str() == Option::from("availableBalance.change") {
                     res_data.code = "200".to_string();
                     res_data.data = json_value["data"].to_string();
@@ -383,7 +369,7 @@ impl KucoinSpotWs {
                 res_data.data = json_value["data"].to_string();
             }
         } else {
-            res_data.code = "-202".to_string();
+            res_data.code = "".to_string();
             res_data.message = "未知解析".to_string();
         }
         res_data

+ 9 - 23
exchanges/src/kucoin_swap_ws.rs

@@ -32,7 +32,7 @@ pub struct KucoinSwapWsParam {
 //订阅频道
 #[derive(Clone)]
 pub enum KucoinSwapSubscribeType {
-    PuContractMarketLevel2Depth50,
+    PuContractMarketLevel2Depth50,//买卖盘 快照,asks:卖,bids:买入
     PuContractMarketExecution,
     PuContractMarkettickerV2,
 
@@ -299,13 +299,13 @@ impl KucoinSwapWs {
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let label = self.label.clone();
-        let heartbeat_time = self.heartbeat_time.clone();
+        let heartbeat_time = self.ws_param.ws_ping_interval.clone();
 
         //心跳-- 方法内部线程启动
         let write_tx_clone1 = write_tx_am.clone();
         tokio::spawn(async move {
             trace!("线程-异步心跳-开始");
-            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
             trace!("线程-异步心跳-结束");
         });
 
@@ -341,30 +341,16 @@ impl KucoinSwapWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let mut response_data = Self::ok_text(text);
-        response_data.time = get_time_microsecond();
-        match response_data.code.as_str() {
-            "200" => Option::from(response_data),
-            "-201" => {
-                trace!("订阅成功:{:?}", response_data);
-                None
-            }
-            "-202" => {
-                trace!("未知解析:{:?}", response_data);
-                None
-            }
-            _ => None
-        }
+        let  response_data = Self::ok_text(text);
+        Option::from(response_data)
     }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-ping");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析-pong
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-pong");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData
@@ -388,7 +374,7 @@ impl KucoinSwapWs {
             res_data.channel = format!("{}", json_value["subject"].as_str().unwrap());
 
             if json_value["topic"].as_str() == Option::from("/contractAccount/wallet") {
-                res_data.code = "-202".to_string();
+                res_data.code = "".to_string();
                 if json_value["subject"].as_str() == Option::from("availableBalance.change") {
                     res_data.code = "200".to_string();
                     res_data.data = json_value["data"].to_string();
@@ -397,7 +383,7 @@ impl KucoinSwapWs {
                 res_data.data = json_value["data"].to_string();
             }
         } else {
-            res_data.code = "-202".to_string();
+            res_data.code = "".to_string();
             res_data.message = "未知解析".to_string();
         }
         res_data

+ 1 - 0
exchanges/src/lib.rs

@@ -19,4 +19,5 @@ pub mod bitget_spot_ws;
 pub mod bitget_spot_rest;
 pub mod kucoin_spot_ws;
 pub mod kucoin_spot_rest;
+pub mod crypto_spot_ws;
 

+ 9 - 212
exchanges/src/okx_swap_ws.rs

@@ -97,7 +97,7 @@ impl OkxSwapWs {
             login_param,
             symbol_s: vec![],
             subscribe_types: vec![],
-            heartbeat_time: 1000 * 5,
+            heartbeat_time: 1000 * 10,
         }
     }
 
@@ -319,249 +319,46 @@ impl OkxSwapWs {
 
         Ok(())
     }
-
-    // async fn run(&mut self, bool_v1: Arc<AtomicBool>)
-    // {
-    //     //订阅信息组装
-    //     let subscription = self.get_subscription();
-    //     loop {
-    //         trace!("要连接咯~~!!{}", self.address_url);
-    //
-    //         let address_url = Url::parse(self.address_url.as_str()).unwrap();
-    //         //1. 判断是否需要代理,根据代理地址是否存来选择
-    //         if self.proxy.ip_address.len() > 0 {
-    //             let ip_array: Vec<&str> = self.proxy.ip_address.split(".").collect();
-    //             let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
-    //                 ip_array[0].parse().unwrap(),
-    //                 ip_array[1].parse().unwrap(),
-    //                 ip_array[2].parse().unwrap(),
-    //                 ip_array[3].parse().unwrap())
-    //             ), self.proxy.port.parse().unwrap());
-    //             let websocket_config = Some(WebSocketConfig {
-    //                 max_send_queue: Some(16),
-    //                 max_message_size: Some(16 * 1024 * 1024),
-    //                 max_frame_size: Some(16 * 1024 * 1024),
-    //                 accept_unmasked_frames: false,
-    //             });
-    //             let max_redirects = 5;
-    //             match connect_with_proxy(address_url.clone(),
-    //                                      proxy_address, websocket_config, max_redirects) {
-    //                 Ok(ws) => {
-    //                     let bool_v1_clone = Arc::clone(&bool_v1);
-    //                     self.proxy_subscription(bool_v1_clone, ws.0, subscription.clone()).await;
-    //                 }
-    //                 Err(err) => {
-    //                     trace!("Can't connect(无法连接): {}", err);
-    //                 }
-    //             };
-    //         } else {
-    //             match connect(address_url.clone()) {
-    //                 Ok(ws) => {
-    //                     let bool_v1_clone = Arc::clone(&bool_v1);
-    //                     self.subscription(bool_v1_clone, ws.0, subscription.clone()).await;
-    //                 }
-    //                 Err(err) => {
-    //                     // 连接失败时执行的操作
-    //                     trace!("Can't connect(无法连接): {}", err);
-    //                     // 返回一个默认的 WebSocket 对象或其他适当的值
-    //                     // 或者根据需要触发 panic 或返回错误信息
-    //                 }
-    //             };
-    //         }
-    //         trace!("退出来咯");
-    //
-    //         let bool_v1_clone = Arc::clone(&bool_v1);
-    //         let bool_v1_v = bool_v1_clone.load(Ordering::SeqCst);
-    //         if !bool_v1_v {
-    //             break;
-    //         }
-    //     }
-    // }
-    //
-    // //代理
-    // async fn proxy_subscription(&self, bool_v1: Arc<AtomicBool>, mut web_socket: WebSocket<ProxyAutoStream>,
-    //                             subscription: String)
-    // {
-    //     let lable = self.label.clone();
-    //     /*****登陆***/
-    //     let login_str = self.log_in_to_str();
-    //     if login_str != "" {
-    //         let _ = web_socket.write_message(Message::Text(login_str));
-    //         thread::sleep(Duration::from_secs(3));
-    //     }
-    //     /*****订阅***/
-    //     web_socket.write_message(Message::Text(subscription))
-    //         .unwrap();
-    //     /*****消息溜***/
-    //     let mut ping_timeout = chrono::Utc::now().timestamp_millis();
-    //     loop {
-    //         tokio::time::sleep(Duration::from_millis(1)).await;
-    //         let msg = web_socket.read_message();
-    //         match msg {
-    //             Ok(Message::Text(text)) => {
-    //                 let get_time = chrono::Utc::now().timestamp_millis();
-    //                 if (get_time - ping_timeout) >= (1000 * 30) {
-    //                     trace!("30s 一次主动发送心跳包!");
-    //                     let _ = web_socket.write_message(Message::Ping(Vec::from("ping")));
-    //                     ping_timeout = get_time;
-    //                 }
-    //
-    //                 let mut res_data = Self::ok_text(lable.to_string(), text);
-    //                 res_data.time = get_time_microsecond();
-    //                 if res_data.code == "-201" {
-    //                     trace!("登陆成功!");
-    //                 } else if res_data.code == "-200" {
-    //                     trace!("订阅成功:{:?}", res_data.data);
-    //                 } else {
-    //                     self.sender.send(res_data).await.unwrap();
-    //                 }
-    //             }
-    //             Ok(Message::Ping(s)) => {
-    //                 trace!( "Ping-响应--{:?}", String::from_utf8(s.clone()));
-    //                 let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
-    //                 trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Pong(s)) => {
-    //                 // trace!("Pong-响应--{:?}", String::from_utf8(s));
-    //                 trace!( "Pong-响应--{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Close(_)) => {
-    //                 // trace!("socket 关闭: ");
-    //                 trace!( "Close-响应");
-    //             }
-    //             Err(e) => {
-    //                 // trace!("Error receiving message: {}", error);
-    //                 trace!("Err-响应{}", e);
-    //                 error!( "Err-响应{}", e);
-    //                 break;
-    //             }
-    //             _ => {}
-    //         }
-    //
-    //         let bool_v1_v = bool_v1.load(Ordering::SeqCst);
-    //         if !bool_v1_v {
-    //             break;
-    //         }
-    //     }
-    //     web_socket.close(None).unwrap();
-    // }
-    //
-    // //非代理
-    // async fn subscription(&self, bool_v1: Arc<AtomicBool>, mut web_socket: WebSocket<AutoStream>,
-    //                       subscription: String)
-    // {
-    //     let lable = self.label.clone();
-    //     /*****订阅***/
-    //     web_socket.write_message(Message::Text(subscription))
-    //         .unwrap();
-    //     /*****消息溜***/
-    //     let mut ping_timeout = chrono::Utc::now().timestamp_millis();
-    //     loop {
-    //         tokio::time::sleep(Duration::from_millis(1)).await;
-    //         let msg = web_socket.read_message();
-    //         match msg {
-    //             Ok(Message::Text(text)) => {
-    //                 let get_time = chrono::Utc::now().timestamp_millis();
-    //                 if (get_time - ping_timeout) >= (1000 * 30) {
-    //                     trace!("30s 一次主动发送心跳包!");
-    //                     let _ = web_socket.write_message(Message::Ping(Vec::from("ping")));
-    //                     ping_timeout = get_time;
-    //                 }
-    //
-    //                 let mut res_data = Self::ok_text(lable.to_string(), text);
-    //                 res_data.time = get_time_microsecond();
-    //                 if res_data.code == "-201" {
-    //                     trace!("登陆成功!");
-    //                 } else if res_data.code == "-200" {
-    //                     trace!("订阅成功:{:?}", res_data.data);
-    //                 } else {
-    //                     self.sender.send(res_data).await.unwrap();
-    //                 }
-    //             }
-    //             Ok(Message::Ping(s)) => {
-    //                 trace!( "Ping-响应--{:?}", String::from_utf8(s.clone()));
-    //                 let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
-    //                 trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Pong(s)) => {
-    //                 // trace!("Pong-响应--{:?}", String::from_utf8(s));
-    //                 trace!( "Pong-响应--{:?}", String::from_utf8(s.clone()));
-    //             }
-    //             Ok(Message::Close(_)) => {
-    //                 // trace!("socket 关闭: ");
-    //                 trace!( "Close-响应");
-    //             }
-    //             Err(e) => {
-    //                 // trace!("Error receiving message: {}", error);
-    //                 trace!("Err-响应{}", e);
-    //                 error!( "Err-响应{}", e);
-    //                 break;
-    //             }
-    //             _ => {}
-    //         }
-    //
-    //         let bool_v1_v = bool_v1.load(Ordering::SeqCst);
-    //         if !bool_v1_v {
-    //             break;
-    //         }
-    //     }
-    //     web_socket.close(None).unwrap();
-    // }
     /*******************************************************************************************************/
     /*****************************************数据解析*****************************************************/
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let mut response_data = Self::ok_text(text);
-        response_data.time = get_time_microsecond();
-        match response_data.code.as_str() {
-            "200" => Option::from(response_data),
-            "-200" => {
-                trace!("订阅成功:{:?}", response_data);
-                None
-            }
-            "-201" => {
-                trace!("登录成功:{:?}", response_data);
-                None
-            }
-            _ => None
-        }
+        let  response_data = Self::ok_text(text);
+        Option::from(response_data)
     }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-ping");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析-pong
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-        trace!("服务器响应-pong");
-        return None;
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
     }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData
     {
         // trace!("元数据:{}",text);
-        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
+        let mut res_data = ResponseData::new("".to_string(), "".to_string(), "success".to_string(), "".to_string());
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
         if json_value.get("event").is_some() {//订阅返回
             if json_value["event"].as_str() == Option::from("login") &&
                 json_value["code"].as_str() == Option::from("0") {
-                res_data.code = "-201".to_string();
+                res_data.code = "-200".to_string();
                 res_data.message = format!("登陆成功!");
             } else if json_value["event"].as_str() == Option::from("error") {
                 res_data.code = json_value["code"].to_string();
                 res_data.message = format!("订阅失败:{}", json_value["msg"].to_string());
             } else if json_value["event"].as_str() == Option::from("subscribe") {
-                res_data.code = "-200".to_string();
+                res_data.code = "-201".to_string();
                 res_data.data = text;
                 res_data.message = format!("订阅成功!");
-            } else {
-                res_data.data = text;
             }
         } else {
             if json_value.get("arg").is_some() && json_value.get("data").is_some() {
                 res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
                 res_data.data = json_value["data"].to_string();
+                res_data.code = "200".to_string();
                 // res_data.reach_time = json_value["data"][0]["ts"].as_str().unwrap().parse().unwrap()
             } else {
                 res_data.data = text;

+ 53 - 6
exchanges/src/socket_tool.rs

@@ -60,33 +60,79 @@ impl AbstractWsMode {
             trace!("WebSocket 握手完成。");
             let (mut write, mut read) = ws_stream.split();
 
+            let write_arc =Arc::new(Mutex::new(write));
+            let write_clone = Arc::clone(&write_arc);
             //订阅写入(包括订阅信息 )
             trace!("订阅内容:{:?}",subscribe_array.clone());
             for s in &subscribe_array {
-                write.send(Message::Text(s.parse().unwrap())).await?;
+                let mut write_lock = write_clone.lock().await;
+                write_lock.send(Message::Text(s.parse().unwrap())).await?;
             }
 
             //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
             // let stdin_to_ws = write_rx.map(Ok).forward(write);
             // Writing task
+            let write_clone2 = Arc::clone(&write_arc);
             let stdin_to_ws = async {
                 while let Some(message) = write_rx.next().await {
-                    write.send(message).await?;
+                    let mut write_lock2 = write_clone2.lock().await;
+                    write_lock2.send(message).await?;
                 }
                 Ok::<(), tokio_tungstenite::tungstenite::Error>(())
             };
+            let write_clone3 = Arc::clone(&write_arc);
             let ws_to_stdout = async {
+
                 while let Some(message) = read.next().await {
+                    let mut write_lock3 = write_clone3.lock().await;
                     let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
                     // let response_data = func(message);
                     if response_data.is_some() {
                         let mut data = response_data.unwrap();
                         data.label = lable.clone();
                         let code = data.code.clone();
-                        if code.as_str() == "-1" {} else if code.as_str() == "200" {
-                            if bool_v1.load(Ordering::Relaxed) {
-                                read_tx.unbounded_send(data).unwrap();
-                            }
+                        /**
+                             200 -正确返回
+                            -200 -登录成功
+                            -201 -订阅成功
+                            -300 -客户端收到服务器心跳ping,需要响应
+                            -301 -客户端收到服务器心跳pong,需要响应
+                            -302 -客户端收到服务器心跳自定义,需要响应自定义
+                         **/
+                        match code.as_str() {
+                            "200" => {
+                                if bool_v1.load(Ordering::Relaxed) {
+                                    read_tx.unbounded_send(data).unwrap();
+                                }
+                            },
+                            "-200" => {//订阅成功
+                                trace!("登录成功:{:?}", data);
+                            },
+                            "-201" => {//订阅成功
+                                trace!("订阅成功:{:?}", data);
+                            },
+                            "-300" => {//服务器发送心跳 ping 给客户端,客户端需要pong回应
+                                trace!("服务器响应-ping");
+                                if data.data.len()>0{
+                                    write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
+                                    trace!("客户端回应服务器-pong");
+                                }
+                            },
+                            "-301" => {//服务器发送心跳 pong 给客户端,客户端需要ping回应
+                                trace!("服务器响应-pong");
+                                if data.data.len()>0{
+                                    write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
+                                    trace!("客户端回应服务器-ping");
+                                }
+                            },
+                            "-302" => {//客户端收到服务器心跳自定义,需要响应自定义
+                                trace!("特定字符心跳,特殊响应:{:?}", data);
+                                write_lock3.send(Message::Text(data.data)).await?;
+                                trace!("特殊字符心跳-回应完成");
+                            },
+                            _ => {
+                                trace!("未知:{:?}",data);
+                            },
                         }
                     }
                 }
@@ -126,6 +172,7 @@ impl AbstractWsMode {
         }
         // return Ok(());
     }
+
     //心跳包
     pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
         loop {

+ 20 - 20
exchanges/tests/binance_spot_test.rs

@@ -51,26 +51,26 @@ async fn ws_custom_subscribe() {
     });
 
     //写数据
-    let bool_v2_clone = Arc::clone(&bool_v1);
-    let write_tx_clone = Arc::clone(&write_tx_am);
-    let su = ws.get_subscription();
-    let tw = tokio::spawn(async move {
-        trace!("线程-数据写入-开始");
-        loop {
-            tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
-            // let close_frame = CloseFrame {
-            //     code: CloseCode::Normal,
-            //     reason: Cow::Borrowed("Bye bye"),
-            // };
-            // let message = Message::Close(Some(close_frame));
-
-
-            let message = Message::Text(su.clone());
-            AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
-            trace!("发送指令成功");
-        }
-        trace!("线程-数据写入-结束");
-    });
+    // let bool_v2_clone = Arc::clone(&bool_v1);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
 
     let t1 = tokio::spawn(async move {
         //链接

+ 20 - 20
exchanges/tests/binance_swap_test.rs

@@ -54,26 +54,26 @@ async fn ws_custom_subscribe() {
     });
 
     //写数据
-    let bool_v2_clone = Arc::clone(&bool_v1);
-    let write_tx_clone = Arc::clone(&write_tx_am);
-    let su = ws.get_subscription();
-    let tw = tokio::spawn(async move {
-        trace!("线程-数据写入-开始");
-        loop {
-            tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
-            // let close_frame = CloseFrame {
-            //     code: CloseCode::Normal,
-            //     reason: Cow::Borrowed("Bye bye"),
-            // };
-            // let message = Message::Close(Some(close_frame));
-
-
-            let message = Message::Text(su.clone());
-            AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
-            trace!("发送指令成功");
-        }
-        trace!("线程-数据写入-结束");
-    });
+    // let bool_v2_clone = Arc::clone(&bool_v1);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
 
     let t1 = tokio::spawn(async move {
         //链接

+ 1 - 1
exchanges/tests/bitget_spot_test.rs

@@ -34,7 +34,7 @@ async fn ws_custom_subscribe_pu() {
     let write_tx_am = Arc::new(Mutex::new(write_tx));
     let bool_v1 = Arc::new(AtomicBool::new(true));
 
-//读取
+    //读取
     let _bool_v1_clone = Arc::clone(&bool_v1);
     let _tr = tokio::spawn(async move {
         trace!("线程-数据读取-开启");

+ 86 - 0
exchanges/tests/crypto_spot_test.rs

@@ -0,0 +1,86 @@
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+
+use futures_util::StreamExt;
+use tokio::sync::Mutex;
+use tracing::trace;
+
+use exchanges::crypto_spot_ws::{CryptoSpotLogin, CryptoSpotWs, CryptoSpotWsType};
+
+const ACCESS_KEY: &str = "";
+const SECRET_KEY: &str = "";
+
+
+//ws-订阅公共频道信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
+async fn ws_pu() {
+    global::log_utils::init_log_with_trace();
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+
+    let mut ws = get_ws(None,CryptoSpotWsType::Public);
+    // ws.set_symbols(vec!["BTC_USDT".to_string()]);
+    ws.set_subscribe(vec![
+        // CryptoSpotSubscribeType::PuBookTicker,
+        // BinanceSwapSubscribeType::PuAggTrade,
+        // BinanceSwapSubscribeType::PuDepth20levels100ms,
+    ]);
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let bool_v1 = Arc::new(AtomicBool::new(true));
+
+    //读取
+    let _bool_v1_clone = Arc::clone(&bool_v1);
+    let _tr = tokio::spawn(async move {
+        trace!("线程-数据读取-开启");
+        loop {
+            if let Some(data) = read_rx.next().await {
+                trace!("读取数据data:{:?}",data)
+            }
+        }
+        // trace!("线程-数据读取-结束");
+    });
+
+    //写数据
+    // let bool_v2_clone = Arc::clone(&bool_v1);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
+
+    let t1 = tokio::spawn(async move {
+        //链接
+        let bool_v3_clone = Arc::clone(&bool_v1);
+        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        trace!("test 唯一线程结束--");
+    });
+    tokio::try_join!(t1).unwrap();
+    trace!("当此结束");
+    trace!("重启!");
+    trace!("参考交易所关闭");
+    return;
+
+}
+
+fn get_ws(btree_map: Option< CryptoSpotLogin>,ws_type:CryptoSpotWsType) -> CryptoSpotWs {
+    let binance_ws = CryptoSpotWs::new(false,
+                                        btree_map,
+                                       ws_type);
+    binance_ws
+}

+ 3 - 3
exchanges/tests/gate_swap_test.rs

@@ -29,10 +29,10 @@ async fn ws_custom_subscribe() {
     ws.set_symbols(vec!["BTC_USDT".to_string()]);
     ws.set_subscribe(vec![
         // GateSwapSubscribeType::PuFuturesOrderBook,
-        // GateSwapSubscribeType::PuFuturesCandlesticks,
-        // GateSwapSubscribeType::PuFuturesTrades,
+        GateSwapSubscribeType::PuFuturesCandlesticks,
+        GateSwapSubscribeType::PuFuturesTrades,
 
-        GateSwapSubscribeType::PrFuturesBalances("".to_string()),
+        // GateSwapSubscribeType::PrFuturesBalances("".to_string()),
         // GateSwapSubscribeType::PrFuturesOrders("".to_string()),
         // GateSwapSubscribeType::PrFuturesPositions("".to_string()),
     ]);

+ 2 - 5
exchanges/tests/okx_swap_test.rs

@@ -23,17 +23,14 @@ async fn ws_custom_subscribe_pu() {
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
 
-    // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
-    // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
-
 
     let mut ws = get_ws(None, OkxSwapWsType::Public).await;
     ws.set_symbols(vec!["BTC_USDT".to_string()]);
     ws.set_subscribe(vec![
         // OkxSwapSubscribeType::PuBooks5,
         // OkxSwapSubscribeType::Putrades,
-        // OkxSwapSubscribeType::PuBooks50L2tbt,
-        OkxSwapSubscribeType::PuIndexTickers,
+        OkxSwapSubscribeType::PuBooks50L2tbt,
+        // OkxSwapSubscribeType::PuIndexTickers,
     ]);