Browse Source

新增coinex的支持,主要是有个二进制消息解构的部分,需要其他交易所也支持这个方法

skyffire 1 year ago
parent
commit
2090b95f4a

+ 3 - 1
exchanges/Cargo.toml

@@ -42,4 +42,6 @@ tracing = "0.1"
 tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
 
 ##生成 xlsx
-rust_xlsxwriter = "0.58.0"
+rust_xlsxwriter = "0.58.0"
+once_cell = "1.19.0"
+flate2 = "1.0.28"

+ 7 - 1
exchanges/src/binance_swap_ws.rs

@@ -196,7 +196,7 @@ impl BinanceSwapWs {
                 // ws层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 error!("binance_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
@@ -224,6 +224,12 @@ impl BinanceSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
         // trace!("原始数据");

+ 7 - 1
exchanges/src/bitget_swap_ws.rs

@@ -250,7 +250,7 @@ impl BitgetSwapWs {
                 // ws层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  login_is, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 warn!("bitget_usdt_swap socket 断连,重连中……");
             }
@@ -277,6 +277,12 @@ impl BitgetSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
         let mut res_data = ResponseData::new("".to_string(), 200, text.clone(), Value::Null);

+ 7 - 1
exchanges/src/bybit_swap_ws.rs

@@ -250,7 +250,7 @@ impl BybitSwapWs {
                 // ws网络层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  false, tag.clone(), subscribe_array, write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 error!("bybit_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
@@ -277,6 +277,12 @@ impl BybitSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
         // trace!("原始数据");

+ 409 - 414
exchanges/src/coinex_swap_ws.rs

@@ -1,414 +1,409 @@
-// use std::io::Read;
-// use std::str::from_utf8;
-// use std::sync::Arc;
-// use std::sync::atomic::AtomicBool;
-// use std::time::{Duration, SystemTime, UNIX_EPOCH};
-//
-// use flate2::bufread::GzDecoder;
-// use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
-//
-// use once_cell::sync::Lazy;  // 使用线程安全的版本
-// use hex::encode;
-// use serde_json::{json, Value};
-// use sha2::{Digest, Sha256};
-// use tokio::sync::Mutex;
-// use tokio::task;
-// use tokio_tungstenite::tungstenite::{Error, Message};
-// use tracing::{error, info, trace};
-// use crate::response_base::ResponseData;
-// use crate::socket_tool::{AbstractWsMode, HeartbeatType};
-//
-// // struct LoginData {
-// //     pub is_need_login: bool,
-// //     pub is_login: bool
-// // }
-//
-// pub(crate) static LOGIN_DATA: Lazy<Mutex<(bool, bool)>> = Lazy::new(|| {
-//     println!("初始化...");
-//     // 0: 需要登录, 1:是否已经登录
-//     Mutex::new((false, false))
-// });
-//
-// //订阅频道
-// #[derive(Clone)]
-// pub enum CoinexSwapSubscribeType {
-//     // 深度
-//     PuFuturesDepth,
-//     // 公开成交
-//     PuFuturesDeals,
-//
-//     // 订单
-//     PrFuturesOrders,
-//     // 仓位
-//     PrFuturesPositions,
-//     // 余额
-//     PrFuturesBalances,
-// }
-//
-// //账号信息
-// #[derive(Clone)]
-// #[allow(dead_code)]
-// pub struct CoinexSwapLogin {
-//     pub api_key: String,
-//     pub secret: String,
-// }
-//
-// #[derive(Clone)]
-// pub struct CoinexSwapWs {
-//     //类型
-//     label: String,
-//     //地址
-//     address_url: String,
-//     //账号信息
-//     login_param: Option<CoinexSwapLogin>,
-//     //币对
-//     symbol_s: Vec<String>,
-//     //订阅
-//     subscribe_types: Vec<CoinexSwapSubscribeType>,
-//     //心跳间隔
-//     heartbeat_time: u64
-// }
-//
-//
-// impl CoinexSwapWs {
-//     /*******************************************************************************************************/
-//     /*****************************************实例化一个对象****************************************************/
-//     /*******************************************************************************************************/
-//     pub fn new(login_param: Option<CoinexSwapLogin>) -> CoinexSwapWs {
-//         return CoinexSwapWs::new_label("default-CoinexSwapWs".to_string(), login_param);
-//     }
-//
-//     pub fn new_label(label: String, login_param: Option<CoinexSwapLogin>) -> CoinexSwapWs
-//     {
-//         /*******公共频道-私有频道数据组装*/
-//         let address_url = "wss://socket.coinex.com/v2/futures".to_string();
-//         info!("走普通通道(不支持colo通道):{}", address_url);
-//         CoinexSwapWs {
-//             label,
-//             address_url,
-//             login_param,
-//             symbol_s: vec![],
-//             subscribe_types: vec![],
-//             heartbeat_time: 1000 * 10
-//         }
-//     }
-//
-//     /*******************************************************************************************************/
-//     /*****************************************订阅函数********************************************************/
-//     /*******************************************************************************************************/
-//     //手动添加订阅信息
-//     pub fn set_subscribe(&mut self, subscribe_types: Vec<CoinexSwapSubscribeType>) {
-//         self.subscribe_types.extend(subscribe_types);
-//     }
-//     //手动添加币对
-//     pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
-//         for symbol in b_array.iter_mut() {
-//             // 大写
-//             *symbol = symbol.to_uppercase();
-//             // 字符串替换
-//             *symbol = symbol.replace("-", "_");
-//         }
-//         self.symbol_s = b_array;
-//     }
-//     //频道是否需要登录
-//     fn contains_pr(&self) -> bool {
-//         for t in self.subscribe_types.clone() {
-//             if match t {
-//                 CoinexSwapSubscribeType::PuFuturesDepth => false,
-//                 CoinexSwapSubscribeType::PuFuturesDeals => false,
-//
-//                 CoinexSwapSubscribeType::PrFuturesOrders => true,
-//                 CoinexSwapSubscribeType::PrFuturesPositions => true,
-//                 CoinexSwapSubscribeType::PrFuturesBalances => true,
-//             } {
-//                 return true;
-//             }
-//         }
-//         false
-//     }
-//
-//     /*******************************************************************************************************/
-//     /*****************************************工具函数********************************************************/
-//     /*******************************************************************************************************/
-//     //订阅枚举解析
-//     pub fn enum_to_string(symbol: String, subscribe_type: CoinexSwapSubscribeType, _login_param: Option<CoinexSwapLogin>) -> Value {
-//         // let access_key;
-//         // let secret_key;
-//         // match login_param {
-//         //     None => {
-//         //         access_key = "".to_string();
-//         //         secret_key = "".to_string();
-//         //     }
-//         //     Some(param) => {
-//         //         access_key = param.api_key.clone();
-//         //         secret_key = param.secret.clone();
-//         //     }
-//         // }
-//
-//         match subscribe_type {
-//             CoinexSwapSubscribeType::PuFuturesDepth => {
-//                 json!({
-//                     "method": "depth.subscribe",
-//                     "params": {
-//                         "market_list": [
-//                             [symbol, 50, "0.000000001", true]
-//                         ]
-//                     },
-//                     "id": 1
-//                 })
-//             }
-//             CoinexSwapSubscribeType::PuFuturesDeals => {
-//                 json!({
-//                     "method": "deals.subscribe",
-//                     "params": {"market_list": [symbol]},
-//                     "id": 1
-//                 })
-//             }
-//
-//             CoinexSwapSubscribeType::PrFuturesOrders => {
-//                 json!({
-//                   "method": "order.subscribe",
-//                   "params": {"market_list": [symbol]},
-//                   "id": 1
-//                 })
-//             }
-//             CoinexSwapSubscribeType::PrFuturesPositions => {
-//                 json!({
-//                   "method": "position.subscribe",
-//                   "params": {"market_list": [symbol]},
-//                   "id": 1
-//                 })
-//             }
-//             CoinexSwapSubscribeType::PrFuturesBalances => {
-//                 json!({
-//                     "method": "balance.subscribe",
-//                     "params": {"ccy_list": ["USDT"]}, // 目前只用u 所以写死
-//                     "id": 1
-//                 })
-//             }
-//         }
-//     }
-//     //订阅信息生成
-//     pub fn get_subscription(&self) -> Vec<Value> {
-//         let mut args = vec![];
-//         // 只获取第一个
-//         let symbol = self.symbol_s.get(0).unwrap().replace("_", "").to_uppercase();
-//
-//         for subscribe_type in &self.subscribe_types {
-//             let ty_str = Self::enum_to_string(symbol.clone(),
-//                                               subscribe_type.clone(),
-//                                               self.login_param.clone(),
-//             );
-//             args.push(ty_str);
-//         }
-//         args
-//     }
-//
-//     /*******************************************************************************************************/
-//     /*****************************************socket基本*****************************************************/
-//     /*******************************************************************************************************/
-//     //链接
-//     pub async fn ws_connect_async<F, Future>(&mut self,
-//                                              is_shutdown_arc: Arc<AtomicBool>,
-//                                              handle_function: F,
-//                                              write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
-//                                              write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
-//         where
-//             F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
-//             Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
-//     {
-//         let login_is = self.contains_pr();
-//         let login_param_clone = self.login_param.clone();
-//         let subscription = self.get_subscription();
-//         let address_url = self.address_url.clone();
-//         let label = self.label.clone();
-//         let heartbeat_time = self.heartbeat_time.clone();
-//
-//
-//         //心跳-- 方法内部线程启动
-//         let write_tx_clone1 = Arc::clone(write_tx_am);
-//         let write_tx_clone2 = Arc::clone(write_tx_am);
-//         tokio::spawn(async move {
-//             trace!("线程-异步心跳-开始");
-//             let ping_str = json!({
-//                 "method": "server.ping",
-//                 "params": {},
-//                 "id": 1
-//             });
-//             AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
-//             trace!("线程-异步心跳-结束");
-//         });
-//
-//         //设置订阅
-//         let mut subscribe_array = vec![];
-//
-//
-//         for s in subscription {
-//             subscribe_array.push(s.to_string());
-//         }
-//
-//         //链接
-//         let t2 = tokio::spawn(async move {
-//             let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
-//
-//             info!("启动连接");
-//             loop {
-//                 info!("coinex_usdt_swap socket 连接中……");
-//                 // 需要登录
-//                 if login_is {
-//                     let login_param = login_param_clone.clone().unwrap();
-//                     let mut login_data = LOGIN_DATA.lock().await;
-//                     login_data.0 = true;
-//                     let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
-//                     //登录相关
-//                     let prepared_str = format!("{}{}", time, login_param.secret);
-//                     // 创建SHA256哈希器实例
-//                     let mut hasher = Sha256::new();
-//                     // 加密字符串
-//                     hasher.update(prepared_str);
-//                     // 计算哈希值
-//                     let result = hasher.finalize();
-//                     // 将哈希值转换为十六进制小写字符串
-//                     let hex_str = encode(result).to_lowercase();
-//
-//                     let login_param = json!({
-//                         "method": "server.sign",
-//                         "params": {
-//                             "access_id": login_param.api_key,
-//                             "signed_str": hex_str,
-//                             "timestamp": time
-//                         },
-//                         "id": 1
-//                     });
-//                     let login_str = login_param.to_string();
-//                     info!("发起ws登录: {}", login_str);
-//                     let write_tx_c = Arc::clone(&write_tx_clone2);
-//                     AbstractWsMode::send_subscribe(write_tx_c, Message::Text(login_str)).await;
-//                 } else {
-//                     info!("coinex 不需登录");
-//                 }
-//
-//                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-//                                                  login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-//                                                  Self::message_text_sync, Self::message_ping, Self::message_pong, Self::message_binary_sync).await;
-//                 let mut login_data = LOGIN_DATA.lock().await;
-//                 // 断联后 设置为没有登录
-//                 login_data.1 = false;
-//                 info!("coinex_usdt_swap socket 断连,1s以后重连……");
-//                 error!("coinex_usdt_swap socket 断连,1s以后重连……");
-//                 tokio::time::sleep(Duration::from_secs(1)).await;
-//             }
-//         });
-//         tokio::try_join!(t2).unwrap();
-//         trace!("线程-心跳与链接-结束");
-//
-//         Ok(())
-//     }
-//     /*******************************************************************************************************/
-//     /*****************************************数据解析*****************************************************/
-//     /*******************************************************************************************************/
-//     //数据解析-Text
-//     pub async fn message_text(text: String) -> Option<ResponseData> {
-//         let response_data = Self::ok_text(text).await;
-//         Option::from(response_data)
-//     }
-//     pub fn message_text_sync(text: String) -> Option<ResponseData> {
-//         // 使用 tokio::task::block_in_place 来等待异步函数的结果
-//         task::block_in_place(|| {
-//             tokio::runtime::Handle::current().block_on(Self::message_text(text))
-//         })
-//     }
-//     //数据解析-ping
-//     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-//         return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
-//     }
-//     //数据解析-pong
-//     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-//         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
-//     }
-//     //数据解析-二进制
-//     pub async fn message_binary(binary: Vec<u8>) -> Option<ResponseData> {
-//         //二进制WebSocket消息
-//         let message_str = Self::parse_zip_data(binary);
-//         let response_data = Self::ok_text(message_str).await;
-//         Option::from(response_data)
-//     }
-//     pub fn message_binary_sync(binary: Vec<u8>) -> Option<ResponseData> {
-//         // 使用 tokio::task::block_in_place 来等待异步函数的结果
-//         task::block_in_place(|| {
-//             tokio::runtime::Handle::current().block_on(Self::message_binary(binary))
-//         })
-//     }
-//     //数据解析
-//     pub async fn ok_text(text: String) -> ResponseData
-//     {
-//         // trace!("原始数据:{}", text);
-//         let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
-//         let json_value: Value = serde_json::from_str(&text).unwrap();
-//
-//         let obj = json_value["method"].as_str();
-//         match obj {
-//             Some(v)=> {
-//                 res_data.channel = format!("{}", v);
-//                 res_data.code = 200;
-//                 res_data.data = json_value["data"].clone();
-//             },
-//             None => {
-//                 // 认证的响应没有method,只能通过id和code判断
-//                 match json_value["id"].as_i64() {
-//                     Some(1) => {
-//                         match json_value["code"].as_i64() {
-//                             Some(0) =>{
-//                                 match json_value["data"].as_str() {
-//                                     None => {
-//                                         // 登录成功逻辑处理
-//                                         let mut login_data = LOGIN_DATA.lock().await;
-//                                         if login_data.0 { // 需要登录
-//                                             if !login_data.1{
-//                                                 login_data.1 = true;
-//                                                 res_data.channel = "server.sign".to_string();
-//                                                 res_data.code = -200;
-//                                             }else {
-//                                                 res_data.code = 400;
-//                                             }
-//                                         }  else { // 不需要登录
-//                                             res_data.code = 200;
-//                                         }
-//                                     }
-//                                     _ =>{
-//                                         res_data.code = 400;
-//                                     }
-//                                 }
-//                             }
-//                             _ => {
-//                                 res_data.code = 400;
-//                             }
-//                         }
-//                     }
-//                     _ => {
-//                         res_data.code = 400;
-//                     }
-//                 }
-//                 res_data.data = json_value;
-//             }
-//         }
-//         res_data
-//     }
-//
-//     fn parse_zip_data(p0: Vec<u8>) -> String{
-//         // 创建一个GzDecoder的实例,将压缩数据作为输入
-//         let mut decoder = GzDecoder::new(&p0[..]);
-//
-//         // 创建一个缓冲区来存放解压缩后的数据
-//         let mut decompressed_data = Vec::new();
-//
-//         // 读取解压缩的数据到缓冲区中
-//         decoder.read_to_end(&mut decompressed_data).expect("解压缩失败");
-//         let result = from_utf8(&decompressed_data)
-//             .expect("解压缩后的数据不是有效的UTF-8");
-//
-//         // info!("解压缩数据 {:?}", result);
-//         result.to_string()
-//     }
-// }
-//
+use std::io::Read;
+use std::str::from_utf8;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+
+use flate2::bufread::GzDecoder;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+
+use once_cell::sync::Lazy;  // 使用线程安全的版本
+use hex::encode;
+use serde_json::{json, Value};
+use sha2::{Digest, Sha256};
+use tokio::sync::Mutex;
+use tokio::task;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace};
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+pub(crate) static LOGIN_DATA: Lazy<Mutex<(bool, bool)>> = Lazy::new(|| {
+    println!("初始化...");
+    // 0: 需要登录, 1:是否已经登录
+    Mutex::new((false, false))
+});
+
+//订阅频道
+#[derive(Clone)]
+pub enum CoinexSwapSubscribeType {
+    // 深度
+    PuFuturesDepth,
+    // 公开成交
+    PuFuturesDeals,
+
+    // 订单
+    PrFuturesOrders,
+    // 仓位
+    PrFuturesPositions,
+    // 余额
+    PrFuturesBalances,
+}
+
+//账号信息
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct CoinexSwapLogin {
+    pub api_key: String,
+    pub secret: String,
+}
+
+#[derive(Clone)]
+pub struct CoinexSwapWs {
+    //类型
+    label: String,
+    //地址
+    address_url: String,
+    //账号信息
+    login_param: Option<CoinexSwapLogin>,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<CoinexSwapSubscribeType>,
+    //心跳间隔
+    heartbeat_time: u64
+}
+
+
+impl CoinexSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************实例化一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(login_param: Option<CoinexSwapLogin>) -> CoinexSwapWs {
+        return CoinexSwapWs::new_label("default-CoinexSwapWs".to_string(), login_param);
+    }
+
+    pub fn new_label(label: String, login_param: Option<CoinexSwapLogin>) -> CoinexSwapWs
+    {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = "wss://socket.coinex.com/v2/futures".to_string();
+        info!("走普通通道(不支持colo通道):{}", address_url);
+        CoinexSwapWs {
+            label,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 10
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<CoinexSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("-", "_");
+        }
+        self.symbol_s = b_array;
+    }
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                CoinexSwapSubscribeType::PuFuturesDepth => false,
+                CoinexSwapSubscribeType::PuFuturesDeals => false,
+
+                CoinexSwapSubscribeType::PrFuturesOrders => true,
+                CoinexSwapSubscribeType::PrFuturesPositions => true,
+                CoinexSwapSubscribeType::PrFuturesBalances => true,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: CoinexSwapSubscribeType, _login_param: Option<CoinexSwapLogin>) -> Value {
+        // let access_key;
+        // let secret_key;
+        // match login_param {
+        //     None => {
+        //         access_key = "".to_string();
+        //         secret_key = "".to_string();
+        //     }
+        //     Some(param) => {
+        //         access_key = param.api_key.clone();
+        //         secret_key = param.secret.clone();
+        //     }
+        // }
+
+        match subscribe_type {
+            CoinexSwapSubscribeType::PuFuturesDepth => {
+                json!({
+                    "method": "depth.subscribe",
+                    "params": {
+                        "market_list": [
+                            [symbol, 50, "0.000000001", true]
+                        ]
+                    },
+                    "id": 1
+                })
+            }
+            CoinexSwapSubscribeType::PuFuturesDeals => {
+                json!({
+                    "method": "deals.subscribe",
+                    "params": {"market_list": [symbol]},
+                    "id": 1
+                })
+            }
+
+            CoinexSwapSubscribeType::PrFuturesOrders => {
+                json!({
+                  "method": "order.subscribe",
+                  "params": {"market_list": [symbol]},
+                  "id": 1
+                })
+            }
+            CoinexSwapSubscribeType::PrFuturesPositions => {
+                json!({
+                  "method": "position.subscribe",
+                  "params": {"market_list": [symbol]},
+                  "id": 1
+                })
+            }
+            CoinexSwapSubscribeType::PrFuturesBalances => {
+                json!({
+                    "method": "balance.subscribe",
+                    "params": {"ccy_list": ["USDT"]}, // 目前只用u 所以写死
+                    "id": 1
+                })
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> Vec<Value> {
+        let mut args = vec![];
+        // 只获取第一个
+        let symbol = self.symbol_s.get(0).unwrap().replace("_", "").to_uppercase();
+
+        for subscribe_type in &self.subscribe_types {
+            let ty_str = Self::enum_to_string(symbol.clone(),
+                                              subscribe_type.clone(),
+                                              self.login_param.clone(),
+            );
+            args.push(ty_str);
+        }
+        args
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                             is_shutdown_arc: Arc<AtomicBool>,
+                                             handle_function: F,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let login_param_clone = self.login_param.clone();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let label = self.label.clone();
+        let heartbeat_time = self.heartbeat_time.clone();
+
+
+        //心跳-- 方法内部线程启动
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        let write_tx_clone2 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            trace!("线程-异步心跳-开始");
+            let ping_str = json!({
+                "method": "server.ping",
+                "params": {},
+                "id": 1
+            });
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
+            trace!("线程-异步心跳-结束");
+        });
+
+        //设置订阅
+        let mut subscribe_array = vec![];
+
+
+        for s in subscription {
+            subscribe_array.push(s.to_string());
+        }
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            info!("启动连接");
+            loop {
+                info!("coinex_usdt_swap socket 连接中……");
+                // 需要登录
+                if login_is {
+                    let login_param = login_param_clone.clone().unwrap();
+                    let mut login_data = LOGIN_DATA.lock().await;
+                    login_data.0 = true;
+                    let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
+                    //登录相关
+                    let prepared_str = format!("{}{}", time, login_param.secret);
+                    // 创建SHA256哈希器实例
+                    let mut hasher = Sha256::new();
+                    // 加密字符串
+                    hasher.update(prepared_str);
+                    // 计算哈希值
+                    let result = hasher.finalize();
+                    // 将哈希值转换为十六进制小写字符串
+                    let hex_str = encode(result).to_lowercase();
+
+                    let login_param = json!({
+                        "method": "server.sign",
+                        "params": {
+                            "access_id": login_param.api_key,
+                            "signed_str": hex_str,
+                            "timestamp": time
+                        },
+                        "id": 1
+                    });
+                    let login_str = login_param.to_string();
+                    info!("发起ws登录: {}", login_str);
+                    let write_tx_c = Arc::clone(&write_tx_clone2);
+                    AbstractWsMode::send_subscribe(write_tx_c, Message::Text(login_str)).await;
+                } else {
+                    info!("coinex 不需登录");
+                }
+
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text_sync, Self::message_ping, Self::message_pong, Self::message_binary_sync).await;
+                let mut login_data = LOGIN_DATA.lock().await;
+                // 断联后 设置为没有登录
+                login_data.1 = false;
+                info!("coinex_usdt_swap socket 断连,1s以后重连……");
+                error!("coinex_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub async fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text).await;
+        Option::from(response_data)
+    }
+    pub fn message_text_sync(text: String) -> Option<ResponseData> {
+        // 使用 tokio::task::block_in_place 来等待异步函数的结果
+        task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(Self::message_text(text))
+        })
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
+    }
+    //数据解析-二进制
+    pub async fn message_binary(binary: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = Self::parse_zip_data(binary);
+        let response_data = Self::ok_text(message_str).await;
+        Option::from(response_data)
+    }
+    pub fn message_binary_sync(binary: Vec<u8>) -> Option<ResponseData> {
+        // 使用 tokio::task::block_in_place 来等待异步函数的结果
+        task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(Self::message_binary(binary))
+        })
+    }
+    //数据解析
+    pub async fn ok_text(text: String) -> ResponseData
+    {
+        // trace!("原始数据:{}", text);
+        let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
+        let json_value: Value = serde_json::from_str(&text).unwrap();
+
+        let obj = json_value["method"].as_str();
+        match obj {
+            Some(v)=> {
+                res_data.channel = format!("{}", v);
+                res_data.code = 200;
+                res_data.data = json_value["data"].clone();
+            },
+            None => {
+                // 认证的响应没有method,只能通过id和code判断
+                match json_value["id"].as_i64() {
+                    Some(1) => {
+                        match json_value["code"].as_i64() {
+                            Some(0) =>{
+                                match json_value["data"].as_str() {
+                                    None => {
+                                        // 登录成功逻辑处理
+                                        let mut login_data = LOGIN_DATA.lock().await;
+                                        if login_data.0 { // 需要登录
+                                            if !login_data.1{
+                                                login_data.1 = true;
+                                                res_data.channel = "server.sign".to_string();
+                                                res_data.code = -200;
+                                            }else {
+                                                res_data.code = 400;
+                                            }
+                                        }  else { // 不需要登录
+                                            res_data.code = 200;
+                                        }
+                                    }
+                                    _ =>{
+                                        res_data.code = 400;
+                                    }
+                                }
+                            }
+                            _ => {
+                                res_data.code = 400;
+                            }
+                        }
+                    }
+                    _ => {
+                        res_data.code = 400;
+                    }
+                }
+                res_data.data = json_value;
+            }
+        }
+        res_data
+    }
+
+    fn parse_zip_data(p0: Vec<u8>) -> String{
+        // 创建一个GzDecoder的实例,将压缩数据作为输入
+        let mut decoder = GzDecoder::new(&p0[..]);
+
+        // 创建一个缓冲区来存放解压缩后的数据
+        let mut decompressed_data = Vec::new();
+
+        // 读取解压缩的数据到缓冲区中
+        decoder.read_to_end(&mut decompressed_data).expect("解压缩失败");
+        let result = from_utf8(&decompressed_data)
+            .expect("解压缩后的数据不是有效的UTF-8");
+
+        // info!("解压缩数据 {:?}", result);
+        result.to_string()
+    }
+}
+

