Browse Source

新增 cointr 交易所

hl 1 năm trước cách đây
mục cha
commit
31a558656a

+ 305 - 0
exchanges/src/cointr_swap_rest.rs

@@ -0,0 +1,305 @@
+use std::collections::BTreeMap;
+// use chrono::Utc;
+use reqwest::header::HeaderMap;
+use reqwest::{Client};
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use rust_decimal_macros::dec;
+use tracing::{error, info, trace};
+use crate::http_tool::RestTool;
+use crate::response_base::ResponseData;
+use ring::hmac;
+use serde_json::{json, Value};
+
+#[derive(Clone, Debug)]
+pub struct CointrSwapRest {
+    pub tag: String,
+    base_url: String,
+    client: reqwest::Client,
+    /*******参数*/
+    //是否需要登录
+    //登录所需参数
+    login_param: BTreeMap<String, String>,
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+
+}
+
+impl CointrSwapRest {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+
+    pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> CointrSwapRest
+    {
+        return CointrSwapRest::new_with_tag("default-CointrSwapRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> CointrSwapRest {
+        let base_url = if is_colo {
+            "https://api.cointr.pro".to_string()
+        } else {
+            "https://api.cointr.pro".to_string()
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",base_url);
+        } else {
+            info!("走普通通道:{}",base_url);
+        }
+        /*****返回结构体*******/
+        CointrSwapRest {
+            tag,
+            base_url,
+            client: Client::new(),
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: dec!(0.0),
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************rest请求函数********************************************************/
+    /*******************************************************************************************************/
+    // //查询服务器时间是(获取系统维护状态(公共))
+    // pub async fn get_server_time(&mut self) -> ResponseData {
+    //     let params = json!({});
+    //     let data = self.request("GET".to_string(),
+    //                             "/v1".to_string(),
+    //                             "/public/system_info".to_string(),
+    //                             false,
+    //                             params,
+    //     ).await;
+    //     data
+    // }
+    //获取合约信息( 合同规格)
+    pub async fn get_market(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "/v1".to_string(),
+                                "/futures/public/instruments".to_string(),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    pub fn get_delays(&self) -> Vec<i64> {
+        self.delays.clone()
+    }
+    pub fn get_avg_delay(&self) -> Decimal {
+        self.avg_delay.clone()
+    }
+    pub fn get_max_delay(&self) -> i64 {
+        self.max_delay.clone()
+    }
+    fn get_delay_info(&mut self) {
+        let last_100 = if self.delays.len() > 100 {
+            self.delays[self.delays.len() - 100..].to_vec()
+        } else {
+            self.delays.clone()
+        };
+
+        let max_value = last_100.iter().max().unwrap();
+        if max_value.clone().to_owned() > self.max_delay {
+            self.max_delay = max_value.clone().to_owned();
+        }
+
+        let sum: i64 = last_100.iter().sum();
+        let sum_v = Decimal::from_i64(sum).unwrap();
+        let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+        self.avg_delay = (sum_v / len_v).round_dp(1);
+        self.delays = last_100.clone().into_iter().collect();
+    }
+    //调用请求
+    pub async fn request(&mut self,
+                         method: String,
+                         prefix_url: String,
+                         request_url: String,
+                         is_login: bool,
+                         params: Value) -> ResponseData
+    {
+        trace!("login_param:{:?}", self.login_param);
+        //解析账号信息
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        let mut passphrase = "".to_string();
+        if self.login_param.contains_key("access_key") {
+            access_key = self.login_param.get("access_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            secret_key = self.login_param.get("secret_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("pass_key") {
+            passphrase = self.login_param.get("pass_key").unwrap().to_string();
+        }
+        let mut is_login_param = true;
+        if access_key == "" || secret_key == "" || passphrase == "" {
+            is_login_param = false
+        }
+
+        //每个接口都有的参数
+        // let timestamp = Utc::now().timestamp_millis();
+
+        //请求类型不同,可能请求头body 不同
+        let mut body = "{}".to_string();
+        let mut headers = HeaderMap::new();
+        if method == "POST" {
+            headers.insert("Content-Type", "application/json".parse().unwrap());
+            body = params.to_string();
+        }
+
+        //是否需要登录-- 组装sing
+        if is_login {
+            if !is_login_param {
+                let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
+                return e;
+            } else {
+                // //需要登录-且登录参数齐全
+                // trace!("param:{}", params);
+                // trace!("body:{}", body);
+                // //组装sing
+                // let sing = Self::sign(secret_key.clone(),
+                //                       method.clone(),
+                //                       prefix_url.clone(),
+                //                       request_url.clone(),
+                //                       params.clone(),
+                //                       body.clone(),
+                //                       timestamp.clone(),
+                // );
+                // //组装header
+                // headers.extend(Self::headers(sing, timestamp, passphrase, access_key));
+            }
+        }
+
+
+        // trace!("headers:{:?}", headers);
+        // let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let response = self.http_tool(
+            format!("{}{}", prefix_url.clone(), request_url.clone()),
+            method.to_string(),
+            params.to_string(),
+            body,
+            headers,
+        ).await;
+
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        self.get_delay_info();
+
+        response
+        //
+        // let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        // self.delays.push(time_array);
+        // self.get_delay_info();
+        // let res_data = Self::res_data_analysis(get_response, base_url, params.to_string());
+        // res_data
+    }
+
+    // pub fn headers(_: String, _timestamp: String, passphrase: String, access_key: String) -> HeaderMap {
+    //     let mut headers = HeaderMap::new();
+    //     // headers.insert("OK-ACCESS-KEY", access_key.parse().unwrap());
+    //     // headers.insert("OK-ACCESS-SIGN", sign.parse().unwrap());
+    //     // headers.insert("OK-ACCESS-TIMESTAMP", timestamp.parse().unwrap());
+    //     // headers.insert("OK-ACCESS-PASSPHRASE", passphrase.parse().unwrap());
+    //     headers
+    // }
+    pub fn sign(secret_key: String,
+                method: String, prefix_url: String, request_url: String,
+                params: String, body: String, timestamp: String) -> String
+    {
+        /*签名生成*/
+        let url_param_str = RestTool::parse_params_to_str(params);
+        let base_url = if method == "GET" {
+            format!("{}{}?{}", prefix_url, request_url, url_param_str)
+        } else {
+            format!("{}{}", prefix_url, request_url)
+        };
+
+        // 时间戳 + 请求类型+ 请求参数字符串
+        let message = format!("{}{}{}{}", timestamp, method, base_url, body);
+        trace!("message:{}",message);
+
+        // 做签名
+        let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+        let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
+        let sign = base64::encode(result);
+        sign
+    }
+
+    // async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+    async fn http_tool(&mut self, request_path: String,
+                       request_type: String,
+                       params: String,
+                       body: String,
+                       headers: HeaderMap) -> ResponseData {
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+        let addrs_url: String = if RestTool::parse_params_to_str(params.clone()) == "" {
+            url.clone()
+        } else {
+            format!("{}?{}", url.clone(), RestTool::parse_params_to_str(params.clone()))
+        };
+
+        trace!("url-----:???{}",url.clone());
+        trace!("addrs_url-----:???{}",addrs_url.clone());
+        trace!("params-----:???{}",params.clone());
+        trace!("body-----:???{}",body.clone());
+
+        let request_builder = match request_type.as_str() {
+            "GET" => self.client.get(addrs_url.clone()).headers(headers),
+            "POST" => self.client.post(url.clone()).body(body).headers(headers),
+            // "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
+            // "PUT" => self.client.put(url.clone()).json(&params),
+            _ => {
+                panic!("{}", format!("错误的请求类型:{}", request_type.clone()))
+            }
+        };
+
+        // 读取响应的内容
+        let response = request_builder.send().await.unwrap();
+        let is_success = response.status().is_success(); // 先检查状态码
+        let text = response.text().await.unwrap();
+        // trace!("text:???{:?}",text);
+        return if is_success {
+            self.on_success_data(&text)
+        } else {
+            self.on_error_data(&text, &addrs_url, &params)
+        };
+    }
+    pub fn on_success_data(&mut self, text: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text).unwrap();
+        return ResponseData::new(self.tag.clone(), 200, "success".to_string(), json_value.clone());
+    }
+
+    pub fn on_error_data(&mut self, text: &String, base_url: &String, params: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text);
+
+        match json_value {
+            Ok(data) => {
+                let message;
+
+                if !data["message"].is_null() {
+                    message = format!("{}:{}", data["tag"].as_str().unwrap(), data["message"].as_str().unwrap());
+                } else {
+                    message = data["tag"].to_string();
+                }
+
+                let mut error = ResponseData::error(self.tag.clone(), message);
+                error.message = format!("请求地址:{}, 请求参数:{}, 报错内容:{}。", base_url, params, error.message);
+                error
+            }
+            Err(e) => {
+                error!("解析错误:{:?}", e);
+                let error = ResponseData::error("".to_string(),
+                                                format!("json 解析失败:{},相关参数:{}", e, text));
+                error
+            }
+        }
+    }
+}

+ 429 - 0
exchanges/src/cointr_swap_ws.rs

@@ -0,0 +1,429 @@
+use std::io::Read;
+use std::str::from_utf8;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+
+use chrono::Utc;
+use flate2::bufread::GzDecoder;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use once_cell::sync::Lazy;
+use ring::hmac;
+use serde_json::{json, Value};
+use tokio::sync::Mutex;
+use tokio::task;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace};
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+pub(crate) static LOGIN_DATA: Lazy<Mutex<(bool, bool)>> = Lazy::new(|| {
+    println!("初始化...");
+    // 0: 需要登录, 1:是否已经登录
+    Mutex::new((false, false))
+});
+
+
+pub enum CointrSwapWsType {
+    PublicAndPrivate
+}
+
+
+//订阅频道
+#[derive(Clone)]
+pub enum CointrSwapSubscribeType {
+    // 深度
+    PuFuturesDepth,
+    // 公开成交
+    PuFuturesTrades,
+    // K线数据
+    PuFuturesRecords,
+
+    // // 深度
+    // PuFuturesDepth,
+    // // 公开成交
+    // PuFuturesTrades,
+    // // K线数据
+    // PuFuturesRecords,
+    //
+    // // 订单
+    // PrFuturesOrders,
+    // // 仓位
+    // PrFuturesPositions,
+    // // 余额
+    // PrFuturesBalances,
+}
+
+//账号信息
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct CointrSwapLogin {
+    pub api_key: String,
+    pub secret: String,
+    pub api_memo: String,
+}
+
+#[derive(Clone)]
+pub struct CointrSwapWs {
+    tag: String,
+    // 类型
+    address_url: String,
+    // 地址
+    login_param: Option<CointrSwapLogin>,
+    // 账号
+    symbol_s: Vec<String>,
+    // 币对
+    subscribe_types: Vec<CointrSwapSubscribeType>,
+    // 订阅
+    heartbeat_time: u64,                                        // 心跳间隔
+}
+
+
+impl CointrSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************实例化一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: Option<CointrSwapLogin>, ws_type: CointrSwapWsType) -> CointrSwapWs {
+        return Self::new_with_tag("default-BingxSwapWs".to_string(), is_colo, login_param, ws_type);
+    }
+
+    pub fn new_with_tag(tag: String, _is_colo: bool, login_param: Option<CointrSwapLogin>, ws_type: CointrSwapWsType) -> CointrSwapWs {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            CointrSwapWsType::PublicAndPrivate => {
+                let url = "wss://stream.cointr.pro/ws".to_string();
+                info!("走普通通道(不支持colo通道):{}", url);
+                url
+            }
+        };
+
+        CointrSwapWs {
+            tag,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 5,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<CointrSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "");
+        }
+        self.symbol_s = b_array;
+    }
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                CointrSwapSubscribeType::PuFuturesDepth => false,
+                CointrSwapSubscribeType::PuFuturesTrades => false,
+                CointrSwapSubscribeType::PuFuturesRecords => false,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: CointrSwapSubscribeType, _login_param: Option<CointrSwapLogin>) -> String {
+        match subscribe_type {
+            CointrSwapSubscribeType::PuFuturesDepth => {//深度
+                json!({
+                    "op":"subscribe",
+                    "channel":"books_perp",
+                    "args":{
+                        "instId":symbol,
+                        "":"0.001"
+                    },
+                }).to_string()
+            }
+            CointrSwapSubscribeType::PuFuturesTrades => {//公开成交
+                json!({
+                    "op":"subscribe",
+                    "channel":"trades_perp",
+                    "args":{
+                        "instId":symbol
+                    },
+                }).to_string()
+            }
+            CointrSwapSubscribeType::PuFuturesRecords => {//k线数据
+                json!({
+                    "op":"subscribe",
+                    "channel":"kline_perp",
+                    "args":{
+                        "instId":symbol,
+                        "bar":"1M"
+                    },
+                }).to_string()
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> Vec<String> {
+        let mut args = vec![];
+        // 只获取第一个
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(),
+                                                  subscribe_type.clone(),
+                                                  self.login_param.clone(),
+                );
+                args.push(ty_str);
+            }
+        }
+
+        return args;
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                             is_shutdown_arc: Arc<AtomicBool>,
+                                             handle_function: F,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let login_param = self.login_param.clone();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let label = self.tag.clone();
+        // let heartbeat_time = self.heartbeat_time.clone();
+
+
+        //心跳-- 方法内部线程启动
+        // let write_tx_clone1 = Arc::clone(write_tx_am);
+        let write_tx_clone2 = Arc::clone(write_tx_am);
+        // tokio::spawn(async move {
+        //     trace!("线程-异步心跳-开始");
+        //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+        //     trace!("线程-异步心跳-结束");
+        // });
+
+        //设置订阅
+        let mut subscribe_array = vec![];
+        for su in subscription {
+            subscribe_array.push(su);
+        }
+
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            info!("启动连接");
+            loop {
+                info!("Cointr_usdt_swap socket 连接中……");
+                // 需要登录
+                if login_is {
+                    let mut login_data = LOGIN_DATA.lock().await;
+                    let login_param_real = login_param.clone().unwrap();
+                    login_data.0 = true;
+
+                    let timestamp = Utc::now().timestamp_millis().to_string();
+                    let api_key = login_param_real.api_key.clone();
+                    let secret_key = login_param_real.secret.clone();
+                    let api_memo = login_param_real.api_memo.clone();
+
+
+                    // let timestamp = "1589267764859".to_string();
+                    // let api_key = "80618e45710812162b04892c7ee5ead4a3cc3e56".to_string();
+                    // let secret_key = "6c6c98544461bbe71db2bca4c6d7fd0021e0ba9efc215f9c6ad41852df9d9df9".to_string();
+                    // let api_memo = "test001".to_string();
+
+                    let sign = {
+                        let message = format!("{}#{}#Cointr.WebSocket", timestamp.clone(), api_memo);
+                        trace!("组装数据:\n{}", message);
+
+                        let signed_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_ref());
+                        let sign = hex::encode(hmac::sign(&signed_key, message.as_bytes()).as_ref());
+                        sign
+                    };
+
+
+                    let mut args = vec![];
+                    args.push(api_key.clone());
+                    args.push(timestamp.clone());
+                    args.push(sign.clone());
+                    args.push(String::from("web"));
+                    // {"action":"access","args":["<API_KEY>","<timestamp>","<sign>","<dev>"]}
+                    let login_param = json!({
+                        "action": "access",
+                        "args": [
+                           api_key, timestamp.as_str(),sign.as_str(),"web"
+                        ]
+                    });
+                    let login_str = login_param.to_string();
+                    info!("发起ws登录: {}", login_str);
+                    let write_tx_c = Arc::clone(&write_tx_clone2);
+                    AbstractWsMode::send_subscribe(write_tx_c, Message::Text(login_str)).await;
+                }
+
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text_sync, Self::message_ping, Self::message_pong, Self::message_binary_sync,
+                ).await;
+                let mut login_data = LOGIN_DATA.lock().await;
+                // 断联后 设置为没有登录
+                login_data.1 = false;
+                info!("Cointr_usdt_swap socket 断连,1s以后重连……");
+                error!("Cointr_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub async fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text).await;
+        Option::from(response_data)
+    }
+    pub fn message_text_sync(text: String) -> Option<ResponseData> {
+        // 使用 tokio::task::block_in_place 来等待异步函数的结果
+        task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(Self::message_text(text))
+        })
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
+    }
+    //数据解析-二进制
+    pub async fn message_binary(binary: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = Self::parse_zip_data(binary);
+        let response_data = Self::ok_text(message_str).await;
+        Option::from(response_data)
+    }
+    pub fn message_binary_sync(binary: Vec<u8>) -> Option<ResponseData> {
+        // 使用 tokio::task::block_in_place 来等待异步函数的结果
+        task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(Self::message_binary(binary))
+        })
+    }
+    //数据解析
+    pub async fn ok_text(text: String) -> ResponseData
+    {
+        info!("原始数据:{}", text);
+        let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
+        let json_value: Value = serde_json::from_str(&text).unwrap();
+
+        let code = json_value["code"].as_i64();
+        match code {
+            None => {}
+            Some(c) => {
+                let msg = json_value["msg"].as_str().unwrap();
+                match msg {
+                    "connect.success" =>{
+                        res_data.code = -201;
+                        res_data.message = format!("连接成功:");
+                        return res_data;
+                    }
+                    "subscribe.faild" =>{
+                        res_data.code = 400;
+                        res_data.message = format!("订阅失败:{}", msg);
+                        return res_data;
+                    }
+                    _ => {}
+                }
+            }
+        }
+        let event = json_value["event"].as_str();
+        match event {
+            None => {}
+            Some(v) => {
+                match v {
+                    "subscribe" => {
+                        res_data.code = -201;
+                        res_data.message = format!("订阅成功:{}", json_value.clone().to_string());
+                        return res_data;
+                    }
+                    "error" => {
+                        res_data.code = 400;
+                        res_data.message = format!("订阅失败:{}", json_value["msg"].clone().to_string());
+                        return res_data;
+                    }
+                    _ => {}
+                }
+            }
+        }
+
+
+        let channel = json_value["channel"].as_str();
+        let instId = json_value["instId"].as_str();
+        match channel {
+            None => {}
+            Some(c) => {
+                res_data.code = 200;
+                res_data.data = json_value.clone();
+                if c.contains("books_perp") {
+                    res_data.channel = "futures.order_book".to_string();
+                } else if c.contains("trades_perp") {
+                    res_data.channel = "futures.trades".to_string();
+                } else if c.contains("kline_perp") {
+                    res_data.channel = "futures.candlesticks".to_string();
+                } else {
+                    res_data.code = 400;
+                    res_data.channel = "未知推送数据".to_string();
+                }
+                return res_data;
+            }
+        }
+
+        res_data.code = 400;
+        res_data.message = format!("未知响应内容");
+        res_data.data = text.parse().unwrap();
+        trace!("--------------------------------");
+        res_data
+    }
+
+    fn parse_zip_data(p0: Vec<u8>) -> String {
+        // 创建一个GzDecoder的实例,将压缩数据作为输入
+        let mut decoder = GzDecoder::new(&p0[..]);
+
+        // 创建一个缓冲区来存放解压缩后的数据
+        let mut decompressed_data = Vec::new();
+
+        // 读取解压缩的数据到缓冲区中
+        decoder.read_to_end(&mut decompressed_data).expect("解压缩失败");
+        let result = from_utf8(&decompressed_data)
+            .expect("解压缩后的数据不是有效的UTF-8");
+
+        // info!("解压缩数据 {:?}", result);
+        result.to_string()
+    }
+}
+

