Explorar el Código

直接挂上去试试。

skyfffire hace 1 año
padre
commit
24f873f406
Se han modificado 1 ficheros con 33 adiciones y 7 borrados
  1. 33 7
      exchanges/src/socket_tool.rs

+ 33 - 7
exchanges/src/socket_tool.rs

@@ -10,7 +10,7 @@ use futures_util::stream::{SplitSink, SplitStream};
 use ring::hmac;
 use serde_json::json;
 use tokio::net::TcpStream;
-use tokio::spawn;
+use tokio::{spawn, time};
 use tokio::sync::Mutex;
 use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
 use tokio_tungstenite::tungstenite::{Error, Message};
@@ -57,6 +57,9 @@ impl AbstractWsMode {
             }
         };
 
+        let read_arc = Arc::new(Mutex::new(read_tx));
+
+        let read1 = read_arc.clone();
         loop {
             match connect_async(address_url.clone(), proxy).await {
                 Ok((ws_stream, _)) => {
@@ -81,7 +84,7 @@ impl AbstractWsMode {
                             let mut write_lock2 = write_clone2.lock().await;
                             write_lock2.send(message).await?;
                         }
-                        Ok::<(), tokio_tungstenite::tungstenite::Error>(())
+                        Ok::<(), Error>(())
                     };
                     let write_clone3 = Arc::clone(&write_arc);
                     let ws_to_stdout = async {
@@ -127,11 +130,11 @@ impl AbstractWsMode {
                                     //     }
                                     // } else {
                                     //     if read_tx.len() == 0 {
-                                    let r = read_tx.clone();
+                                    let r = read1.clone();
                                     spawn(async move {
-                                        r.unbounded_send(data_c).unwrap();
+                                        r.lock().await.unbounded_send(data_c).unwrap();
                                     });
-                                        // }
+                                    // }
                                     // }
                                 }
 
@@ -183,9 +186,32 @@ impl AbstractWsMode {
                                 }
                             }
                         }
-                        Ok::<(), tokio_tungstenite::tungstenite::Error>(())
+                        Ok::<(), Error>(())
                     };
 
+                    // 防止 cpu 休眠。
+                    let read2 = read_arc.clone();
+                    spawn(async move {
+                        let t_str = "t".to_string();
+                        let response = ResponseData::new(t_str.clone(),
+                                                  t_str.clone(),
+                                                  t_str.clone(),
+                                                  t_str.clone());
+                        loop {
+                            time::sleep(Duration::from_micros(5)).await;
+
+                            let t = response.clone();
+                            let r = read2.clone();
+                            spawn(async move {
+                                let read = r.lock().await;
+
+                                if read.is_empty() {
+                                    read.unbounded_send(t.clone()).unwrap();
+                                }
+                            });
+                        }
+                    });
+
                     //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
                     pin_mut!(stdin_to_ws, ws_to_stdout,);
                     future::select(stdin_to_ws, ws_to_stdout).await;
@@ -404,7 +430,7 @@ pub async fn client(add_url: String) {
     //创建通道 开启线程,向通道写入数据
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
     let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
-    tokio::spawn(write_sell(write_tx));
+    spawn(write_sell(write_tx));
 
 
     //创建socket,,并且读写分离