Kaynağa Gözat

修改coinex订阅,修改coinex获取币对逻辑

DESKTOP-NE65RNK\Citrus_limon 1 yıl önce
ebeveyn
işleme
588a9fb07e

+ 140 - 230
exchanges/src/coinex_swap_ws.rs

@@ -1,28 +1,33 @@
 use std::io::Read;
-use std::str::from_utf8;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
-use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use std::time::Duration;
 
-use flate2::bufread::GzDecoder;
+use flate2::read::GzDecoder;
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
-
-use once_cell::sync::Lazy;  // 使用线程安全的版本
-use hex::encode;
-use serde_json::{json, Value};
-use sha2::{Digest, Sha256};
+use serde_json::json;
+use serde_json::Value;
 use tokio::sync::Mutex;
-use tokio::task;
 use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{error, info, trace};
+
 use crate::response_base::ResponseData;
 use crate::socket_tool::{AbstractWsMode, HeartbeatType};
 
-pub(crate) static LOGIN_DATA: Lazy<Mutex<(bool, bool)>> = Lazy::new(|| {
-    println!("初始化...");
-    // 0: 需要登录, 1:是否已经登录
-    Mutex::new((false, false))
-});
+//类型
+pub enum CoinexSwapWsType {
+    PublicAndPrivate,
+}
+
+#[derive(Debug)]
+#[derive(Clone)]
+pub struct CoinexSwapWsParam {
+    pub token: String,
+    pub ws_url: String,
+    pub ws_ping_interval: i64,
+    pub ws_ping_timeout: i64,
+    pub is_ok_subscribe: bool,
+}
 
 //订阅频道
 #[derive(Clone)]
@@ -31,31 +36,27 @@ pub enum CoinexSwapSubscribeType {
     PuFuturesDepth,
     // 公开成交
     PuFuturesDeals,
-
-    // 订单
-    PrFuturesOrders,
-    // 仓位
-    PrFuturesPositions,
-    // 余额
-    PrFuturesBalances,
 }
 
 //账号信息
-#[derive(Clone)]
-#[allow(dead_code)]
+#[derive(Clone, Debug)]
 pub struct CoinexSwapLogin {
-    pub api_key: String,
-    pub secret: String,
+    pub access_key: String,
+    pub secret_key: String,
+    pub pass_key: String,
 }
 
 #[derive(Clone)]
