ソースを参照

修正 Mutex 为 tokio

hl 1 年間 前
コミット
882f3be0ce

+ 13 - 12
exchanges/src/binance_swap_ws_async.rs

@@ -1,8 +1,9 @@
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use serde_json::json;
+use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{info, trace};
 
@@ -254,17 +255,17 @@ impl BinanceSwapWs {
 
         Ok(())
     }
-    //主动订阅
-    pub fn send_subscribe(self, write_tx: Arc<Mutex<UnboundedSender<Message>>>) {
-        trace!("发起订阅");
-        if self.contains_pr() {
-            //有订阅需要登录
-        }
-        //发送订阅
-        let subscribe = self.get_subscription();
-        let message = Message::Text(subscribe.to_string());
-        AbstractWsMode::send_subscribe(write_tx, message);
-    }
+    // //主动订阅
+    // pub fn send_subscribe(self, write_tx: Arc<Mutex<UnboundedSender<Message>>>) {
+    //     trace!("发起订阅");
+    //     if self.contains_pr() {
+    //         //有订阅需要登录
+    //     }
+    //     //发送订阅
+    //     let subscribe = self.get_subscription();
+    //     let message = Message::Text(subscribe.to_string());
+    //     AbstractWsMode::send_subscribe(write_tx, message);
+    // }
 
 
     //数据解析

+ 5 - 4
exchanges/src/kucoin_swap_ws_async.rs

@@ -1,8 +1,9 @@
 use std::collections::BTreeMap;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{error, info, trace};
 
@@ -358,8 +359,8 @@ impl KucoinSwapWs {
         let heartbeat_time = self.heartbeat_time.clone();
 
         //心跳-- 方法内部线程启动
-        let write_tx_clone1 = Arc::clone(write_tx_am);
-       tokio::spawn(async move {
+        let write_tx_clone1 = write_tx_am.clone();
+        tokio::spawn(async move {
             trace!("线程-异步心跳-开始");
             AbstractWsMode::ping_or_pong(bool_v1, write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
             trace!("线程-异步心跳-结束");
@@ -367,7 +368,7 @@ impl KucoinSwapWs {
 
 
         //设置订阅
-        let  subscribe_array =  subscription.clone();
+        let subscribe_array = subscription.clone();
         if login_is {
             //登录相关
         }

+ 8 - 8
exchanges/src/socket_tool.rs

@@ -1,15 +1,16 @@
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 
 use chrono::Utc;
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
-use futures_util::{future, pin_mut,  SinkExt, StreamExt};
+use futures_util::{future, pin_mut, SinkExt, StreamExt};
 use futures_util::stream::{SplitSink, SplitStream};
 use ring::hmac;
 use serde_json::json;
 use tokio::net::TcpStream;
+use tokio::sync::Mutex;
 use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
 use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::trace;
@@ -24,8 +25,7 @@ pub enum HeartbeatType {
     Pong,
 }
 
-pub struct AbstractWsMode {
-}
+pub struct AbstractWsMode {}
 
 impl AbstractWsMode {
     // pub fn new(write_rx: UnboundedReceiver<Message>,
@@ -175,7 +175,7 @@ impl AbstractWsMode {
             trace!("---4");
             trace!("重启...");
         }
-       return  Ok(())
+        return Ok(());
     }
 
 // match connect_async(address_url, proxy).await {
@@ -222,7 +222,7 @@ impl AbstractWsMode {
     pub async fn ping_or_pong(_bool_v1: Arc<AtomicBool>, write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
         loop {
             tokio::time::sleep(Duration::from_millis(millis)).await;
-            let  write_tx_clone = write_tx_clone.lock().unwrap();
+            let write_tx_clone = write_tx_clone.lock().await;
             write_tx_clone.unbounded_send(
                 match h_type {
                     HeartbeatType::Ping => {
@@ -238,8 +238,8 @@ impl AbstractWsMode {
     }
 
     //发送数据
-    pub fn send_subscribe(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, message: Message) -> bool {
-        let  write_tx_clone = write_tx_clone.lock().unwrap();
+    pub async fn send_subscribe(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, message: Message) -> bool {
+        let write_tx_clone = write_tx_clone.lock().await;
         write_tx_clone.unbounded_send(message.clone()).unwrap();
         trace!("发送指令:{:?}",message);
         true

+ 2 - 1
exchanges/tests/binance_swap_test.rs

@@ -1,8 +1,9 @@
 use std::collections::BTreeMap;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 
 use futures_util::StreamExt;
+use tokio::sync::Mutex;
 use tracing::trace;
 
 use exchanges::binance_swap_rest::BinanceSwapRest;

+ 2 - 1
exchanges/tests/kucoin_swap_test_async.rs

@@ -1,7 +1,8 @@
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 
 use futures_util::StreamExt;
+use tokio::sync::Mutex;
 use tracing::trace;
 
 use exchanges::kucoin_swap_ws_async::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};