hl 1 år sedan
förälder
incheckning
bd7684d5a6

+ 2 - 2
exchanges/src/binance_swap_ws_async.rs

@@ -226,7 +226,7 @@ impl BinanceSwapWs {
         let write_tx_clone1 = Arc::clone(write_tx_am);
         tokio::spawn(async move {
             trace!("线程-异步心跳-开始");
-            AbstractWsMode::ping_or_pong(bool_v1, write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
             trace!("线程-异步心跳-结束");
         });
 
@@ -243,7 +243,7 @@ impl BinanceSwapWs {
 
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(address_url.clone(),
+            AbstractWsMode::ws_connect_async(bool_v1,address_url.clone(),
                                              label.clone(), subscribe_array,
                                              write_rx, read_tx, BinanceSwapWs::analysis_message,
             ).await.expect("币安");

+ 2 - 2
exchanges/src/kucoin_swap_ws_async.rs

@@ -362,7 +362,7 @@ impl KucoinSwapWs {
         let write_tx_clone1 = write_tx_am.clone();
         tokio::spawn(async move {
             trace!("线程-异步心跳-开始");
-            AbstractWsMode::ping_or_pong(bool_v1, write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
             trace!("线程-异步心跳-结束");
         });
 
@@ -378,7 +378,7 @@ impl KucoinSwapWs {
 
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(address_url.clone(),
+            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
                                              label.clone(), subscribe_array,
                                              write_rx, read_tx, KucoinSwapWs::analysis_message,
             ).await.expect("kucoin");

+ 7 - 5
exchanges/src/socket_tool.rs

@@ -1,6 +1,6 @@
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 
 use chrono::Utc;
@@ -88,7 +88,8 @@ impl AbstractWsMode {
 
 
     //创建链接
-    pub async fn ws_connect_async<F>(address_url: String,
+    pub async fn ws_connect_async<F>(bool_v1: Arc<AtomicBool>,
+                                     address_url: String,
                                      lable: String,
                                      subscribe_array: Vec<String>,
                                      mut write_rx: UnboundedReceiver<Message>,
@@ -119,7 +120,6 @@ impl AbstractWsMode {
                 write.send(Message::Text(s.parse().unwrap())).await?;
             }
 
-
             //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
             // let stdin_to_ws = write_rx.map(Ok).forward(write);
             // Writing task
@@ -137,7 +137,9 @@ impl AbstractWsMode {
                         data.label = lable.clone();
                         let code = data.code.clone();
                         if code.as_str() == "-1" {} else if code.as_str() == "200" {
-                            read_tx.unbounded_send(data).unwrap();
+                            if bool_v1.load(Ordering::Relaxed) {
+                                read_tx.unbounded_send(data).unwrap();
+                            }
                         }
                     }
                 }
@@ -219,7 +221,7 @@ impl AbstractWsMode {
 
 
     //心跳包
-    pub async fn ping_or_pong(_bool_v1: Arc<AtomicBool>, write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
+    pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
         loop {
             tokio::time::sleep(Duration::from_millis(millis)).await;
             let write_tx_clone = write_tx_clone.lock().await;