Browse Source

websocket增加二进制binary解析方法

1. coinex二进制解压本地化
2. 未知推送类型解决
待解决:
 1. 资金占用率   挂单
 2. 断线重连
JiahengHe 1 năm trước cách đây
mục cha
commit
5573384b7c

+ 7 - 1
exchanges/src/binance_swap_ws.rs

@@ -191,7 +191,7 @@ impl BinanceSwapWs {
                 // ws层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 error!("binance_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
@@ -219,6 +219,12 @@ impl BinanceSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
         // trace!("原始数据");

+ 7 - 1
exchanges/src/bitget_swap_ws.rs

@@ -238,7 +238,7 @@ impl BitgetSwapWs {
                 // ws层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 error!("bitget_usdt_swap socket 断连,重连中……");
             }
@@ -265,6 +265,12 @@ impl BitgetSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
         let mut res_data = ResponseData::new("".to_string(), 200, text.clone(), Value::Null);

+ 7 - 1
exchanges/src/bybit_swap_ws.rs

@@ -250,7 +250,7 @@ impl BybitSwapWs {
                 // ws网络层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  false, label.clone(), subscribe_array, write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 error!("bybit_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
@@ -277,6 +277,12 @@ impl BybitSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
         // trace!("原始数据");

+ 3 - 4
exchanges/src/coinex_swap_rest.rs

@@ -2,9 +2,7 @@ use std::collections::BTreeMap;
 use std::error::Error;
 use std::time::{SystemTime, UNIX_EPOCH};
 use reqwest::header::{HeaderMap, HeaderValue};
-use ring::{digest};
 use hex;
-use hmac::{Hmac, Mac, NewMac};
 use reqwest::Client;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::FromPrimitive;
@@ -12,8 +10,8 @@ use rust_decimal_macros::dec;
 use serde_json::Value;
 use crate::http_tool::RestTool;
 use crate::response_base::ResponseData;
-use sha2::{Digest, Sha256, Sha512};
-use tracing::{error, info, warn};
+use sha2::{Digest, Sha256};
+use tracing::{error};
 
 #[derive(Clone)]
 pub struct CoinexSwapRest {
@@ -93,6 +91,7 @@ impl CoinexSwapRest {
     //指定币对仓位列表
     pub async fn get_position(&mut self, market: String) -> ResponseData {
         let params = serde_json::json!({
+            "market": market,
             "market_type": "FUTURES"
         });
         let data = self.request("GET".to_string(),

+ 59 - 6
exchanges/src/coinex_swap_ws.rs

@@ -1,7 +1,10 @@
+use std::io::Read;
+use std::str::from_utf8;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use chrono::Utc;
+use flate2::bufread::GzDecoder;
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 
 use once_cell::sync::Lazy;  // 使用线程安全的版本
@@ -9,14 +12,21 @@ use hex::encode;
 use serde_json::{json, Value};
 use sha2::{Digest, Sha256};
 use tokio::sync::Mutex;
+use tokio::task;
 use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{error, info, trace};
 use crate::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
 use crate::response_base::ResponseData;
 use crate::socket_tool::{AbstractWsMode, HeartbeatType};
 
+// struct LoginData {
+//     pub is_need_login: bool,
+//     pub is_login: bool
+// }
+
 pub(crate) static LOGIN_DATA: Lazy<Mutex<(bool, bool)>> = Lazy::new(|| {
     println!("初始化...");
+    // 0: 需要登录, 1:是否已经登录
     Mutex::new((false, false))
 });
 
@@ -275,7 +285,7 @@ impl CoinexSwapWs {
 
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text_sync, Self::message_ping, Self::message_pong, Self::message_binary_sync).await;
                 let mut login_data = LOGIN_DATA.lock().await;
                 // 断联后 设置为没有登录
                 login_data.1 = false;
@@ -293,10 +303,16 @@ impl CoinexSwapWs {
     /*****************************************数据解析*****************************************************/
     /*******************************************************************************************************/
     //数据解析-Text
-    pub fn message_text(text: String) -> Option<ResponseData> {
-        let response_data = Self::ok_text(text);
+    pub async fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text).await;
         Option::from(response_data)
     }
+    pub fn message_text_sync(text: String) -> Option<ResponseData> {
+        // 使用 tokio::task::block_in_place 来等待异步函数的结果
+        task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(Self::message_text(text))
+        })
+    }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
@@ -305,8 +321,21 @@ impl CoinexSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub async fn message_binary(binary: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = Self::parse_zip_data(binary);
+        let response_data = Self::ok_text(message_str).await;
+        Option::from(response_data)
+    }
+    pub fn message_binary_sync(binary: Vec<u8>) -> Option<ResponseData> {
+        // 使用 tokio::task::block_in_place 来等待异步函数的结果
+        task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(Self::message_binary(binary))
+        })
+    }
     //数据解析
-    pub fn ok_text(text: String) -> ResponseData
+    pub async fn ok_text(text: String) -> ResponseData
     {
         // trace!("原始数据:{}", text);
         let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
@@ -327,8 +356,16 @@ impl CoinexSwapWs {
                             Some(0) =>{
                                 match json_value["data"].as_str() {
                                     None => {
-                                        res_data.channel = "server.sign".to_string();
-                                        res_data.code = -200;
+                                        // 登录成功逻辑处理
+                                        let mut login_data = LOGIN_DATA.lock().await;
+
+                                        if login_data.0 && !login_data.1 {
+                                            login_data.1 = true;
+                                            res_data.channel = "server.sign".to_string();
+                                            res_data.code = -200;
+                                        } else {
+                                            res_data.code = 400;
+                                        }
                                     }
                                     _ =>{
                                         res_data.code = 400;
@@ -349,5 +386,21 @@ impl CoinexSwapWs {
         }
         res_data
     }
+
+    fn parse_zip_data(p0: Vec<u8>) -> String{
+        // 创建一个GzDecoder的实例,将压缩数据作为输入
+        let mut decoder = GzDecoder::new(&p0[..]);
+
+        // 创建一个缓冲区来存放解压缩后的数据
+        let mut decompressed_data = Vec::new();
+
+        // 读取解压缩的数据到缓冲区中
+        decoder.read_to_end(&mut decompressed_data).expect("解压缩失败");
+        let result = from_utf8(&decompressed_data)
+            .expect("解压缩后的数据不是有效的UTF-8");
+
+        // info!("解压缩数据 {:?}", result);
+        result.to_string()
+    }
 }
 

+ 7 - 1
exchanges/src/gate_swap_ws.rs

@@ -303,7 +303,7 @@ impl GateSwapWs {
 
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 error!("gate_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
@@ -330,6 +330,12 @@ impl GateSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData
     {

+ 7 - 1
exchanges/src/kucoin_swap_ws.rs

@@ -330,7 +330,7 @@ impl KucoinSwapWs {
                 info!("kucoin_usdt_swap socket 连接中……");
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 error!("kucoin_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
@@ -358,6 +358,12 @@ impl KucoinSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData
     {

+ 24 - 51
exchanges/src/socket_tool.rs

@@ -33,7 +33,7 @@ pub enum HeartbeatType {
 pub struct AbstractWsMode {}
 
 impl AbstractWsMode {
-    pub async fn ws_connected<T, PI, PO, F, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
+    pub async fn ws_connected<T, PI, PO, F, B, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
                                                     is_first_login: bool,
                                                     label: String,
                                                     is_shutdown_arc: Arc<AtomicBool>,
@@ -42,11 +42,13 @@ impl AbstractWsMode {
                                                     ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
                                                     message_text: T,
                                                     message_ping: PI,
-                                                    message_pong: PO)
+                                                    message_pong: PO,
+                                                    message_binary: B)
         where T: Fn(String) -> Option<ResponseData> + Copy,
               PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               F: Fn(ResponseData) -> Future + Clone,
+              B: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               Future: future::Future<Output=()> + Send + 'static,
     {
         let (ws_write, mut ws_read) = ws_stream.split();
@@ -80,7 +82,7 @@ impl AbstractWsMode {
                     continue;
                 }
 
-                let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
+                let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong, message_binary);
                 // let response_data = func(message);
                 if response_data.is_some() {
                     let mut data = response_data.unwrap();
@@ -106,32 +108,14 @@ impl AbstractWsMode {
                     */
                     match code {
                         -200 => {
-                            let mut is_sub = false;
-                            match data.channel.as_str() {
-                                "server.sign" => { // coinex
-                                    let mut login_data = crate::coinex_swap_ws::LOGIN_DATA.lock().await;
-                                    if login_data.0 && !login_data.1 {
-                                        login_data.1 = true;
-                                        is_sub = true;
-                                    }
-                                },
-                                "login" => { // bitget
-                                    is_sub = true;
-                                },
-                                _ => {
-                                    is_sub = false;
-                                }
-                            }
-                            if is_sub {
-                                //登录成功
-                                info!("ws登录成功:{:?}", data);
-                                info!("订阅内容:{:?}", subscribe_array.clone());
-                                for s in &subscribe_array {
-                                    let mut write_lock = ws_write_arc.lock().await;
-                                    write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
-                                }
-                                info!("订阅完成!");
+                            //登录成功
+                            info!("ws登录成功:{:?}", data);
+                            info!("订阅内容:{:?}", subscribe_array.clone());
+                            for s in &subscribe_array {
+                                let mut write_lock = ws_write_arc.lock().await;
+                                write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
                             }
+                            info!("订阅完成!");
                         }
                         -201 => {
                             //订阅成功
@@ -177,7 +161,7 @@ impl AbstractWsMode {
     }
 
     //创建链接
-    pub async fn ws_connect_async<T, PI, PO, F, Future>(is_shutdown_arc: Arc<AtomicBool>,
+    pub async fn ws_connect_async<T, PI, PO, F, B, Future>(is_shutdown_arc: Arc<AtomicBool>,
                                                         handle_function: F,
                                                         address_url: String,
                                                         is_first_login: bool,
@@ -186,10 +170,12 @@ impl AbstractWsMode {
                                                         write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
                                                         message_text: T,
                                                         message_ping: PI,
-                                                        message_pong: PO)
+                                                        message_pong: PO,
+                                                        message_binary: B)
         where T: Fn(String) -> Option<ResponseData> + Copy,
               PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
+              B: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               F: Fn(ResponseData) -> Future + Clone,
               Future: future::Future<Output=()> + Send + 'static,
     {
@@ -219,7 +205,8 @@ impl AbstractWsMode {
                                    ws_stream,
                                    message_text,
                                    message_ping,
-                                   message_pong).await;
+                                   message_pong,
+                                   message_binary).await;
             }
             Err(e) => {
                 error!("WebSocket 握手失败:{:?}", e);
@@ -249,19 +236,21 @@ impl AbstractWsMode {
         }
     }
     //数据解析
-    pub fn analysis_message<T, PI, PO>(message: Result<Message, Error>,
+    pub fn analysis_message<T, PI, PO, B>(message: Result<Message, Error>,
                                        message_text: T,
                                        message_ping: PI,
-                                       message_pong: PO) -> Option<ResponseData>
+                                       message_pong: PO,
+                                       message_binary: B) -> Option<ResponseData>
         where T: Fn(String) -> Option<ResponseData>,
               PI: Fn(Vec<u8>) -> Option<ResponseData>,
-              PO: Fn(Vec<u8>) -> Option<ResponseData>
+              PO: Fn(Vec<u8>) -> Option<ResponseData>,
+              B: Fn(Vec<u8>) -> Option<ResponseData>
     {
         match message {
             Ok(Message::Text(text)) => message_text(text),
             Ok(Message::Ping(pi)) => message_ping(pi),
             Ok(Message::Pong(po)) => message_pong(po),
-            Ok(Message::Binary(s)) => message_text(parse_zip_data(s.clone())), // 二进制压缩  解析
+            Ok(Message::Binary(s)) => message_binary(s), //二进制WebSocket消息
             Ok(Message::Close(c)) => {
                 let message_str = format!("关闭指令:{:?}", c);
                 trace!("{:?}",message_str);
@@ -289,22 +278,6 @@ impl AbstractWsMode {
     }
 }
 
-fn parse_zip_data(p0: Vec<u8>) -> String{
-    // 创建一个GzDecoder的实例,将压缩数据作为输入
-    let mut decoder = GzDecoder::new(&p0[..]);
-
-    // 创建一个缓冲区来存放解压缩后的数据
-    let mut decompressed_data = Vec::new();
-
-    // 读取解压缩的数据到缓冲区中
-    decoder.read_to_end(&mut decompressed_data).expect("解压缩失败");
-    let result = from_utf8(&decompressed_data)
-        .expect("解压缩后的数据不是有效的UTF-8");
-
-    // info!("解压缩数据 {:?}", result);
-    result.to_string()
-}
-
 //创建链接
 pub async fn ws_connect_async(address_url: String) -> (SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
                                                        SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {