| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548 |
- use std::net::{IpAddr, Ipv4Addr, SocketAddr};
- use std::sync::Arc;
- use std::sync::atomic::{AtomicBool, Ordering};
- use std::time::Duration;
- use chrono::Utc;
- use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
- use futures_util::{future, pin_mut, SinkExt, StreamExt};
- use futures_util::stream::{SplitSink, SplitStream};
- use ring::hmac;
- use serde_json::json;
- use tokio::net::TcpStream;
- use tokio::{spawn};
- use tokio::sync::Mutex;
- use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
- use tokio_tungstenite::tungstenite::{Error, Message};
- use tracing::{trace};
- use crate::proxy;
- use crate::proxy::{ProxyEnum, ProxyResponseEnum};
- use crate::response_base::ResponseData;
- #[derive(Debug)]
- pub enum HeartbeatType {
- Ping,
- Pong,
- Custom(String),
- }
- pub struct AbstractWsMode {}
- impl AbstractWsMode {
- //创建链接
- pub async fn ws_connect_async<T, PI, PO>(bool_v1: Arc<AtomicBool>,
- address_url: String,
- lable: String,
- subscribe_array: Vec<String>,
- mut write_rx: UnboundedReceiver<Message>,
- read_tx: UnboundedSender<ResponseData>,
- message_text: T,
- message_ping: PI,
- message_pong: PO) -> Result<(), Error>
- where T: Fn(String) -> Option<ResponseData> + Copy,
- PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
- PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy
- {
- //1.是否走代理
- /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
- let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
- ProxyResponseEnum::NO => {
- // trace!("非 代理");
- None
- }
- ProxyResponseEnum::YES(proxy) => {
- // trace!("代理");
- Option::from(proxy)
- }
- };
- let read_arc = Arc::new(Mutex::new(read_tx));
- let read1 = read_arc.clone();
- loop {
- match connect_async(address_url.clone(), proxy).await {
- Ok((ws_stream, _)) => {
- trace!("WebSocket 握手完成。");
- let (write, mut read) = ws_stream.split();
- let write_arc = Arc::new(Mutex::new(write));
- let write_clone = Arc::clone(&write_arc);
- //订阅写入(包括订阅信息 )
- trace!("订阅内容:{:?}",subscribe_array.clone());
- for s in &subscribe_array {
- let mut write_lock = write_clone.lock().await;
- write_lock.send(Message::Text(s.parse().unwrap())).await?;
- }
- //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
- // let stdin_to_ws = write_rx.map(Ok).forward(write);
- // Writing task
- let write_clone2 = Arc::clone(&write_arc);
- let stdin_to_ws = async {
- while let Some(message) = write_rx.next().await {
- let mut write_lock2 = write_clone2.lock().await;
- write_lock2.send(message).await?;
- }
- Ok::<(), Error>(())
- };
- let write_clone3 = Arc::clone(&write_arc);
- let ws_to_stdout = async {
- while let Some(message) = read.next().await {
- if !bool_v1.load(Ordering::Relaxed) {
- continue;
- }
- let mut write_lock3 = write_clone3.lock().await;
- let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
- // let response_data = func(message);
- if response_data.is_some() {
- let data = response_data.unwrap();
- if data.code == "200" {
- let mut data_c = data.clone();
- data_c.time = chrono::Utc::now().timestamp_micros();
- data_c.label = lable.clone();
- let r = read1.clone();
- let read = r.lock().await;
- if data_c.label.contains("gate_usdt_swap") {
- if data_c.channel == "futures.order_book" {
- if read.len() == 0 {
- read.unbounded_send(data_c).unwrap();
- }
- } else {
- read.unbounded_send(data_c).unwrap();
- }
- } else if data_c.label.contains("binance_usdt_swap") {
- if data_c.channel == "bookTicker" {
- if read.len() == 0 {
- read.unbounded_send(data_c).unwrap();
- }
- } else {
- read.unbounded_send(data_c).unwrap();
- }
- } else if data_c.label.contains("bybit_usdt_swap") {
- if data_c.channel == "orderbook" {
- if read.len() == 0 {
- read.unbounded_send(data_c).unwrap();
- }
- } else {
- read.unbounded_send(data_c).unwrap();
- }
- } else {
- if read.len() == 0 {
- read.unbounded_send(data_c).unwrap();
- }
- }
- }
- let code = data.code.clone();
- /*
- 200 -正确返回
- -200 -登录成功
- -201 -订阅成功
- -300 -客户端收到服务器心跳ping,需要响应
- -301 -客户端收到服务器心跳pong,需要响应
- -302 -客户端收到服务器心跳自定义,需要响应自定义
- */
- match code.as_str() {
- "200" => {
- }
- "-200" => {
- //登录成功
- trace!("登录成功:{:?}", data);
- }
- "-201" => {
- //订阅成功
- trace!("订阅成功:{:?}", data);
- }
- "-300" => {
- //服务器发送心跳 ping 给客户端,客户端需要pong回应
- trace!("服务器响应-ping");
- if data.data.len() > 0 {
- write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
- trace!("客户端回应服务器-pong");
- }
- }
- "-301" => {
- //服务器发送心跳 pong 给客户端,客户端需要ping回应
- trace!("服务器响应-pong");
- if data.data.len() > 0 {
- write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
- trace!("客户端回应服务器-ping");
- }
- }
- "-302" => {
- //客户端收到服务器心跳自定义,需要响应自定义
- trace!("特定字符心跳,特殊响应:{:?}", data);
- write_lock3.send(Message::Text(data.data)).await?;
- trace!("特殊字符心跳-回应完成");
- }
- _ => {
- trace!("未知:{:?}",data);
- }
- }
- }
- }
- Ok::<(), Error>(())
- };
- // // 防止 cpu 休眠。
- // let read2 = read_arc.clone();
- // spawn(async move {
- // let t_str = "t".to_string();
- // let response = ResponseData::new(t_str.clone(),
- // t_str.clone(),
- // t_str.clone(),
- // t_str.clone());
- // loop {
- // time::sleep(Duration::from_nanos(1)).await;
- //
- // let t = response.clone();
- // let r = read2.clone();
- // spawn(async move {
- // let read = r.lock().await;
- //
- // read.unbounded_send(t.clone()).unwrap();
- // });
- // }
- // });
- //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
- pin_mut!(stdin_to_ws, ws_to_stdout,);
- future::select(stdin_to_ws, ws_to_stdout).await;
- }
- Err(e) => {
- trace!("WebSocket 握手失败:{:?}",e);
- }
- }
- trace!("---5");
- trace!("---4");
- trace!("重启...");
- // let (ws_stream, _) = connect_async(address_url.clone(), proxy).await.unwrap();
- // trace!("WebSocket 握手完成。");
- // let (write, mut read) = ws_stream.split();
- //
- // let write_arc = Arc::new(Mutex::new(write));
- // let write_clone = Arc::clone(&write_arc);
- // //订阅写入(包括订阅信息 )
- // trace!("订阅内容:{:?}",subscribe_array.clone());
- // for s in &subscribe_array {
- // let mut write_lock = write_clone.lock().await;
- // write_lock.send(Message::Text(s.parse().unwrap())).await?;
- // }
- //
- // //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
- // // let stdin_to_ws = write_rx.map(Ok).forward(write);
- // // Writing task
- // let write_clone2 = Arc::clone(&write_arc);
- // let stdin_to_ws = async {
- // while let Some(message) = write_rx.next().await {
- // let mut write_lock2 = write_clone2.lock().await;
- // write_lock2.send(message).await?;
- // }
- // Ok::<(), tokio_tungstenite::tungstenite::Error>(())
- // };
- // let write_clone3 = Arc::clone(&write_arc);
- // let ws_to_stdout = async {
- // while let Some(message) = read.next().await {
- // let mut write_lock3 = write_clone3.lock().await;
- // let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
- // // let response_data = func(message);
- // if response_data.is_some() {
- // let mut data = response_data.unwrap();
- // data.label = lable.clone();
- // let code = data.code.clone();
- // /*
- // 200 -正确返回
- // -200 -登录成功
- // -201 -订阅成功
- // -300 -客户端收到服务器心跳ping,需要响应
- // -301 -客户端收到服务器心跳pong,需要响应
- // -302 -客户端收到服务器心跳自定义,需要响应自定义
- // */
- // match code.as_str() {
- // "200" => {
- // if bool_v1.load(Ordering::Relaxed) {
- // read_tx.unbounded_send(data).unwrap();
- // }
- // }
- // "-200" => {
- // //登录成功
- // trace!("登录成功:{:?}", data);
- // }
- // "-201" => {
- // //订阅成功
- // trace!("订阅成功:{:?}", data);
- // }
- // "-300" => {
- // //服务器发送心跳 ping 给客户端,客户端需要pong回应
- // trace!("服务器响应-ping");
- // if data.data.len() > 0 {
- // write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
- // trace!("客户端回应服务器-pong");
- // }
- // }
- // "-301" => {
- // //服务器发送心跳 pong 给客户端,客户端需要ping回应
- // trace!("服务器响应-pong");
- // if data.data.len() > 0 {
- // write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
- // trace!("客户端回应服务器-ping");
- // }
- // }
- // "-302" => {
- // //客户端收到服务器心跳自定义,需要响应自定义
- // trace!("特定字符心跳,特殊响应:{:?}", data);
- // write_lock3.send(Message::Text(data.data)).await?;
- // trace!("特殊字符心跳-回应完成");
- // }
- // _ => {
- // trace!("未知:{:?}",data);
- // }
- // }
- // }
- // }
- // Ok::<(), tokio_tungstenite::tungstenite::Error>(())
- // };
- //
- // //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
- // pin_mut!(stdin_to_ws, ws_to_stdout,);
- // future::select(stdin_to_ws, ws_to_stdout).await;
- // trace!("---5");
- // trace!("---4");
- // trace!("重启...");
- }
- // return Ok(());
- }
- //心跳包
- pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
- loop {
- tokio::time::sleep(Duration::from_millis(millis)).await;
- let write_tx_clone = write_tx_clone.lock().await;
- write_tx_clone.unbounded_send(
- match h_type {
- HeartbeatType::Ping => {
- Message::Ping(Vec::from("Ping"))
- }
- HeartbeatType::Pong => {
- Message::Pong(Vec::from("Pong"))
- }
- HeartbeatType::Custom(ref str) => {
- Message::Text(str.parse().unwrap())
- }
- }
- ).expect("发送失败");
- trace!("发送指令-心跳:{:?}",h_type);
- }
- }
- //数据解析
- pub fn analysis_message<T, PI, PO>(message: Result<Message, Error>,
- message_text: T,
- message_ping: PI,
- message_pong: PO) -> Option<ResponseData>
- where T: Fn(String) -> Option<ResponseData>,
- PI: Fn(Vec<u8>) -> Option<ResponseData>,
- PO: Fn(Vec<u8>) -> Option<ResponseData>
- {
- match message {
- Ok(Message::Text(text)) => message_text(text),
- Ok(Message::Ping(pi)) => message_ping(pi),
- Ok(Message::Pong(po)) => message_pong(po),
- Ok(Message::Binary(s)) => {
- //二进制WebSocket消息
- let message_str = format!("Binary:{:?}", s);
- trace!("{:?}",message_str);
- Option::from(ResponseData::new("".to_string(),
- "2".to_string(),
- message_str, "".to_string()))
- }
- Ok(Message::Close(c)) => {
- let message_str = format!("关闭指令:{:?}", c);
- trace!("{:?}",message_str);
- Option::from(ResponseData::new("".to_string(),
- "0".to_string(),
- message_str, "".to_string()))
- }
- Ok(Message::Frame(f)) => {
- //原始帧 正常读取数据不会读取到该 信息类型
- let message_str = format!("意外读取到原始帧:{:?}", f);
- trace!("{:?}",message_str);
- Option::from(ResponseData::new("".to_string(),
- "-2".to_string(),
- message_str, "".to_string()))
- }
- Err(e) => {
- let message_str = format!("服务器响应:{:?}", e);
- trace!("{:?}",message_str);
- Option::from(ResponseData::new("".to_string(),
- "-1".to_string(),
- message_str, "".to_string()))
- }
- }
- }
- //发送数据
- pub async fn send_subscribe(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, message: Message) -> bool {
- let write_tx_clone = write_tx_clone.lock().await;
- write_tx_clone.unbounded_send(message.clone()).unwrap();
- trace!("发送指令:{:?}",message);
- true
- }
- }
- //创建链接
- pub async fn ws_connect_async(address_url: String) -> (SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
- SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
- //1.是否走代理
- /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
- let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
- ProxyResponseEnum::NO => {
- trace!("非 代理");
- None
- }
- ProxyResponseEnum::YES(proxy) => {
- trace!("代理");
- Option::from(proxy)
- }
- };
- let (ws_stream, _) = connect_async(address_url, proxy).await.expect("链接失败!");
- trace!("WebSocket 握手完成。");
- ws_stream.split()
- }
- pub async fn client(add_url: String) {
- let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
- 127,
- 0,
- 0,
- 1)
- ), 7890);
- //创建通道 开启线程,向通道写入数据
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
- let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
- spawn(write_sell(write_tx));
- //创建socket,,并且读写分离
- let (ws_stream, _) = connect_async(add_url, Option::from(proxy)).await.expect("Failed to connect");
- trace!("WebSocket handshake has been successfully completed");
- let (write, read) = ws_stream.split();
- //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
- let stdin_to_ws = write_rx.map(Ok).forward(write);
- let ws_to_stdout = {
- trace!("---1");
- //读,循环读取,然后拿到 message,,然后开启异步处理 message,
- let result = read.for_each(|message| async {
- read_tx.unbounded_send(message.unwrap()).unwrap();
- });
- trace!("---3");
- result
- };
- tokio::spawn(read_sell(read_rx));
- //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
- pin_mut!(stdin_to_ws, ws_to_stdout);
- future::select(stdin_to_ws, ws_to_stdout).await;
- }
- //模拟 业务场景中 发送指令给目标交易所
- async fn write_sell(tx: futures_channel::mpsc::UnboundedSender<Message>) {
- let _str = serde_json::json!({
- "op": "subscribe",
- "args": [
- {
- // "channel":"orders",
- // "instType":"SWAP",
- // "instFamily":"BTC-USDT"
- "channel":"books5",
- "instId":"BTC-USDT"
- }
- ]
- });
- let str_array: Vec<String> = vec![
- // log_in_to_str(),
- // str.to_string(),
- ];
- let i = 0;
- loop {
- if str_array.len() > i {
- let send_str = str_array.get(i).unwrap();
- tx.unbounded_send(Message::Text(send_str.to_string())).unwrap();
- }
- tokio::time::sleep(Duration::from_secs(5)).await;
- tx.unbounded_send(Message::Ping(Vec::from("Ping"))).unwrap();
- tx.unbounded_send(Message::Ping(Vec::from("Pong"))).unwrap();
- }
- }
- async fn read_sell(mut rx: futures_channel::mpsc::UnboundedReceiver<Message>) {
- loop {
- if let Some(message) = rx.next().await {
- match message {
- Message::Text(s) => {
- trace!("Text: {}", s);
- }
- Message::Binary(s) => {
- trace!("Binary: {:?}", s);
- }
- Message::Ping(s) => {
- trace!("Ping: {:?}", s);
- }
- Message::Pong(s) => {
- trace!("Pong: {:?}", s);
- }
- Message::Close(s) => {
- trace!("Close: {:?}", s);
- }
- Message::Frame(s) => {
- trace!("Frame: {:?}", s);
- }
- }
- }
- tokio::time::sleep(Duration::from_millis(1)).await
- }
- }
- pub fn log_in_to_str() -> String {
- let mut login_json_str = "".to_string();
- let access_key: String = "".to_string();
- let secret_key: String = "".to_string();
- let passphrase: String = "".to_string();
- if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
- let timestamp = Utc::now().timestamp().to_string();
- // 时间戳 + 请求类型+ 请求参数字符串
- let message = format!("{}GET{}", timestamp, "/users/self/verify");
- let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
- let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
- let sign = base64::encode(result);
- let login_json = json!({
- "op": "login",
- "args": [{
- "apiKey": access_key,
- "passphrase": passphrase,
- "timestamp": timestamp,
- "sign": sign }]
- });
- // trace!("---login_json:{0}", login_json.to_string());
- // trace!("--登陆:{}", login_json.to_string());
- login_json_str = login_json.to_string();
- }
- login_json_str
- }
|