+ 2 - 0
exchanges/src/lib.rs

@@ -40,3 +40,5 @@ pub mod phemex_swap_rest;
 pub mod phemex_swap_ws;
 pub mod woo_swap_ws;
 pub mod woo_swap_rest;
+pub mod cointr_swap_rest;
+pub mod cointr_swap_ws;

+ 129 - 0
exchanges/tests/cointr_swap_test.rs

@@ -0,0 +1,129 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+
+use serde_json::json;
+use tokio::sync::Mutex;
+use tracing::trace;
+use exchanges::cointr_swap_rest::CointrSwapRest;
+use exchanges::response_base::ResponseData;
+use exchanges::cointr_swap_ws::{CointrSwapLogin, CointrSwapSubscribeType, CointrSwapWs, CointrSwapWsType};
+
+const ACCESS_KEY: &str = "";
+const SECRET_KEY: &str = "";
+const PASS_KEY: &str = "";
+
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn ws_custom_subscribe() {
+    global::log_utils::init_log_with_trace();
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+
+    // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
+    // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
+
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+
+    //读取
+    let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
+    let _tr = tokio::spawn(async move {
+        trace!("线程-数据读取-开启");
+        loop {
+            // 从通道中接收并丢弃所有的消息,直到通道为空
+            while let Ok(Some(_)) = read_rx.try_next() {
+
+                // 从通道中接收并丢弃所有的消息,直到通道为空
+                while let Ok(Some(_)) = read_rx.try_next() {
+                    // 消息被忽略
+                }
+            }
+        }
+        // trace!("线程-数据读取-结束");
+    });
+
+    //写数据
+    // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
+
+    let fun = move |data: ResponseData| {
+        async move {
+            // trace!("---传入的方法~~~~{:?}", data);
+        }
+    };
+    let param = CointrSwapLogin {
+        api_key: "".to_string(),
+        secret: "".to_string(),
+        api_memo: "".to_string(),
+    };
+    let t1 = tokio::spawn(async move {
+        let mut ws = get_ws(Option::from(param), CointrSwapWsType::PublicAndPrivate);
+        ws.set_symbols(vec!["ETH_USDT".to_string()]);
+        ws.set_subscribe(vec![
+            // CointrSwapSubscribeType::PuFuturesTrades,
+            // CointrSwapSubscribeType::PuFuturesDepth,
+            // CointrSwapSubscribeType::PuFuturesRecords,
+        ]);
+        //链接
+        let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+        ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        trace!("test 唯一线程结束--");
+    });
+    tokio::try_join!(t1).unwrap();
+    trace!("当此结束");
+    trace!("重启!");
+    trace!("参考交易所关闭");
+    return;
+}
+
+fn get_ws(btree_map: Option<CointrSwapLogin>, ws_type: CointrSwapWsType) -> CointrSwapWs {
+    let cointr_ws = CointrSwapWs::new(false, btree_map, ws_type);
+    cointr_ws
+}
+
+
+
+//查詢合約基礎信息
+#[tokio::test]
+async fn rest_get_market_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_market(json!({
+
+    })).await;
+    println!("Cointr--查詢合約基礎信息--{:?}", req_data);
+}
+
+
+fn get_rest() -> CointrSwapRest {
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+    btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
+    btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
+    btree_map.insert("pass_key".to_string(), PASS_KEY.to_string());
+
+    let cointr_exc = CointrSwapRest::new(false, btree_map.clone());
+    cointr_exc
+}
+