hl před 1 rokem
rodič
revize
68d3e0c820

+ 449 - 0
exchanges/src/bitmart_swap_rest.rs

@@ -0,0 +1,449 @@
+use std::collections::BTreeMap;
+
+use chrono::Utc;
+use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode};
+use reqwest::Client;
+use reqwest::header::HeaderMap;
+use ring::hmac;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use rust_decimal_macros::dec;
+use serde_json::{json, Value};
+use tracing::{error, info, trace};
+use crate::http_tool::RestTool;
+
+use crate::response_base::ResponseData;
+
+// 定义用于 URL 编码的字符集
+pub const FRAGMENT: &AsciiSet = &CONTROLS
+    .add(b' ')
+    .add(b':')
+    .add(b'=')
+    .add(b'+')
+    .add(b'/').add(b'?').add(b'#')
+    .add(b'[').add(b']').add(b'@').add(b'!').add(b'$').add(b'&')
+    .add(b'\'').add(b'(').add(b')').add(b'*').add(b',')
+    .add(b';').add(b'"').add(b'%')
+;
+
+
+#[derive(Clone)]
+pub struct BitMartSwapRest {
+    label: String,
+    base_url: String,
+    client: reqwest::Client,
+    /*******参数*/
+    //登录所需参数
+    login_param: BTreeMap<String, String>,
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+}
+
+impl BitMartSwapRest {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> BitMartSwapRest
+    {
+        return BitMartSwapRest::new_label("default-BitMartSwapRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BitMartSwapRest
+    {
+        let base_url: String = String::from("https://api-cloud.bitmart.com");
+        info!("走普通通道:{}",base_url);
+
+        if is_colo {} else {}
+        /*****返回结构体*******/
+        BitMartSwapRest {
+            label,
+            base_url: base_url.to_string(),
+            client: Client::new(),
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: dec!(0.0),
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************rest请求函数********************************************************/
+    /*******************************************************************************************************/
+
+    //深度
+    pub async fn get_depth(&mut self, symbol: String) -> ResponseData {
+        let params = json!({
+            "symbol":symbol
+        });
+
+        let data = self.request("GET".to_string(),
+                                "/contract".to_string(),
+                                format!("/public/depth"),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+    //获取所有合约详情
+    pub async fn get_market(&mut self, symbol: String) -> ResponseData {
+        let params = json!({
+            "symbol":symbol
+        });
+        let data = self.request("GET".to_string(),
+                                "/contract".to_string(),
+                                format!("/public/details"),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+
+    //查询合约账户 (查询合约资产明细 )
+    pub async fn get_account(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "/contract".to_string(),
+                                format!("/private/assets-detail"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //用户仓位列表(查询仓位详情 (KEYED))
+    pub async fn get_user_position(&mut self, symbol: String) -> ResponseData {
+        let params = json!({
+            "symbol":symbol
+        });
+        let data = self.request("GET".to_string(),
+                                "/contract".to_string(),
+                                format!("/private/position"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //查询合约订单列表 (查询合约历史订单 (KEYED))
+    pub async fn get_orders(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "/contract".to_string(),
+                                format!("/private/order-history"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+
+    //查询单个订单详情 (【全仓】获取订单明细信息)
+    pub async fn get_order_details(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "/contract".to_string(),
+                                format!("/private/order"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //合约交易下单 (合约下单 (SIGNED))
+    pub async fn swap_order(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/contract".to_string(),
+                                format!("/private/submit-order"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    // 全部撤单(批量撤销合约订单 (SIGNED))
+    pub async fn cancel_price_order(&mut self, params: Value) -> ResponseData {
+        self.request("POST".to_string(),
+                     "/contract".to_string(),
+                     format!("/private/cancel-orders"),
+                     true,
+                     params,
+        ).await
+    }
+
+    //撤销单个订单 (取消单个合约订单 (SIGNED))
+    pub async fn cancel_order(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/contract".to_string(),
+                                format!("/private/cancel-order"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    // //设置持仓模式 (【全仓】切换持仓模式)
+    // pub async fn setting_dual_mode(&mut self, params: Value) -> ResponseData {
+    //     let data = self.request("POST".to_string(),
+    //                             "/linear-swap-api/v1".to_string(),
+    //                             format!("/swap_cross_switch_position_mode"),
+    //                             true,
+    //                             params,
+    //     ).await;
+    //     data
+    // }
+
+    //设置杠杆(合约杠杆调整 (SIGNED))
+    pub async fn setting_dual_leverage(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/contract".to_string(),
+                                format!("/private/submit-leverage"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    pub fn get_delays(&self) -> Vec<i64> {
+        self.delays.clone()
+    }
+    pub fn get_avg_delay(&self) -> Decimal {
+        self.avg_delay.clone()
+    }
+    pub fn get_max_delay(&self) -> i64 {
+        self.max_delay.clone()
+    }
+    fn get_delay_info(&mut self) {
+        let last_100 = if self.delays.len() > 100 {
+            self.delays[self.delays.len() - 100..].to_vec()
+        } else {
+            self.delays.clone()
+        };
+
+        let max_value = last_100.iter().max().unwrap();
+        if max_value.clone().to_owned() > self.max_delay {
+            self.max_delay = max_value.clone().to_owned();
+        }
+
+        let sum: i64 = last_100.iter().sum();
+        let sum_v = Decimal::from_i64(sum).unwrap();
+        let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+        self.avg_delay = (sum_v / len_v).round_dp(1);
+        self.delays = last_100.clone().into_iter().collect();
+    }
+    //调用请求
+    async fn request(&mut self,
+                     requesst_type: String,
+                     prefix_url: String,
+                     request_url: String,
+                     is_login: bool,
+                     params: Value) -> ResponseData
+    {
+        // trace!("login_param:{:?}", self.login_param);
+        //解析账号信息
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        let mut api_memo = "".to_string();
+        if self.login_param.contains_key("access_key") {
+            access_key = self.login_param.get("access_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            secret_key = self.login_param.get("secret_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            api_memo = self.login_param.get("api_memo").unwrap().to_string();
+        }
+        let mut is_login_param = true;
+        if access_key == "" || secret_key == "" {
+            is_login_param = false
+        }
+
+        //请求头配置-如果需要登录则存在额外配置
+        let mut body = "".to_string();
+        let mut headers = HeaderMap::new();
+        headers.insert("Content-Type", "application/json".parse().unwrap());
+
+        // let timestamp = Utc::now().timestamp_millis().to_string();
+        let timestamp = "1589793796145".to_string();
+
+        // let mut params_str = params.to_string();
+        let mut params_str =  json!({
+            "count":"100","price":"8600","symbol":"BTC_USDT"
+        }).to_string();
+        if requesst_type == "GET" {
+            body = "{}".to_string();
+        } else {
+            body = params_str.clone();
+        }
+        //是否需要登录-- 组装sing
+        let mut sing = "".to_string();
+        if is_login {
+            if !is_login_param {
+                let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+                return e;
+            } else {//需要登录-且登录参数齐全
+                //组装sing
+                sing = Self::sign(secret_key.clone(),
+                                  api_memo.clone(),
+                                  params_str.clone(),
+                                  timestamp.clone(),
+                );
+                headers.extend(Self::headers(access_key.clone(), timestamp.clone(), sing.clone()));
+            }
+        }
+
+
+        // trace!("headers:{:?}", headers);
+        let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let response = self.http_tool(
+            base_url.clone(),
+            requesst_type.to_string(),
+            params_str.clone(),
+            body.clone(),
+            headers,
+        ).await;
+
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        self.get_delay_info();
+
+        response
+    }
+
+    pub fn headers(access_key: String, timestamp: String, sign: String) -> HeaderMap {
+        let mut headers = HeaderMap::new();
+        headers.insert("X-BM-KEY", access_key.clone().parse().unwrap());
+        headers.insert("X-BM-SIGN", sign.clone().parse().unwrap());
+        headers.insert("X-BM-TIMESTAMP", timestamp.clone().parse().unwrap());
+        headers
+    }
+    pub fn sign(secret_key: String, api_memo: String, param_str: String, timestamp: String) -> String
+    {
+
+        // X-BM-SIGN= hmac_sha256(Your_api_secret_key, X-BM-TIMESTAMP + '#' +Your_api_memo + '#' + '{"symbol":"BTC_USDT","price":"8600","count":"100"}')
+        let message = format!("{}#{}#{}", timestamp, api_memo, param_str);
+        trace!("1组装数据:\n{}", message);
+        let signed_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_ref());
+        let sign = hex::encode(hmac::sign(&signed_key, message.as_bytes()).as_ref());
+        sign
+    }
+
+
+    async fn http_tool(&mut self, request_path: String,
+                       request_type: String,
+                       params: String,
+                       body: String, headers: HeaderMap) -> ResponseData {
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+        let params_str = RestTool::parse_params_to_str(params.clone());
+        let addrs_url = if params_str.len() > 0 {
+            format!("{}?{}", url.clone(), params_str)
+        } else {
+            format!("{}", url.clone())
+        };
+
+        trace!("url-----:???{}",url.clone());
+        trace!("addrs_url-----:???{}",addrs_url.clone());
+        trace!("param-----:???{}",params.clone());
+        trace!("param_str-----:???{}",params_str.clone());
+        trace!("body-----:???{}",body.clone());
+        trace!("headers-----:???{:?}",headers.clone());
+
+        let request_builder = match request_type.as_str() {
+            "GET" => self.client.get(addrs_url.clone()).headers(headers),
+            "POST" => self.client.post(url.clone()).body(body).headers(headers),
+            // "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
+            // "PUT" => self.client.put(url.clone()).json(&params),
+            _ => {
+                panic!("{}", format!("错误的请求类型:{}", request_type.clone()))
+            }
+        };
+
+        // 读取响应的内容
+        let response = request_builder.send().await.unwrap();
+        let is_success = response.status().is_success(); // 先检查状态码
+        let text = response.text().await.unwrap();
+        trace!("text:???{:?}",text);
+        return if is_success {
+            self.on_success_data(&text)
+        } else {
+            self.on_error_data(&text, &addrs_url, &params)
+        };
+    }
+
+    pub fn on_success_data(&mut self, text: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text).unwrap();
+        // return  ResponseData::new(self.label.clone(), 200, "success".to_string(), json_value);
+
+        let code = json_value["code"].as_i64();
+        let message = json_value["message"].as_str();
+        let trace = json_value["trace"].as_str();
+        match code {
+            Some(c) => {
+                if c == 1000 {
+                    let data = json_value.get("data").unwrap();
+                    ResponseData::new(self.label.clone(), 200, "success".to_string(), data.clone())
+                } else {
+                    ResponseData::new(self.label.clone(), 400, "message".to_string(), json_value)
+                }
+            }
+            None => {
+                ResponseData::new(self.label.clone(), 400, "message".to_string(), json_value)
+            }
+        }
+    }
+
+    pub fn on_error_data(&mut self, text: &String, base_url: &String, params: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text);
+        match json_value {
+            Ok(d) => {
+                ResponseData::new(self.label.clone(), 400, format!("错误:{}", d["message"].as_str().unwrap()), d)
+            }
+            Err(_) => {
+                ResponseData::new(self.label.clone(), 400, format!("未知错误"), text.parse().unwrap())
+            }
+        }
+    }
+}
+
+
+// 合并两个 JSON 对象的函数
+fn merge_json(a: &mut Value, b: &Value) {
+    match (a, b) {
+        (Value::Object(ref mut a_map), Value::Object(b_map)) => {
+            for (k, v) in b_map {
+                merge_json(a_map.entry(k.clone()).or_insert(Value::Null), v);
+            }
+        }
+        (a, b) => {
+            *a = b.clone();
+        }
+    }
+}
+
+
+// 函数:将 JSON 对象转换为排序后的查询字符串
+fn json_to_query_string(value: Value) -> String {
+    let mut params = BTreeMap::new();
+
+    if let Value::Object(obj) = value {
+        for (k, v) in obj {
+            // 确保只处理字符串值
+            if let Value::String(v_str) = v {
+                params.insert(k, v_str);
+            } else {
+                // 对于非字符串值,我们可以转换为字符串或执行其他处理
+                params.insert(k, v.to_string());
+            }
+        }
+    }
+
+    // 拼接键值对为查询字符串
+    params.iter()
+        .map(|(k, v)| format!("{}={}", utf8_percent_encode(k, FRAGMENT), utf8_percent_encode(v, FRAGMENT)))
+        .collect::<Vec<String>>()
+        .join("&")
+}

+ 465 - 0
exchanges/src/bitmart_swap_ws.rs

@@ -0,0 +1,465 @@
+use std::io::Read;
+use std::str::from_utf8;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+
+use chrono::Utc;
+use flate2::bufread::GzDecoder;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use once_cell::sync::Lazy;
+use ring::hmac;
+use serde_json::{json, Value};
+use tokio::sync::Mutex;
+use tokio::task;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace};
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::AbstractWsMode;
+
+pub(crate) static LOGIN_DATA: Lazy<Mutex<(bool, bool)>> = Lazy::new(|| {
+    println!("初始化...");
+    // 0: 需要登录, 1:是否已经登录
+    Mutex::new((false, false))
+});
+
+
+pub enum BitMartSwapWsType {
+    Public,
+    Private,
+}
+
+
+//订阅频道
+#[derive(Clone)]
+pub enum BitMartSwapSubscribeType {
+    // 深度
+    PuFuturesDepth,
+
+    // 订单
+    PrFuturesOrders,
+    // 仓位
+    PrFuturesPositions,
+    // 余额
+    PrFuturesBalances,
+}
+
+//账号信息
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct BitMartSwapLogin {
+    pub api_key: String,
+    pub secret: String,
+    pub api_memo:String,
+}
+
+#[derive(Clone)]
+pub struct BitMartSwapWs {
+    //类型
+    label: String,
+    //地址
+    address_url: String,
+    //账号信息
+    login_param: Option<BitMartSwapLogin>,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<BitMartSwapSubscribeType>,
+    //心跳间隔
+    _heartbeat_time: u64,
+}
+
+
+impl BitMartSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************实例化一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(login_param: Option<BitMartSwapLogin>, ws_type: BitMartSwapWsType) -> BitMartSwapWs {
+        return BitMartSwapWs::new_label("default-BitMartSwapWs".to_string(), login_param, ws_type);
+    }
+
+    pub fn new_label(label: String, login_param: Option<BitMartSwapLogin>, ws_type: BitMartSwapWsType) -> BitMartSwapWs
+    {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            BitMartSwapWsType::Public => {
+                let url = "wss://openapi-ws.bitmart.com/api?protocol=1.1".to_string();
+                info!("走普通通道(不支持colo通道):{}", url);
+                url
+            }
+            BitMartSwapWsType::Private => {
+                let url = "wss://openapi-ws.bitmart.com/user?protocol=1.1".to_string();
+                info!("走普通通道(不支持colo通道):{}", url);
+                url
+            }
+        };
+
+        BitMartSwapWs {
+            label,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            _heartbeat_time: 1000 * 10,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<BitMartSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "");
+        }
+        self.symbol_s = b_array;
+    }
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                BitMartSwapSubscribeType::PuFuturesDepth => false,
+
+                BitMartSwapSubscribeType::PrFuturesOrders => true,
+                BitMartSwapSubscribeType::PrFuturesPositions => true,
+                BitMartSwapSubscribeType::PrFuturesBalances => true,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: BitMartSwapSubscribeType, _login_param: Option<BitMartSwapLogin>) -> String {
+        // let access_key;
+        // let secret_key;
+        // match login_param {
+        //     None => {
+        //         access_key = "".to_string();
+        //         secret_key = "".to_string();
+        //     }
+        //     Some(param) => {
+        //         access_key = param.api_key.clone();
+        //         secret_key = param.secret.clone();
+        //     }
+        // }
+        // let cid = "";
+
+        match subscribe_type {
+            BitMartSwapSubscribeType::PuFuturesDepth => {
+                format!("futures/depth20:{}", symbol.to_uppercase())
+            }
+
+            BitMartSwapSubscribeType::PrFuturesOrders => {
+                format!("futures/order")
+            }
+            BitMartSwapSubscribeType::PrFuturesPositions => {
+                format!("futures/position")
+            }
+            BitMartSwapSubscribeType::PrFuturesBalances => {
+                format!("futures/asset:USDT")
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> Value {
+        let mut args = vec![];
+        // 只获取第一个
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(),
+                                                  subscribe_type.clone(),
+                                                  self.login_param.clone(),
+                );
+                args.push(ty_str);
+            }
+        }
+        json!({
+            "action":"subscribe",
+            "args":args
+         })
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                             is_shutdown_arc: Arc<AtomicBool>,
+                                             handle_function: F,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let login_param = self.login_param.clone();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let label = self.label.clone();
+        // let heartbeat_time = self.heartbeat_time.clone();
+
+
+        //心跳-- 方法内部线程启动
+        // let write_tx_clone1 = Arc::clone(write_tx_am);
+        let write_tx_clone2 = Arc::clone(write_tx_am);
+        // tokio::spawn(async move {
+        //     trace!("线程-异步心跳-开始");
+        //     let ping_str = json!({
+        //         "method": "server.ping",
+        //         "params": {},
+        //         "id": 1
+        //     });
+        //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
+        //     trace!("线程-异步心跳-结束");
+        // });
+
+        //设置订阅
+        let mut subscribe_array = vec![];
+        subscribe_array.push(subscription.to_string());
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            info!("启动连接");
+            loop {
+                info!("BitMart_usdt_swap socket 连接中……");
+                // 需要登录
+                if login_is {
+                    let mut login_data = LOGIN_DATA.lock().await;
+                    let login_param_real = login_param.clone().unwrap();
+                    login_data.0 = true;
+
+                    let timestamp = Utc::now().timestamp_millis().to_string();
+                    let api_key = login_param_real.api_key.clone();
+                    let secret_key = login_param_real.secret.clone();
+                    let api_memo = login_param_real.api_memo.clone();
+
+
+                    // let timestamp = "1589267764859".to_string();
+                    // let api_key = "80618e45710812162b04892c7ee5ead4a3cc3e56";
+                    // let secret_key = "6c6c98544461bbe71db2bca4c6d7fd0021e0ba9efc215f9c6ad41852df9d9df9";
+                    // let api_memo = "test001";
+
+                    // 当前时间:timestamp=1589267764859
+                    // 申请的API_KEY = "80618e45710812162b04892c7ee5ead4a3cc3e56"
+                    // 申请的API_SECRET = "6c6c98544461bbe71db2bca4c6d7fd0021e0ba9efc215f9c6ad41852df9d9df9"
+                    // 申请的api_memo = "test001";
+                    let sign = {
+                        let message = format!("{}#{}#bitmart.WebSocket", timestamp.clone(),api_memo);
+                        trace!("组装数据:\n{}", message);
+
+                        // let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+                        // let result = hmac::sign(&hmac_key, &message.as_bytes());
+                        // base64::encode(result)
+
+                        let signed_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_ref());
+                        let sign = hex::encode(hmac::sign(&signed_key, message.as_bytes()).as_ref());
+                        sign
+                    };
+
+                    trace!("参考sign-3ceeb7e1b8cb165a975e28a2e2dfaca4d30b358873c0351c1a071d8c83314556",);
+                    trace!("自己的sign-{}",sign.clone());
+
+                    let mut args = vec![];
+                    args.push(api_key.clone());
+                    args.push(timestamp.clone());
+                    args.push(sign.clone());
+                    args.push(String::from("web"));
+                    // {"action":"access","args":["<API_KEY>","<timestamp>","<sign>","<dev>"]}
+                    let login_param = json!({
+                        "action": "access",
+                        "args": [
+                           api_key, timestamp.as_str(),sign.as_str(),"web"
+                        ]
+                    });
+                    let login_str = login_param.to_string();
+                    info!("发起ws登录: {}", login_str);
+                    let write_tx_c = Arc::clone(&write_tx_clone2);
+                    AbstractWsMode::send_subscribe(write_tx_c, Message::Text(login_str)).await;
+                }
+
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text_sync, Self::message_ping, Self::message_pong, Self::message_binary_sync,
+                ).await;
+                let mut login_data = LOGIN_DATA.lock().await;
+                // 断联后 设置为没有登录
+                login_data.1 = false;
+                info!("BitMart_usdt_swap socket 断连,1s以后重连……");
+                error!("BitMart_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub async fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text).await;
+        Option::from(response_data)
+    }
+    pub fn message_text_sync(text: String) -> Option<ResponseData> {
+        // 使用 tokio::task::block_in_place 来等待异步函数的结果
+        task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(Self::message_text(text))
+        })
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
+    }
+    //数据解析-二进制
+    pub async fn message_binary(binary: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = Self::parse_zip_data(binary);
+        let response_data = Self::ok_text(message_str).await;
+        Option::from(response_data)
+    }
+    pub fn message_binary_sync(binary: Vec<u8>) -> Option<ResponseData> {
+        // 使用 tokio::task::block_in_place 来等待异步函数的结果
+        task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(Self::message_binary(binary))
+        })
+    }
+    //数据解析
+    pub async fn ok_text(text: String) -> ResponseData
+    {
+        trace!("原始数据:{}", text);
+        let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
+        let json_value: Value = serde_json::from_str(&text).unwrap();
+
+        // {"action":"access","success":true}
+        let action = json_value["action"].as_str();
+        match action {
+            None => {}
+            Some(r) => {
+                match r {
+                    "access" => {
+                        /*登录响应*/
+                        let success = json_value["success"].as_bool();
+                        match success {
+                            None => {}
+                            Some(s) => {
+                                if s {
+                                    res_data.code = -200;
+                                    res_data.message = "登录成功".to_string();
+                                } else {
+                                    res_data.code = 400;
+                                    res_data.message = format!("登录失败:{}", json_value["error"].as_str().unwrap());
+                                }
+                                return res_data;
+                            }
+                        }
+                    }
+                    "subscribe" => {
+                        /*订阅响应*/
+                        let success = json_value["success"].as_bool();
+                        match success {
+                            None => {}
+                            Some(s) => {
+                                if s {
+                                    res_data.code = -201;
+                                    res_data.message = format!("订阅成功:{}", json_value["request"].clone().to_string());
+                                } else {
+                                    res_data.code = 400;
+                                    res_data.message = format!("订阅失败:{}", json_value["error"].as_str().unwrap());
+                                }
+                                return res_data;
+                            }
+                        }
+                    }
+                    "unsubscribe" => {
+                        /*取消订阅响应*/
+                        let success = json_value["success"].as_bool();
+                        match success {
+                            None => {}
+                            Some(s) => {
+                                if s {
+                                    res_data.code = -201;
+                                    res_data.message = format!("取消-订阅成功:{}", json_value["request"].clone().to_string());
+                                } else {
+                                    res_data.code = 400;
+                                    res_data.message = format!("取消-订阅失败:{}", json_value["error"].as_str().unwrap());
+                                }
+                                return res_data;
+                            }
+                        }
+                    }
+                    _ => {}
+                }
+            }
+        }
+
+        let group = json_value["group"].as_str();
+        match group {
+            Some(ch) => {
+                let data = json_value["data"].clone();
+                res_data.channel = format!("{}", ch);
+                res_data.code = 200;
+                res_data.data = data.clone();
+                res_data.message = "推送数据".to_string();
+                return res_data;
+                // match data {
+                //     Some(_) => {
+                //
+                //     }
+                //     None => {
+                //         res_data.channel = format!("{}", ch);
+                //         res_data.code = 400;
+                //         res_data.data = data.clone();
+                //         return res_data;
+                //     }
+                // }
+            }
+            None => {}
+        }
+
+
+        res_data.code = 400;
+        res_data.message = format!("未知响应内容");
+        res_data.data = text.parse().unwrap();
+        trace!("--------------------------------");
+        res_data
+    }
+
+    fn parse_zip_data(p0: Vec<u8>) -> String {
+        // 创建一个GzDecoder的实例,将压缩数据作为输入
+        let mut decoder = GzDecoder::new(&p0[..]);
+
+        // 创建一个缓冲区来存放解压缩后的数据
+        let mut decompressed_data = Vec::new();
+
+        // 读取解压缩的数据到缓冲区中
+        decoder.read_to_end(&mut decompressed_data).expect("解压缩失败");
+        let result = from_utf8(&decompressed_data)
+            .expect("解压缩后的数据不是有效的UTF-8");
+
+        // info!("解压缩数据 {:?}", result);
+        result.to_string()
+    }
+}
+

+ 2 - 0
exchanges/src/lib.rs

@@ -29,4 +29,6 @@ pub mod coinex_swap_rest;
 pub mod coinex_swap_ws;
 pub mod htx_swap_ws;
 pub mod htx_swap_rest;
+pub mod bitmart_swap_ws;
+pub mod bitmart_swap_rest;
 

+ 296 - 0
exchanges/tests/bitmart_swap_test.rs

@@ -0,0 +1,296 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+
+use serde_json::json;
+use tokio::sync::Mutex;
+use tracing::trace;
+use exchanges::bitmart_swap_rest::BitMartSwapRest;
+use exchanges::bitmart_swap_ws::{BitMartSwapLogin, BitMartSwapSubscribeType, BitMartSwapWs, BitMartSwapWsType};
+
+use exchanges::htx_swap_rest::HtxSwapRest;
+use exchanges::htx_swap_ws::{HtxSwapLogin, HtxSwapSubscribeType, HtxSwapWs, HtxSwapWsType};
+use exchanges::proxy;
+use exchanges::response_base::ResponseData;
+
+
+
+const ACCESS_KEY: &str = "80618e45710812162b04892c7ee5ead4a3cc3e56";
+const SECRET_KEY: &str = "6c6c98544461bbe71db2bca4c6d7fd0021e0ba9efc215f9c6ad41852df9d9df9";
+const API_MEMO: &str = "test001";
+
+//ws-订阅公共频道信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn ws_custom_subscribe() {
+    global::log_utils::init_log_with_trace();
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+
+    // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
+    // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
+
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+
+    //读取
+    let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
+    let _tr = tokio::spawn(async move {
+        trace!("线程-数据读取-开启");
+        loop {
+            // 从通道中接收并丢弃所有的消息,直到通道为空
+            while let Ok(Some(_)) = read_rx.try_next() {
+
+                // 从通道中接收并丢弃所有的消息,直到通道为空
+                while let Ok(Some(_)) = read_rx.try_next() {
+                    // 消息被忽略
+                }
+            }
+        }
+        // trace!("线程-数据读取-结束");
+    });
+
+    //写数据
+    // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
+
+    let fun = move |data: ResponseData| {
+        async move {
+            trace!("---传入的方法~~~~{:?}", data);
+        }
+    };
+    let param = BitMartSwapLogin {
+        api_key: ACCESS_KEY.to_string(),
+        secret: SECRET_KEY.to_string(),
+        api_memo: API_MEMO.to_string(),
+    };
+    let t1 = tokio::spawn(async move {
+        let mut ws = get_ws(Option::from(param), BitMartSwapWsType::Private);
+        ws.set_symbols(vec!["BTC_USDT".to_string(), "ETC_USDT".to_string()]);
+        ws.set_subscribe(vec![
+            // BitMartSwapSubscribeType::PuFuturesDepth,
+
+            BitMartSwapSubscribeType::PrFuturesOrders,
+            BitMartSwapSubscribeType::PrFuturesPositions,
+            BitMartSwapSubscribeType::PrFuturesBalances,
+        ]);
+        //链接
+        let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+        ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        trace!("test 唯一线程结束--");
+    });
+    tokio::try_join!(t1).unwrap();
+    trace!("当此结束");
+    trace!("重启!");
+    trace!("参考交易所关闭");
+    return;
+}
+
+fn get_ws(btree_map: Option<BitMartSwapLogin>, ws_type: BitMartSwapWsType) -> BitMartSwapWs {
+    let bitmart_ws = BitMartSwapWs::new(btree_map, ws_type);
+    bitmart_ws
+}
+
+
+/*深度*/
+#[tokio::test]
+async fn rest_get_depth_test() {
+    global::log_utils::init_log_with_trace();
+    proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_depth("BTCUSDT".to_string()).await;
+    trace!("htx--深度--{:?}", req_data);
+    trace!("htx--深度--{}", req_data.data);
+}
+
+
+/*合约信息*/
+#[tokio::test]
+async fn rest_get_market_test() {
+    global::log_utils::init_log_with_trace();
+    proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_market("BTCUSDT".to_string()).await;
+    trace!("htx--合约信息--{:?}", req_data);
+    trace!("htx--合约信息--{}", req_data.data);
+}
+
+
+/*查询合约账户*/
+#[tokio::test]
+async fn rest_get_account_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_account(json!({
+         })
+    ).await;
+    trace!("htx--查询合约账户--{:?}", req_data);
+    trace!("htx--查询合约账户--{}", req_data.data);
+}
+
+/*用户仓位列表*/
+#[tokio::test]
+async fn rest_get_user_position_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_user_position("BTCUSDT".to_string()).await;
+    trace!("htx--用户仓位列表--{:?}", req_data);
+    trace!("htx--用户仓位列表--{}", req_data.data);
+}
+
+/*查询合约订单列表*/
+#[tokio::test]
+async fn rest_get_orders_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_orders(json!({
+        "symbol":"BTCUSDT"
+    })).await;
+    trace!("htx--查询合约订单列表--{:?}", req_data);
+    trace!("htx--查询合约订单列表--{}", req_data.data);
+}
+
+/*查询单个订单详情*/
+#[tokio::test]
+async fn rest_get_order_details_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_order_details(json!({
+        "symbol":"BTCUSDT",
+        "order_id":"123"
+    })).await;
+    trace!("htx--查询单个订单详情--{:?}", req_data);
+    trace!("htx--查询单个订单详情--{}", req_data.data);
+}
+/*合约交易下单*/
+#[tokio::test]
+async fn rest_swap_order_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+
+    let req_data = ret.swap_order(json!({
+        "symbol":"SHIB-USDT",
+        "type":"limit",
+        "side":"limit",
+        "leverage":"open",
+        "direction":"buy",
+        "price":"0.000001359",
+        "volume":1,
+        "lever_rate":1,
+
+    })).await;
+    trace!("htx--合约交易下单--{:?}", req_data);
+    trace!("htx--合约交易下单--{}", req_data.data);
+}
+
+/*全部撤单*/
+#[tokio::test]
+async fn rest_cancel_price_order_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+
+    let req_data = ret.cancel_price_order(json!({  })).await;
+    trace!("htx--全部撤单--{:?}", req_data);
+    trace!("htx--全部撤单--{}", req_data.data);
+}
+
+/*撤单*/
+#[tokio::test]
+async fn rest_cancel_order_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+
+    let req_data = ret.cancel_order(json!({
+        "order_id":"123"
+    })).await;
+    trace!("htx--撤单--{:?}", req_data);
+    trace!("htx--撤单--{}", req_data.data);
+}
+
+
+// /*设置持仓模式*/
+// #[tokio::test]
+// async fn rest_setting_dual_mode_test() {
+//     global::log_utils::init_log_with_trace();
+//     // proxy_handle();
+//
+//     let mut ret = get_rest();
+//
+//     let req_data = ret.setting_dual_mode(json!({
+//         "margin_account":"USDT",
+//         "position_mode":"dual_side",
+//     })).await;
+//     trace!("htx--设置持仓模式--{:?}", req_data);
+//     trace!("htx--设置持仓模式--{}", req_data.data);
+// }
+
+
+/*设置杠杆*/
+#[tokio::test]
+async fn rest_setting_dual_leverage_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+
+    let req_data = ret.setting_dual_leverage(json!({
+        "symbol":"BTCUSDT",
+        "open_type":"cross",
+    })).await;
+    trace!("htx--设置持仓模式--{:?}", req_data);
+    trace!("htx--设置持仓模式--{}", req_data.data);
+}
+
+
+fn get_rest() -> BitMartSwapRest {
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+    btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
+    btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
+    btree_map.insert("api_memo".to_string(), API_MEMO.to_string());
+
+
+    let bitmart_exc = BitMartSwapRest::new(false, btree_map);
+    bitmart_exc
+}
+
+// 检测是否走代理
+pub fn proxy_handle() {
+    if proxy::ParsingDetail::http_enable_proxy() {
+        trace!("检测有代理配置,配置走代理");
+    }
+}

+ 5 - 5
exchanges/tests/htx_swap_test.rs

@@ -11,8 +11,8 @@ use exchanges::htx_swap_ws::{HtxSwapLogin, HtxSwapSubscribeType, HtxSwapWs, HtxS
 use exchanges::proxy;
 use exchanges::response_base::ResponseData;
 
-const ACCESS_KEY: &str = "984b0506-a251aad4-e16133ec-ed2htwf5tf";
-const SECRET_KEY: &str = "6baacfd0-ebeb0de5-9aeb40ed-d2dbd";
+const ACCESS_KEY: &str = "";
+const SECRET_KEY: &str = "";
 
 
 //ws-订阅公共频道信息
@@ -79,11 +79,11 @@ async fn ws_custom_subscribe() {
         secret: SECRET_KEY.to_string(),
     };
     let t1 = tokio::spawn(async move {
-        let mut ws = get_ws(Option::from(param), HtxSwapWsType::Private);
-        ws.set_symbols(vec!["BTC_USDT".to_string()]);
+        let mut ws = get_ws(Option::from(param), HtxSwapWsType::Public);
+        ws.set_symbols(vec!["BTC_USDT".to_string(),"ETC_USDT".to_string()]);
         ws.set_subscribe(vec![
             HtxSwapSubscribeType::PuFuturesDepth,
-            HtxSwapSubscribeType::PrFuturesOrders
+            // HtxSwapSubscribeType::PrFuturesOrders
 
             // HtxSwapSubscribeType::PrFuturesOrders,
             // HtxSwapSubscribeType::PrFuturesPositions,