use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use chrono::Utc; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures_util::{future, pin_mut, SinkExt, StreamExt}; use futures_util::stream::{SplitSink, SplitStream}; use ring::hmac; use serde_json::json; use tokio::net::TcpStream; use tokio::{spawn, time}; use tokio::sync::Mutex; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; use tokio_tungstenite::tungstenite::{Error, Message}; use tracing::{trace}; use crate::proxy; use crate::proxy::{ProxyEnum, ProxyResponseEnum}; use crate::response_base::ResponseData; #[derive(Debug)] pub enum HeartbeatType { Ping, Pong, Custom(String), } pub struct AbstractWsMode {} impl AbstractWsMode { //创建链接 pub async fn ws_connect_async(bool_v1: Arc, address_url: String, lable: String, subscribe_array: Vec, mut write_rx: UnboundedReceiver, read_tx: UnboundedSender, message_text: T, message_ping: PI, message_pong: PO) -> Result<(), Error> where T: Fn(String) -> Option + Copy, PI: Fn(Vec) -> Option + Copy, PO: Fn(Vec) -> Option + Copy { //1.是否走代理 /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/ let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) { ProxyResponseEnum::NO => { // trace!("非 代理"); None } ProxyResponseEnum::YES(proxy) => { // trace!("代理"); Option::from(proxy) } }; let read_arc = Arc::new(Mutex::new(read_tx)); let read1 = read_arc.clone(); loop { match connect_async(address_url.clone(), proxy).await { Ok((ws_stream, _)) => { trace!("WebSocket 握手完成。"); let (write, mut read) = ws_stream.split(); let write_arc = Arc::new(Mutex::new(write)); let write_clone = Arc::clone(&write_arc); //订阅写入(包括订阅信息 ) trace!("订阅内容:{:?}",subscribe_array.clone()); for s in &subscribe_array { let mut write_lock = write_clone.lock().await; write_lock.send(Message::Text(s.parse().unwrap())).await?; } //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递 // let stdin_to_ws = write_rx.map(Ok).forward(write); // Writing task let write_clone2 = Arc::clone(&write_arc); let stdin_to_ws = async { while let Some(message) = write_rx.next().await { let mut write_lock2 = write_clone2.lock().await; write_lock2.send(message).await?; } Ok::<(), Error>(()) }; let write_clone3 = Arc::clone(&write_arc); let ws_to_stdout = async { while let Some(message) = read.next().await { if !bool_v1.load(Ordering::Relaxed) { continue; } let mut write_lock3 = write_clone3.lock().await; let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong); // let response_data = func(message); if response_data.is_some() { let data = response_data.unwrap(); if data.code == "200" { let mut data_c = data.clone(); data_c.time = chrono::Utc::now().timestamp_micros(); data_c.label = lable.clone(); // if data_c.label.contains("gate_usdt_swap") { // if data_c.channel == "futures.order_book" { // if read_tx.len() == 0 { // read_tx.unbounded_send(data_c).unwrap(); // } // } else { // read_tx.unbounded_send(data_c).unwrap(); // } // } else if data_c.label.contains("binance_usdt_swap") { // if data_c.channel == "bookTicker" { // if read_tx.len() == 0 { // read_tx.unbounded_send(data_c).unwrap(); // } // } else { // read_tx.unbounded_send(data_c).unwrap(); // } // } else if data_c.label.contains("bybit_usdt_swap") { // if data_c.channel == "orderbook" { // if read_tx.len() == 0 { // read_tx.unbounded_send(data_c).unwrap(); // } // } else { // read_tx.unbounded_send(data_c).unwrap(); // } // } else { // if read_tx.len() == 0 { let r = read1.clone(); spawn(async move { r.lock().await.unbounded_send(data_c).unwrap(); }); // } // } } let code = data.code.clone(); /* 200 -正确返回 -200 -登录成功 -201 -订阅成功 -300 -客户端收到服务器心跳ping,需要响应 -301 -客户端收到服务器心跳pong,需要响应 -302 -客户端收到服务器心跳自定义,需要响应自定义 */ match code.as_str() { "200" => { } "-200" => { //登录成功 trace!("登录成功:{:?}", data); } "-201" => { //订阅成功 trace!("订阅成功:{:?}", data); } "-300" => { //服务器发送心跳 ping 给客户端,客户端需要pong回应 trace!("服务器响应-ping"); if data.data.len() > 0 { write_lock3.send(Message::Pong(Vec::from(data.data))).await?; trace!("客户端回应服务器-pong"); } } "-301" => { //服务器发送心跳 pong 给客户端,客户端需要ping回应 trace!("服务器响应-pong"); if data.data.len() > 0 { write_lock3.send(Message::Ping(Vec::from(data.data))).await?; trace!("客户端回应服务器-ping"); } } "-302" => { //客户端收到服务器心跳自定义,需要响应自定义 trace!("特定字符心跳,特殊响应:{:?}", data); write_lock3.send(Message::Text(data.data)).await?; trace!("特殊字符心跳-回应完成"); } _ => { trace!("未知:{:?}",data); } } } } Ok::<(), Error>(()) }; // 防止 cpu 休眠。 let read2 = read_arc.clone(); spawn(async move { let t_str = "t".to_string(); let response = ResponseData::new(t_str.clone(), t_str.clone(), t_str.clone(), t_str.clone()); loop { time::sleep(Duration::from_micros(5)).await; let t = response.clone(); let r = read2.clone(); spawn(async move { let read = r.lock().await; if read.is_empty() { read.unbounded_send(t.clone()).unwrap(); } }); } }); //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理 pin_mut!(stdin_to_ws, ws_to_stdout,); future::select(stdin_to_ws, ws_to_stdout).await; } Err(e) => { trace!("WebSocket 握手失败:{:?}",e); } } trace!("---5"); trace!("---4"); trace!("重启..."); // let (ws_stream, _) = connect_async(address_url.clone(), proxy).await.unwrap(); // trace!("WebSocket 握手完成。"); // let (write, mut read) = ws_stream.split(); // // let write_arc = Arc::new(Mutex::new(write)); // let write_clone = Arc::clone(&write_arc); // //订阅写入(包括订阅信息 ) // trace!("订阅内容:{:?}",subscribe_array.clone()); // for s in &subscribe_array { // let mut write_lock = write_clone.lock().await; // write_lock.send(Message::Text(s.parse().unwrap())).await?; // } // // //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递 // // let stdin_to_ws = write_rx.map(Ok).forward(write); // // Writing task // let write_clone2 = Arc::clone(&write_arc); // let stdin_to_ws = async { // while let Some(message) = write_rx.next().await { // let mut write_lock2 = write_clone2.lock().await; // write_lock2.send(message).await?; // } // Ok::<(), tokio_tungstenite::tungstenite::Error>(()) // }; // let write_clone3 = Arc::clone(&write_arc); // let ws_to_stdout = async { // while let Some(message) = read.next().await { // let mut write_lock3 = write_clone3.lock().await; // let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong); // // let response_data = func(message); // if response_data.is_some() { // let mut data = response_data.unwrap(); // data.label = lable.clone(); // let code = data.code.clone(); // /* // 200 -正确返回 // -200 -登录成功 // -201 -订阅成功 // -300 -客户端收到服务器心跳ping,需要响应 // -301 -客户端收到服务器心跳pong,需要响应 // -302 -客户端收到服务器心跳自定义,需要响应自定义 // */ // match code.as_str() { // "200" => { // if bool_v1.load(Ordering::Relaxed) { // read_tx.unbounded_send(data).unwrap(); // } // } // "-200" => { // //登录成功 // trace!("登录成功:{:?}", data); // } // "-201" => { // //订阅成功 // trace!("订阅成功:{:?}", data); // } // "-300" => { // //服务器发送心跳 ping 给客户端,客户端需要pong回应 // trace!("服务器响应-ping"); // if data.data.len() > 0 { // write_lock3.send(Message::Pong(Vec::from(data.data))).await?; // trace!("客户端回应服务器-pong"); // } // } // "-301" => { // //服务器发送心跳 pong 给客户端,客户端需要ping回应 // trace!("服务器响应-pong"); // if data.data.len() > 0 { // write_lock3.send(Message::Ping(Vec::from(data.data))).await?; // trace!("客户端回应服务器-ping"); // } // } // "-302" => { // //客户端收到服务器心跳自定义,需要响应自定义 // trace!("特定字符心跳,特殊响应:{:?}", data); // write_lock3.send(Message::Text(data.data)).await?; // trace!("特殊字符心跳-回应完成"); // } // _ => { // trace!("未知:{:?}",data); // } // } // } // } // Ok::<(), tokio_tungstenite::tungstenite::Error>(()) // }; // // //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理 // pin_mut!(stdin_to_ws, ws_to_stdout,); // future::select(stdin_to_ws, ws_to_stdout).await; // trace!("---5"); // trace!("---4"); // trace!("重启..."); } // return Ok(()); } //心跳包 pub async fn ping_or_pong(write_tx_clone: Arc>>, h_type: HeartbeatType, millis: u64) { loop { tokio::time::sleep(Duration::from_millis(millis)).await; let write_tx_clone = write_tx_clone.lock().await; write_tx_clone.unbounded_send( match h_type { HeartbeatType::Ping => { Message::Ping(Vec::from("Ping")) } HeartbeatType::Pong => { Message::Pong(Vec::from("Pong")) } HeartbeatType::Custom(ref str) => { Message::Text(str.parse().unwrap()) } } ).expect("发送失败"); trace!("发送指令-心跳:{:?}",h_type); } } //数据解析 pub fn analysis_message(message: Result, message_text: T, message_ping: PI, message_pong: PO) -> Option where T: Fn(String) -> Option, PI: Fn(Vec) -> Option, PO: Fn(Vec) -> Option { match message { Ok(Message::Text(text)) => message_text(text), Ok(Message::Ping(pi)) => message_ping(pi), Ok(Message::Pong(po)) => message_pong(po), Ok(Message::Binary(s)) => { //二进制WebSocket消息 let message_str = format!("Binary:{:?}", s); trace!("{:?}",message_str); Option::from(ResponseData::new("".to_string(), "2".to_string(), message_str, "".to_string())) } Ok(Message::Close(c)) => { let message_str = format!("关闭指令:{:?}", c); trace!("{:?}",message_str); Option::from(ResponseData::new("".to_string(), "0".to_string(), message_str, "".to_string())) } Ok(Message::Frame(f)) => { //原始帧 正常读取数据不会读取到该 信息类型 let message_str = format!("意外读取到原始帧:{:?}", f); trace!("{:?}",message_str); Option::from(ResponseData::new("".to_string(), "-2".to_string(), message_str, "".to_string())) } Err(e) => { let message_str = format!("服务器响应:{:?}", e); trace!("{:?}",message_str); Option::from(ResponseData::new("".to_string(), "-1".to_string(), message_str, "".to_string())) } } } //发送数据 pub async fn send_subscribe(write_tx_clone: Arc>>, message: Message) -> bool { let write_tx_clone = write_tx_clone.lock().await; write_tx_clone.unbounded_send(message.clone()).unwrap(); trace!("发送指令:{:?}",message); true } } //创建链接 pub async fn ws_connect_async(address_url: String) -> (SplitSink>, Message>, SplitStream>>) { //1.是否走代理 /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/ let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) { ProxyResponseEnum::NO => { trace!("非 代理"); None } ProxyResponseEnum::YES(proxy) => { trace!("代理"); Option::from(proxy) } }; let (ws_stream, _) = connect_async(address_url, proxy).await.expect("链接失败!"); trace!("WebSocket 握手完成。"); ws_stream.split() } pub async fn client(add_url: String) { let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new( 127, 0, 0, 1) ), 7890); //创建通道 开启线程,向通道写入数据 let (write_tx, write_rx) = futures_channel::mpsc::unbounded(); let (read_tx, read_rx) = futures_channel::mpsc::unbounded(); spawn(write_sell(write_tx)); //创建socket,,并且读写分离 let (ws_stream, _) = connect_async(add_url, Option::from(proxy)).await.expect("Failed to connect"); trace!("WebSocket handshake has been successfully completed"); let (write, read) = ws_stream.split(); //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递 let stdin_to_ws = write_rx.map(Ok).forward(write); let ws_to_stdout = { trace!("---1"); //读,循环读取,然后拿到 message,,然后开启异步处理 message, let result = read.for_each(|message| async { read_tx.unbounded_send(message.unwrap()).unwrap(); }); trace!("---3"); result }; tokio::spawn(read_sell(read_rx)); //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理 pin_mut!(stdin_to_ws, ws_to_stdout); future::select(stdin_to_ws, ws_to_stdout).await; } //模拟 业务场景中 发送指令给目标交易所 async fn write_sell(tx: futures_channel::mpsc::UnboundedSender) { let _str = serde_json::json!({ "op": "subscribe", "args": [ { // "channel":"orders", // "instType":"SWAP", // "instFamily":"BTC-USDT" "channel":"books5", "instId":"BTC-USDT" } ] }); let str_array: Vec = vec![ // log_in_to_str(), // str.to_string(), ]; let i = 0; loop { if str_array.len() > i { let send_str = str_array.get(i).unwrap(); tx.unbounded_send(Message::Text(send_str.to_string())).unwrap(); } tokio::time::sleep(Duration::from_secs(5)).await; tx.unbounded_send(Message::Ping(Vec::from("Ping"))).unwrap(); tx.unbounded_send(Message::Ping(Vec::from("Pong"))).unwrap(); } } async fn read_sell(mut rx: futures_channel::mpsc::UnboundedReceiver) { loop { if let Some(message) = rx.next().await { match message { Message::Text(s) => { trace!("Text: {}", s); } Message::Binary(s) => { trace!("Binary: {:?}", s); } Message::Ping(s) => { trace!("Ping: {:?}", s); } Message::Pong(s) => { trace!("Pong: {:?}", s); } Message::Close(s) => { trace!("Close: {:?}", s); } Message::Frame(s) => { trace!("Frame: {:?}", s); } } } tokio::time::sleep(Duration::from_millis(1)).await } } pub fn log_in_to_str() -> String { let mut login_json_str = "".to_string(); let access_key: String = "".to_string(); let secret_key: String = "".to_string(); let passphrase: String = "".to_string(); if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 { let timestamp = Utc::now().timestamp().to_string(); // 时间戳 + 请求类型+ 请求参数字符串 let message = format!("{}GET{}", timestamp, "/users/self/verify"); let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes()); let result = ring::hmac::sign(&hmac_key, &message.as_bytes()); let sign = base64::encode(result); let login_json = json!({ "op": "login", "args": [{ "apiKey": access_key, "passphrase": passphrase, "timestamp": timestamp, "sign": sign }] }); // trace!("---login_json:{0}", login_json.to_string()); // trace!("--登陆:{}", login_json.to_string()); login_json_str = login_json.to_string(); } login_json_str }