Browse Source

可以连上gate_usdt_swap了。

skyffire 1 year ago
parent
commit
ba48f813c8

+ 2 - 0
Cargo.toml

@@ -22,4 +22,6 @@ ctrlc = "3.2.5"
 serde_json = "1.0.105"
 rust_decimal = { version = "1.32.0", features = ["maths"] }
 rust_decimal_macros = "1.32.0"
+futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
+futures-channel = "0.3.28"
 

+ 2 - 2
exchanges/src/binance_spot_rest.rs

@@ -6,9 +6,9 @@ pub struct BinanceSpotRest {}
 impl BinanceSpotRest {
     pub fn new(_is_colo: bool, _login_param: BTreeMap<String, String>) -> BinanceSpotRest
     {
-        return BinanceSpotRest::new_label("default-BinanceSpotRest".to_string(), _is_colo, _login_param);
+        return BinanceSpotRest::new_with_tag("default-BinanceSpotRest".to_string(), _is_colo, _login_param);
     }
-    pub fn new_label(_label: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> BinanceSpotRest {
+    pub fn new_with_tag(_tag: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> BinanceSpotRest {
         BinanceSpotRest {}
     }
 }

+ 6 - 6
exchanges/src/binance_spot_ws.rs

@@ -36,7 +36,7 @@
 // #[allow(dead_code)]
 // pub struct BinanceSpotWs {
 //     //类型
-//     label: String,
+//     tag: String,
 //     //地址
 //     address_url: String,
 //     //账号信息
@@ -54,9 +54,9 @@
 //     /*****************************************获取一个对象****************************************************/
 //     /*******************************************************************************************************/
 //     pub fn new(is_colo: bool, login_param: Option<BinanceSpotLogin>, ws_type: BinanceSpotWsType) -> BinanceSpotWs {
-//         return BinanceSpotWs::new_label("default-BinanceSpotWs".to_string(), is_colo, login_param, ws_type);
+//         return BinanceSpotWs::new_with_tag("default-BinanceSpotWs".to_string(), is_colo, login_param, ws_type);
 //     }
-//     pub fn new_label(label: String, is_colo: bool, login_param: Option<BinanceSpotLogin>, ws_type: BinanceSpotWsType) -> BinanceSpotWs {
+//     pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<BinanceSpotLogin>, ws_type: BinanceSpotWsType) -> BinanceSpotWs {
 //         /*******公共频道-私有频道数据组装*/
 //         let address_url = match ws_type {
 //             BinanceSpotWsType::PublicAndPrivate => {
@@ -71,7 +71,7 @@
 //         }
 //         /*****返回结构体*******/
 //         BinanceSpotWs {
-//             label,
+//             tag,
 //             address_url,
 //             login_param,
 //             symbol_s: vec![],
@@ -158,7 +158,7 @@
 //         let login_is = self.contains_pr();
 //         let subscription = self.get_subscription();
 //         let address_url = self.address_url.clone();
-//         let label = self.label.clone();
+//         let tag = self.tag.clone();
 //         // let heartbeat_time = self.heartbeat_time.clone();
 //
 //
@@ -181,7 +181,7 @@
 //         let t2 = tokio::spawn(async move {
 //             trace!("线程-异步链接-开始");
 //             match AbstractWsMode::ws_connect_async(is_shutdown_arc, address_url.clone(),
-//                                                    label.clone(), subscribe_array,
+//                                                    tag.clone(), subscribe_array,
 //                                                    write_rx, read_tx,
 //                                                    Self::message_text,
 //                                                    Self::message_ping,

+ 7 - 7
exchanges/src/binance_swap_rest.rs

@@ -14,7 +14,7 @@ use serde_json::{json, Value};
 
 #[derive(Clone)]
 pub struct BinanceSwapRest {
-    label: String,
+    tag: String,
     base_url: String,
     client: reqwest::Client,
     /*******参数*/
@@ -31,9 +31,9 @@ impl BinanceSwapRest {
     /*******************************************************************************************************/
     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> BinanceSwapRest
     {
-        return BinanceSwapRest::new_label("default-BinanceSwapRest".to_string(), is_colo, login_param);
+        return BinanceSwapRest::new_with_tag("default-BinanceSwapRest".to_string(), is_colo, login_param);
     }
-    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BinanceSwapRest
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BinanceSwapRest
     {
         let base_url = if is_colo {
             "https://fapi.binance.com".to_string()
@@ -48,7 +48,7 @@ impl BinanceSwapRest {
         }
         /*****返回结构体*******/
         BinanceSwapRest {
-            label,
+            tag,
             base_url: base_url.to_string(),
             client: Client::new(),
             login_param,
@@ -345,7 +345,7 @@ impl BinanceSwapRest {
         //是否需要登录-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+                let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
                 return e;
             } else {//需要登录-且登录参数齐全
                 //组装sing
@@ -433,7 +433,7 @@ impl BinanceSwapRest {
     pub fn on_success_data(&mut self, text: String) -> ResponseData {
         let data = serde_json::from_str(text.as_str()).unwrap();
 
-        ResponseData::new(self.label.clone(), 200, "success".to_string(), data)
+        ResponseData::new(self.tag.clone(), 200, "success".to_string(), data)
     }
 
     pub fn on_error_data(&mut self, text: String, base_url: String, params: String) -> ResponseData {
@@ -443,7 +443,7 @@ impl BinanceSwapRest {
             Ok(data) => {
                 let message = data["msg"].as_str().unwrap();
 
-                let mut error = ResponseData::error(self.label.clone(), message.to_string());
+                let mut error = ResponseData::error(self.tag.clone(), message.to_string());
                 error.code = i16::from_str(data["code"].as_str().unwrap()).unwrap();
                 error.message = format!("请求地址:{}, 请求参数:{}", base_url, params);
                 error

+ 6 - 6
exchanges/src/binance_swap_ws.rs

@@ -37,7 +37,7 @@ pub struct BinanceSwapLogin {
 #[allow(dead_code)]
 pub struct BinanceSwapWs {
     //类型
-    label: String,
+    tag: String,
     //地址
     address_url: String,
     //账号
@@ -55,9 +55,9 @@ impl BinanceSwapWs {
     /*****************************************获取一个对象****************************************************/
     /*******************************************************************************************************/
     pub fn new(is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
-        return BinanceSwapWs::new_label("default-BinanceSwapWs".to_string(), is_colo, login_param, ws_type);
+        return BinanceSwapWs::new_with_tag("default-BinanceSwapWs".to_string(), is_colo, login_param, ws_type);
     }
-    pub fn new_label(label: String, is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
         /*******公共频道-私有频道数据组装*/
         let address_url = match ws_type {
             BinanceSwapWsType::PublicAndPrivate => {
@@ -73,7 +73,7 @@ impl BinanceSwapWs {
         }
 
         BinanceSwapWs {
-            label,
+            tag,
             address_url,
             login_param,
             symbol_s: vec![],
@@ -163,7 +163,7 @@ impl BinanceSwapWs {
         let login_is = self.contains_pr();
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
-        let label = self.label.clone();
+        let tag = self.tag.clone();
         // let heartbeat_time = self.heartbeat_time.clone();
 
 
@@ -190,7 +190,7 @@ impl BinanceSwapWs {
                 info!("binance_usdt_swap socket 连接中……");
                 // ws层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("binance_usdt_swap socket 断连,1s以后重连……");

+ 8 - 8
exchanges/src/bitget_spot_rest.rs

@@ -12,7 +12,7 @@
 //
 // #[derive(Clone, Debug)]
 // pub struct BitgetSpotRest {
-//     pub label: String,
+//     pub tag: String,
 //     base_url: String,
 //     client: reqwest::Client,
 //     /*******参数*/
@@ -32,9 +32,9 @@
 //
 //     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> BitgetSpotRest
 //     {
-//         return BitgetSpotRest::new_label("default-BitgetSpotRest".to_string(), is_colo, login_param);
+//         return BitgetSpotRest::new_with_tag("default-BitgetSpotRest".to_string(), is_colo, login_param);
 //     }
-//     pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BitgetSpotRest {
+//     pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BitgetSpotRest {
 //         let base_url = if is_colo {
 //             "https://api.bitget.com".to_string()
 //         } else {
@@ -48,7 +48,7 @@
 //         }
 //         /*****返回结构体*******/
 //         BitgetSpotRest {
-//             label,
+//             tag,
 //             base_url,
 //             client: Client::new(),
 //             login_param,
@@ -667,7 +667,7 @@
 //         //是否需要登录-- 组装sing
 //         if is_login {
 //             if !is_login_param {
-//                 let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+//                 let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
 //                 return e;
 //             } else {
 //                 //需要登录-且登录参数齐全
@@ -757,7 +757,7 @@
 //             "POST" => self.client.post(url.clone()).body(params).headers(headers),
 //             "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
 //             // "PUT" => self.client.put(url.clone()).json(&params),
-//             _ => return Ok(ResponseData::error(self.label.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+//             _ => return Ok(ResponseData::error(self.tag.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
 //         };
 //
 //         let response = req.send().await?;
@@ -765,11 +765,11 @@
 //             // 读取响应的内容
 //             let body = response.text().await?;
 //             // trace!("ok-----{}", body);
-//             res_data = ResponseData::new(self.label.clone(), "200".to_string(), "success".to_string(), body);
+//             res_data = ResponseData::new(self.tag.clone(), "200".to_string(), "success".to_string(), body);
 //         } else {
 //             let body = response.text().await?;
 //             // trace!("error-----{}", body);
-//             res_data = ResponseData::error(self.label.clone(), body.to_string())
+//             res_data = ResponseData::error(self.tag.clone(), body.to_string())
 //         }
 //
 //         Ok(res_data)

+ 6 - 6
exchanges/src/bitget_spot_ws.rs

@@ -44,7 +44,7 @@
 // #[derive(Clone)]
 // pub struct BitgetSpotWs {
 //     //类型
-//     label: String,
+//     tag: String,
 //     //地址
 //     address_url: String,
 //     //账号
@@ -62,9 +62,9 @@
 //     /*****************************************获取一个对象****************************************************/
 //     /*******************************************************************************************************/
 //     pub fn new(is_colo: bool, login_param: Option<BitgetSpotLogin>, ws_type: BitgetSpotWsType) -> BitgetSpotWs {
-//         return BitgetSpotWs::new_label("default-BitgetSpotWs".to_string(), is_colo, login_param, ws_type);
+//         return BitgetSpotWs::new_with_tag("default-BitgetSpotWs".to_string(), is_colo, login_param, ws_type);
 //     }
-//     pub fn new_label(label: String, is_colo: bool, login_param: Option<BitgetSpotLogin>, ws_type: BitgetSpotWsType) -> BitgetSpotWs
+//     pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<BitgetSpotLogin>, ws_type: BitgetSpotWsType) -> BitgetSpotWs
 //     {
 //         /*******公共频道-私有频道数据组装*/
 //         let address_url = match ws_type {
@@ -83,7 +83,7 @@
 //         }
 //
 //         BitgetSpotWs {
-//             label,
+//             tag,
 //             address_url,
 //             login_param,
 //             symbol_s: vec![],
@@ -245,7 +245,7 @@
 //         let login_is = self.contains_pr();
 //         let subscription = self.get_subscription();
 //         let address_url = self.address_url.clone();
-//         let label = self.label.clone();
+//         let tag = self.tag.clone();
 //         let heartbeat_time = self.heartbeat_time.clone();
 //
 //
@@ -272,7 +272,7 @@
 //         let t2 = tokio::spawn(async move {
 //             trace!("线程-异步链接-开始");
 //             match AbstractWsMode::ws_connect_async(is_shutdown_arc, address_url.clone(),
-//                                                    label.clone(), subscribe_array,
+//                                                    tag.clone(), subscribe_array,
 //                                                    write_rx, read_tx,
 //                                                    Self::message_text,
 //                                                    Self::message_ping,

+ 9 - 9
exchanges/src/bitget_swap_rest.rs

@@ -12,7 +12,7 @@ use serde_json::Value;
 
 #[derive(Clone, Debug)]
 pub struct BitgetSwapRest {
-    pub label: String,
+    pub tag: String,
     base_url: String,
     client: Client,
     login_param: BTreeMap<String, String>,                  // 登录参数
@@ -23,11 +23,11 @@ pub struct BitgetSwapRest {
 
 impl BitgetSwapRest {
     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> BitgetSwapRest {
-        return BitgetSwapRest::new_label("default-BitgetSwapRest".to_string(), is_colo, login_param)
+        return BitgetSwapRest::new_with_tag("default-BitgetSwapRest".to_string(), is_colo, login_param)
     }
 
-    // 构造Bitget,可以自定义label
-    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BitgetSwapRest {
+    // 构造Bitget,可以自定义tag
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BitgetSwapRest {
         let base_url = if is_colo {
             "https://api.bitget.com".to_string()
         } else {
@@ -41,7 +41,7 @@ impl BitgetSwapRest {
         }
 
         BitgetSwapRest {
-            label,
+            tag,
             base_url,
             client: Client::new(),
             login_param,
@@ -230,7 +230,7 @@ impl BitgetSwapRest {
         //是否需要登陆-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+                let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
                 return e;
             } else {
                 //需要登陆-且登陆参数齐全
@@ -353,7 +353,7 @@ impl BitgetSwapRest {
             "POST" => self.client.post(url.clone()).body(params).headers(headers),
             "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
             // "PUT" => self.client.put(url.clone()).json(&params),
-            _ => return Ok(ResponseData::error(self.label.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+            _ => return Ok(ResponseData::error(self.tag.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
         };
 
         let response = req.send().await?;
@@ -361,10 +361,10 @@ impl BitgetSwapRest {
             // 读取响应的内容
             let body = response.text().await?;
             // trace!("ok-----{}", body);
-            res_data = ResponseData::new(self.label.clone(), 200, "success".to_string(), body.parse().unwrap());
+            res_data = ResponseData::new(self.tag.clone(), 200, "success".to_string(), body.parse().unwrap());
         } else {
             let body = response.text().await?;
-            res_data = ResponseData::error(self.label.clone(), body.to_string())
+            res_data = ResponseData::error(self.tag.clone(), body.to_string())
         }
 
         Ok(res_data)

+ 6 - 6
exchanges/src/bitget_swap_ws.rs

@@ -35,7 +35,7 @@ pub struct BitgetSwapLogin {
 
 #[derive(Clone)]
 pub struct BitgetSwapWs {
-    label: String,                                              // 类型
+    tag: String,                                              // 类型
     address_url: String,                                        // 地址
     login_param: Option<BitgetSwapLogin>,                       // 账号
     symbol_s: Vec<String>,                                      // 币对
@@ -45,10 +45,10 @@ pub struct BitgetSwapWs {
 
 impl BitgetSwapWs {
     pub fn new(is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
-        return BitgetSwapWs::new_label("default-BitgetSwapWs".to_string(), is_colo, login_param, ws_type);
+        return BitgetSwapWs::new_with_tag("default-BitgetSwapWs".to_string(), is_colo, login_param, ws_type);
     }
 
-    pub fn new_label(label: String, is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
         let address_url = match ws_type {
             BitgetSwapWsType::Public => {
                 "wss://ws.bitget.com/v2/ws/public".to_string()
@@ -65,7 +65,7 @@ impl BitgetSwapWs {
         }
 
         BitgetSwapWs {
-            label,
+            tag,
             address_url,
             login_param,
             symbol_s: vec![],
@@ -186,7 +186,7 @@ impl BitgetSwapWs {
     {
         let login_is = self.contains_pr();
         let address_url = self.address_url.clone();
-        let label = self.label.clone();
+        let tag = self.tag.clone();
         let heartbeat_time = self.heartbeat_time.clone();
 
         // 设置订阅
@@ -237,7 +237,7 @@ impl BitgetSwapWs {
 
                 // ws层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 login_is, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("bitget_usdt_swap socket 断连,重连中……");

+ 6 - 6
exchanges/src/bybit_swap_rest.rs

@@ -14,7 +14,7 @@ use crate::response_base::ResponseData;
 
 #[derive(Clone, Debug)]
 pub struct BybitSwapRest {
-    pub label: String,
+    pub tag: String,
     base_url: String,
     client: reqwest::Client,
     /*******参数*/
@@ -32,9 +32,9 @@ impl BybitSwapRest {
 
     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> BybitSwapRest
     {
-        return BybitSwapRest::new_label("default-BybitSwapRest".to_string(), is_colo, login_param);
+        return BybitSwapRest::new_with_tag("default-BybitSwapRest".to_string(), is_colo, login_param);
     }
-    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BybitSwapRest {
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BybitSwapRest {
         let base_url = if is_colo {
             "https://api.bytick.com".to_string()
         } else {
@@ -48,7 +48,7 @@ impl BybitSwapRest {
         }
         /*****返回结构体*******/
         BybitSwapRest {
-            label,
+            tag,
             base_url,
             client: Client::new(),
             login_param,
@@ -341,7 +341,7 @@ impl BybitSwapRest {
         //是否需要登录-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+                let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
                 return e;
             } else {
                 //需要登录-且登录参数齐全
@@ -437,7 +437,7 @@ impl BybitSwapRest {
         } else {
             let body = response.text().await.unwrap();
 
-            ResponseData::error(self.label.clone(), body)
+            ResponseData::error(self.tag.clone(), body)
         }
     }
 

+ 6 - 6
exchanges/src/bybit_swap_ws.rs

@@ -47,7 +47,7 @@ pub struct BybitSwapLogin {
 #[allow(dead_code)]
 pub struct BybitSwapWs {
     //类型
-    label: String,
+    tag: String,
     //地址
     address_url: String,
     //账号
@@ -65,9 +65,9 @@ impl BybitSwapWs {
     /*****************************************获取一个对象****************************************************/
     /*******************************************************************************************************/
     pub fn new(is_colo: bool, login_param: Option<BybitSwapLogin>, ws_type: BybitSwapWsType) -> BybitSwapWs {
-        return BybitSwapWs::new_label("default-BybitSwapWs".to_string(), is_colo, login_param, ws_type);
+        return BybitSwapWs::new_with_tag("default-BybitSwapWs".to_string(), is_colo, login_param, ws_type);
     }
-    pub fn new_label(label: String, is_colo: bool, login_param: Option<BybitSwapLogin>, ws_type: BybitSwapWsType) -> BybitSwapWs {
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<BybitSwapLogin>, ws_type: BybitSwapWsType) -> BybitSwapWs {
         /*******公共频道-私有频道数据组装*/
         let address_url = match ws_type {
             BybitSwapWsType::Public => {
@@ -85,7 +85,7 @@ impl BybitSwapWs {
         }
 
         BybitSwapWs {
-            label,
+            tag,
             address_url,
             login_param,
             symbol_s: vec![],
@@ -200,7 +200,7 @@ impl BybitSwapWs {
         let login_is = self.contains_pr();
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
-        let label = self.label.clone();
+        let tag = self.tag.clone();
         let login_param = self.login_param.clone();
         let (api_key, secret_key) = match login_param {
             None => { ("".to_string(), "".to_string()) }
@@ -249,7 +249,7 @@ impl BybitSwapWs {
 
                 // ws网络层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 false, label.clone(), subscribe_array, write_to_socket_rx_arc.clone(),
+                                                 false, tag.clone(), subscribe_array, write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("bybit_usdt_swap socket 断连,1s以后重连……");

+ 6 - 6
exchanges/src/crypto_spot_ws.rs

@@ -38,7 +38,7 @@
 // #[allow(dead_code)]
 // pub struct CryptoSpotWs {
 //     //类型
-//     label: String,
+//     tag: String,
 //     //地址
 //     address_url: String,
 //     //账号
@@ -56,9 +56,9 @@
 //     /*****************************************获取一个对象****************************************************/
 //     /*******************************************************************************************************/
 //     pub fn new(is_colo: bool, login_param: Option<CryptoSpotLogin>, ws_type: CryptoSpotWsType) -> CryptoSpotWs {
-//         return CryptoSpotWs::new_label("default-CryptoSpotWs".to_string(), is_colo, login_param, ws_type);
+//         return CryptoSpotWs::new_with_tag("default-CryptoSpotWs".to_string(), is_colo, login_param, ws_type);
 //     }
-//     pub fn new_label(label: String, is_colo: bool, login_param: Option<CryptoSpotLogin>, ws_type: CryptoSpotWsType) -> CryptoSpotWs {
+//     pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<CryptoSpotLogin>, ws_type: CryptoSpotWsType) -> CryptoSpotWs {
 //         /*******公共频道-私有频道数据组装*/
 //         let address_url = match ws_type {
 //             CryptoSpotWsType::Public => {
@@ -76,7 +76,7 @@
 //         }
 //
 //         CryptoSpotWs {
-//             label,
+//             tag,
 //             address_url,
 //             login_param,
 //             symbol_s: vec![],
@@ -176,7 +176,7 @@
 //         let login_is = self.contains_pr();
 //         let subscription = self.get_subscription();
 //         let address_url = self.address_url.clone();
-//         let label = self.label.clone();
+//         let tag = self.tag.clone();
 //         // let heartbeat_time = self.heartbeat_time.clone();
 //
 //
@@ -199,7 +199,7 @@
 //         let t2 = tokio::spawn(async move {
 //             trace!("线程-异步链接-开始");
 //             match AbstractWsMode::ws_connect_async(is_shutdown_arc, address_url.clone(),
-//                                                    label.clone(), subscribe_array,
+//                                                    tag.clone(), subscribe_array,
 //                                                    write_rx, read_tx,
 //                                                    Self::message_text,
 //                                                    Self::message_ping,

+ 2 - 2
exchanges/src/gate_spot_rest.rs

@@ -6,9 +6,9 @@ pub struct GateSpotRest {}
 impl GateSpotRest {
     pub fn new(_is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSpotRest
     {
-        return GateSpotRest::new_label("default-GateSpotRest".to_string(), _is_colo, _login_param);
+        return GateSpotRest::new_with_tag("default-GateSpotRest".to_string(), _is_colo, _login_param);
     }
-    pub fn new_label(_label: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSpotRest
+    pub fn new_with_tag(_tag: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSpotRest
     {
         GateSpotRest {}
     }

+ 2 - 2
exchanges/src/gate_spot_ws.rs

@@ -6,9 +6,9 @@
 // impl GateSpotWs {
 //     pub fn new(_is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSpotWs
 //     {
-//         return GateSpotWs::new_label("default-GateSpotWs".to_string(), _is_colo, _login_param);
+//         return GateSpotWs::new_with_tag("default-GateSpotWs".to_string(), _is_colo, _login_param);
 //     }
-//     pub fn new_label(_label: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSpotWs
+//     pub fn new_with_tag(_tag: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSpotWs
 //     {
 //         GateSpotWs {}
 //     }

+ 11 - 11
exchanges/src/gate_swap_rest.rs

@@ -15,7 +15,7 @@ use tracing::{error, info};
 
 #[derive(Clone)]
 pub struct GateSwapRest {
-    label: String,
+    tag: String,
     base_url: String,
     client: reqwest::Client,
     /*******参数*/
@@ -32,9 +32,9 @@ impl GateSwapRest {
     /*******************************************************************************************************/
     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> GateSwapRest
     {
-        return GateSwapRest::new_label("default-GateSwapRest".to_string(), is_colo, login_param);
+        return GateSwapRest::new_with_tag("default-GateSwapRest".to_string(), is_colo, login_param);
     }
-    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> GateSwapRest
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> GateSwapRest
     {
         let base_url = if is_colo {
             let url = "https://apiv4-private.gateapi.io".to_string();
@@ -50,7 +50,7 @@ impl GateSwapRest {
         if is_colo {} else {}
         /*****返回结构体*******/
         GateSwapRest {
-            label,
+            tag,
             base_url: base_url.to_string(),
             client: Client::new(),
             login_param,
@@ -185,10 +185,10 @@ impl GateSwapRest {
     ) -> ResponseData
     {
         if side != "buy" && side != "sell" {
-            ResponseData::error(self.label.clone(), format!("未知下单方向!{}", side));
+            ResponseData::error(self.tag.clone(), format!("未知下单方向!{}", side));
         }
         if pos_side != "long" && pos_side != "short" {
-            ResponseData::error(self.label.clone(), format!("未知持仓方向!{}", side));
+            ResponseData::error(self.tag.clone(), format!("未知持仓方向!{}", side));
         }
         let mut param = serde_json::json!({
             "contract":contract, //合约标识
@@ -477,7 +477,7 @@ impl GateSwapRest {
         //是否需要登录-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+                let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
                 return e;
             } else {//需要登录-且登录参数齐全
                 //组装sing
@@ -583,7 +583,7 @@ impl GateSwapRest {
     pub fn on_success_data(&mut self, text: &String) -> ResponseData {
         let data = serde_json::from_str(text.as_str()).unwrap();
 
-        ResponseData::new(self.label.clone(), 200, "success".to_string(), data)
+        ResponseData::new(self.tag.clone(), 200, "success".to_string(), data)
     }
 
     pub fn on_error_data(&mut self, text: &String, base_url: &String, params: &String) -> ResponseData {
@@ -594,12 +594,12 @@ impl GateSwapRest {
                 let message;
 
                 if !data["message"].is_null() {
-                    message = format!("{}:{}", data["label"].as_str().unwrap(), data["message"].as_str().unwrap());
+                    message = format!("{}:{}", data["tag"].as_str().unwrap(), data["message"].as_str().unwrap());
                 } else {
-                    message = data["label"].to_string();
+                    message = data["tag"].to_string();
                 }
 
-                let mut error = ResponseData::error(self.label.clone(), message);
+                let mut error = ResponseData::error(self.tag.clone(), message);
                 error.message = format!("请求地址:{}, 请求参数:{}, 报错内容:{}。", base_url, params, error.message);
                 error
             }

+ 6 - 6
exchanges/src/gate_swap_ws.rs

@@ -47,7 +47,7 @@ pub struct GateSwapLogin {
 #[derive(Clone)]
 pub struct GateSwapWs {
     //类型
-    label: String,
+    tag: String,
     //地址
     address_url: String,
     //账号信息
@@ -65,9 +65,9 @@ impl GateSwapWs {
     /*****************************************获取一个对象****************************************************/
     /*******************************************************************************************************/
     pub fn new(is_colo: bool, login_param: Option<GateSwapLogin>, ws_type: GateSwapWsType) -> GateSwapWs {
-        return GateSwapWs::new_label("default-GateSwapWs".to_string(), is_colo, login_param, ws_type);
+        return GateSwapWs::new_with_tag("default-GateSwapWs".to_string(), is_colo, login_param, ws_type);
     }
-    pub fn new_label(label: String, is_colo: bool, login_param: Option<GateSwapLogin>, ws_type: GateSwapWsType) -> GateSwapWs
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<GateSwapLogin>, ws_type: GateSwapWsType) -> GateSwapWs
     {
         /*******公共频道-私有频道数据组装*/
         let address_url = match ws_type {
@@ -86,7 +86,7 @@ impl GateSwapWs {
 
 
         GateSwapWs {
-            label,
+            tag,
             address_url,
             login_param,
             symbol_s: vec![],
@@ -269,7 +269,7 @@ impl GateSwapWs {
         let login_is = self.contains_pr();
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
-        let label = self.label.clone();
+        let tag = self.tag.clone();
         let heartbeat_time = self.heartbeat_time.clone();
         let timestamp = Utc::now().timestamp();
 
@@ -302,7 +302,7 @@ impl GateSwapWs {
                 info!("gate_usdt_swap socket 连接中……");
 
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("gate_usdt_swap socket 断连,1s以后重连……");

+ 9 - 9
exchanges/src/kucoin_spot_rest.rs

@@ -15,7 +15,7 @@
 //
 // #[derive(Clone, Debug)]
 // pub struct KucoinSpotRest {
-//     pub label: String,
+//     pub tag: String,
 //     base_url: String,
 //     client: reqwest::Client,
 //     /*******参数*/
@@ -35,9 +35,9 @@
 //
 //     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> KucoinSpotRest
 //     {
-//         return KucoinSpotRest::new_label("default-KucoinSpotRest".to_string(), is_colo, login_param);
+//         return KucoinSpotRest::new_with_tag("default-KucoinSpotRest".to_string(), is_colo, login_param);
 //     }
-//     pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> KucoinSpotRest {
+//     pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> KucoinSpotRest {
 //         let base_url = if is_colo {
 //             "https://api.kucoin.com".to_string()
 //         } else {
@@ -51,7 +51,7 @@
 //         }
 //         /*****返回结构体*******/
 //         KucoinSpotRest {
-//             label,
+//             tag,
 //             base_url,
 //             client: Client::new(),
 //             login_param,
@@ -339,7 +339,7 @@
 //         //是否需要登录-- 组装sing
 //         if is_login {
 //             if !is_login_param {
-//                 let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+//                 let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
 //                 return e;
 //             } else {
 //                 //需要登录-且登录参数齐全
@@ -460,7 +460,7 @@
 //             "POST" => self.client.post(url.clone()).body(params).headers(headers),
 //             "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
 //             // "PUT" => self.client.put(url.clone()).json(&params),
-//             _ => return Ok(ResponseData::error(self.label.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+//             _ => return Ok(ResponseData::error(self.tag.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
 //         };
 //
 //         let response = req.send().await?;
@@ -468,11 +468,11 @@
 //             // 读取响应的内容
 //             let body = response.text().await?;
 //             // trace!("ok-----{}", body);
-//             res_data = ResponseData::new(self.label.clone(), "200".to_string(), "success".to_string(), body);
+//             res_data = ResponseData::new(self.tag.clone(), "200".to_string(), "success".to_string(), body);
 //         } else {
 //             let body = response.text().await?;
 //             // trace!("error-----{}", body);
-//             res_data = ResponseData::error(self.label.clone(), body.to_string())
+//             res_data = ResponseData::error(self.tag.clone(), body.to_string())
 //         }
 //
 //         Ok(res_data)
@@ -501,7 +501,7 @@
 //                         //                               "".parse().unwrap());
 //                         // error
 //
-//                         let mut error = ResponseData::error(res_data.label, msg.parse().unwrap());
+//                         let mut error = ResponseData::error(res_data.tag, msg.parse().unwrap());
 //                         error.code = code.parse().unwrap();
 //                         error.data = format!("请求地址:{},请求参数:{}", base_url, params);
 //                         error

+ 6 - 6
exchanges/src/kucoin_spot_ws.rs

@@ -50,7 +50,7 @@
 // #[allow(dead_code)]
 // pub struct KucoinSpotWs {
 //     //类型
-//     label: String,
+//     tag: String,
 //     //地址
 //     address_url: String,
 //     //代理信息
@@ -70,9 +70,9 @@
 //     /*****************************************获取一个对象****************************************************/
 //     /*******************************************************************************************************/
 //     pub async fn new(is_colo: bool, login_param: Option<KucoinSpotLogin>, ws_type: KucoinSpotWsType) -> KucoinSpotWs {
-//         return KucoinSpotWs::new_label("default-KucoinSpotWs".to_string(), is_colo, login_param, ws_type).await;
+//         return KucoinSpotWs::new_with_tag("default-KucoinSpotWs".to_string(), is_colo, login_param, ws_type).await;
 //     }
-//     pub async fn new_label(label: String, is_colo: bool, login_param: Option<KucoinSpotLogin>, ws_type: KucoinSpotWsType) -> KucoinSpotWs {
+//     pub async fn new_with_tag(tag: String, is_colo: bool, login_param: Option<KucoinSpotLogin>, ws_type: KucoinSpotWsType) -> KucoinSpotWs {
 //         /*******公共频道-私有频道数据组装*/
 //         let mut ws_param = KucoinSpotWsParam {
 //             token: "".to_string(),
@@ -102,7 +102,7 @@
 //         }
 //
 //         KucoinSpotWs {
-//             label,
+//             tag,
 //             address_url,
 //             login_param,
 //             ws_param,
@@ -274,7 +274,7 @@
 //         let login_is = self.contains_pr();
 //         let subscription = self.get_subscription();
 //         let address_url = self.address_url.clone();
-//         let label = self.label.clone();
+//         let tag = self.tag.clone();
 //         let heartbeat_time = self.ws_param.ws_ping_interval;
 //
 //         //心跳-- 方法内部线程启动
@@ -307,7 +307,7 @@
 //         let t2 = tokio::spawn(async move {
 //             trace!("线程-异步链接-开始");
 //             match AbstractWsMode::ws_connect_async(is_shutdown_arc, address_url.clone(),
-//                                                    label.clone(), subscribe_array.clone(),
+//                                                    tag.clone(), subscribe_array.clone(),
 //                                                    write_rx, read_tx,
 //                                                    Self::message_text,
 //                                                    Self::message_ping,

+ 9 - 9
exchanges/src/kucoin_swap_rest.rs

@@ -12,7 +12,7 @@ use crate::response_base::ResponseData;
 
 #[derive(Clone, Debug)]
 pub struct KucoinSwapRest {
-    pub label: String,
+    pub tag: String,
     base_url: String,
     client: reqwest::Client,
     /*******参数*/
@@ -31,9 +31,9 @@ impl KucoinSwapRest {
 
     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> KucoinSwapRest
     {
-        return KucoinSwapRest::new_label("default-KucoinSwapRest".to_string(), is_colo, login_param);
+        return KucoinSwapRest::new_with_tag("default-KucoinSwapRest".to_string(), is_colo, login_param);
     }
-    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> KucoinSwapRest {
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> KucoinSwapRest {
         let base_url = if is_colo {
             "https://api-futures.kucoin.com".to_string()
         } else {
@@ -47,7 +47,7 @@ impl KucoinSwapRest {
         }
         /*****返回结构体*******/
         KucoinSwapRest {
-            label,
+            tag,
             base_url,
             client: Client::new(),
             login_param,
@@ -444,7 +444,7 @@ impl KucoinSwapRest {
         //是否需要登录-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+                let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
                 return e;
             } else {
                 //需要登录-且登录参数齐全
@@ -564,7 +564,7 @@ impl KucoinSwapRest {
             "POST" => self.client.post(url.clone()).body(params).headers(headers),
             "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
             // "PUT" => self.client.put(url.clone()).json(&params),
-            _ => return Ok(ResponseData::error(self.label.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+            _ => return Ok(ResponseData::error(self.tag.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
         };
 
         let response = req.send().await?;
@@ -573,11 +573,11 @@ impl KucoinSwapRest {
             let body = response.text().await?;
 
             let data = serde_json::from_str(body.as_str()).unwrap();
-            res_data = ResponseData::new(self.label.clone(), 200, "success".to_string(), data);
+            res_data = ResponseData::new(self.tag.clone(), 200, "success".to_string(), data);
         } else {
             let body = response.text().await?;
             // trace!("error-----{}", body);
-            res_data = ResponseData::error(self.label.clone(), body.to_string())
+            res_data = ResponseData::error(self.tag.clone(), body.to_string())
         }
 
         Ok(res_data)
@@ -605,7 +605,7 @@ impl KucoinSwapRest {
                         //                               "".parse().unwrap());
                         // error
 
-                        let mut error = ResponseData::error(res_data.label, msg.parse().unwrap());
+                        let mut error = ResponseData::error(res_data.tag, msg.parse().unwrap());
                         error.code = code.parse().unwrap();
                         error.message = format!("请求地址:{},请求参数:{},响应:{}。", base_url, params, json_value.to_string());
                         error

+ 6 - 6
exchanges/src/kucoin_swap_ws.rs

@@ -58,7 +58,7 @@ pub struct KucoinSwapLogin {
 #[allow(dead_code)]
 pub struct KucoinSwapWs {
     //类型
-    label: String,
+    tag: String,
     //地址
     address_url: String,
     //账号
@@ -78,9 +78,9 @@ impl KucoinSwapWs {
     /*****************************************获取一个对象****************************************************/
     /*******************************************************************************************************/
     pub async fn new(is_colo: bool, login_param: Option<KucoinSwapLogin>, ws_type: KucoinSwapWsType) -> KucoinSwapWs {
-        return Self::new_label("default-KucoinSwapWs".to_string(), is_colo, login_param, ws_type).await;
+        return Self::new_with_tag("default-KucoinSwapWs".to_string(), is_colo, login_param, ws_type).await;
     }
-    pub async fn new_label(label: String, is_colo: bool, login_param: Option<KucoinSwapLogin>, ws_type: KucoinSwapWsType) -> KucoinSwapWs {
+    pub async fn new_with_tag(tag: String, is_colo: bool, login_param: Option<KucoinSwapLogin>, ws_type: KucoinSwapWsType) -> KucoinSwapWs {
         /*******公共频道-私有频道数据组装*/
         let mut ws_param = KucoinSwapWsParam {
             token: "".to_string(),
@@ -110,7 +110,7 @@ impl KucoinSwapWs {
         }
 
         KucoinSwapWs {
-            label,
+            tag,
             address_url,
             login_param,
             ws_param,
@@ -304,7 +304,7 @@ impl KucoinSwapWs {
         let login_is = self.contains_pr();
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
-        let label = self.label.clone();
+        let tag = self.tag.clone();
         let heartbeat_time = self.ws_param.ws_ping_interval.clone();
 
         //心跳-- 方法内部线程启动
@@ -329,7 +329,7 @@ impl KucoinSwapWs {
             loop {
                 info!("kucoin_usdt_swap socket 连接中……");
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("kucoin_usdt_swap socket 断连,1s以后重连……");

+ 8 - 8
exchanges/src/okx_swap_rest.rs

@@ -11,7 +11,7 @@
 //
 // #[derive(Clone, Debug)]
 // pub struct OkxSwapRest {
-//     pub label: String,
+//     pub tag: String,
 //     base_url: String,
 //     client: reqwest::Client,
 //     /*******参数*/
@@ -31,9 +31,9 @@
 //
 //     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> OkxSwapRest
 //     {
-//         return OkxSwapRest::new_label("default-OkxSwapRest".to_string(), is_colo, login_param);
+//         return OkxSwapRest::new_with_tag("default-OkxSwapRest".to_string(), is_colo, login_param);
 //     }
-//     pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> OkxSwapRest {
+//     pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> OkxSwapRest {
 //         let base_url = if is_colo {
 //             "https://www.okx.com".to_string()
 //         } else {
@@ -47,7 +47,7 @@
 //         }
 //         /*****返回结构体*******/
 //         OkxSwapRest {
-//             label,
+//             tag,
 //             base_url,
 //             client: Client::new(),
 //             login_param,
@@ -407,7 +407,7 @@
 //         //是否需要登录-- 组装sing
 //         if is_login {
 //             if !is_login_param {
-//                 let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+//                 let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
 //                 return e;
 //             } else {
 //                 //需要登录-且登录参数齐全
@@ -497,7 +497,7 @@
 //             "POST" => self.client.post(url.clone()).body(params).headers(headers),
 //             "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
 //             // "PUT" => self.client.put(url.clone()).json(&params),
-//             _ => return Ok(ResponseData::error(self.label.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+//             _ => return Ok(ResponseData::error(self.tag.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
 //         };
 //
 //         let response = req.send().await?;
@@ -505,11 +505,11 @@
 //             // 读取响应的内容
 //             let body = response.text().await?;
 //             // trace!("ok-----{}", body);
-//             res_data = ResponseData::new(self.label.clone(), "200".to_string(), "success".to_string(), body);
+//             res_data = ResponseData::new(self.tag.clone(), "200".to_string(), "success".to_string(), body);
 //         } else {
 //             let body = response.text().await?;
 //             // trace!("error-----{}", body);
-//             res_data = ResponseData::error(self.label.clone(), body.to_string())
+//             res_data = ResponseData::error(self.tag.clone(), body.to_string())
 //         }
 //
 //         Ok(res_data)

+ 6 - 6
exchanges/src/okx_swap_ws.rs

@@ -50,7 +50,7 @@
 // #[derive(Clone)]
 // pub struct OkxSwapWs {
 //     //类型
-//     label: String,
+//     tag: String,
 //     //地址
 //     address_url: String,
 //     //账号信息
@@ -68,9 +68,9 @@
 //     /*****************************************获取一个对象****************************************************/
 //     /*******************************************************************************************************/
 //     pub fn new(is_colo: bool, login_param: Option<OkxSwapLogin>, ws_type: OkxSwapWsType) -> OkxSwapWs {
-//         return OkxSwapWs::new_label("default-OkxSwapWs".to_string(), is_colo, login_param, ws_type);
+//         return OkxSwapWs::new_with_tag("default-OkxSwapWs".to_string(), is_colo, login_param, ws_type);
 //     }
-//     pub fn new_label(label: String, is_colo: bool, login_param: Option<OkxSwapLogin>, ws_type: OkxSwapWsType) -> OkxSwapWs {
+//     pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<OkxSwapLogin>, ws_type: OkxSwapWsType) -> OkxSwapWs {
 //         /*******公共频道-私有频道数据组装*/
 //         let address_url = match ws_type {
 //             OkxSwapWsType::Public => {
@@ -91,7 +91,7 @@
 //         }
 //         /*****返回结构体*******/
 //         OkxSwapWs {
-//             label,
+//             tag,
 //             address_url,
 //             login_param,
 //             symbol_s: vec![],
@@ -272,7 +272,7 @@
 //         let login_is = self.contains_pr();
 //         let subscription = self.get_subscription();
 //         let address_url = self.address_url.clone();
-//         let label = self.label.clone();
+//         let tag = self.tag.clone();
 //         let heartbeat_time = self.heartbeat_time.clone();
 //
 //
@@ -305,7 +305,7 @@
 //         let t2 = tokio::spawn(async move {
 //             trace!("线程-异步链接-开始");
 //             match AbstractWsMode::ws_connect_async(is_shutdown_arc, address_url.clone(),
-//                                              label.clone(), subscribe_array,
+//                                              tag.clone(), subscribe_array,
 //                                              write_rx, read_tx,
 //                                              Self::message_text,
 //                                              Self::message_ping,

+ 5 - 5
exchanges/src/response_base.rs

@@ -4,7 +4,7 @@ use tokio::time::Instant;
 /**交易所返回数据处理之后,同意保存格式,为了内部其他接口调用*/
 #[derive(Debug, Clone)]
 pub struct ResponseData {
-    pub label: String,
+    pub tag: String,
     pub code: i16,
     pub message: String,
     pub channel: String,
@@ -16,9 +16,9 @@ pub struct ResponseData {
 }
 
 impl ResponseData {
-    pub fn new(label: String, code: i16, message: String, data: Value) -> ResponseData {
+    pub fn new(tag: String, code: i16, message: String, data: Value) -> ResponseData {
         ResponseData {
-            label,
+            tag,
             code,
             message,
             data,
@@ -29,9 +29,9 @@ impl ResponseData {
             ins: Instant::now(),
         }
     }
-    pub fn error(label: String, message: String) -> ResponseData {
+    pub fn error(tag: String, message: String) -> ResponseData {
         ResponseData {
-            label,
+            tag,
             code: -1,
             message: format!("{}", &message),
             data: Value::Null,

+ 4 - 4
exchanges/src/socket_tool.rs

@@ -32,7 +32,7 @@ pub struct AbstractWsMode {}
 impl AbstractWsMode {
     pub async fn ws_connected<T, PI, PO, F, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
                                                     is_first_login: bool,
-                                                    label: String,
+                                                    tag: String,
                                                     is_shutdown_arc: Arc<AtomicBool>,
                                                     handle_function: &F,
                                                     subscribe_array: Vec<String>,
@@ -81,7 +81,7 @@ impl AbstractWsMode {
                 // let response_data = func(message);
                 if response_data.is_some() {
                     let mut data = response_data.unwrap();
-                    data.label = label.clone();
+                    data.tag = tag.clone();
 
                     let code = data.code.clone();
 
@@ -158,7 +158,7 @@ impl AbstractWsMode {
                                                         handle_function: F,
                                                         address_url: String,
                                                         is_first_login: bool,
-                                                        label: String,
+                                                        tag: String,
                                                         subscribe_array: Vec<String>,
                                                         write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
                                                         message_text: T,
@@ -189,7 +189,7 @@ impl AbstractWsMode {
 
                 Self::ws_connected(write_to_socket_rx_arc,
                                    is_first_login,
-                                   label,
+                                   tag,
                                    is_shutdown_arc,
                                    &handle_function,
                                    subscribe_array.clone(),

+ 33 - 0
src/gate_usdt_swap_data_listener.rs

@@ -0,0 +1,33 @@
+use std::sync::{Arc};
+use std::sync::atomic::AtomicBool;
+use tokio::sync::Mutex;
+use tracing::info;
+use exchanges::gate_swap_ws::{GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
+use exchanges::response_base::ResponseData;
+
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let name = "gate_usdt_swap_listener";
+    let symbols = vec!["BTC_USDT".to_string(), "ETH_USDT".to_string()];
+
+    tokio::spawn(async move {
+        let mut ws = GateSwapWs::new_with_tag(name.to_string(), false, None, GateSwapWsType::PublicAndPrivate("usdt".to_string()));
+            ws.set_subscribe(vec![
+                GateSwapSubscribeType::PuFuturesTrades,
+                GateSwapSubscribeType::PuFuturesBookTicker,
+                GateSwapSubscribeType::PuFuturesOrderBook
+            ]);
+
+        // 读取数据
+        let fun = move |data: ResponseData| {
+            async move {
+                info!(?data);
+            }
+        };
+
+        // 建立链接
+        ws.set_symbols(symbols);
+        ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    });
+}

+ 14 - 3
src/main.rs

@@ -1,3 +1,8 @@
+mod gate_usdt_swap_data_listener;
+
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
 use tracing::info;
 use tracing_appender_timezone::non_blocking::WorkerGuard;
 
@@ -10,7 +15,13 @@ fn log_level_init(log_str: String, port: u32, app_name: String) -> WorkerGuard {
 #[tokio::main(flavor = "multi_thread")]
 async fn main() {
     // 日志级别配置
-    let _guard = log_level_init("info".to_string(), 8888, "data-center".to_string());
-
-    info!("Welcome data center");
+    let _ = log_level_init("info".to_string(), 8888, "data-center".to_string());
+    // 掌控全局的关闭
+    let running = Arc::new(AtomicBool::new(true));
+    // 启动gate监听器
+    gate_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // 每一秒检查一次程序是否结束
+    while running.load(Ordering::Relaxed) {
+        tokio::time::sleep(Duration::from_secs(1)).await;
+    }
 }

+ 3 - 3
standard/src/binance_spot_handle.rs

@@ -11,10 +11,10 @@
 // pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
 //     let res_data_str = res_data.data;
 //     let res_data_json: serde_json::Value = serde_json::from_str(&*res_data_str).unwrap();
-//     format_special_ticker(res_data_json, res_data.label)
+//     format_special_ticker(res_data_json, res_data.tag)
 // }
 //
-// pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialDepth {
+// pub fn format_special_ticker(data: serde_json::Value, tag: String) -> SpecialDepth {
 //     let bp = Decimal::from_str(data["b"].as_str().unwrap()).unwrap();
 //     let bq = Decimal::from_str(data["B"].as_str().unwrap()).unwrap();
 //     let ap = Decimal::from_str(data["a"].as_str().unwrap()).unwrap();
@@ -25,7 +25,7 @@
 //     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at: Default::default() };
 //     let depth_info = vec![bp, bq, ap, aq];
 //     SpecialDepth {
-//         name: label,
+//         name: tag,
 //         depth: depth_info,
 //         ticker: ticker_info,
 //         t,

+ 1 - 1
standard/src/binance_swap_handle.rs

@@ -19,7 +19,7 @@ pub fn handle_book_ticker(res_data: &ResponseData) -> SpecialDepth {
     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
-        name: (*res_data).label.clone(),
+        name: (*res_data).tag.clone(),
         depth: depth_info,
         ticker: ticker_info,
         t,

+ 4 - 4
standard/src/bitget_spot_handle.rs

@@ -55,7 +55,7 @@
 //     }
 //     trace!(?order_info);
 //     SpecialOrder {
-//         name: res_data.label,
+//         name: res_data.tag,
 //         order: order_info,
 //     }
 // }
@@ -112,10 +112,10 @@
 // pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
 //     let res_data_str = res_data.data;
 //     let res_data_json: Vec<serde_json::Value> = serde_json::from_str(&*res_data_str).unwrap();
-//     format_special_ticker(res_data_json[0].clone(), res_data.label)
+//     format_special_ticker(res_data_json[0].clone(), res_data.tag)
 // }
 //
-// pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialDepth {
+// pub fn format_special_ticker(data: serde_json::Value, tag: String) -> SpecialDepth {
 //     let bp = Decimal::from_str(data["bidPr"].as_str().unwrap()).unwrap();
 //     let bq = Decimal::from_str(data["bidSz"].as_str().unwrap()).unwrap();
 //     let ap = Decimal::from_str(data["askPr"].as_str().unwrap()).unwrap();
@@ -127,7 +127,7 @@
 //     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
 //     let depth_info = vec![bp, bq, ap, aq];
 //     SpecialDepth {
-//         name: label,
+//         name: tag,
 //         depth: depth_info,
 //         ticker: ticker_info,
 //         t,

+ 4 - 4
standard/src/bitget_swap_handle.rs

@@ -40,7 +40,7 @@ pub fn handle_order(res_data: &ResponseData, ct_val: Decimal) -> SpecialOrder {
         order_info.push(format_order_item(item.clone(), ct_val));
     }
     SpecialOrder {
-        name: res_data.label.clone(),
+        name: res_data.tag.clone(),
         order: order_info,
     }
 }
@@ -154,10 +154,10 @@ pub fn format_position_item(position_json: &Value, ct_val: &Decimal) -> Position
 // pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
 //     let res_data_str = res_data.data;
 //     let res_data_json: Vec<serde_json::Value> = serde_json::from_str(&*res_data_str).unwrap();
-//     format_special_ticker(res_data_json[0].clone(), res_data.label)
+//     format_special_ticker(res_data_json[0].clone(), res_data.tag)
 // }
 //
-// pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialDepth {
+// pub fn format_special_ticker(data: serde_json::Value, tag: String) -> SpecialDepth {
 //     let bp = Decimal::from_str(data["bidPr"].as_str().unwrap()).unwrap();
 //     let bq = Decimal::from_str(data["bidSz"].as_str().unwrap()).unwrap();
 //     let ap = Decimal::from_str(data["askPr"].as_str().unwrap()).unwrap();
@@ -169,7 +169,7 @@ pub fn format_position_item(position_json: &Value, ct_val: &Decimal) -> Position
 //     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
 //     let depth_info = vec![bp, bq, ap, aq];
 //     SpecialDepth {
-//         name: label,
+//         name: tag,
 //         depth: depth_info,
 //         ticker: ticker_info,
 //         t,

+ 2 - 2
standard/src/bybit_swap_handle.rs

@@ -99,7 +99,7 @@ pub fn handle_order(res_data: &ResponseData, ct_val: Decimal) -> SpecialOrder {
     };
 
     SpecialOrder {
-        name: res_data.label.clone(),
+        name: res_data.tag.clone(),
         order: order_info,
     }
 }
@@ -148,7 +148,7 @@ pub fn handle_ticker(res_data: &ResponseData) -> SpecialDepth {
     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at: 0 };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
-        name: res_data.label.clone(),
+        name: res_data.tag.clone(),
         depth: depth_info,
         ticker: ticker_info,
         t,

+ 2 - 2
standard/src/gate_swap_handle.rs

@@ -86,7 +86,7 @@ pub fn handle_order(res_data: &ResponseData, ct_val: Decimal) -> SpecialOrder {
     };
 
     SpecialOrder {
-        name: res_data.label.clone(),
+        name: res_data.tag.clone(),
         order: order_info,
     }
 }
@@ -130,7 +130,7 @@ pub fn handle_book_ticker(res_data: &ResponseData) -> SpecialDepth {
     let depth_info = vec![bp, bq, ap, aq];
 
     SpecialDepth {
-        name: (*res_data).label.clone(),
+        name: (*res_data).tag.clone(),
         depth: depth_info,
         ticker: ticker_info,
         t,

+ 4 - 4
standard/src/kucoin_handle.rs

@@ -31,10 +31,10 @@ pub fn format_account_info(data: &Value, symbol: &String) -> Account {
 
 // 处理特殊Ticket信息
 pub fn handle_book_ticker(res_data: &ResponseData) -> SpecialDepth {
-    format_special_ticker(&res_data.data, &res_data.label)
+    format_special_ticker(&res_data.data, &res_data.tag)
 }
 
-pub fn format_special_ticker(data: &Value, label: &String) -> SpecialDepth {
+pub fn format_special_ticker(data: &Value, tag: &String) -> SpecialDepth {
     let bp = Decimal::from_str(&data["bestBidPrice"].as_str().unwrap()).unwrap();
     let bq = Decimal::from_f64(data["bestBidSize"].as_f64().unwrap()).unwrap();
     let ap = Decimal::from_str(&data["bestAskPrice"].as_str().unwrap()).unwrap();
@@ -46,7 +46,7 @@ pub fn format_special_ticker(data: &Value, label: &String) -> SpecialDepth {
     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
-        name: label.clone(),
+        name: tag.clone(),
         depth: depth_info,
         ticker: ticker_info,
         t,
@@ -94,7 +94,7 @@ pub fn handle_order(res_data: &ResponseData, ct_val: Decimal) -> SpecialOrder {
     let order_info = vec![format_order_item(&res_data.data, ct_val)];
 
     SpecialOrder {
-        name: res_data.label.clone(),
+        name: res_data.tag.clone(),
         order: order_info,
     }
 }

+ 4 - 4
standard/src/kucoin_spot_handle.rs

@@ -55,7 +55,7 @@
 //     }
 //     trace!(?order_info);
 //     SpecialOrder {
-//         name: res_data.label,
+//         name: res_data.tag,
 //         order: order_info,
 //     }
 // }
@@ -112,10 +112,10 @@
 // pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
 //     let res_data_str = res_data.data;
 //     let res_data_json: serde_json::Value = serde_json::from_str(&*res_data_str).unwrap();
-//     format_special_ticker(res_data_json, res_data.label)
+//     format_special_ticker(res_data_json, res_data.tag)
 // }
 //
-// pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialDepth {
+// pub fn format_special_ticker(data: serde_json::Value, tag: String) -> SpecialDepth {
 //     let bp = Decimal::from_str(data["bestBid"].as_str().unwrap()).unwrap();
 //     let bq = Decimal::from_str(data["bestBidSize"].as_str().unwrap()).unwrap();
 //     let ap = Decimal::from_str(data["bestAsk"].as_str().unwrap()).unwrap();
@@ -127,7 +127,7 @@
 //     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
 //     let depth_info = vec![bp, bq, ap, aq];
 //     SpecialDepth {
-//         name: label,
+//         name: tag,
 //         depth: depth_info,
 //         ticker: ticker_info,
 //         t,

+ 4 - 4
standard/src/okx_handle.rs

@@ -83,7 +83,7 @@
 //     }
 //     trace!(?order_info);
 //     SpecialOrder {
-//         name: res_data.label,
+//         name: res_data.tag,
 //         order: order_info,
 //     }
 // }
@@ -140,10 +140,10 @@
 // pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
 //     let res_data_str = res_data.data;
 //     let res_data_json: serde_json::Value = serde_json::from_str(&*res_data_str).unwrap();
-//     format_special_ticker(res_data_json[0].clone(), res_data.label)
+//     format_special_ticker(res_data_json[0].clone(), res_data.tag)
 // }
 //
-// pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialDepth {
+// pub fn format_special_ticker(data: serde_json::Value, tag: String) -> SpecialDepth {
 //     let bids = data["bids"][0].as_array().unwrap();
 //     let asks = data["asks"][0].as_array().unwrap();
 //     let bp = Decimal::from_str(bids[0].as_str().unwrap()).unwrap();
@@ -157,7 +157,7 @@
 //     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
 //     let depth_info = vec![bp, bq, ap, aq];
 //     SpecialDepth {
-//         name: label,
+//         name: tag,
 //         depth: depth_info,
 //         ticker: ticker_info,
 //         t,

+ 581 - 581
standard/tests/exchange_test.rs

@@ -1,581 +1,581 @@
-use std::collections::{BTreeMap};
-use std::io::{Error};
-use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
-use futures::StreamExt;
-use rust_decimal_macros::dec;
-use tokio::sync::mpsc::{channel, Receiver, Sender};
-use tokio::sync::Mutex;
-use tokio::try_join;
-use tracing::{error, trace};
-// use exchanges::binance_spot_ws::{BinanceSpotLogin, BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
-// use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
-// use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
-// use exchanges::kucoin_spot_ws::{KucoinSpotLogin, KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
-// use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
-// use exchanges::bitget_spot_ws::{BitgetSpotLogin, BitgetSpotSubscribeType, BitgetSpotWs, BitgetSpotWsType};
-use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
-use exchanges::response_base::ResponseData;
-use standard::exchange::{Exchange, ExchangeEnum};
-// use standard::{binance_spot_handle, Order, Platform, utils};
-// use standard::{binance_handle, Order, Platform, utils};
-// use standard::{kucoin_handle, Order, Platform, utils};
-// use standard::{kucoin_spot_handle, Order, Platform, utils};
-// use standard::{gate_handle, Order, Platform, utils};
-// use standard::{bitget_spot_handle, Order, Platform, utils};
-use standard::{okx_handle, Order, Platform, utils};
-
-// 创建实体
-#[allow(dead_code)]
-pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
-    utils::proxy_handle();
-    let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
-    let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
-
-    let account_info = global::account_info::get_account_info("../test_account.toml");
-    match exchange {
-        ExchangeEnum::BinanceSwap => {
-            let mut params: BTreeMap<String, String> = BTreeMap::new();
-            let access_key = account_info.binance_access_key;
-            let secret_key = account_info.binance_secret_key;
-            params.insert("access_key".to_string(), access_key);
-            params.insert("secret_key".to_string(), secret_key);
-            Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
-        }
-        ExchangeEnum::BinanceSpot => {
-            let mut params: BTreeMap<String, String> = BTreeMap::new();
-            let access_key = account_info.binance_access_key;
-            let secret_key = account_info.binance_secret_key;
-            params.insert("access_key".to_string(), access_key);
-            params.insert("secret_key".to_string(), secret_key);
-            Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
-        }
-        ExchangeEnum::GateSwap => {
-            let mut params: BTreeMap<String, String> = BTreeMap::new();
-            let access_key = account_info.gate_access_key;
-            let secret_key = account_info.gate_secret_key;
-            params.insert("access_key".to_string(), access_key);
-            params.insert("secret_key".to_string(), secret_key);
-            Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
-        }
-        ExchangeEnum::GateSpot => {
-            let mut params: BTreeMap<String, String> = BTreeMap::new();
-            let access_key = account_info.gate_access_key;
-            let secret_key = account_info.gate_secret_key;
-            params.insert("access_key".to_string(), access_key);
-            params.insert("secret_key".to_string(), secret_key);
-            Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
-        }
-        ExchangeEnum::KucoinSwap => {
-            let mut params: BTreeMap<String, String> = BTreeMap::new();
-            let access_key = account_info.kucoin_access_key;
-            let secret_key = account_info.kucoin_secret_key;
-            let pass_key = account_info.kucoin_pass;
-            params.insert("access_key".to_string(), access_key);
-            params.insert("secret_key".to_string(), secret_key);
-            params.insert("pass_key".to_string(), pass_key);
-            Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
-        }
-        ExchangeEnum::KucoinSpot => {
-            let mut params: BTreeMap<String, String> = BTreeMap::new();
-            let access_key = account_info.kucoin_access_key;
-            let secret_key = account_info.kucoin_secret_key;
-            let pass_key = account_info.kucoin_pass;
-            params.insert("access_key".to_string(), access_key);
-            params.insert("secret_key".to_string(), secret_key);
-            params.insert("pass_key".to_string(), pass_key);
-            Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
-        }
-        ExchangeEnum::OkxSwap => {
-            let mut params: BTreeMap<String, String> = BTreeMap::new();
-            let access_key = account_info.okx_access_key;
-            let secret_key = account_info.okx_secret_key;
-            let pass_key = account_info.okx_pass;
-            params.insert("access_key".to_string(), access_key);
-            params.insert("secret_key".to_string(), secret_key);
-            params.insert("pass_key".to_string(), pass_key);
-            Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
-        }
-        ExchangeEnum::BitgetSpot => {
-            let mut params: BTreeMap<String, String> = BTreeMap::new();
-            let access_key = account_info.bitget_access_key;
-            let secret_key = account_info.bitget_secret_key;
-            let pass_key = account_info.bitget_pass;
-            params.insert("access_key".to_string(), access_key);
-            params.insert("secret_key".to_string(), secret_key);
-            params.insert("pass_key".to_string(), pass_key);
-            Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
-        }
-    }
-}
-
-#[allow(dead_code)]
-pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<OkxSwapSubscribeType>: From<T> {
-    utils::proxy_handle();
-    let account_info = global::account_info::get_account_info("../test_account.toml");
-    match exchange {
-        ExchangeEnum::BinanceSpot => {
-            // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
-            // trace!(symbol_format);
-            // let name = format!("binance_spot@{}", symbol.to_string().to_lowercase());
-            // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-            // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
-            // let write_tx_am = Arc::new(Mutex::new(write_tx));
-            // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
-            //
-            // let params = BinanceSpotLogin {
-            //     api_key: account_info.binance_access_key,
-            //     api_secret: account_info.binance_secret_key,
-            // };
-            // let mut exchange_wss;
-            // exchange_wss = BinanceSpotWs::new_label(name, false, Option::from(params), BinanceSpotWsType::PublicAndPrivate);
-            // exchange_wss.set_symbols(vec![symbol_format]);
-            // exchange_wss.set_subscribe(subscriber_type.into());
-            //
-            //
-            // let mold_arc = Arc::new(mold.to_string());
-            // //读取
-            // tokio::spawn(async move {
-            //     let mold_clone = Arc::clone(&mold_arc);
-            //     loop {
-            //         if let Some(data) = read_rx.next().await {
-            //             trace!("原始数据 data:{:?}",data);
-            //             match mold_clone.as_str() {
-            //                 "depth" => {
-            //                     if data.data != "" {
-            //                         let result = binance_spot_handle::handle_special_depth(data);
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "ticker" => {
-            //                     if data.data != "" {
-            //                         let result = binance_spot_handle::handle_special_ticker(data);
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 _ => {
-            //                     error!("没有该命令!mode={}", mold_clone);
-            //                     panic!("没有该命令!mode={}", mold_clone)
-            //                 }
-            //             }
-            //         }
-            //     };
-            // });
-            //
-            // let t1 = tokio::spawn(async move {
-            //     //链接
-            //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-            //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-            // });
-            // try_join!(t1).unwrap();
-        }
-        ExchangeEnum::BinanceSwap => {
-            // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
-            // trace!(symbol_format);
-            // let name = format!("binance_swap@{}", symbol.to_string().to_lowercase());
-            // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-            // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
-            // let write_tx_am = Arc::new(Mutex::new(write_tx));
-            // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
-            //
-            // let params = BinanceSwapLogin {
-            //     api_key: account_info.binance_access_key,
-            //     api_secret: account_info.binance_secret_key,
-            // };
-            // let mut exchange_wss;
-            // exchange_wss = BinanceSwapWs::new_label(name, false, Option::from(params), BinanceSwapWsType::PublicAndPrivate);
-            // exchange_wss.set_symbols(vec![symbol_format]);
-            // exchange_wss.set_subscribe(subscriber_type.into());
-            //
-            //
-            // let mold_arc = Arc::new(mold.to_string());
-            // //读取
-            // tokio::spawn(async move {
-            //     let mold_clone = Arc::clone(&mold_arc);
-            //     loop {
-            //         if let Some(data) = read_rx.next().await {
-            //             trace!("原始数据 data:{:?}",data);
-            //             match mold_clone.as_str() {
-            //                 "depth" => {
-            //                     if data.data != "" {
-            //                         let result = binance_handle::handle_special_depth(data);
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "ticker" => {
-            //                     if data.data != "" {
-            //                         let result = binance_handle::handle_special_ticker(data);
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 _ => {
-            //                     error!("没有该命令!mode={}", mold_clone);
-            //                     panic!("没有该命令!mode={}", mold_clone)
-            //                 }
-            //             }
-            //         }
-            //     };
-            // });
-            //
-            // let t1 = tokio::spawn(async move {
-            //     //链接
-            //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-            //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-            // });
-            // try_join!(t1).unwrap();
-        }
-        ExchangeEnum::KucoinSwap => {
-            // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
-            // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
-            //
-            // let name = format!("kucoin_swap@{}", symbol.to_string().to_lowercase());
-            // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-            // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
-            // let write_tx_am = Arc::new(Mutex::new(write_tx));
-            // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
-            //
-            // let params = KucoinSwapLogin {
-            //     access_key: account_info.kucoin_access_key,
-            //     secret_key: account_info.kucoin_secret_key,
-            //     pass_key: account_info.kucoin_pass,
-            // };
-            // let mut exchange_wss;
-            // if ["depth", "ticker"].contains(&mold) {
-            //     exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Public).await;
-            // } else {
-            //     exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Private).await;
-            // }
-            // exchange_wss.set_symbols(vec![symbol_format]);
-            // exchange_wss.set_subscribe(subscriber_type.into());
-            //
-            // let mold_arc = Arc::new(mold.to_string());
-            // tokio::spawn(async move {
-            //     let mold_clone = Arc::clone(&mold_arc);
-            //     loop {
-            //         if let Some(data) = read_rx.next().await {
-            //             trace!("原始数据 data:{:?}",data);
-            //             match mold_clone.as_str() {
-            //                 "depth" => {
-            //                     let result = kucoin_handle::handle_special_depth(data);
-            //                     trace!(?result)
-            //                 }
-            //                 "ticker" => {
-            //                     let result = kucoin_handle::handle_special_ticker(data);
-            //                     trace!(?result)
-            //                 }
-            //                 "account" => {
-            //                     let result = kucoin_handle::handle_account_info(data, symbol_back.clone());
-            //                     trace!(?result)
-            //                 }
-            //                 "position" => {
-            //                     let result = kucoin_handle::handle_position(data, dec!(1));
-            //                     trace!(?result)
-            //                 }
-            //                 "orders" => {
-            //                     let result = kucoin_handle::handle_order(data, dec!(0.001));
-            //                     trace!(?result)
-            //                 }
-            //                 _ => {
-            //                     error!("没有该命令!mode={}", mold_clone);
-            //                     panic!("没有该命令!mode={}", mold_clone)
-            //                 }
-            //             }
-            //         }
-            //     }
-            // });
-            //
-            // let t1 = tokio::spawn(async move {
-            //     //链接
-            //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-            //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-            // });
-            // try_join!(t1).unwrap();
-        }
-        ExchangeEnum::KucoinSpot => {
-            // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
-            // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
-            // trace!(symbol_format);
-            // let name = format!("kucoin_spot@{}", symbol.to_string().to_lowercase());
-            // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-            // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
-            // let write_tx_am = Arc::new(Mutex::new(write_tx));
-            // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
-            //
-            // let params = KucoinSpotLogin {
-            //     access_key: account_info.kucoin_access_key,
-            //     secret_key: account_info.kucoin_secret_key,
-            //     pass_key: account_info.kucoin_pass,
-            // };
-            // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
-            //     KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Public).await
-            // } else {
-            //     KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Private).await
-            // };
-            // exchange_wss.set_symbols(vec![symbol_format]);
-            // exchange_wss.set_subscribe(subscriber_type.into());
-            //
-            // let mold_arc = Arc::new(mold.to_string());
-            // tokio::spawn(async move {
-            //     let mold_clone = Arc::clone(&mold_arc);
-            //     loop {
-            //         if let Some(data) = read_rx.next().await {
-            //             trace!("原始数据 data:{:?}",data);
-            //             match mold_clone.as_str() {
-            //                 "depth" => {
-            //                     if data.data != "" {
-            //                         let result = kucoin_spot_handle::handle_special_depth(data);
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "ticker" => {
-            //                     if data.data != "" {
-            //                         let result = kucoin_spot_handle::handle_special_ticker(data);
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "account" => {
-            //                     if data.data != "" {
-            //                         let result = kucoin_spot_handle::handle_account_info(data, symbol_back.clone());
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "orders" => {
-            //                     if data.data != "" {
-            //                         let result = kucoin_spot_handle::handle_order(data, dec!(1));
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 _ => {
-            //                     error!("没有该命令!mode={}", mold_clone);
-            //                     panic!("没有该命令!mode={}", mold_clone)
-            //                 }
-            //             }
-            //         }
-            //     }
-            // });
-            // let t1 = tokio::spawn(async move {
-            //     //链接
-            //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-            //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-            // });
-            // try_join!(t1).unwrap();
-        }
-        ExchangeEnum::GateSwap => {
-            // let symbol_format = utils::format_symbol(symbol.to_string(), "_").to_uppercase();
-            // trace!(symbol_format);
-            // let name = format!("gate_swap@{}", symbol.to_string().to_lowercase());
-            // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-            // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
-            // let write_tx_am = Arc::new(Mutex::new(write_tx));
-            // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
-            //
-            // let params = GateSwapLogin {
-            //     api_key: account_info.gate_access_key,
-            //     secret: account_info.gate_secret_key,
-            // };
-            // let mut exchange_wss = GateSwapWs::new_label(name, false, Option::from(params), GateSwapWsType::PublicAndPrivate("usdt".to_string()));
-            // exchange_wss.set_symbols(vec![symbol_format.clone()]);
-            // exchange_wss.set_subscribe(subscriber_type.into());
-            //
-            // let mold_arc = Arc::new(mold.to_string());
-            // tokio::spawn(async move {
-            //     let mold_clone = Arc::clone(&mold_arc);
-            //     loop {
-            //         if let Some(data) = read_rx.next().await {
-            //             trace!("原始数据 data:{:?}",data);
-            //             match mold_clone.as_str() {
-            //                 "depth" => {
-            //                     if data.data != "" {
-            //                         let result = gate_handle::handle_special_depth(data);
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "ticker" => {
-            //                     if data.data != "" {
-            //                         let result = gate_handle::handle_special_ticker(data);
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "account" => {
-            //                     if data.data != "" {
-            //                         let result = gate_handle::handle_account_info(data, symbol_format.clone());
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "orders" => {
-            //                     if data.data != "" {
-            //                         let result = gate_handle::handle_order(data, dec!(1));
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 _ => {
-            //                     error!("没有该命令!mode={}", mold_clone);
-            //                     panic!("没有该命令!mode={}", mold_clone)
-            //                 }
-            //             }
-            //         }
-            //     }
-            // });
-            // let t1 = tokio::spawn(async move {
-            //     //链接
-            //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-            //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-            // });
-            // try_join!(t1).unwrap();
-        }
-        ExchangeEnum::BitgetSpot => {
-            // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
-            // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
-            // trace!(symbol_format);
-            // let name = format!("bitget_spot@{}", symbol.to_string().to_lowercase());
-            // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-            // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
-            // let write_tx_am = Arc::new(Mutex::new(write_tx));
-            // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
-            //
-            // let params = BitgetSpotLogin {
-            //     api_key: account_info.bitget_access_key,
-            //     secret_key: account_info.bitget_secret_key,
-            //     passphrase_key: account_info.bitget_pass,
-            // };
-            //
-            // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
-            //     BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Public)
-            // } else {
-            //     BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Private)
-            // };
-            // exchange_wss.set_symbols(vec![symbol_format]);
-            // exchange_wss.set_subscribe(subscriber_type.into());
-            //
-            // let mold_arc = Arc::new(mold.to_string());
-            // //读取
-            // tokio::spawn(async move {
-            //     loop {
-            //         let mold_clone = Arc::clone(&mold_arc);
-            //         if let Some(data) = read_rx.next().await {
-            //             trace!("原始数据 data:{:?}",data);
-            //             match mold_clone.as_str() {
-            //                 "depth" => {
-            //                     if data.data != "" {
-            //                         let result = bitget_spot_handle::handle_special_depth(data);
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "ticker" => {
-            //                     if data.data != "" {
-            //                         let result = bitget_spot_handle::handle_special_ticker(data);
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "account" => {
-            //                     if data.data != "" {
-            //                         let result = bitget_spot_handle::handle_account_info(data, symbol_back.clone());
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 "orders" => {
-            //                     if data.data != "" {
-            //                         let result = bitget_spot_handle::handle_order(data, dec!(1));
-            //                         trace!(?result)
-            //                     }
-            //                 }
-            //                 _ => {
-            //                     error!("没有该命令!mode={}", mold_clone);
-            //                     panic!("没有该命令!mode={}", mold_clone)
-            //                 }
-            //             }
-            //         }
-            //     }
-            // });
-            // let t1 = tokio::spawn(async move {
-            //     //链接
-            //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-            //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-            // });
-            // try_join!(t1).unwrap();
-        }
-        ExchangeEnum::OkxSwap => {
-            let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
-            trace!(symbol_format);
-            let name = format!("okx_swap@{}", symbol.to_string().to_lowercase());
-            let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-            let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
-            let write_tx_am = Arc::new(Mutex::new(write_tx));
-            let is_shutdown_arc = Arc::new(AtomicBool::new(true));
-
-            let params = OkxSwapLogin {
-                api_key: account_info.okx_access_key,
-                secret_key: account_info.okx_secret_key,
-                passphrase: account_info.okx_pass,
-            };
-
-            let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
-                OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Public)
-            } else if ["account", "orders", "position"].contains(&mold) {
-                OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Private)
-            } else {
-                OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Business)
-            };
-
-            exchange_wss.set_symbols(vec![symbol_format.clone()]);
-            exchange_wss.set_subscribe(subscriber_type.into());
-
-            let mold_arc = Arc::new(mold.to_string());
-            tokio::spawn(async move {
-                let mold_clone = Arc::clone(&mold_arc);
-                loop {
-                    if let Some(data) = read_rx.next().await {
-                        trace!("原始数据 data:{:?}",data);
-                        match mold_clone.as_str() {
-                            "depth" => {
-                                if data.data != "" {
-                                    let result = okx_handle::handle_special_depth(data);
-                                    trace!(?result)
-                                }
-                            }
-                            "ticker" => {
-                                if data.data != "" {
-                                    let result = okx_handle::handle_special_ticker(data);
-                                    trace!(?result)
-                                }
-                            }
-                            "account" => {
-                                if data.data != "" {
-                                    let result = okx_handle::handle_account_info(data, symbol_format.clone());
-                                    trace!(?result)
-                                }
-                            }
-                            "position" => {
-                                if data.data != "" {
-                                    let result = okx_handle::handle_position(data, dec!(10));
-                                    trace!(?result)
-                                }
-                            }
-                            "orders" => {
-                                if data.data != "" {
-                                    let result = okx_handle::handle_order(data, dec!(10));
-                                    trace!(?result)
-                                }
-                            }
-                            _ => {
-                                error!("没有该命令!mode={}", mold_clone);
-                                panic!("没有该命令!mode={}", mold_clone)
-                            }
-                        }
-                    }
-                }
-            });
-
-            let t1 = tokio::spawn(async move {
-                //链接
-                let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-                exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-            });
-            try_join!(t1).unwrap();
-        }
-        _ => {
-            error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
-            panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
-        }
-    }
-}
+// use std::collections::{BTreeMap};
+// use std::io::{Error};
+// use std::sync::Arc;
+// use std::sync::atomic::AtomicBool;
+// use futures::StreamExt;
+// use rust_decimal_macros::dec;
+// use tokio::sync::mpsc::{channel, Receiver, Sender};
+// use tokio::sync::Mutex;
+// use tokio::try_join;
+// use tracing::{error, trace};
+// // use exchanges::binance_spot_ws::{BinanceSpotLogin, BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
+// // use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
+// // use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
+// // use exchanges::kucoin_spot_ws::{KucoinSpotLogin, KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
+// // use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
+// // use exchanges::bitget_spot_ws::{BitgetSpotLogin, BitgetSpotSubscribeType, BitgetSpotWs, BitgetSpotWsType};
+// use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
+// use exchanges::response_base::ResponseData;
+// use standard::exchange::{Exchange, ExchangeEnum};
+// // use standard::{binance_spot_handle, Order, Platform, utils};
+// // use standard::{binance_handle, Order, Platform, utils};
+// // use standard::{kucoin_handle, Order, Platform, utils};
+// // use standard::{kucoin_spot_handle, Order, Platform, utils};
+// // use standard::{gate_handle, Order, Platform, utils};
+// // use standard::{bitget_spot_handle, Order, Platform, utils};
+// use standard::{okx_handle, Order, Platform, utils};
+//
+// // 创建实体
+// #[allow(dead_code)]
+// pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
+//     utils::proxy_handle();
+//     let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
+//     let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
+//
+//     let account_info = global::account_info::get_account_info("../test_account.toml");
+//     match exchange {
+//         ExchangeEnum::BinanceSwap => {
+//             let mut params: BTreeMap<String, String> = BTreeMap::new();
+//             let access_key = account_info.binance_access_key;
+//             let secret_key = account_info.binance_secret_key;
+//             params.insert("access_key".to_string(), access_key);
+//             params.insert("secret_key".to_string(), secret_key);
+//             Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
+//         }
+//         ExchangeEnum::BinanceSpot => {
+//             let mut params: BTreeMap<String, String> = BTreeMap::new();
+//             let access_key = account_info.binance_access_key;
+//             let secret_key = account_info.binance_secret_key;
+//             params.insert("access_key".to_string(), access_key);
+//             params.insert("secret_key".to_string(), secret_key);
+//             Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
+//         }
+//         ExchangeEnum::GateSwap => {
+//             let mut params: BTreeMap<String, String> = BTreeMap::new();
+//             let access_key = account_info.gate_access_key;
+//             let secret_key = account_info.gate_secret_key;
+//             params.insert("access_key".to_string(), access_key);
+//             params.insert("secret_key".to_string(), secret_key);
+//             Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
+//         }
+//         ExchangeEnum::GateSpot => {
+//             let mut params: BTreeMap<String, String> = BTreeMap::new();
+//             let access_key = account_info.gate_access_key;
+//             let secret_key = account_info.gate_secret_key;
+//             params.insert("access_key".to_string(), access_key);
+//             params.insert("secret_key".to_string(), secret_key);
+//             Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
+//         }
+//         ExchangeEnum::KucoinSwap => {
+//             let mut params: BTreeMap<String, String> = BTreeMap::new();
+//             let access_key = account_info.kucoin_access_key;
+//             let secret_key = account_info.kucoin_secret_key;
+//             let pass_key = account_info.kucoin_pass;
+//             params.insert("access_key".to_string(), access_key);
+//             params.insert("secret_key".to_string(), secret_key);
+//             params.insert("pass_key".to_string(), pass_key);
+//             Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
+//         }
+//         ExchangeEnum::KucoinSpot => {
+//             let mut params: BTreeMap<String, String> = BTreeMap::new();
+//             let access_key = account_info.kucoin_access_key;
+//             let secret_key = account_info.kucoin_secret_key;
+//             let pass_key = account_info.kucoin_pass;
+//             params.insert("access_key".to_string(), access_key);
+//             params.insert("secret_key".to_string(), secret_key);
+//             params.insert("pass_key".to_string(), pass_key);
+//             Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
+//         }
+//         ExchangeEnum::OkxSwap => {
+//             let mut params: BTreeMap<String, String> = BTreeMap::new();
+//             let access_key = account_info.okx_access_key;
+//             let secret_key = account_info.okx_secret_key;
+//             let pass_key = account_info.okx_pass;
+//             params.insert("access_key".to_string(), access_key);
+//             params.insert("secret_key".to_string(), secret_key);
+//             params.insert("pass_key".to_string(), pass_key);
+//             Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
+//         }
+//         ExchangeEnum::BitgetSpot => {
+//             let mut params: BTreeMap<String, String> = BTreeMap::new();
+//             let access_key = account_info.bitget_access_key;
+//             let secret_key = account_info.bitget_secret_key;
+//             let pass_key = account_info.bitget_pass;
+//             params.insert("access_key".to_string(), access_key);
+//             params.insert("secret_key".to_string(), secret_key);
+//             params.insert("pass_key".to_string(), pass_key);
+//             Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
+//         }
+//     }
+// }
+//
+// #[allow(dead_code)]
+// pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<OkxSwapSubscribeType>: From<T> {
+//     utils::proxy_handle();
+//     let account_info = global::account_info::get_account_info("../test_account.toml");
+//     match exchange {
+//         ExchangeEnum::BinanceSpot => {
+//             // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
+//             // trace!(symbol_format);
+//             // let name = format!("binance_spot@{}", symbol.to_string().to_lowercase());
+//             // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//             // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+//             // let write_tx_am = Arc::new(Mutex::new(write_tx));
+//             // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+//             //
+//             // let params = BinanceSpotLogin {
+//             //     api_key: account_info.binance_access_key,
+//             //     api_secret: account_info.binance_secret_key,
+//             // };
+//             // let mut exchange_wss;
+//             // exchange_wss = BinanceSpotWs::new_with_tag(name, false, Option::from(params), BinanceSpotWsType::PublicAndPrivate);
+//             // exchange_wss.set_symbols(vec![symbol_format]);
+//             // exchange_wss.set_subscribe(subscriber_type.into());
+//             //
+//             //
+//             // let mold_arc = Arc::new(mold.to_string());
+//             // //读取
+//             // tokio::spawn(async move {
+//             //     let mold_clone = Arc::clone(&mold_arc);
+//             //     loop {
+//             //         if let Some(data) = read_rx.next().await {
+//             //             trace!("原始数据 data:{:?}",data);
+//             //             match mold_clone.as_str() {
+//             //                 "depth" => {
+//             //                     if data.data != "" {
+//             //                         let result = binance_spot_handle::handle_special_depth(data);
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "ticker" => {
+//             //                     if data.data != "" {
+//             //                         let result = binance_spot_handle::handle_special_ticker(data);
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 _ => {
+//             //                     error!("没有该命令!mode={}", mold_clone);
+//             //                     panic!("没有该命令!mode={}", mold_clone)
+//             //                 }
+//             //             }
+//             //         }
+//             //     };
+//             // });
+//             //
+//             // let t1 = tokio::spawn(async move {
+//             //     //链接
+//             //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+//             //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//             // });
+//             // try_join!(t1).unwrap();
+//         }
+//         ExchangeEnum::BinanceSwap => {
+//             // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
+//             // trace!(symbol_format);
+//             // let name = format!("binance_swap@{}", symbol.to_string().to_lowercase());
+//             // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//             // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+//             // let write_tx_am = Arc::new(Mutex::new(write_tx));
+//             // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+//             //
+//             // let params = BinanceSwapLogin {
+//             //     api_key: account_info.binance_access_key,
+//             //     api_secret: account_info.binance_secret_key,
+//             // };
+//             // let mut exchange_wss;
+//             // exchange_wss = BinanceSwapWs::new_with_tag(name, false, Option::from(params), BinanceSwapWsType::PublicAndPrivate);
+//             // exchange_wss.set_symbols(vec![symbol_format]);
+//             // exchange_wss.set_subscribe(subscriber_type.into());
+//             //
+//             //
+//             // let mold_arc = Arc::new(mold.to_string());
+//             // //读取
+//             // tokio::spawn(async move {
+//             //     let mold_clone = Arc::clone(&mold_arc);
+//             //     loop {
+//             //         if let Some(data) = read_rx.next().await {
+//             //             trace!("原始数据 data:{:?}",data);
+//             //             match mold_clone.as_str() {
+//             //                 "depth" => {
+//             //                     if data.data != "" {
+//             //                         let result = binance_handle::handle_special_depth(data);
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "ticker" => {
+//             //                     if data.data != "" {
+//             //                         let result = binance_handle::handle_special_ticker(data);
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 _ => {
+//             //                     error!("没有该命令!mode={}", mold_clone);
+//             //                     panic!("没有该命令!mode={}", mold_clone)
+//             //                 }
+//             //             }
+//             //         }
+//             //     };
+//             // });
+//             //
+//             // let t1 = tokio::spawn(async move {
+//             //     //链接
+//             //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+//             //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//             // });
+//             // try_join!(t1).unwrap();
+//         }
+//         ExchangeEnum::KucoinSwap => {
+//             // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
+//             // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
+//             //
+//             // let name = format!("kucoin_swap@{}", symbol.to_string().to_lowercase());
+//             // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//             // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+//             // let write_tx_am = Arc::new(Mutex::new(write_tx));
+//             // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+//             //
+//             // let params = KucoinSwapLogin {
+//             //     access_key: account_info.kucoin_access_key,
+//             //     secret_key: account_info.kucoin_secret_key,
+//             //     pass_key: account_info.kucoin_pass,
+//             // };
+//             // let mut exchange_wss;
+//             // if ["depth", "ticker"].contains(&mold) {
+//             //     exchange_wss = KucoinSwapWs::new_with_tag(name, false, Option::from(params), KucoinSwapWsType::Public).await;
+//             // } else {
+//             //     exchange_wss = KucoinSwapWs::new_with_tag(name, false, Option::from(params), KucoinSwapWsType::Private).await;
+//             // }
+//             // exchange_wss.set_symbols(vec![symbol_format]);
+//             // exchange_wss.set_subscribe(subscriber_type.into());
+//             //
+//             // let mold_arc = Arc::new(mold.to_string());
+//             // tokio::spawn(async move {
+//             //     let mold_clone = Arc::clone(&mold_arc);
+//             //     loop {
+//             //         if let Some(data) = read_rx.next().await {
+//             //             trace!("原始数据 data:{:?}",data);
+//             //             match mold_clone.as_str() {
+//             //                 "depth" => {
+//             //                     let result = kucoin_handle::handle_special_depth(data);
+//             //                     trace!(?result)
+//             //                 }
+//             //                 "ticker" => {
+//             //                     let result = kucoin_handle::handle_special_ticker(data);
+//             //                     trace!(?result)
+//             //                 }
+//             //                 "account" => {
+//             //                     let result = kucoin_handle::handle_account_info(data, symbol_back.clone());
+//             //                     trace!(?result)
+//             //                 }
+//             //                 "position" => {
+//             //                     let result = kucoin_handle::handle_position(data, dec!(1));
+//             //                     trace!(?result)
+//             //                 }
+//             //                 "orders" => {
+//             //                     let result = kucoin_handle::handle_order(data, dec!(0.001));
+//             //                     trace!(?result)
+//             //                 }
+//             //                 _ => {
+//             //                     error!("没有该命令!mode={}", mold_clone);
+//             //                     panic!("没有该命令!mode={}", mold_clone)
+//             //                 }
+//             //             }
+//             //         }
+//             //     }
+//             // });
+//             //
+//             // let t1 = tokio::spawn(async move {
+//             //     //链接
+//             //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+//             //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//             // });
+//             // try_join!(t1).unwrap();
+//         }
+//         ExchangeEnum::KucoinSpot => {
+//             // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
+//             // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
+//             // trace!(symbol_format);
+//             // let name = format!("kucoin_spot@{}", symbol.to_string().to_lowercase());
+//             // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//             // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+//             // let write_tx_am = Arc::new(Mutex::new(write_tx));
+//             // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+//             //
+//             // let params = KucoinSpotLogin {
+//             //     access_key: account_info.kucoin_access_key,
+//             //     secret_key: account_info.kucoin_secret_key,
+//             //     pass_key: account_info.kucoin_pass,
+//             // };
+//             // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
+//             //     KucoinSpotWs::new_with_tag(name, false, Option::from(params), KucoinSpotWsType::Public).await
+//             // } else {
+//             //     KucoinSpotWs::new_with_tag(name, false, Option::from(params), KucoinSpotWsType::Private).await
+//             // };
+//             // exchange_wss.set_symbols(vec![symbol_format]);
+//             // exchange_wss.set_subscribe(subscriber_type.into());
+//             //
+//             // let mold_arc = Arc::new(mold.to_string());
+//             // tokio::spawn(async move {
+//             //     let mold_clone = Arc::clone(&mold_arc);
+//             //     loop {
+//             //         if let Some(data) = read_rx.next().await {
+//             //             trace!("原始数据 data:{:?}",data);
+//             //             match mold_clone.as_str() {
+//             //                 "depth" => {
+//             //                     if data.data != "" {
+//             //                         let result = kucoin_spot_handle::handle_special_depth(data);
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "ticker" => {
+//             //                     if data.data != "" {
+//             //                         let result = kucoin_spot_handle::handle_special_ticker(data);
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "account" => {
+//             //                     if data.data != "" {
+//             //                         let result = kucoin_spot_handle::handle_account_info(data, symbol_back.clone());
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "orders" => {
+//             //                     if data.data != "" {
+//             //                         let result = kucoin_spot_handle::handle_order(data, dec!(1));
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 _ => {
+//             //                     error!("没有该命令!mode={}", mold_clone);
+//             //                     panic!("没有该命令!mode={}", mold_clone)
+//             //                 }
+//             //             }
+//             //         }
+//             //     }
+//             // });
+//             // let t1 = tokio::spawn(async move {
+//             //     //链接
+//             //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+//             //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//             // });
+//             // try_join!(t1).unwrap();
+//         }
+//         ExchangeEnum::GateSwap => {
+//             // let symbol_format = utils::format_symbol(symbol.to_string(), "_").to_uppercase();
+//             // trace!(symbol_format);
+//             // let name = format!("gate_swap@{}", symbol.to_string().to_lowercase());
+//             // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//             // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+//             // let write_tx_am = Arc::new(Mutex::new(write_tx));
+//             // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+//             //
+//             // let params = GateSwapLogin {
+//             //     api_key: account_info.gate_access_key,
+//             //     secret: account_info.gate_secret_key,
+//             // };
+//             // let mut exchange_wss = GateSwapWs::new_with_tag(name, false, Option::from(params), GateSwapWsType::PublicAndPrivate("usdt".to_string()));
+//             // exchange_wss.set_symbols(vec![symbol_format.clone()]);
+//             // exchange_wss.set_subscribe(subscriber_type.into());
+//             //
+//             // let mold_arc = Arc::new(mold.to_string());
+//             // tokio::spawn(async move {
+//             //     let mold_clone = Arc::clone(&mold_arc);
+//             //     loop {
+//             //         if let Some(data) = read_rx.next().await {
+//             //             trace!("原始数据 data:{:?}",data);
+//             //             match mold_clone.as_str() {
+//             //                 "depth" => {
+//             //                     if data.data != "" {
+//             //                         let result = gate_handle::handle_special_depth(data);
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "ticker" => {
+//             //                     if data.data != "" {
+//             //                         let result = gate_handle::handle_special_ticker(data);
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "account" => {
+//             //                     if data.data != "" {
+//             //                         let result = gate_handle::handle_account_info(data, symbol_format.clone());
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "orders" => {
+//             //                     if data.data != "" {
+//             //                         let result = gate_handle::handle_order(data, dec!(1));
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 _ => {
+//             //                     error!("没有该命令!mode={}", mold_clone);
+//             //                     panic!("没有该命令!mode={}", mold_clone)
+//             //                 }
+//             //             }
+//             //         }
+//             //     }
+//             // });
+//             // let t1 = tokio::spawn(async move {
+//             //     //链接
+//             //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+//             //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//             // });
+//             // try_join!(t1).unwrap();
+//         }
+//         ExchangeEnum::BitgetSpot => {
+//             // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
+//             // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
+//             // trace!(symbol_format);
+//             // let name = format!("bitget_spot@{}", symbol.to_string().to_lowercase());
+//             // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//             // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+//             // let write_tx_am = Arc::new(Mutex::new(write_tx));
+//             // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+//             //
+//             // let params = BitgetSpotLogin {
+//             //     api_key: account_info.bitget_access_key,
+//             //     secret_key: account_info.bitget_secret_key,
+//             //     passphrase_key: account_info.bitget_pass,
+//             // };
+//             //
+//             // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
+//             //     BitgetSpotWs::new_with_tag(name, false, Option::from(params), BitgetSpotWsType::Public)
+//             // } else {
+//             //     BitgetSpotWs::new_with_tag(name, false, Option::from(params), BitgetSpotWsType::Private)
+//             // };
+//             // exchange_wss.set_symbols(vec![symbol_format]);
+//             // exchange_wss.set_subscribe(subscriber_type.into());
+//             //
+//             // let mold_arc = Arc::new(mold.to_string());
+//             // //读取
+//             // tokio::spawn(async move {
+//             //     loop {
+//             //         let mold_clone = Arc::clone(&mold_arc);
+//             //         if let Some(data) = read_rx.next().await {
+//             //             trace!("原始数据 data:{:?}",data);
+//             //             match mold_clone.as_str() {
+//             //                 "depth" => {
+//             //                     if data.data != "" {
+//             //                         let result = bitget_spot_handle::handle_special_depth(data);
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "ticker" => {
+//             //                     if data.data != "" {
+//             //                         let result = bitget_spot_handle::handle_special_ticker(data);
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "account" => {
+//             //                     if data.data != "" {
+//             //                         let result = bitget_spot_handle::handle_account_info(data, symbol_back.clone());
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 "orders" => {
+//             //                     if data.data != "" {
+//             //                         let result = bitget_spot_handle::handle_order(data, dec!(1));
+//             //                         trace!(?result)
+//             //                     }
+//             //                 }
+//             //                 _ => {
+//             //                     error!("没有该命令!mode={}", mold_clone);
+//             //                     panic!("没有该命令!mode={}", mold_clone)
+//             //                 }
+//             //             }
+//             //         }
+//             //     }
+//             // });
+//             // let t1 = tokio::spawn(async move {
+//             //     //链接
+//             //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+//             //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//             // });
+//             // try_join!(t1).unwrap();
+//         }
+//         ExchangeEnum::OkxSwap => {
+//             let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
+//             trace!(symbol_format);
+//             let name = format!("okx_swap@{}", symbol.to_string().to_lowercase());
+//             let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//             let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+//             let write_tx_am = Arc::new(Mutex::new(write_tx));
+//             let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+//
+//             let params = OkxSwapLogin {
+//                 api_key: account_info.okx_access_key,
+//                 secret_key: account_info.okx_secret_key,
+//                 passphrase: account_info.okx_pass,
+//             };
+//
+//             let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
+//                 OkxSwapWs::new_with_tag(name, false, Option::from(params), OkxSwapWsType::Public)
+//             } else if ["account", "orders", "position"].contains(&mold) {
+//                 OkxSwapWs::new_with_tag(name, false, Option::from(params), OkxSwapWsType::Private)
+//             } else {
+//                 OkxSwapWs::new_with_tag(name, false, Option::from(params), OkxSwapWsType::Business)
+//             };
+//
+//             exchange_wss.set_symbols(vec![symbol_format.clone()]);
+//             exchange_wss.set_subscribe(subscriber_type.into());
+//
+//             let mold_arc = Arc::new(mold.to_string());
+//             tokio::spawn(async move {
+//                 let mold_clone = Arc::clone(&mold_arc);
+//                 loop {
+//                     if let Some(data) = read_rx.next().await {
+//                         trace!("原始数据 data:{:?}",data);
+//                         match mold_clone.as_str() {
+//                             "depth" => {
+//                                 if data.data != "" {
+//                                     let result = okx_handle::handle_special_depth(data);
+//                                     trace!(?result)
+//                                 }
+//                             }
+//                             "ticker" => {
+//                                 if data.data != "" {
+//                                     let result = okx_handle::handle_special_ticker(data);
+//                                     trace!(?result)
+//                                 }
+//                             }
+//                             "account" => {
+//                                 if data.data != "" {
+//                                     let result = okx_handle::handle_account_info(data, symbol_format.clone());
+//                                     trace!(?result)
+//                                 }
+//                             }
+//                             "position" => {
+//                                 if data.data != "" {
+//                                     let result = okx_handle::handle_position(data, dec!(10));
+//                                     trace!(?result)
+//                                 }
+//                             }
+//                             "orders" => {
+//                                 if data.data != "" {
+//                                     let result = okx_handle::handle_order(data, dec!(10));
+//                                     trace!(?result)
+//                                 }
+//                             }
+//                             _ => {
+//                                 error!("没有该命令!mode={}", mold_clone);
+//                                 panic!("没有该命令!mode={}", mold_clone)
+//                             }
+//                         }
+//                     }
+//                 }
+//             });
+//
+//             let t1 = tokio::spawn(async move {
+//                 //链接
+//                 let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+//                 exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//             });
+//             try_join!(t1).unwrap();
+//         }
+//         _ => {
+//             error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
+//             panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
+//         }
+//     }
+// }