Explorar o código

Merge remote-tracking branch 'origin/dev' into dev

skyfffire hai 1 ano
pai
achega
6d9d57b193

+ 3 - 4
exchanges/src/binance_spot_ws.rs

@@ -8,8 +8,7 @@ use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{info, trace};
 
 use crate::response_base::ResponseData;
-use crate::socket_tool::{AbstractWsMode, HeartbeatType};
-use crate::utils::get_time_microsecond;
+use crate::socket_tool::AbstractWsMode;
 
 pub enum BinanceSpotWsType {
     //订阅频道类型
@@ -152,7 +151,7 @@ impl BinanceSpotWs {
     //链接
     pub async fn ws_connect_async(&mut self,
                                   bool_v1: Arc<AtomicBool>,
-                                  write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                  _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
                                   write_rx: UnboundedReceiver<Message>,
                                   read_tx: UnboundedSender<ResponseData>) -> Result<(), Error>
     {
@@ -160,7 +159,7 @@ impl BinanceSpotWs {
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let label = self.label.clone();
-        let heartbeat_time = self.heartbeat_time.clone();
+        // let heartbeat_time = self.heartbeat_time.clone();
 
 
         //心跳-- 方法内部线程启动

+ 3 - 4
exchanges/src/binance_swap_ws.rs

@@ -8,8 +8,7 @@ use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{info, trace};
 
 use crate::response_base::ResponseData;
-use crate::socket_tool::{AbstractWsMode, HeartbeatType};
-use crate::utils::get_time_microsecond;
+use crate::socket_tool::AbstractWsMode;
 
 //类型
 pub enum BinanceSwapWsType {
@@ -152,7 +151,7 @@ impl BinanceSwapWs {
     //链接
     pub async fn ws_connect_async(&mut self,
                                   bool_v1: Arc<AtomicBool>,
-                                  write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                  _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
                                   write_rx: UnboundedReceiver<Message>,
                                   read_tx: UnboundedSender<ResponseData>) -> Result<(), Error>
     {
@@ -160,7 +159,7 @@ impl BinanceSwapWs {
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let label = self.label.clone();
-        let heartbeat_time = self.heartbeat_time.clone();
+        // let heartbeat_time = self.heartbeat_time.clone();
 
 
         //心跳-- 方法内部线程启动

+ 0 - 1
exchanges/src/bitget_spot_ws.rs

@@ -12,7 +12,6 @@ use tracing::{info, trace};
 
 use crate::response_base::ResponseData;
 use crate::socket_tool::{AbstractWsMode, HeartbeatType};
-use crate::utils::get_time_microsecond;
 
 //类型
 pub enum BitgetSpotWsType {

+ 6 - 7
exchanges/src/crypto_spot_ws.rs

@@ -1,7 +1,7 @@
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
-use chrono::Utc;
 
+use chrono::Utc;
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use serde_json::json;
 use tokio::sync::Mutex;
@@ -9,8 +9,7 @@ use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{info, trace};
 
 use crate::response_base::ResponseData;
-use crate::socket_tool::{AbstractWsMode, HeartbeatType};
-use crate::utils::get_time_microsecond;
+use crate::socket_tool::AbstractWsMode;
 
 //类型
 pub enum CryptoSpotWsType {
@@ -151,15 +150,15 @@ impl CryptoSpotWs {
     //链接
     pub async fn ws_connect_async(&mut self,
                                   bool_v1: Arc<AtomicBool>,
-                                  write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                  _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
                                   write_rx: UnboundedReceiver<Message>,
                                   read_tx: UnboundedSender<ResponseData>) -> Result<(), Error>
     {
         let login_is = self.contains_pr();
-        let subscription = self.get_subscription();
+        // let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let label = self.label.clone();
-        let heartbeat_time = self.heartbeat_time.clone();
+        // let heartbeat_time = self.heartbeat_time.clone();
 
 
         //心跳-- 方法内部线程启动
@@ -171,7 +170,7 @@ impl CryptoSpotWs {
         // });
 
         //设置订阅
-        let mut subscribe_array = vec![];
+        let  subscribe_array = vec![];
         if login_is {
             //登录相关
         }

+ 2 - 3
exchanges/src/gate_swap_ws.rs

@@ -12,7 +12,6 @@ use tracing::{info, trace};
 
 use crate::response_base::ResponseData;
 use crate::socket_tool::{AbstractWsMode, HeartbeatType};
-use crate::utils::get_time_microsecond;
 
 //类型
 pub enum GateSwapWsType {
@@ -71,7 +70,7 @@ impl GateSwapWs {
             GateSwapWsType::PublicAndPrivate(name) => {
                 if is_colo {
                     let url = format!("wss://fxws-privategateapi.io/v4/ws/{}", name.to_string());
-                    info!("开启高速(未配置,走普通:{})通道",url);
+                    info!("开启高速通道:{:?}",url);
                     url
                 } else {
                     let url = format!("wss://fx-ws.gateio.ws/v4/ws/{}", name.to_string());
@@ -297,7 +296,7 @@ impl GateSwapWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 0 - 1
exchanges/src/kucoin_spot_ws.rs

@@ -10,7 +10,6 @@ use tracing::{error, info, trace};
 use crate::kucoin_spot_rest::KucoinSpotRest;
 use crate::response_base::ResponseData;
 use crate::socket_tool::{AbstractWsMode, HeartbeatType};
-use crate::utils::get_time_microsecond;
 
 //类型
 pub enum KucoinSpotWsType {

+ 0 - 1
exchanges/src/kucoin_swap_ws.rs

@@ -10,7 +10,6 @@ use tracing::{error, info, trace};
 use crate::kucoin_swap_rest::KucoinSwapRest;
 use crate::response_base::ResponseData;
 use crate::socket_tool::{AbstractWsMode, HeartbeatType};
-use crate::utils::get_time_microsecond;
 
 //类型
 pub enum KucoinSwapWsType {

+ 0 - 1
exchanges/src/okx_swap_ws.rs

@@ -12,7 +12,6 @@ use tracing::{info, trace};
 
 use crate::response_base::ResponseData;
 use crate::socket_tool::{AbstractWsMode, HeartbeatType};
-use crate::utils::get_time_microsecond;
 
 //类型
 pub enum OkxSwapWsType {

+ 29 - 25
exchanges/src/socket_tool.rs

@@ -58,9 +58,9 @@ impl AbstractWsMode {
         loop {
             let (ws_stream, _) = connect_async(address_url.clone(), proxy).await?;
             trace!("WebSocket 握手完成。");
-            let (mut write, mut read) = ws_stream.split();
+            let (write, mut read) = ws_stream.split();
 
-            let write_arc =Arc::new(Mutex::new(write));
+            let write_arc = Arc::new(Mutex::new(write));
             let write_clone = Arc::clone(&write_arc);
             //订阅写入(包括订阅信息 )
             trace!("订阅内容:{:?}",subscribe_array.clone());
@@ -82,7 +82,6 @@ impl AbstractWsMode {
             };
             let write_clone3 = Arc::clone(&write_arc);
             let ws_to_stdout = async {
-
                 while let Some(message) = read.next().await {
                     let mut write_lock3 = write_clone3.lock().await;
                     let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
@@ -91,48 +90,53 @@ impl AbstractWsMode {
                         let mut data = response_data.unwrap();
                         data.label = lable.clone();
                         let code = data.code.clone();
-                        /**
-                             200 -正确返回
-                            -200 -登录成功
-                            -201 -订阅成功
-                            -300 -客户端收到服务器心跳ping,需要响应
-                            -301 -客户端收到服务器心跳pong,需要响应
-                            -302 -客户端收到服务器心跳自定义,需要响应自定义
-                         **/
+                        /*
+                            200 -正确返回
+                           -200 -登录成功
+                           -201 -订阅成功
+                           -300 -客户端收到服务器心跳ping,需要响应
+                           -301 -客户端收到服务器心跳pong,需要响应
+                           -302 -客户端收到服务器心跳自定义,需要响应自定义
+                        */
                         match code.as_str() {
                             "200" => {
                                 if bool_v1.load(Ordering::Relaxed) {
                                     read_tx.unbounded_send(data).unwrap();
                                 }
-                            },
-                            "-200" => {//订阅成功
+                            }
+                            "-200" => {
+                                //订阅成功
                                 trace!("登录成功:{:?}", data);
-                            },
-                            "-201" => {//订阅成功
+                            }
+                            "-201" => {
+                                //订阅成功
                                 trace!("订阅成功:{:?}", data);
-                            },
-                            "-300" => {//服务器发送心跳 ping 给客户端,客户端需要pong回应
+                            }
+                            "-300" => {
+                                //服务器发送心跳 ping 给客户端,客户端需要pong回应
                                 trace!("服务器响应-ping");
-                                if data.data.len()>0{
+                                if data.data.len() > 0 {
                                     write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
                                     trace!("客户端回应服务器-pong");
                                 }
-                            },
-                            "-301" => {//服务器发送心跳 pong 给客户端,客户端需要ping回应
+                            }
+                            "-301" => {
+                                //服务器发送心跳 pong 给客户端,客户端需要ping回应
                                 trace!("服务器响应-pong");
-                                if data.data.len()>0{
+                                if data.data.len() > 0 {
                                     write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
                                     trace!("客户端回应服务器-ping");
                                 }
-                            },
-                            "-302" => {//客户端收到服务器心跳自定义,需要响应自定义
+                            }
+                            "-302" => {
+                                //客户端收到服务器心跳自定义,需要响应自定义
                                 trace!("特定字符心跳,特殊响应:{:?}", data);
                                 write_lock3.send(Message::Text(data.data)).await?;
                                 trace!("特殊字符心跳-回应完成");
-                            },
+                            }
                             _ => {
                                 trace!("未知:{:?}",data);
-                            },
+                            }
                         }
                     }
                 }