+#[allow(dead_code)]
 pub struct CoinexSwapWs {
     //类型
     tag: String,
     //地址
     address_url: String,
-    //账号信息
+    //账号
     login_param: Option<CoinexSwapLogin>,
+    //登录数据
+    ws_param: CoinexSwapWsParam,
     //币对
     symbol_s: Vec<String>,
     //订阅
@@ -64,27 +65,46 @@ pub struct CoinexSwapWs {
     heartbeat_time: u64,
 }
 
-
 impl CoinexSwapWs {
     /*******************************************************************************************************/
-    /*****************************************实例化一个对象****************************************************/
+    /*****************************************获取一个对象****************************************************/
     /*******************************************************************************************************/
-    pub fn new(login_param: Option<CoinexSwapLogin>) -> CoinexSwapWs {
-        return CoinexSwapWs::new_with_tag("default-CoinexSwapWs".to_string(), login_param);
+    pub fn new(is_colo: bool, login_param: Option<CoinexSwapLogin>, ws_type: CoinexSwapWsType) -> CoinexSwapWs {
+        return Self::new_with_tag("default-CoinexSwapWs".to_string(), is_colo, login_param, ws_type);
     }
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<CoinexSwapLogin>, ws_type: CoinexSwapWsType) -> CoinexSwapWs {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            CoinexSwapWsType::PublicAndPrivate => {
+                let url = "wss://socket.coinex.com/v2/futures".to_string();
+                info!("走普通通道(不支持colo通道):{}", url);
+                url
+            }
+        };
 
-    pub fn new_with_tag(tag: String, login_param: Option<CoinexSwapLogin>) -> CoinexSwapWs
-    {
         /*******公共频道-私有频道数据组装*/
-        let address_url = "wss://socket.coinex.com/v2/futures".to_string();
-        info!("走普通通道(不支持colo通道):{}", address_url);
+        let ws_param = CoinexSwapWsParam {
+            token: "".to_string(),
+            ws_url: "".to_string(),
+            ws_ping_interval: 15 * 1000,
+            ws_ping_timeout: 0,
+            is_ok_subscribe: false,
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",address_url);
+        } else {
+            info!("走普通通道:{}",address_url);
+        }
+
         CoinexSwapWs {
             tag,
             address_url,
             login_param,
+            ws_param,
             symbol_s: vec![],
             subscribe_types: vec![],
-            heartbeat_time: 1000 * 10,
+            heartbeat_time: 1000 * 18,
         }
     }
 
@@ -99,47 +119,28 @@ impl CoinexSwapWs {
     pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
         for symbol in b_array.iter_mut() {
             // 大写
-            *symbol = symbol.to_uppercase();
+            *symbol = symbol.to_string();
             // 字符串替换
-            *symbol = symbol.replace("-", "_");
+            *symbol = symbol.replace("_", "").to_uppercase();
         }
         self.symbol_s = b_array;
     }
-    //频道是否需要登录
     fn contains_pr(&self) -> bool {
         for t in self.subscribe_types.clone() {
             if match t {
                 CoinexSwapSubscribeType::PuFuturesDepth => false,
                 CoinexSwapSubscribeType::PuFuturesDeals => false,
-
-                CoinexSwapSubscribeType::PrFuturesOrders => true,
-                CoinexSwapSubscribeType::PrFuturesPositions => true,
-                CoinexSwapSubscribeType::PrFuturesBalances => true,
             } {
                 return true;
             }
         }
         false
     }
-
     /*******************************************************************************************************/
     /*****************************************工具函数********************************************************/
     /*******************************************************************************************************/
     //订阅枚举解析
-    pub fn enum_to_string(symbol: String, subscribe_type: CoinexSwapSubscribeType, _login_param: Option<CoinexSwapLogin>) -> Value {
-        // let access_key;
-        // let secret_key;
-        // match login_param {
-        //     None => {
-        //         access_key = "".to_string();
-        //         secret_key = "".to_string();
-        //     }
-        //     Some(param) => {
-        //         access_key = param.api_key.clone();
-        //         secret_key = param.secret.clone();
-        //     }
-        // }
-
+    pub fn enum_to_string(symbol: String, subscribe_type: CoinexSwapSubscribeType) -> Value {
         match subscribe_type {
             CoinexSwapSubscribeType::PuFuturesDepth => {
                 json!({
@@ -159,48 +160,19 @@ impl CoinexSwapWs {
                     "id": 1
                 })
             }
-
-            CoinexSwapSubscribeType::PrFuturesOrders => {
-                json!({
-                  "method": "order.subscribe",
-                  "params": {"market_list": [symbol]},
-                  "id": 1
-                })
-            }
-            CoinexSwapSubscribeType::PrFuturesPositions => {
-                json!({
-                  "method": "position.subscribe",
-                  "params": {"market_list": [symbol]},
-                  "id": 1
-                })
-            }
-            CoinexSwapSubscribeType::PrFuturesBalances => {
-                json!({
-                    "method": "balance.subscribe",
-                    "params": {"ccy_list": ["USDT"]}, // 目前只用u 所以写死
-                    "id": 1
-                })
-            }
         }
     }
     //订阅信息生成
-    pub fn get_subscription(&self) -> Vec<Value> {
-        let mut args = vec![];
-        // 只获取第一个
+    pub fn get_subscription(&self) -> Vec<String> {
+        let mut array = vec![];
         for symbol in &self.symbol_s {
-            let symbol_final = symbol.replace("_", "").to_uppercase();
-
             for subscribe_type in &self.subscribe_types {
-                let ty_str = Self::enum_to_string(symbol_final.clone(),
-                                                  subscribe_type.clone(),
-                                                  self.login_param.clone(),
-                );
-                args.push(ty_str);
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                array.push(ty_str.to_string());
             }
         }
-        args
+        array
     }
-
     /*******************************************************************************************************/
     /*****************************************socket基本*****************************************************/
     /*******************************************************************************************************/
@@ -210,92 +182,52 @@ impl CoinexSwapWs {
                                              handle_function: F,
                                              write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
                                              write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
-        where
-            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
-            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    where
+        F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+        Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
     {
         let login_is = self.contains_pr();
-        let login_param_clone = self.login_param.clone();
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let tag = self.tag.clone();
-        let heartbeat_time = self.heartbeat_time.clone();
-
+        let heartbeat_time = self.ws_param.ws_ping_interval.clone();
 
-        //心跳-- 方法内部线程启动
-        let write_tx_clone1 = Arc::clone(write_tx_am);
-        let write_tx_clone2 = Arc::clone(write_tx_am);
+        // 心跳-- 方法内部线程启动
+        let write_tx_clone1 = write_tx_am.clone();
         tokio::spawn(async move {
             trace!("线程-异步心跳-开始");
-            let ping_str = json!({
+            let info = json!({
                 "method": "server.ping",
                 "params": {},
                 "id": 1
             });
-            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(info.to_string()), heartbeat_time as u64).await;
             trace!("线程-异步心跳-结束");
         });
 
-        //设置订阅
-        let mut subscribe_array = vec![];
-
 
-        for s in subscription {
-            subscribe_array.push(s.to_string());
+        //设置订阅
+        let subscribe_array = subscription.clone();
+        if login_is {
+            //登录相关
         }
 
-        //链接
+        //1 链接
         let t2 = tokio::spawn(async move {
             let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
 
-            info!("启动连接");
+            println!("-----------------");
             loop {
-                info!("coinex_usdt_swap socket 连接中……");
-                // 需要登录
-                if login_is {
-                    let login_param = login_param_clone.clone().unwrap();
-                    let mut login_data = LOGIN_DATA.lock().await;
-                    login_data.0 = true;
-                    let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
-                    //登录相关
-                    let prepared_str = format!("{}{}", time, login_param.secret);
-                    // 创建SHA256哈希器实例
-                    let mut hasher = Sha256::new();
-                    // 加密字符串
-                    hasher.update(prepared_str);
-                    // 计算哈希值
-                    let result = hasher.finalize();
-                    // 将哈希值转换为十六进制小写字符串
-                    let hex_str = encode(result).to_lowercase();
-
-                    let login_param = json!({
-                        "method": "server.sign",
-                        "params": {
-                            "access_id": login_param.api_key,
-                            "signed_str": hex_str,
-                            "timestamp": time
-                        },
-                        "id": 1
-                    });
-                    let login_str = login_param.to_string();
-                    info!("发起ws登录: {}", login_str);
-                    let write_tx_c = Arc::clone(&write_tx_clone2);
-                    AbstractWsMode::send_subscribe(write_tx_c, Message::Text(login_str)).await;
-                } else {
-                    info!("coinex 不需登录");
-                }
-
+                info!("Coinex_usdt_swap socket 连接中……");
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 login_is, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text_sync, Self::message_ping, Self::message_pong, Self::message_binary_sync).await;
-                let mut login_data = LOGIN_DATA.lock().await;
-                // 断联后 设置为没有登录
-                login_data.1 = false;
-                info!("coinex_usdt_swap socket 断连,1s以后重连……");
-                error!("coinex_usdt_swap socket 断连,1s以后重连……");
+                                                 false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
+
+                error!("Coinex_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
             }
         });
+
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");
 
@@ -305,16 +237,10 @@ impl CoinexSwapWs {
     /*****************************************数据解析*****************************************************/
     /*******************************************************************************************************/
     //数据解析-Text
-    pub async fn message_text(text: String) -> Option<ResponseData> {
-        let response_data = Self::ok_text(text).await;
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
-    pub fn message_text_sync(text: String) -> Option<ResponseData> {
-        // 使用 tokio::task::block_in_place 来等待异步函数的结果
-        task::block_in_place(|| {
-            tokio::runtime::Handle::current().block_on(Self::message_text(text))
-        })
-    }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
         return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
@@ -324,88 +250,72 @@ impl CoinexSwapWs {
         return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
     }
     //数据解析-二进制
-    pub async fn message_binary(binary: Vec<u8>) -> Option<ResponseData> {
+    pub fn message_binary(po: Vec<u8>) -> Option<ResponseData> {
         //二进制WebSocket消息
-        let message_str = Self::parse_zip_data(binary);
-        let response_data = Self::ok_text(message_str).await;
-        Option::from(response_data)
-    }
-    pub fn message_binary_sync(binary: Vec<u8>) -> Option<ResponseData> {
-        // 使用 tokio::task::block_in_place 来等待异步函数的结果
-        task::block_in_place(|| {
-            tokio::runtime::Handle::current().block_on(Self::message_binary(binary))
-        })
+        // let message_str = format!("Binary:{:?}", _po);
+        // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+        // let result = String::from_utf8(bytes);
+        // let result = String::from_utf8(po);
+
+        let mut gz_decoder = GzDecoder::new(&po[..]);
+        let mut decompressed_data = Vec::new();
+
+        // 尝试解压数据
+        if let Ok(_) = gz_decoder.read_to_end(&mut decompressed_data) {
+            // 将解压后的字节向量转换为 UTF-8 字符串
+            match String::from_utf8(decompressed_data) {
+                Ok(text) => {
+                    let response_data = Self::ok_text(text);
+                    return Option::from(response_data);
+                }
+                Err(_) => {
+                    return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
+                }
+            }
+        } else {
+            return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
+        }
     }
     //数据解析
-    pub async fn ok_text(text: String) -> ResponseData
+    pub fn ok_text(text: String) -> ResponseData
     {
-        // trace!("原始数据:{}", text);
+        // println!("原始数据:{:?}", text);
+
         let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
         let json_value: Value = serde_json::from_str(&text).unwrap();
-
-        let obj = json_value["method"].as_str();
-        match obj {
-            Some(v) => {
-                res_data.channel = format!("{}", v);
-                res_data.code = 200;
-                res_data.data = json_value["data"].clone();
+        res_data.data = json_value.clone();
+        // { "id": "id1", "code": 0, "msg": "" }
+        if json_value["data"]["id"].as_i64() == Option::from(1) {
+            //订阅
+            if json_value["result"].as_str() == Option::from("pong") {
+                res_data.code = -301;
+                res_data.message = "Pong".to_string();
             }
-            None => {
-                // 认证的响应没有method,只能通过id和code判断
-                match json_value["id"].as_i64() {
-                    Some(1) => {
-                        match json_value["code"].as_i64() {
-                            Some(0) => {
-                                match json_value["data"].as_str() {
-                                    None => {
-                                        // 登录成功逻辑处理
-                                        let mut login_data = LOGIN_DATA.lock().await;
-                                        if login_data.0 { // 需要登录
-                                            if !login_data.1 {
-                                                login_data.1 = true;
-                                                res_data.channel = "server.sign".to_string();
-                                                res_data.code = -200;
-                                            } else {
-                                                res_data.code = 400;
-                                            }
-                                        } else { // 不需要登录
-                                            res_data.code = 200;
-                                        }
-                                    }
-                                    _ => {
-                                        res_data.code = 400;
-                                    }
-                                }
-                            }
-                            _ => {
-                                res_data.code = 400;
-                            }
-                        }
-                    }
-                    _ => {
-                        res_data.code = 400;
-                    }
-                }
-                res_data.data = json_value;
+        } else if json_value["data"]["id"].as_i64() == Option::from(999) {
+            //订阅
+            if json_value["result"]["status"].as_str() == Option::from("success") {
+                res_data.code = -201;
+                res_data.message = "订阅成功".to_string();
+            } else {
+                res_data.code = 400;
+                res_data.message = "订阅失败".to_string();
             }
+        } else if json_value["method"].as_str() == Option::from("deals.update") {
+            res_data.code = 200;
+            res_data.data = json_value["data"].clone();
+            res_data.channel = "futures.trades".to_string();
+        } else if json_value["method"].as_str() == Option::from("depth.update") {
+            res_data.code = 200;
+            res_data.data = json_value["data"].clone();
+            res_data.channel = "futures.candlesticks".to_string();
+        } else {
+            res_data.code = -1;
+            res_data.message = "未知解析".to_string();
         }
+
         res_data
     }
+}
 
-    fn parse_zip_data(p0: Vec<u8>) -> String {
-        // 创建一个GzDecoder的实例,将压缩数据作为输入
-        let mut decoder = GzDecoder::new(&p0[..]);
-
-        // 创建一个缓冲区来存放解压缩后的数据
-        let mut decompressed_data = Vec::new();
-
-        // 读取解压缩的数据到缓冲区中
-        decoder.read_to_end(&mut decompressed_data).expect("解压缩失败");
-        let result = from_utf8(&decompressed_data)
-            .expect("解压缩后的数据不是有效的UTF-8");
 
-        // info!("解压缩数据 {:?}", result);
-        result.to_string()
-    }
-}
 

+ 16 - 12
src/coinex_usdt_swap_data_listener.rs

@@ -1,4 +1,3 @@
-
 use std::collections::{BTreeMap, HashMap};
 use std::sync::{Arc};
 use std::sync::atomic::AtomicBool;
@@ -8,7 +7,9 @@ use lazy_static::lazy_static;
 use tokio::sync::{Mutex};
 use tracing::info;
 use exchanges::coinex_swap_rest::CoinexSwapRest;
-use exchanges::coinex_swap_ws::{CoinexSwapSubscribeType, CoinexSwapWs};
+use exchanges::coinex_swap_ws::{CoinexSwapSubscribeType, CoinexSwapWs, CoinexSwapWsType};
+// use exchanges::coinex_swap_ws::{CoinexSwapSubscribeType, CoinexSwapWs};
+use exchanges::phemex_swap_ws::{PhemexSwapWs, PhemexSwapWsType};
 use exchanges::response_base::ResponseData;
 use rust_decimal_macros::dec;
 use standard::exchange::ExchangeEnum;
@@ -31,16 +32,19 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     // 订阅所有币种
     let login = BTreeMap::new();
     let mut coinex_rest = CoinexSwapRest::new(login);
-    let response = coinex_rest.get_market_details("usdt".to_string()).await;
     let mut symbols = vec![];
-    if response.code == 200 {
-        let data = response.data.as_array().unwrap();
-        for info in data {
-            let s = info["market"].as_str().unwrap();
-            if !s.ends_with("USDT") { continue; }
-            let symbol = s.to_string().replace("USDT", "_USDT");
-            symbols.push(symbol)
+    loop {
+        let response = coinex_rest.get_market_details("usdt".to_string()).await;
+        if response.code == 200 {
+            let data = response.data.as_array().unwrap();
+            for info in data {
+                let s = info["market"].as_str().unwrap();
+                if !s.ends_with("USDT") { continue; }
+                let symbol = s.to_string().replace("USDT", "_USDT");
+                symbols.push(symbol)
+            }
         }
+        if symbols.len() > 0 { break; } else { tokio::time::sleep(Duration::from_secs(2)).await; }
     }
 
     for chunk in symbols.chunks(20) {
@@ -51,7 +55,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
         let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
 
         tokio::spawn(async move {
-            let mut ws = CoinexSwapWs::new_with_tag(ws_name, None);
+            let mut ws = CoinexSwapWs::new_with_tag(ws_name, false, None, CoinexSwapWsType::PublicAndPrivate).clone();
             ws.set_subscribe(vec![
                 CoinexSwapSubscribeType::PuFuturesDeals,
                 // GateSwapSubscribeType::PuFuturesOrderBook
@@ -94,7 +98,7 @@ pub async fn data_listener(response: ResponseData) {
 
     match response.channel.as_str() {
         // 订单流数据
-        "deals.update" => {
+        "futures.trades" => {
             let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::CoinexSwap, &response);
 
             for trade in trades.iter_mut() {

+ 1 - 0
standard/src/coinex_swap_handle.rs

@@ -5,6 +5,7 @@ use exchanges::response_base::ResponseData;
 use crate::{Trade};
 
 pub fn format_trade_items(response: &ResponseData) -> Vec<Trade> {
+    // let res_data = response.data;
     let symbol = response.data["market"].as_str().unwrap().to_string().replace("USDT", "_USDT");
     let result = response.data["deal_list"].as_array().unwrap();
     let mut trades = vec![];