Bläddra i källkod

时效过期,发起重启

skyfffire 1 år sedan
förälder
incheckning
bfd86c1a2e
2 ändrade filer med 50 tillägg och 16 borttagningar
  1. 45 11
      exchanges/src/binance_swap_ws.rs
  2. 5 5
      exchanges/tests/binance_swap_test.rs

+ 45 - 11
exchanges/src/binance_swap_ws.rs

@@ -1,3 +1,4 @@
+use std::borrow::Cow;
 use std::collections::BTreeMap;
 use std::str::FromStr;
 use std::sync::Arc;
@@ -8,11 +9,13 @@ use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use serde_json::{json, Value};
 use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::{Error, Message};
+use tokio_tungstenite::tungstenite::protocol::CloseFrame;
+use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
 use tracing::{error, info, trace};
 use crate::binance_swap_rest::BinanceSwapRest;
 
 use crate::response_base::ResponseData;
-use crate::socket_tool::AbstractWsMode;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
 
 //类型
 pub enum BinanceSwapWsType {
@@ -230,11 +233,11 @@ impl BinanceSwapWs {
     pub async fn ws_connect_async<F, Future>(&mut self,
                                              is_shutdown_arc: Arc<AtomicBool>,
                                              handle_function: F,
-                                             _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
                                              write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
-        where
-            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
-            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    where
+        F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+        Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
     {
         let login_is = self.contains_pr();
         let subscription_pu = self.get_subscription_pu();
@@ -252,6 +255,32 @@ impl BinanceSwapWs {
         //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
         //     trace!("线程-异步心跳-结束");
         // });
+        //60分钟效果 50之后分钟重连
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            trace!("线程-异步时效记录-开始");
+            // let time_5m = 1000 * 60 * 50;
+            let time_5m = 1000 * 60 * 50;
+            loop {
+                tokio::time::sleep(Duration::from_millis(time_5m)).await;
+                let write_tx_clone = write_tx_clone1.lock().await;
+                let close_frame = CloseFrame {
+                    code: CloseCode::Normal,
+                    reason: Cow::Borrowed("Bye bye"),
+                };
+                let message = Message::Close(Some(close_frame));
+                match write_tx_clone.unbounded_send(message) {
+                    Ok(_o) => {
+                        info!("listen_key 时效不足60,更新listen_key,发起重启指令..");
+                    }
+                    Err(k) => {
+                        error!("listen_key 时效不足60,更新listen_key,发起关闭指令..发起失败:原因{:?}",k)
+                    }
+                }
+            }
+            // AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(), time_5m).await;
+            trace!("线程-异步时效记录-结束");
+        });
 
         //设置订阅
         // let mut subscribe_array = vec![];
@@ -378,17 +407,22 @@ impl BinanceSwapWs {
         {
             //私有订阅
             let result = json_value["result"].as_array().unwrap();
-            let mut channel = "".to_string();
-            for dto in result {
+            let mut message = "".to_string();
+            for dto in result.clone() {
                 let req = dto["req"].as_str().unwrap();
                 if req.contains("@account") {
-                    channel += "@aggTrade";
+                    message += "@aggTrade";
                 } else if req.contains("@balance") {
-                    channel += "@balance";
+                    message += "@balance";
+                }else if req.contains("@position") {
+                    message += "@position";
                 }
             }
-            res_data.code = -201;
-            res_data.message = format!("订阅成功:{}", channel)
+
+            res_data.data = Value::from(result.clone());
+            res_data.code = 200;
+            res_data.message = format!("私有订阅反馈:{}", message);
+            res_data.channel = "info".to_string();
         } else if json_value.get("error").is_some() {
             //订阅失败返回
             res_data.code = i16::from_str(json_value["error"]["code"].as_str().unwrap()).unwrap();

+ 5 - 5
exchanges/tests/binance_swap_test.rs

@@ -79,16 +79,16 @@ async fn ws_custom_subscribe() {
             api_secret: SECRET_KEY.to_string(),
         };
         let t1 = tokio::spawn(async move {
-            let mut ws = get_ws(Option::from(param), BinanceSwapWsType::Public).await;
+            let mut ws = get_ws(Option::from(param), BinanceSwapWsType::Private).await;
             ws.set_symbols(vec!["BTC_USDT".to_string(), "ETC_USDT".to_string()]);
             ws.set_subscribe(vec![
                 // BinanceSwapSubscribeType::PuBookTicker,
-                BinanceSwapSubscribeType::PuAggTrade,
+                // BinanceSwapSubscribeType::PuAggTrade,
                 // BinanceSwapSubscribeType::PuDepth20levels100ms,
 
-                // BinanceSwapSubscribeType::PrAccount,
-                // BinanceSwapSubscribeType::PrBalance,
-                // BinanceSwapSubscribeType::PrPosition
+                BinanceSwapSubscribeType::PrAccount,
+                BinanceSwapSubscribeType::PrBalance,
+                BinanceSwapSubscribeType::PrPosition
             ]);
             //链接
             let bool_v3_clone = Arc::clone(&is_shutdown_arc);