|
|
@@ -187,8 +187,6 @@ impl BitgetSpotWs {
|
|
|
"args": args
|
|
|
});
|
|
|
|
|
|
- trace!("订阅信息:{}", str.to_string());
|
|
|
-
|
|
|
str.to_string()
|
|
|
}
|
|
|
|
|
|
@@ -314,6 +312,12 @@ impl BitgetSpotWs {
|
|
|
let _ = web_socket.write_message(Message::Text(login_str));
|
|
|
thread::sleep(Duration::from_secs(3));
|
|
|
}
|
|
|
+ /*****订阅***/
|
|
|
+ if subscription.len() > 0 {
|
|
|
+ trace!("订阅信息:{}", subscription);
|
|
|
+ web_socket.write_message(Message::Text(subscription.clone()))
|
|
|
+ .unwrap();
|
|
|
+ }
|
|
|
/*****消息溜***/
|
|
|
let mut ping_timeout = chrono::Utc::now().timestamp_millis();
|
|
|
loop {
|
|
|
@@ -334,12 +338,6 @@ impl BitgetSpotWs {
|
|
|
trace!("订阅成功:{:?}", res_data.data);
|
|
|
} else if res_data.code == "-201" {
|
|
|
trace!("登陆:{:?}", res_data);
|
|
|
- /*****订阅***/
|
|
|
- if subscription.len() > 0 {
|
|
|
- // trace!("订阅信息:{}", subscription);
|
|
|
- web_socket.write_message(Message::Text(subscription.clone()))
|
|
|
- .unwrap();
|
|
|
- }
|
|
|
} else {
|
|
|
let sender = self.sender.clone();
|
|
|
tokio::spawn(async move {
|
|
|
@@ -391,37 +389,31 @@ impl BitgetSpotWs {
|
|
|
.unwrap();
|
|
|
}
|
|
|
/*****消息溜***/
|
|
|
- let mut start_ping = chrono::Utc::now().timestamp_millis();
|
|
|
+ let mut ping_timeout = chrono::Utc::now().timestamp_millis();
|
|
|
loop {
|
|
|
tokio::time::sleep(Duration::from_millis(1)).await;
|
|
|
let msg = web_socket.read_message();
|
|
|
match msg {
|
|
|
Ok(Message::Text(text)) => {
|
|
|
- // trace!("获取推送:{}",text.clone());
|
|
|
+ let get_time = chrono::Utc::now().timestamp_millis();
|
|
|
+ if (get_time - ping_timeout) >= (1000 * 30) {
|
|
|
+ trace!("30s 一次主动发送心跳包!");
|
|
|
+ let _ = web_socket.write_message(Message::Ping(Vec::from("ping")));
|
|
|
+ ping_timeout = get_time;
|
|
|
+ }
|
|
|
+
|
|
|
let mut res_data = Self::ok_text(label.to_string(), text);
|
|
|
res_data.time = get_time_microsecond();
|
|
|
if res_data.code == "-200" {
|
|
|
trace!("订阅成功:{:?}", res_data.data);
|
|
|
+ } else if res_data.code == "-201" {
|
|
|
+ trace!("登陆:{:?}", res_data);
|
|
|
} else {
|
|
|
- // self.sender.send(res_data).await.unwrap();
|
|
|
let sender = self.sender.clone();
|
|
|
tokio::spawn(async move {
|
|
|
sender.send(res_data).await.unwrap();
|
|
|
});
|
|
|
tokio::spawn(async move {});
|
|
|
-
|
|
|
- //主动ping 服务器
|
|
|
- let new_time = chrono::Utc::now().timestamp_millis();
|
|
|
- // let tr = format!("判断-ping {}--{},{},{}",new_time,start_ping,(new_time - start_ping), (new_time - start_ping) > 10000);
|
|
|
- // trace!(tr);
|
|
|
- if (new_time - start_ping) > 10000 {
|
|
|
- let t = chrono::Utc::now().timestamp();
|
|
|
- let ping_str = serde_json::json!({
|
|
|
- "time" :t, "channel" : "futures.ping"
|
|
|
- });
|
|
|
- let _ = web_socket.write_message(Message::Ping(Vec::from(ping_str.to_string())));
|
|
|
- start_ping = new_time;
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
Ok(Message::Ping(s)) => {
|
|
|
@@ -486,22 +478,5 @@ impl BitgetSpotWs {
|
|
|
} else {
|
|
|
res_data
|
|
|
}
|
|
|
-
|
|
|
- // if json_value.get("error").is_some() {
|
|
|
- // let message = json_value["error"]["message"].as_str().unwrap().to_string();
|
|
|
- // let mes = message.trim_end_matches('\n');
|
|
|
- //
|
|
|
- // // let mes_json_value: serde_json::Value = serde_json::from_str(mes).unwrap();
|
|
|
- // // // trace!("错误信息:{}", mes_json_value.to_string());
|
|
|
- // res_data.code = json_value["error"]["code"].to_string();
|
|
|
- // res_data.message = mes.clone().to_string();
|
|
|
- // } else if json_value["result"]["status"].as_str() == Option::from("success") {//订阅返回
|
|
|
- // res_data.code = "-200".to_string();
|
|
|
- // res_data.data = text;
|
|
|
- // } else {
|
|
|
- // res_data.channel = format!("{}", json_value["channel"].as_str().unwrap());
|
|
|
- // res_data.code = "200".to_string();
|
|
|
- // res_data.data = json_value["result"].to_string();
|
|
|
- // }
|
|
|
}
|
|
|
}
|