瀏覽代碼

日志优化,断连流程优化。

skyffire 1 年之前
父節點
當前提交
9be63ee0ef
共有 3 個文件被更改,包括 17 次插入13 次删除
  1. 6 3
      exchanges/src/bitget_swap_ws.rs
  2. 5 3
      exchanges/src/socket_tool.rs
  3. 6 7
      src/bitget_usdt_swap_data_listener.rs

+ 6 - 3
exchanges/src/bitget_swap_ws.rs

@@ -1,5 +1,5 @@
 use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
+use std::sync::atomic::{AtomicBool, Ordering};
 use chrono::{Utc};
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use serde_json::{json, Value};
@@ -201,7 +201,6 @@ impl BitgetSwapWs {
         // 设置订阅
         let subscription = self.get_subscription();
         let subscribe_array = vec![subscription.to_string()];
-        info!(?subscribe_array);
 
         //心跳-- 方法内部线程启动
         let write_tx_clone1 = Arc::clone(write_tx_am);
@@ -217,6 +216,10 @@ impl BitgetSwapWs {
             let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
 
             loop {
+                if !is_shutdown_arc.load(Ordering::Relaxed) {
+                    break;
+                }
+
                 info!("bitget_usdt_swap socket 连接中……");
 
                 // 登录相关
@@ -294,7 +297,7 @@ impl BitgetSwapWs {
                     }
                     res_data.channel = "login".to_string();
                     res_data.code = -200;
-                    res_data.data = json_value;
+                    res_data.data = json_value.clone();
                 } else if json_value.get("event").is_some() && json_value["event"].as_str() == Some("subscribe") {
                     res_data.code = -201;
                     res_data.data = json_value.clone();

+ 5 - 3
exchanges/src/socket_tool.rs

@@ -63,8 +63,10 @@ impl AbstractWsMode {
 
         // 如果不需要事先登录,则直接订阅消息
         if !is_first_login {
-            info!("订阅内容:{:?}", subscribe_array.clone());
+            info!("订阅长度: {}", subscribe_array.len());
             for s in &subscribe_array {
+                info!("订阅内容:{} ", s);
+
                 let mut write_lock = ws_write_arc.lock().await;
                 write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
             }
@@ -207,7 +209,6 @@ impl AbstractWsMode {
     //心跳包
     pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
         loop {
-            tokio::time::sleep(Duration::from_millis(millis)).await;
             let write_tx_clone = write_tx_clone.lock().await;
             write_tx_clone.unbounded_send(
                 match h_type {
@@ -223,6 +224,7 @@ impl AbstractWsMode {
                 }
             ).expect("发送失败");
             trace!("发送指令-心跳:{:?}",h_type);
+            tokio::time::sleep(Duration::from_millis(millis)).await;
         }
     }
     //数据解析
@@ -257,7 +259,7 @@ impl AbstractWsMode {
             }
             Err(e) => {
                 let message_str = format!("服务器响应:{:?}", e);
-                trace!("{:?}",message_str);
+                error!("socket断连:{}", message_str);
                 Option::from(ResponseData::new("".to_string(), -1, message_str, Value::Null))
             }
         }

+ 6 - 7
src/bitget_usdt_swap_data_listener.rs

@@ -29,7 +29,6 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     let login = BTreeMap::new();
     let mut bitget_rest = BitgetSwapRest::new(false, login);
     let response = bitget_rest.get_all_contracts().await;
-    info!(?response);
     let mut symbols = vec![];
     if response.code == 200 {
         let data = response.data.as_array().unwrap();
@@ -42,7 +41,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         let mut ws = BitgetSwapWs::new_with_tag(name.to_string(), false, None, BitgetSwapWsType::Public);
             ws.set_subscribe(vec![
-                // BitgetSwapSubscribeType::PuTrade,
+                BitgetSwapSubscribeType::PuTrade,
                 BitgetSwapSubscribeType::PuCandle1m
             ]);
 
@@ -77,11 +76,11 @@ pub async fn data_listener(response: ResponseData) {
         },
         // k线数据
         "candle1m" => {
-            let records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
-
-            for record in records.iter() {
-                info!(?record);
-            }
+            // let records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
+            //
+            // for record in records.iter() {
+            //     info!(?record);
+            // }
         },
         _ => {
             info!("48 未知的数据类型: {:?}", response)