+ 7 - 1
exchanges/src/gate_swap_ws.rs

@@ -303,7 +303,7 @@ impl GateSwapWs {
 
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 error!("gate_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
@@ -330,6 +330,12 @@ impl GateSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData
     {

+ 7 - 1
exchanges/src/kucoin_swap_ws.rs

@@ -330,7 +330,7 @@ impl KucoinSwapWs {
                 info!("kucoin_usdt_swap socket 连接中……");
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
                                                  false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 error!("kucoin_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
@@ -358,6 +358,12 @@ impl KucoinSwapWs {
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData
     {

+ 40 - 36
exchanges/src/socket_tool.rs

@@ -30,20 +30,22 @@ pub enum HeartbeatType {
 pub struct AbstractWsMode {}
 
 impl AbstractWsMode {
-    pub async fn ws_connected<T, PI, PO, F, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
-                                                    is_first_login: bool,
-                                                    tag: String,
-                                                    is_shutdown_arc: Arc<AtomicBool>,
-                                                    handle_function: &F,
-                                                    subscribe_array: Vec<String>,
-                                                    ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
-                                                    message_text: T,
-                                                    message_ping: PI,
-                                                    message_pong: PO)
+    pub async fn ws_connected<T, PI, PO, F, B, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
+                                                       is_first_login: bool,
+                                                       tag: String,
+                                                       is_shutdown_arc: Arc<AtomicBool>,
+                                                       handle_function: &F,
+                                                       subscribe_array: Vec<String>,
+                                                       ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
+                                                       message_text: T,
+                                                       message_ping: PI,
+                                                       message_pong: PO,
+                                                       message_binary: B)
         where T: Fn(String) -> Option<ResponseData> + Copy,
               PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               F: Fn(ResponseData) -> Future + Clone,
+              B: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               Future: future::Future<Output=()> + Send + 'static,
     {
         let (ws_write, mut ws_read) = ws_stream.split();
@@ -79,7 +81,7 @@ impl AbstractWsMode {
                     continue;
                 }
 
-                let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
+                let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong, message_binary);
                 // let response_data = func(message);
                 if response_data.is_some() {
                     let mut data = response_data.unwrap();
@@ -90,6 +92,7 @@ impl AbstractWsMode {
                     if code == 200 {
                         let mut data_c = data.clone();
                         data_c.ins = Instant::now();
+                        data_c.time = Utc::now().timestamp_millis();
 
                         handle_function(data_c).await;
                     }
@@ -111,6 +114,7 @@ impl AbstractWsMode {
                                 let mut write_lock = ws_write_arc.lock().await;
                                 write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
                             }
+                            info!("订阅完成!");
                         }
                         -201 => {
                             //订阅成功
@@ -156,19 +160,21 @@ impl AbstractWsMode {
     }
 
     //创建链接
-    pub async fn ws_connect_async<T, PI, PO, F, Future>(is_shutdown_arc: Arc<AtomicBool>,
-                                                        handle_function: F,
-                                                        address_url: String,
-                                                        is_first_login: bool,
-                                                        tag: String,
-                                                        subscribe_array: Vec<String>,
-                                                        write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
-                                                        message_text: T,
-                                                        message_ping: PI,
-                                                        message_pong: PO)
+    pub async fn ws_connect_async<T, PI, PO, F, B, Future>(is_shutdown_arc: Arc<AtomicBool>,
+                                                           handle_function: F,
+                                                           address_url: String,
+                                                           is_first_login: bool,
+                                                           label: String,
+                                                           subscribe_array: Vec<String>,
+                                                           write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
+                                                           message_text: T,
+                                                           message_ping: PI,
+                                                           message_pong: PO,
+                                                           message_binary: B)
         where T: Fn(String) -> Option<ResponseData> + Copy,
               PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
+              B: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
               F: Fn(ResponseData) -> Future + Clone,
               Future: future::Future<Output=()> + Send + 'static,
     {
@@ -191,14 +197,15 @@ impl AbstractWsMode {
 
                 Self::ws_connected(write_to_socket_rx_arc,
                                    is_first_login,
-                                   tag,
+                                   label,
                                    is_shutdown_arc,
                                    &handle_function,
                                    subscribe_array.clone(),
                                    ws_stream,
                                    message_text,
                                    message_ping,
-                                   message_pong).await;
+                                   message_pong,
+                                   message_binary).await;
             }
             Err(e) => {
                 error!("WebSocket 握手失败:{:?}", e);
@@ -209,6 +216,7 @@ impl AbstractWsMode {
     //心跳包
     pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
         loop {
+            tokio::time::sleep(Duration::from_millis(millis)).await;
             let write_tx_clone = write_tx_clone.lock().await;
             write_tx_clone.unbounded_send(
                 match h_type {
@@ -224,28 +232,24 @@ impl AbstractWsMode {
                 }
             ).expect("发送失败");
             trace!("发送指令-心跳:{:?}",h_type);
-            tokio::time::sleep(Duration::from_millis(millis)).await;
         }
     }
     //数据解析
-    pub fn analysis_message<T, PI, PO>(message: Result<Message, Error>,
-                                       message_text: T,
-                                       message_ping: PI,
-                                       message_pong: PO) -> Option<ResponseData>
+    pub fn analysis_message<T, PI, PO, B>(message: Result<Message, Error>,
+                                          message_text: T,
+                                          message_ping: PI,
+                                          message_pong: PO,
+                                          message_binary: B) -> Option<ResponseData>
         where T: Fn(String) -> Option<ResponseData>,
               PI: Fn(Vec<u8>) -> Option<ResponseData>,
-              PO: Fn(Vec<u8>) -> Option<ResponseData>
+              PO: Fn(Vec<u8>) -> Option<ResponseData>,
+              B: Fn(Vec<u8>) -> Option<ResponseData>
     {
         match message {
             Ok(Message::Text(text)) => message_text(text),
             Ok(Message::Ping(pi)) => message_ping(pi),
             Ok(Message::Pong(po)) => message_pong(po),
-            Ok(Message::Binary(s)) => {
-                //二进制WebSocket消息
-                let message_str = format!("Binary:{:?}", s);
-                trace!("{:?}",message_str);
-                Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
-            }
+            Ok(Message::Binary(s)) => message_binary(s), //二进制WebSocket消息
             Ok(Message::Close(c)) => {
                 let message_str = format!("关闭指令:{:?}", c);
                 trace!("{:?}",message_str);
@@ -259,7 +263,7 @@ impl AbstractWsMode {
             }
             Err(e) => {
                 let message_str = format!("服务器响应:{:?}", e);
-                error!("socket断连:{}", message_str);
+                trace!("{:?}",message_str);
                 Option::from(ResponseData::new("".to_string(), -1, message_str, Value::Null))
             }
         }

+ 1 - 0
src/server.rs

@@ -171,6 +171,7 @@ async fn get_exchanges() -> impl Responder {
         "gate_usdt_swap",
         "bitget_usdt_swap",
         "binance_usdt_swap",
+        "coinex_usdt_swap"
     ];
     let response_data = json!(exchanges);