Bläddra i källkod

主要修复的是断网重连机制。

skyffire 1 år sedan
förälder
incheckning
85e02f8a8a

+ 14 - 14
exchanges/src/binance_swap_ws.rs

@@ -1,5 +1,6 @@
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use std::time::Duration;
 
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use serde_json::{json, Value};
@@ -153,7 +154,7 @@ impl BinanceSwapWs {
                                              is_shutdown_arc: Arc<AtomicBool>,
                                              handle_function: F,
                                              _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
-                                             write_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
     where
         F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
         Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
@@ -182,21 +183,20 @@ impl BinanceSwapWs {
 
         //链接
         let t2 = tokio::spawn(async move {
-            trace!("线程-异步链接-开始");
-            match AbstractWsMode::ws_connect_async(is_shutdown_arc,
-                                                   handle_function,
-                                                   address_url.clone(),
-                                                   label.clone(),
-                                                   subscribe_array,
-                                                   write_rx,
-                                                   Self::message_text,
-                                                   Self::message_ping,
-                                                   Self::message_pong,
-            ).await {
-                Ok(_) => { trace!("线程-异步链接-结束"); }
-                Err(e) => { error!("发生异常:币安-期货链接关闭-{:?}",e); }
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            loop {
+                info!("binance_usdt_swap socket 连接中……");
+                // ws层重连
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+
+                error!("binance_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
             }
         });
+
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");
 

+ 32 - 32
exchanges/src/bybit_swap_ws.rs

@@ -1,5 +1,6 @@
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use std::time::Duration;
 
 use chrono::Utc;
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
@@ -191,7 +192,7 @@ impl BybitSwapWs {
                                              is_shutdown_arc: Arc<AtomicBool>,
                                              handle_function: F,
                                              write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
-                                             write_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
         where
             F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
             Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
@@ -220,40 +221,39 @@ impl BybitSwapWs {
             trace!("线程-异步心跳-结束");
         });
 
-        //设置订阅
-        let mut subscribe_array = vec![];
-        if login_is {
-            let expires = timestamp + 1000;
-            let message = format!("GET/realtime{}", expires);
+        //链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
 
-            let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
-            let result = hmac::sign(&hmac_key, &message.as_bytes());
-            let signature = hex::encode(result.as_ref());
+            loop {
+                info!("bybit_usdt_swap socket 连接中……");
 
-            //登录相关
-            let str = json!({
-                "op": "auth",
-                "args": [api_key, expires, signature]
-            });
-            subscribe_array.push(str.to_string());
-        }
-        subscribe_array.push(subscription.to_string());
+                //设置订阅
+                let mut subscribe_array = vec![];
+                if login_is {
+                    let expires = timestamp + 1000;
+                    let message = format!("GET/realtime{}", expires);
 
-        //链接
-        let t2 = tokio::spawn(async move {
-            trace!("线程-异步链接-开始");
-            match AbstractWsMode::ws_connect_async(is_shutdown_arc,
-                                                   handle_function,
-                                                   address_url.clone(),
-                                                   label.clone(),
-                                                   subscribe_array,
-                                                   write_rx,
-                                                   Self::message_text,
-                                                   Self::message_ping,
-                                                   Self::message_pong,
-            ).await {
-                Ok(_) => { trace!("线程-异步链接-结束"); }
-                Err(e) => { error!("发生异常:bybit_usdt_swap 链接关闭-{:?}",e); }
+                    let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+                    let result = hmac::sign(&hmac_key, &message.as_bytes());
+                    let signature = hex::encode(result.as_ref());
+
+                    //登录相关
+                    let str = json!({
+                        "op": "auth",
+                        "args": [api_key, expires, signature]
+                    });
+                    subscribe_array.push(str.to_string());
+                }
+                subscribe_array.push(subscription.to_string());
+
+                // ws网络层重连
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 label.clone(), subscribe_array, write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+
+                error!("bybit_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
             }
         });
         tokio::try_join!(t2).unwrap();

+ 13 - 14
exchanges/src/gate_swap_ws.rs

@@ -1,5 +1,6 @@
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use std::time::Duration;
 use chrono::Utc;
 
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
@@ -259,7 +260,7 @@ impl GateSwapWs {
                                              is_shutdown_arc: Arc<AtomicBool>,
                                              handle_function: F,
                                              write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
-                                             write_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
         where
             F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
             Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
@@ -294,19 +295,17 @@ impl GateSwapWs {
 
         //链接
         let t2 = tokio::spawn(async move {
-            trace!("线程-异步链接-开始");
-            match AbstractWsMode::ws_connect_async(is_shutdown_arc,
-                                                   handle_function,
-                                                   address_url.clone(),
-                                                   label.clone(),
-                                                   subscribe_array,
-                                                   write_rx,
-                                                   Self::message_text,
-                                                   Self::message_ping,
-                                                   Self::message_pong,
-            ).await {
-                Ok(_) => { trace!("线程-异步链接-结束"); }
-                Err(e) => { error!("发生异常:gate-期货链接关闭-{:?}",e); }
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            loop {
+                info!("gate_usdt_swap socket 连接中……");
+
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+
+                error!("gate_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
             }
         });
         tokio::try_join!(t2).unwrap();

+ 19 - 26
exchanges/src/socket_tool.rs

@@ -14,7 +14,7 @@ use tokio::sync::Mutex;
 use tokio::time::Instant;
 use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
 use tokio_tungstenite::tungstenite::{Error, Message};
-use tracing::{info, trace};
+use tracing::{error, info, trace};
 
 use crate::proxy;
 use crate::proxy::{ProxyEnum, ProxyResponseEnum};
@@ -152,10 +152,10 @@ impl AbstractWsMode {
                                                         address_url: String,
                                                         label: String,
                                                         subscribe_array: Vec<String>,
-                                                        write_to_socket_rx: UnboundedReceiver<Message>,
+                                                        write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
                                                         message_text: T,
                                                         message_ping: PI,
-                                                        message_pong: PO) -> Result<(), Error>
+                                                        message_pong: PO)
         where T: Fn(String) -> Option<ResponseData> + Copy,
               PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
@@ -175,30 +175,23 @@ impl AbstractWsMode {
             }
         };
 
-        let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
-
-        loop {
-            match connect_async(address_url.clone(), proxy).await {
-                Ok((ws_stream, _)) => {
-                    let handle_function_clone = handle_function.clone(); // 显式克隆handle_function
-
-                    Self::ws_connected(write_to_socket_rx_arc.clone(),
-                                       label.clone(),
-                                       is_shutdown_arc.clone(),
-                                       &handle_function_clone,
-                                       subscribe_array.clone(),
-                                       ws_stream,
-                                       message_text,
-                                       message_ping,
-                                       message_pong).await;
-                }
-                Err(e) => {
-                    trace!("WebSocket 握手失败:{:?}",e);
-                }
+        match connect_async(address_url.clone(), proxy).await {
+            Ok((ws_stream, _)) => {
+                info!("socket 链接成功,{}。", address_url);
+
+                Self::ws_connected(write_to_socket_rx_arc,
+                                   label,
+                                   is_shutdown_arc,
+                                   &handle_function,
+                                   subscribe_array.clone(),
+                                   ws_stream,
+                                   message_text,
+                                   message_ping,
+                                   message_pong).await;
+            }
+            Err(e) => {
+                error!("WebSocket 握手失败:{:?}", e);
             }
-            trace!("---5");
-            trace!("---4");
-            trace!("重启...");
         }
     }
 

+ 1 - 1
strategy/src/core.rs

@@ -1684,7 +1684,7 @@ pub fn on_timer(core_arc: Arc<Mutex<Core>>) -> JoinHandle<()> {
             let mut core = core_arc_clone.lock().await;
             {
                 // 检查风控
-                // core.check_risk().await;
+                core.check_risk().await;
 
                 // 线程停止信号
                 if core.mode_signal == 1 {