ソースを参照

修复bitget的心跳包问题。

skyffire 1 年間 前
コミット
80e8f3954d

+ 39 - 33
exchanges/src/bitget_swap_ws.rs

@@ -71,7 +71,7 @@ impl BitgetSwapWs {
             login_param,
             symbol_s: vec![],
             subscribe_types: vec![],
-            heartbeat_time: 1000 * 20
+            heartbeat_time: 1000 * 10
         }
     }
 
@@ -246,7 +246,8 @@ impl BitgetSwapWs {
         //心跳-- 方法内部线程启动
         let write_tx_clone1 = Arc::clone(write_tx_am);
         tokio::spawn(async move {
-            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+            let ping_str = json!("ping");
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.as_str().unwrap().to_string()), heartbeat_time).await;
         });
 
         //链接
@@ -290,38 +291,43 @@ impl BitgetSwapWs {
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
         let mut res_data = ResponseData::new("".to_string(), "200".to_string(), text.clone(), Value::Null);
-        let json_value: Value = serde_json::from_str(&text).unwrap();
-
-        if json_value.get("event").is_some() && json_value["event"].as_str() == Some("login") {
-            if json_value.get("code").is_some() && json_value["code"] == 0 {
-                res_data.message = "登陆成功".to_string();
-            } else {
-                res_data.message = format!("登陆失败:({},{})", json_value.get("code").as_ref().unwrap(), json_value.get("msg").as_ref().unwrap());
-            }
-            res_data.channel = "login".to_string();
-            res_data.code = "-200".to_string();
-            res_data.data = json_value;
-
-            res_data
-        } else if json_value.get("event").is_some() && json_value["event"].as_str() == Some("subscribe") {
-            res_data.code = "-201".to_string();
-            res_data.data = json_value.clone();
-            res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
-            res_data.message = "success".to_string();
-            res_data
-        } else if json_value.get("action").is_some() {
-            res_data.data = json_value["data"].clone();
-            if res_data.data == "[]" {
-                res_data.code = "".to_string();
-            } else {
-                res_data.code = "200".to_string();
+        match text.as_str() {
+            "pong" => {
+                res_data.code = "-301".to_string();
+                res_data.channel = "pong".to_string();
+                res_data.message = "success".to_string();
+            },
+            _ => {
+                let json_value: Value = serde_json::from_str(&text).unwrap();
+
+                if json_value.get("event").is_some() && json_value["event"].as_str() == Some("login") {
+                    if json_value.get("code").is_some() && json_value["code"] == 0 {
+                        res_data.message = "登陆成功".to_string();
+                    } else {
+                        res_data.message = format!("登陆失败:({},{})", json_value.get("code").as_ref().unwrap(), json_value.get("msg").as_ref().unwrap());
+                    }
+                    res_data.channel = "login".to_string();
+                    res_data.code = "-200".to_string();
+                    res_data.data = json_value;
+                } else if json_value.get("event").is_some() && json_value["event"].as_str() == Some("subscribe") {
+                    res_data.code = "-201".to_string();
+                    res_data.data = json_value.clone();
+                    res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
+                    res_data.message = "success".to_string();
+                } else if json_value.get("action").is_some() {
+                    res_data.data = json_value["data"].clone();
+                    if res_data.data == "[]" {
+                        res_data.code = "".to_string();
+                    } else {
+                        res_data.code = "200".to_string();
+                    }
+                    res_data.message = "success".to_string();
+                    res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
+                    res_data.reach_time = json_value["ts"].as_i64().unwrap() * 1000;
+                }
             }
-            res_data.message = "success".to_string();
-            res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
-            res_data.reach_time = json_value["ts"].as_i64().unwrap() * 1000;
-            res_data
-        } else {
-            res_data
         }
+
+        res_data
     }
 }

+ 2 - 2
exchanges/src/socket_tool.rs

@@ -342,8 +342,8 @@ pub async fn client(add_url: String) {
 
 
 //模拟 业务场景中 发送指令给目标交易所
-async fn write_sell(tx: futures_channel::mpsc::UnboundedSender<Message>) {
-    let _str = serde_json::json!({
+async fn write_sell(tx: UnboundedSender<Message>) {
+    let _str = json!({
                 "op": "subscribe",
                 "args": [
                         {

+ 3 - 1
strategy/src/bitget_usdt_swap.rs

@@ -150,6 +150,7 @@ async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>,
                 core.update_order(order_infos, trace_stack).await;
             }
         },
+        "pong" => {}
         _ => {
             info!("bitget_usdt_swap 113 未知的订阅数据: {:?}", response);
         }
@@ -170,7 +171,8 @@ async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>,
             trace_stack.on_after_format();
 
             on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
-        }
+        },
+        "pong" => {},
         _ => {
             info!("bitget_usdt_swap 125 未知的订阅数据");
             info!(?response)