浏览代码

优化代码

875428575@qq.com 2 年之前
父节点
当前提交
a2303d04f5

+ 6 - 5
exchanges/src/binance_usdt_swap_ws.rs

@@ -172,6 +172,7 @@ impl BinanceUsdtSwapWs {
                           subscription: &serde_json::Value,
     )
     {
+        let label = self.label.clone();
         /*****消息溜***/
         let mut stdout = io::stdout();
         // let mut stderr = io::stderr();
@@ -191,7 +192,7 @@ impl BinanceUsdtSwapWs {
             let msg = web_socket.read_message();
             match msg {
                 Ok(Message::Text(text)) => {
-                    let res_data = Self::ok_text(text);
+                    let res_data = Self::ok_text(label.to_string(), text);
                     self.sender.send(res_data).unwrap();
                     // writeln!(stdout, "Text-响应--{:?}", res_data).unwrap();
                     //
@@ -229,8 +230,8 @@ impl BinanceUsdtSwapWs {
     fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
                     subscription: &serde_json::Value,
     )
-
     {
+        let label = self.label.clone();
         /*****消息溜***/
         // let  stdout = io::stdout();
         // let mut stderr = io::stderr();
@@ -253,7 +254,7 @@ impl BinanceUsdtSwapWs {
             let msg = web_socket.read_message();
             match msg {
                 Ok(Message::Text(text)) => {
-                    let res_data = Self::ok_text(text);
+                    let res_data = Self::ok_text(label.to_string(), text);
                     self.sender.send(res_data).unwrap();
                     // writeln!(stdout, "Pong-响应--{:?}", res_data).unwrap();
                     // let parse_fn_clone = Arc::clone(parse_fn); // Clone the Arc for each iteration
@@ -282,9 +283,9 @@ impl BinanceUsdtSwapWs {
     }
 
     //数据解析
-    pub fn ok_text(text: String) -> ResponseData
+    pub fn ok_text(label: String, text: String) -> ResponseData
     {
-        let mut res_data = ResponseData::new("200".to_string(), "success".to_string(), "".to_string());
+        let mut res_data = ResponseData::new(label.to_string(), "200".to_string(), "success".to_string(), "".to_string());
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
         if json_value.get("result").is_some() {
             //订阅反馈

+ 7 - 7
exchanges/src/gate_swap_rest.rs

@@ -156,10 +156,10 @@ impl GateSwapRest {
                        text: String,
     ) -> ResponseData {
         if side != "buy" && side != "sell" {
-            ResponseData::error(format!("未知下单方向!{}", side));
+            ResponseData::error(self.label.clone(),format!("未知下单方向!{}", side));
         }
         if pos_side != "long" && pos_side != "short" {
-            ResponseData::error(format!("未知持仓方向!{}", side));
+            ResponseData::error(self.label.clone(),format!("未知持仓方向!{}", side));
         }
         let mut param = serde_json::json!({
             "contract":contract, //合约标识
@@ -357,7 +357,7 @@ impl GateSwapRest {
         //是否需要登陆-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error("登陆参数错误!".to_string());
+                let e = ResponseData::error(self.label.clone(),"登陆参数错误!".to_string());
                 return e;
             } else {//需要登陆-且登陆参数齐全
                 //组装sing
@@ -444,7 +444,7 @@ impl GateSwapRest {
             "POST" => self.client.post(addrs_url.clone()).body(params).headers(headers),
             "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
             // "PUT" => self.client.put(url.clone()).json(&params),
-            _ => return Ok(ResponseData::error(format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+            _ => return Ok(ResponseData::error(self.label.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
         };
 
         let response = req.send().await?;
@@ -452,11 +452,11 @@ impl GateSwapRest {
             // 读取响应的内容
             let body = response.text().await?;
             // println!("ok-----{}", body);
-            res_data = ResponseData::new("200".to_string(), "success".to_string(), body);
+            res_data = ResponseData::new(self.label.clone(),"200".to_string(), "success".to_string(), body);
         } else {
             let body = response.text().await?;
             // println!("error-----{}", body);
-            res_data = ResponseData::error(body.to_string())
+            res_data = ResponseData::error(self.label.clone(),body.to_string())
         }
         Ok(res_data)
     }
@@ -472,7 +472,7 @@ impl GateSwapRest {
                 }
             }
             Err(err) => {
-                let error = ResponseData::error(format!("json 解析失败:{}", err));
+                let error = ResponseData::error("".to_string(),format!("json 解析失败:{}", err));
                 error
             }
         }

+ 356 - 15
exchanges/src/gate_swap_ws.rs

@@ -1,15 +1,356 @@
-use std::collections::BTreeMap;
-
-#[derive(Clone)]
-pub struct GateSwapWs {}
-
-impl GateSwapWs {
-    pub fn new(_is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSwapWs
-    {
-        return GateSwapWs::new_label("default-GateSwapWs".to_string(), _is_colo, _login_param);
-    }
-    pub fn new_label(label: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSwapWs
-    {
-        GateSwapWs {}
-    }
-}
+// use std::collections::{BTreeMap};
+// use std::{io, thread};
+// use std::io::{Write};
+// use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+// use std::sync::{mpsc};
+// use std::time::Duration;
+// use chrono::Utc;
+// use serde_json::{json, Value};
+// use ring::hmac;
+// use crate::{proxy};
+// use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
+// use tungstenite::{connect, Message, WebSocket};
+// use tungstenite::protocol::WebSocketConfig;
+// use url::Url;
+// use crate::proxy::ParsingDetail;
+// use crate::response_base::ResponseData;
+//
+// pub enum GateWsType {
+//     //频道类型
+//     PublicAndPrivate
+// }
+//
+//
+// #[derive(Clone)]                        //订阅枚举
+// pub enum GateSubscribeType {
+//     PuFuturesOrderBook,
+//     PuFuturesCandlesticks,
+// }
+//
+// #[derive(Clone)]
+// pub struct GateSwapWs {
+//     pub label: String,
+//     request_url: String,
+//     //实际ws 链接地址
+//     proxy: ParsingDetail,
+//     //账号信息
+//     login_param: BTreeMap<String, String>,
+//     //kuconis特殊参数
+//     symbol_s: Vec<String>,
+//     //订阅币对
+//     subscribe_types: Vec<GateSubscribeType>,
+//     //订阅信息
+//     sender: mpsc::Sender<ResponseData>,     //数据通道
+// }
+//
+// impl GateSwapWs {
+//     /*******************************************************************************************************/
+//     /*****************************************获取一个对象****************************************************/
+//     /*******************************************************************************************************/
+//     pub async fn new(is_colo: bool,
+//                      login_param: BTreeMap<String, String>,
+//                      ws_type: GateWsType,
+//                      sender: mpsc::Sender<ResponseData>,
+//     ) -> GateSwapWs
+//     {
+//         return GateSwapWs::new_label("default-GateSwapWs".to_string(), is_colo, login_param, ws_type, sender).await;
+//     }
+//     pub async fn new_label(label: String, is_colo: bool,
+//                            login_param: BTreeMap<String, String>,
+//                            ws_type: GateWsType,
+//                            sender: mpsc::Sender<ResponseData>,
+//     ) -> GateSwapWs
+//     {
+//         if is_colo {
+//             println!("支持高速通道-未配置")
+//         } else {}
+//
+//         /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
+//         let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
+//
+//         let request_url = "wss://fx-ws.gateio.ws/v4/ws/".to_string();
+//
+//         /*****返回结构体*******/
+//         GateSwapWs {
+//             label,
+//             request_url,
+//             proxy: parsing_detail,
+//             login_param,
+//             symbol_s: vec![],
+//             subscribe_types: vec![],
+//             sender,
+//         }
+//     }
+//
+//     /*******************************************************************************************************/
+//     /*****************************************订阅函数********************************************************/
+//     /*******************************************************************************************************/
+//     //手动添加订阅信息
+//     pub fn set_subscribe(&mut self, subscribe_types: Vec<GateSubscribeType>) {
+//         self.subscribe_types.extend(subscribe_types);
+//     }
+//     //自定义
+//     pub fn custom_subscribe(&mut self, b_array: Vec<String>)
+//     {
+//         let mut symbol_s = b_array.clone();
+//         for symbol in symbol_s.iter_mut() {
+//             // 大写
+//             *symbol = symbol.to_uppercase();
+//             // 字符串替换
+//             *symbol = symbol.replace("-", "_");
+//         }
+//         self.symbol_s = symbol_s;
+//         self.run();
+//     }
+//
+//     /*******************************************************************************************************/
+//     /*****************************************工具函数********************************************************/
+//     /*******************************************************************************************************/
+//     //订阅枚举解析
+//     pub fn enum_to_string(symbol: String, subscribe_type: GateSubscribeType) -> Value {
+//         match subscribe_type {
+//             GateSubscribeType::BuIndexCandle30m => {
+//                 json!({
+//                     "channel":"index-candle30m",
+//                     "instId":symbol
+//                 })
+//             }
+//             GateSubscribeType::PuIndexTickers => {
+//                 json!({
+//                     "channel":"index-tickers",
+//                     "instId":symbol
+//                 })
+//             }
+//             GateSubscribeType::PrAccount(ccy) => {
+//                 json!({
+//                     "channel":"account",
+//                     "ccy":ccy
+//                 })
+//             }
+//         }
+//     }
+//     //组装订阅数据
+//     pub fn get_subscription(&self) -> String {
+//         let mut args = vec![];
+//         for symbol in &self.symbol_s {
+//             for subscribe_type in &self.subscribe_types {
+//                 let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+//                 args.push(ty_str);
+//             }
+//         }
+//         let str = json!({
+//               "time": 1545445847,
+//               "time_ms": 1545445847123,
+//               "channel": "futures.candlesticks",
+//               "event": "subscribe",
+//               "result": {
+//                 "status": "success"
+//               }
+//         });
+//
+//         println!("订阅信息:{}", str.to_string());
+//
+//         str.to_string()
+//     }
+//
+//     //组装登陆数据
+//     fn log_in_to_str(&self) -> String {
+//         let mut login_json_str = "".to_string();
+//
+//         let mut access_key: String = "".to_string();
+//         let mut secret_key: String = "".to_string();
+//         let mut passphrase: String = "".to_string();
+//         for (key, value) in &self.login_param {
+//             if key == "access_key" {
+//                 access_key = value.parse().unwrap();
+//             } else if key == "secret_key" {
+//                 secret_key = value.parse().unwrap();
+//             } else if key == "passphrase" {
+//                 passphrase = value.parse().unwrap();
+//             }
+//         }
+//         if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
+//             let timestamp = Utc::now().timestamp().to_string();
+//             // 时间戳 + 请求类型+ 请求参数字符串
+//             let message = format!("{}GET{}", timestamp, "/users/self/verify");
+//             let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+//             let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
+//             let sign = base64::encode(result);
+//
+//             let login_json = json!({
+//                               "op": "login",
+//                                 "args": [{
+//                                 "apiKey": access_key,
+//                                 "passphrase": passphrase,
+//                                 "timestamp": timestamp,
+//                                 "sign": sign  }]
+//                         });
+//
+//             println!("---login_json:{0}", login_json.to_string());
+//             println!("--登陆:{:?}", login_json);
+//             login_json_str = login_json.to_string();
+//         }
+//         login_json_str
+//     }
+//     /*******************************************************************************************************/
+//     /*****************************************socket基本*****************************************************/
+//     /*******************************************************************************************************/
+//     fn run(&mut self)
+//     {
+//         //订阅信息组装
+//         let subscription = self.get_subscription();
+//         loop {
+//             println!("要连接咯~~!!{}", self.request_url);
+//
+//             let request_url = Url::parse(self.request_url.as_str()).unwrap();
+//             //1. 判断是否需要代理,根据代理地址是否存来选择
+//             if self.proxy.ip_address.len() > 0 {
+//                 let ip_array: Vec<&str> = self.proxy.ip_address.split(".").collect();
+//                 let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+//                     ip_array[0].parse().unwrap(),
+//                     ip_array[1].parse().unwrap(),
+//                     ip_array[2].parse().unwrap(),
+//                     ip_array[3].parse().unwrap())
+//                 ), self.proxy.port.parse().unwrap());
+//                 let websocket_config = Some(WebSocketConfig {
+//                     max_send_queue: Some(16),
+//                     max_message_size: Some(16 * 1024 * 1024),
+//                     max_frame_size: Some(16 * 1024 * 1024),
+//                     accept_unmasked_frames: false,
+//                 });
+//                 let max_redirects = 5;
+//                 match connect_with_proxy(request_url.clone(),
+//                                          proxy_address, websocket_config, max_redirects) {
+//                     Ok(ws) => {
+//                         self.proxy_subscription(ws.0, subscription.clone());
+//                     }
+//                     Err(err) => {
+//                         println!("Can't connect(无法连接): {}", err);
+//                     }
+//                 };
+//             } else {
+//                 match connect(request_url.clone()) {
+//                     Ok(ws) => {
+//                         self.subscription(ws.0, subscription.clone());
+//                     }
+//                     Err(err) => {
+//                         // 连接失败时执行的操作
+//                         println!("Can't connect(无法连接): {}", err);
+//                         // 返回一个默认的 WebSocket 对象或其他适当的值
+//                         // 或者根据需要触发 panic 或返回错误信息
+//                     }
+//                 };
+//             }
+//             println!("退出来咯")
+//         }
+//     }
+//
+//     //代理
+//     fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>, subscription: String)
+//     {
+//         /*****登陆***/
+//         let login_str = self.log_in_to_str();
+//         if login_str != "" {
+//             let _ = web_socket.write_message(Message::Text(login_str));
+//             thread::sleep(Duration::from_secs(3));
+//         }
+//         /*****订阅***/
+//         web_socket.write_message(Message::Text(subscription))
+//             .unwrap();
+//         /*****消息溜***/
+//         let mut stdout = io::stdout();
+//         loop {
+//             let msg = web_socket.read_message();
+//             match msg {
+//                 Ok(Message::Text(text)) => {
+//                     let res_data = Self::ok_text(text);
+//                     if res_data.code == "-200" {
+//                         writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
+//                     } else {
+//                         self.sender.send(res_data).unwrap();
+//                     }
+//                 }
+//                 Ok(Message::Ping(s)) => {
+//                     writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s.clone())).unwrap();
+//                     let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
+//                     writeln!(stdout, "回应-pong---{:?}", String::from_utf8(s.clone())).unwrap();
+//                 }
+//                 Ok(Message::Pong(s)) => {
+//                     // println!("Pong-响应--{:?}", String::from_utf8(s));
+//                     writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s.clone())).unwrap();
+//                 }
+//                 Ok(Message::Close(_)) => {
+//                     // println!("socket 关闭: ");
+//                     writeln!(stdout, "Close-响应").unwrap();
+//                 }
+//                 Err(error) => {
+//                     // println!("Error receiving message: {}", error);
+//                     writeln!(stdout, "Err-响应{}", error).unwrap();
+//                     break;
+//                 }
+//                 _ => {}
+//             }
+//         }
+//         web_socket.close(None).unwrap();
+//     }
+//
+//     //非代理
+//     fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
+//                     subscription: String)
+//     {
+//         /*****订阅***/
+//         web_socket.write_message(Message::Text(subscription))
+//             .unwrap();
+//         /*****消息溜***/
+//         let mut stdout = io::stdout();
+//         loop {
+//             let msg = web_socket.read_message();
+//             match msg {
+//                 Ok(Message::Text(text)) => {
+//                     let res_data = Self::ok_text(text);
+//                     if res_data.code == "-200" {
+//                         writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
+//                     } else {
+//                         self.sender.send(res_data).unwrap();
+//                     }
+//                 }
+//                 Ok(Message::Ping(s)) => {
+//                     println!("Ping-响应--{:?}", String::from_utf8(s));
+//                 }
+//                 Ok(Message::Pong(s)) => {
+//                     println!("Pong-响应--{:?}", String::from_utf8(s));
+//                 }
+//                 Ok(Message::Close(_)) => {
+//                     println!("socket 关闭: ");
+//                 }
+//                 Err(error) => {
+//                     println!("Error receiving message: {}", error);
+//                     break;
+//                 }
+//                 _ => {}
+//             }
+//         }
+//         web_socket.close(None).unwrap();
+//     }
+//
+//     //数据解析
+//     pub fn ok_text(text: String) -> ResponseData
+//     {
+//         let mut res_data = ResponseData::new("200".to_string(), "success".to_string(), "".to_string());
+//         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+//         if json_value.get("event").is_some() {//订阅返回
+//             if json_value["event"].as_str() == Option::from("error") {
+//                 res_data.code = json_value["code"].to_string();
+//                 res_data.message = format!("错误:{}", json_value["msg"].to_string());
+//             } else if json_value["event"].as_str() == Option::from("subscribe") {
+//                 res_data.code = "-200".to_string();
+//                 res_data.data = text;
+//             } else {
+//                 res_data.data = text
+//             }
+//         } else {
+//             res_data.data = text
+//         }
+//         res_data
+//     }
+// }

+ 6 - 5
exchanges/src/http_tool.rs

@@ -46,7 +46,7 @@ impl RestTool {
             "POST" => self.client.post(addrs_url.clone()).body(params).headers(headers),
             "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
             // "PUT" => self.client.put(url.clone()).json(&params),
-            _ => return Ok(ResponseData::error(format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+            _ => return Ok(ResponseData::error("".to_string(),format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
         };
 
         let response = req.send().await?;
@@ -54,11 +54,11 @@ impl RestTool {
             // 读取响应的内容
             let body = response.text().await?;
             // println!("ok-----{}", body);
-            res_data = ResponseData::new("200".to_string(), "success".to_string(), body);
+            res_data = ResponseData::new("".to_string(),"200".to_string(), "success".to_string(), body);
         } else {
             let body = response.text().await?;
             // println!("error-----{}", body);
-            res_data = ResponseData::error(body.to_string())
+            res_data = ResponseData::error("".to_string(),body.to_string())
         }
         Ok(res_data)
     }
@@ -105,14 +105,15 @@ impl RestTool {
                     // //println!("--解析成功----code:{}",code);
                     // //println!("--解析成功----data:{}",data);
                     // //println!("--解析成功----msg:{}",msg);
-                    let success = ResponseData::new(code.parse().unwrap(),
+                    let success = ResponseData::new("".to_string(),
+                                                    code.parse().unwrap(),
                                                     msg.parse().unwrap(),
                                                     data.parse().unwrap());
                     success
                 }
             }
             Err(err) => {
-                let error = ResponseData::error(format!("json 解析失败:{}", err));
+                let error = ResponseData::error("".to_string(),format!("json 解析失败:{}", err));
                 error
             }
         }

+ 7 - 7
exchanges/src/kucoin_swap_rest.rs

@@ -298,7 +298,7 @@ impl KucoinSwapRest {
         //是否需要登陆-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error("登陆参数错误!".to_string());
+                let e = ResponseData::error(self.label.clone(), "登陆参数错误!".to_string());
                 return e;
             } else {//需要登陆-且登陆参数齐全
                 println!("param:{}", params);
@@ -413,7 +413,7 @@ impl KucoinSwapRest {
             "POST" => self.client.post(url.clone()).body(params).headers(headers),
             "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
             // "PUT" => self.client.put(url.clone()).json(&params),
-            _ => return Ok(ResponseData::error(format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+            _ => return Ok(ResponseData::error(self.label.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
         };
 
         let response = req.send().await?;
@@ -421,11 +421,11 @@ impl KucoinSwapRest {
             // 读取响应的内容
             let body = response.text().await?;
             // println!("ok-----{}", body);
-            res_data = ResponseData::new("200".to_string(), "success".to_string(), body);
+            res_data = ResponseData::new(self.label.clone(), "200".to_string(), "success".to_string(), body);
         } else {
             let body = response.text().await?;
             // println!("error-----{}", body);
-            res_data = ResponseData::error(body.to_string())
+            res_data = ResponseData::error(self.label.clone(), body.to_string())
         }
         Ok(res_data)
     }
@@ -445,17 +445,17 @@ impl KucoinSwapRest {
 
                     if code != "200000" {
                         let msg = json_value["msg"].as_str().unwrap();
-                        let error = ResponseData::new(code.to_string(), msg.to_string(), "".parse().unwrap());
+                        let error = ResponseData::new("".to_string(), code.to_string(), msg.to_string(), "".parse().unwrap());
                         error
                     } else {
                         let data = serde_json::to_string(&json_value["data"]).unwrap();
-                        let success = ResponseData::new("200".to_string(), "success".to_string(), data.parse().unwrap());
+                        let success = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), data.parse().unwrap());
                         success
                     }
                 }
             }
             Err(err) => {
-                let error = ResponseData::error(format!("json 解析失败:{}", err));
+                let error = ResponseData::error("".to_string(), format!("json 解析失败:{}", err));
                 error
             }
         }

+ 7 - 5
exchanges/src/kuconin_swap_ws.rs

@@ -1,5 +1,5 @@
 use std::collections::{BTreeMap, HashSet};
-use std::{io, thread};
+use std::{io};
 use std::io::{Write};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 use std::sync::{mpsc};
@@ -269,6 +269,7 @@ impl KuconinSwapWs {
     fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>,
                           subscription: Vec<String>)
     {
+        let label = self.label.clone();
         /*****消息溜***/
         let mut stdout = io::stdout();
         let mut ping_interval = chrono::Utc::now().timestamp_millis();
@@ -278,7 +279,7 @@ impl KuconinSwapWs {
             match msg {
                 Ok(Message::Text(text)) => {
                     // writeln!(stdout, "Text-响应--{:?}", text.clone()).unwrap();
-                    let res_data = Self::ok_text(text);
+                    let res_data = Self::ok_text(label.to_string(), text);
 
                     if res_data.code == "-200" {//表示链接成功
                         for sub in &subscription {
@@ -335,6 +336,7 @@ impl KuconinSwapWs {
     fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
                     subscription: Vec<String>)
     {
+        let label = self.label.clone();
         /*****消息溜***/
         let mut stdout = io::stdout();
         let mut ping_interval = chrono::Utc::now().timestamp_millis();
@@ -344,7 +346,7 @@ impl KuconinSwapWs {
             match msg {
                 Ok(Message::Text(text)) => {
                     // writeln!(stdout, "Text-响应--{:?}", text.clone()).unwrap();
-                    let res_data = Self::ok_text(text);
+                    let res_data = Self::ok_text(label.to_string(), text);
 
                     if res_data.code == "-200" {//表示链接成功
                         for sub in &subscription {
@@ -398,9 +400,9 @@ impl KuconinSwapWs {
     }
 
     //数据解析
-    pub fn ok_text(text: String) -> ResponseData
+    pub fn ok_text(lable: String, text: String) -> ResponseData
     {
-        let mut res_data = ResponseData::new("200".to_string(), "success".to_string(), "".to_string());
+        let mut res_data = ResponseData::new(lable, "200".to_string(), "success".to_string(), "".to_string());
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
         if json_value.get("id").is_some() && json_value.get("type").is_some() {
             //订阅 相应

+ 6 - 4
exchanges/src/okx_swap_ws.rs

@@ -259,6 +259,7 @@ impl OkxSwapWs {
     //代理
     fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>, subscription: String)
     {
+        let lable = self.label.clone();
         /*****登陆***/
         let login_str = self.log_in_to_str();
         if login_str != "" {
@@ -274,7 +275,7 @@ impl OkxSwapWs {
             let msg = web_socket.read_message();
             match msg {
                 Ok(Message::Text(text)) => {
-                    let res_data = Self::ok_text(text);
+                    let res_data = Self::ok_text(lable.to_string(), text);
                     if res_data.code == "-200" {
                         writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
                     } else {
@@ -309,6 +310,7 @@ impl OkxSwapWs {
     fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
                     subscription: String)
     {
+        let lable = self.label.clone();
         /*****订阅***/
         web_socket.write_message(Message::Text(subscription))
             .unwrap();
@@ -318,7 +320,7 @@ impl OkxSwapWs {
             let msg = web_socket.read_message();
             match msg {
                 Ok(Message::Text(text)) => {
-                    let res_data = Self::ok_text(text);
+                    let res_data = Self::ok_text(lable.to_string(),text);
                     if res_data.code == "-200" {
                         writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
                     } else {
@@ -345,9 +347,9 @@ impl OkxSwapWs {
     }
 
     //数据解析
-    pub fn ok_text(text: String) -> ResponseData
+    pub fn ok_text(lable: String, text: String) -> ResponseData
     {
-        let mut res_data = ResponseData::new("200".to_string(), "success".to_string(), "".to_string());
+        let mut res_data = ResponseData::new(lable, "200".to_string(), "success".to_string(), "".to_string());
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
         if json_value.get("event").is_some() {//订阅返回
             if json_value["event"].as_str() == Option::from("error") {

+ 12 - 6
exchanges/src/response_base.rs

@@ -1,9 +1,9 @@
 /**交易所返回数据处理之后,同意保存格式,为了内部其他接口调用*/
-pub enum ChannelType{
-}
+pub enum ChannelType {}
 
 #[derive(Debug)]
 pub struct ResponseData {
+    pub lable: String,
     pub code: String,
     pub message: String,
     pub channel: String,
@@ -11,11 +11,17 @@ pub struct ResponseData {
 }
 
 impl ResponseData {
-    pub fn new(code: String, message: String, data: String) -> ResponseData {
-        ResponseData { code, message, data, channel: "".to_string() }
+    pub fn new(lable: String, code: String, message: String, data: String) -> ResponseData {
+        ResponseData { lable, code, message, data, channel: "".to_string() }
     }
-    pub fn error(message: String) -> ResponseData {
-        ResponseData { code: "-1".to_string(), message: "请求失败:".to_string() + &message, data: "".to_string(), channel: "".to_string() }
+    pub fn error(lable: String, message: String) -> ResponseData {
+        ResponseData {
+            lable,
+            code: "-1".to_string(),
+            message: "请求失败:".to_string() + &message,
+            data: "".to_string(),
+            channel: "".to_string(),
+        }
     }
 }