|
|
@@ -1,7 +1,7 @@
|
|
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
|
|
use std::sync::Arc;
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
-use std::time::{Duration};
|
|
|
+use std::time::Duration;
|
|
|
|
|
|
use chrono::Utc;
|
|
|
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
|
|
|
@@ -31,16 +31,16 @@ pub struct AbstractWsMode {}
|
|
|
|
|
|
impl AbstractWsMode {
|
|
|
pub async fn ws_connected<T, PI, PO, F, B, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
|
|
|
- is_first_login: bool,
|
|
|
- label: String,
|
|
|
- is_shutdown_arc: Arc<AtomicBool>,
|
|
|
- handle_function: &F,
|
|
|
- subscribe_array: Vec<String>,
|
|
|
- ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
|
|
|
- message_text: T,
|
|
|
- message_ping: PI,
|
|
|
- message_pong: PO,
|
|
|
- message_binary: B)
|
|
|
+ is_first_login: bool,
|
|
|
+ label: String,
|
|
|
+ is_shutdown_arc: Arc<AtomicBool>,
|
|
|
+ handle_function: &F,
|
|
|
+ subscribe_array: Vec<String>,
|
|
|
+ ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
|
|
|
+ message_text: T,
|
|
|
+ message_ping: PI,
|
|
|
+ message_pong: PO,
|
|
|
+ message_binary: B)
|
|
|
where T: Fn(String) -> Option<ResponseData> + Copy,
|
|
|
PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
|
|
|
PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
|
|
|
@@ -158,16 +158,16 @@ impl AbstractWsMode {
|
|
|
|
|
|
//创建链接
|
|
|
pub async fn ws_connect_async<T, PI, PO, F, B, Future>(is_shutdown_arc: Arc<AtomicBool>,
|
|
|
- handle_function: F,
|
|
|
- address_url: String,
|
|
|
- is_first_login: bool,
|
|
|
- label: String,
|
|
|
- subscribe_array: Vec<String>,
|
|
|
- write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
|
|
|
- message_text: T,
|
|
|
- message_ping: PI,
|
|
|
- message_pong: PO,
|
|
|
- message_binary: B)
|
|
|
+ handle_function: F,
|
|
|
+ address_url: String,
|
|
|
+ is_first_login: bool,
|
|
|
+ label: String,
|
|
|
+ subscribe_array: Vec<String>,
|
|
|
+ write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
|
|
|
+ message_text: T,
|
|
|
+ message_ping: PI,
|
|
|
+ message_pong: PO,
|
|
|
+ message_binary: B)
|
|
|
where T: Fn(String) -> Option<ResponseData> + Copy,
|
|
|
PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
|
|
|
PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
|
|
|
@@ -215,7 +215,7 @@ impl AbstractWsMode {
|
|
|
loop {
|
|
|
tokio::time::sleep(Duration::from_millis(millis)).await;
|
|
|
let write_tx_clone = write_tx_clone.lock().await;
|
|
|
- write_tx_clone.unbounded_send(
|
|
|
+ match write_tx_clone.unbounded_send(
|
|
|
match h_type {
|
|
|
HeartbeatType::Ping => {
|
|
|
Message::Ping(Vec::from("Ping"))
|
|
|
@@ -227,16 +227,35 @@ impl AbstractWsMode {
|
|
|
Message::Text(str.parse().unwrap())
|
|
|
}
|
|
|
}
|
|
|
- ).expect("发送失败");
|
|
|
- trace!("发送指令-心跳:{:?}",h_type);
|
|
|
+ ) {
|
|
|
+ Ok(o) => {
|
|
|
+ trace!("发送指令-心跳:{:?}",h_type);
|
|
|
+ }
|
|
|
+ Err(k) => {
|
|
|
+ error!("发送失败:原因{:?}",k)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // write_tx_clone.unbounded_send(
|
|
|
+ // match h_type {
|
|
|
+ // HeartbeatType::Ping => {
|
|
|
+ // Message::Ping(Vec::from("Ping"))
|
|
|
+ // }
|
|
|
+ // HeartbeatType::Pong => {
|
|
|
+ // Message::Pong(Vec::from("Pong"))
|
|
|
+ // }
|
|
|
+ // HeartbeatType::Custom(ref str) => {
|
|
|
+ // Message::Text(str.parse().unwrap())
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // ).expect("发送失败");
|
|
|
}
|
|
|
}
|
|
|
//数据解析
|
|
|
pub fn analysis_message<T, PI, PO, B>(message: Result<Message, Error>,
|
|
|
- message_text: T,
|
|
|
- message_ping: PI,
|
|
|
- message_pong: PO,
|
|
|
- message_binary: B) -> Option<ResponseData>
|
|
|
+ message_text: T,
|
|
|
+ message_ping: PI,
|
|
|
+ message_pong: PO,
|
|
|
+ message_binary: B) -> Option<ResponseData>
|
|
|
where T: Fn(String) -> Option<ResponseData>,
|
|
|
PI: Fn(Vec<u8>) -> Option<ResponseData>,
|
|
|
PO: Fn(Vec<u8>) -> Option<ResponseData>,
|