Browse Source

优化代码

875428575@qq.com 2 years ago
parent
commit
ba07f53655

+ 4 - 55
exchanges/src/binance_swap_ws.rs

@@ -1,11 +1,7 @@
 use std::collections::{BTreeMap};
-use std::{io, thread};
+use std::{io};
 use std::io::{Write};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
-use std::time::Duration;
-use chrono::Utc;
-use serde_json::{json, Value};
-use ring::hmac;
 use tokio::sync::mpsc::Sender;
 use tracing::trace;
 use crate::{proxy};
@@ -108,6 +104,8 @@ impl BinanceSwapWs {
             *symbol = symbol.replace("-", "");
         }
         self.symbol_s = symbol_s;
+        let log_in = self.login_param.clone();
+        trace!(?log_in);
         self.run().await;
     }
 
@@ -161,45 +159,6 @@ impl BinanceSwapWs {
         str.to_string()
     }
 
-    //组装登陆数据
-    fn log_in_to_str(&self) -> String {
-        let mut login_json_str = "".to_string();
-
-        let mut access_key: String = "".to_string();
-        let mut secret_key: String = "".to_string();
-        let mut passphrase: String = "".to_string();
-        for (key, value) in &self.login_param {
-            if key == "access_key" {
-                access_key = value.parse().unwrap();
-            } else if key == "secret_key" {
-                secret_key = value.parse().unwrap();
-            } else if key == "passphrase" {
-                passphrase = value.parse().unwrap();
-            }
-        }
-        if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
-            let timestamp = Utc::now().timestamp().to_string();
-            // 时间戳 + 请求类型+ 请求参数字符串
-            let message = format!("{}GET{}", timestamp, "/users/self/verify");
-            let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
-            let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
-            let sign = base64::encode(result);
-
-            let login_json = json!({
-                              "op": "login",
-                                "args": [{
-                                "apiKey": access_key,
-                                "passphrase": passphrase,
-                                "timestamp": timestamp,
-                                "sign": sign  }]
-                        });
-
-            trace!("---login_json:{0}", login_json.to_string());
-            trace!("--登陆:{:?}", login_json);
-            login_json_str = login_json.to_string();
-        }
-        login_json_str
-    }
     /*******************************************************************************************************/
     /*****************************************socket基本*****************************************************/
     /*******************************************************************************************************/
@@ -260,19 +219,9 @@ impl BinanceSwapWs {
     }
 
     //代理
-    async fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>, subscription: String)
+    async fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>, _subscription: String)
     {
         let lable = self.label.clone();
-        /*****登陆***/
-        // let login_str = self.log_in_to_str();
-        // if login_str != "" {
-        //     trace!("登陆:");
-        //     let _ = web_socket.write_message(Message::Text(login_str));
-        //     thread::sleep(Duration::from_secs(3));
-        // }
-        // /*****订阅***/
-        // web_socket.write_message(Message::Text(subscription))
-        //     .unwrap();
         /*****消息溜***/
         let mut stdout = io::stdout();
         loop {

+ 12 - 0
exchanges/src/gate_swap_rest.rs

@@ -60,6 +60,18 @@ impl GateSwapRest {
         ).await;
         data
     }
+    //查询个人交易费率
+    pub async fn wallet_fee(&self) -> ResponseData {
+        let params = serde_json::json!({
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v4".to_string(),
+                                format!("/wallet/fee"),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
     //查询合约账户
     pub async fn get_account(&self, settle: String) -> ResponseData {
         let params = serde_json::json!({

+ 25 - 12
exchanges/src/gate_swap_ws.rs

@@ -1,6 +1,4 @@
 use std::collections::{BTreeMap};
-use std::{io};
-use std::io::{Write};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 use tokio::sync::mpsc::Sender;
 use serde_json::{json, Value};
@@ -293,34 +291,49 @@ impl GateSwapWs {
                 .unwrap();
         }
         /*****消息溜***/
-        let mut stdout = io::stdout();
+        let mut  start_ping = chrono::Utc::now().timestamp_millis();
         loop {
             let msg = web_socket.read_message();
             match msg {
                 Ok(Message::Text(text)) => {
                     let res_data = Self::ok_text(label.to_string(), text);
                     if res_data.code == "-200" {
-                        writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
+                        trace!("订阅成功:{:?}", res_data.data);
                     } else {
                         self.sender.send(res_data).await.unwrap();
+
+                        //主动ping 服务器
+                        let new_time = chrono::Utc::now().timestamp_millis();
+                        // let tr = format!("判断-ping {}--{},{},{}",new_time,start_ping,(new_time - start_ping), (new_time - start_ping) > 10000);
+                        // trace!(tr);
+                        if (new_time - start_ping) > 10000 {
+                            let t = chrono::Utc::now().timestamp();
+                            let ping_str = serde_json::json!({
+                                "time" :t, "channel" : "futures.ping"
+                            });
+                            let _ = web_socket.write_message(Message::Ping(Vec::from(ping_str.to_string())));
+                            start_ping = new_time;
+                        }
+
                     }
                 }
                 Ok(Message::Ping(s)) => {
-                    writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s.clone())).unwrap();
+                    trace!( "Ping-响应--{:?}", String::from_utf8(s.clone()));
                     let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
-                    writeln!(stdout, "回应-pong---{:?}", String::from_utf8(s.clone())).unwrap();
+                    trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
                 }
                 Ok(Message::Pong(s)) => {
                     // trace!("Pong-响应--{:?}", String::from_utf8(s));
-                    writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s.clone())).unwrap();
+                    trace!("Pong-响应--{:?}", String::from_utf8(s.clone()));
                 }
                 Ok(Message::Close(_)) => {
                     // trace!("socket 关闭: ");
-                    writeln!(stdout, "Close-响应").unwrap();
+                    trace!( "Close-响应");
+                    break;
                 }
                 Err(error) => {
                     // trace!("Error receiving message: {}", error);
-                    writeln!(stdout, "Err-响应{}", error).unwrap();
+                    trace!( "Err-响应{}", error);
                     break;
                 }
                 _ => {}
@@ -331,7 +344,7 @@ impl GateSwapWs {
 
     //非代理
     async fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
-                    subscription: Vec<Value>)
+                          subscription: Vec<Value>)
     {
         let label = self.label.clone();
         /*****订阅***/
@@ -340,14 +353,13 @@ impl GateSwapWs {
                 .unwrap();
         }
         /*****消息溜***/
-        let mut stdout = io::stdout();
         loop {
             let msg = web_socket.read_message();
             match msg {
                 Ok(Message::Text(text)) => {
                     let res_data = Self::ok_text(label.to_string(), text);
                     if res_data.code == "-200" {
-                        writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
+                        trace!( "订阅成功:{:?}", res_data.data);
                     } else {
                         self.sender.send(res_data).await.unwrap();
                     }
@@ -360,6 +372,7 @@ impl GateSwapWs {
                 }
                 Ok(Message::Close(_)) => {
                     trace!("socket 关闭: ");
+                    break;
                 }
                 Err(error) => {
                     trace!("Error receiving message: {}", error);

+ 2 - 1
exchanges/src/kucoin_swap_rest.rs

@@ -301,7 +301,8 @@ impl KucoinSwapRest {
             if !is_login_param {
                 let e = ResponseData::error(self.label.clone(), "登陆参数错误!".to_string());
                 return e;
-            } else {//需要登陆-且登陆参数齐全
+            } else {
+                //需要登陆-且登陆参数齐全
                 trace!("param:{}", params);
                 trace!("body:{}", body);
                 //组装sing