JiahengHe 1 vuosi sitten
vanhempi
commit
7288ffcb2d

+ 4 - 1
exchanges/Cargo.toml

@@ -46,4 +46,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
 ##生成 xlsx
 rust_xlsxwriter = "0.58.0"
 
-once_cell = "1.18.0"
+once_cell = "1.18.0"
+
+##url编码
+percent-encoding = "2.1.0"

+ 1 - 1
exchanges/src/coinex_swap_rest.rs

@@ -417,7 +417,7 @@ impl CoinexSwapRest {
 
     //查询子账号列表
     pub async fn account_get(&mut self) -> ResponseData {
-        let mut params = serde_json::json!({
+        let params = serde_json::json!({
             "is_frozen":false,
             "page":1,
             "limit":100,

+ 508 - 0
exchanges/src/htx_swap_rest.rs

@@ -0,0 +1,508 @@
+use std::collections::BTreeMap;
+
+use chrono::Utc;
+use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode};
+use reqwest::Client;
+use reqwest::header::HeaderMap;
+use ring::hmac;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use rust_decimal_macros::dec;
+use serde_json::Value;
+use tracing::{error, info, trace};
+
+use crate::response_base::ResponseData;
+
+// 定义用于 URL 编码的字符集
+pub const FRAGMENT: &AsciiSet = &CONTROLS
+    .add(b' ')
+    .add(b':')
+    .add(b'=')
+    .add(b'+')
+    .add(b'/').add(b'?').add(b'#')
+    .add(b'[').add(b']').add(b'@').add(b'!').add(b'$').add(b'&')
+    .add(b'\'').add(b'(').add(b')').add(b'*').add(b',')
+    .add(b';').add(b'"').add(b'%')
+;
+
+
+#[derive(Clone)]
+pub struct HtxSwapRest {
+    label: String,
+    base_url: String,
+    client: reqwest::Client,
+    /*******参数*/
+    //登录所需参数
+    login_param: BTreeMap<String, String>,
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+}
+
+impl HtxSwapRest {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> HtxSwapRest
+    {
+        return HtxSwapRest::new_label("default-HtxSwapRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> HtxSwapRest
+    {
+        let base_url: String = String::from("https://api.hbdm.vn");
+        info!("走普通通道:{}",base_url);
+
+        if is_colo {} else {}
+        /*****返回结构体*******/
+        HtxSwapRest {
+            label,
+            base_url: base_url.to_string(),
+            client: Client::new(),
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: dec!(0.0),
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************rest请求函数********************************************************/
+    /*******************************************************************************************************/
+    //获取服务器当前时间
+    pub async fn get_server_time(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v1".to_string(),
+                                format!("/timestamp"),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+
+    //行情 (【通用】获取聚合行情)
+    pub async fn get_ticker(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "/linear-swap-ex/market".to_string(),
+                                format!("/detail/merged"),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+    //合约信息 (【通用】获取合约信息)
+    pub async fn get_market(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "/linear-swap-api/v1".to_string(),
+                                format!("/swap_contract_info"),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+    //查询合约账户 (【全仓】获取用户账户信息)
+    pub async fn get_account(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/linear-swap-api/v1".to_string(),
+                                format!("/swap_cross_account_info"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //用户仓位列表(【全仓】获取用户持仓信息)
+    pub async fn get_user_position(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/linear-swap-api/v1".to_string(),
+                                format!("/swap_cross_position_info"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //查询合约订单列表 (【全仓】获取合约订单信息)
+    pub async fn get_orders(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/linear-swap-api/v1".to_string(),
+                                format!("/swap_cross_order_info"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+
+    //查询单个订单详情 (【全仓】获取订单明细信息)
+    pub async fn get_order_details(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/linear-swap-api/v1".to_string(),
+                                format!("/swap_cross_order_info"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //合约交易下单 (【全仓】合约下单)
+    pub async fn swap_order(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/linear-swap-api/v1".to_string(),
+                                format!("/swap_cross_order"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    // 全部撤单(【全仓】全部撤单)
+    pub async fn cancel_price_order(&mut self, params: Value) -> ResponseData {
+        self.request("POST".to_string(),
+                     "/linear-swap-api/v1".to_string(),
+                     format!("/swap_cross_cancelall"),
+                     true,
+                     params,
+        ).await
+    }
+
+    //撤销单个订单 (【全仓】撤销订单)
+    pub async fn cancel_order(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/linear-swap-api/v1".to_string(),
+                                format!("/swap_cross_cancel"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //设置持仓模式 (【全仓】切换持仓模式)
+    pub async fn setting_dual_mode(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/linear-swap-api/v1".to_string(),
+                                format!("/swap_cross_switch_position_mode"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //设置杠杆(【全仓】切换杠杆)
+    pub async fn setting_dual_leverage(&mut self, params: Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/linear-swap-api/v1".to_string(),
+                                format!("/swap_cross_switch_lever_rate"),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    pub fn get_delays(&self) -> Vec<i64> {
+        self.delays.clone()
+    }
+    pub fn get_avg_delay(&self) -> Decimal {
+        self.avg_delay.clone()
+    }
+    pub fn get_max_delay(&self) -> i64 {
+        self.max_delay.clone()
+    }
+    fn get_delay_info(&mut self) {
+        let last_100 = if self.delays.len() > 100 {
+            self.delays[self.delays.len() - 100..].to_vec()
+        } else {
+            self.delays.clone()
+        };
+
+        let max_value = last_100.iter().max().unwrap();
+        if max_value.clone().to_owned() > self.max_delay {
+            self.max_delay = max_value.clone().to_owned();
+        }
+
+        let sum: i64 = last_100.iter().sum();
+        let sum_v = Decimal::from_i64(sum).unwrap();
+        let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+        self.avg_delay = (sum_v / len_v).round_dp(1);
+        self.delays = last_100.clone().into_iter().collect();
+    }
+    //调用请求
+    async fn request(&mut self,
+                     requesst_type: String,
+                     prefix_url: String,
+                     request_url: String,
+                     is_login: bool,
+                     params: Value) -> ResponseData
+    {
+        // trace!("login_param:{:?}", self.login_param);
+        //解析账号信息
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        if self.login_param.contains_key("access_key") {
+            access_key = self.login_param.get("access_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            secret_key = self.login_param.get("secret_key").unwrap().to_string();
+        }
+        let mut is_login_param = true;
+        if access_key == "" || secret_key == "" {
+            is_login_param = false
+        }
+
+        //请求头配置-如果需要登录则存在额外配置
+        let mut body = "".to_string();
+
+        let mut headers = HeaderMap::new();
+        if requesst_type == "GET" {
+            headers.insert("Content-type", "application/x-www-form-urlencoded".parse().unwrap());
+            headers.insert("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ".parse().unwrap());
+        } else {
+            headers.insert("Accept", "application/json".parse().unwrap());
+            headers.insert("Content-Type", "application/json".parse().unwrap());
+        }
+
+        let utc_now = Utc::now();
+        let timestamp = utc_now.format("%Y-%m-%dT%H:%M:%S").to_string();
+        let mut params_str = "".to_string();
+        //是否需要登录-- 组装sing
+        let mut sing = "".to_string();
+        if is_login {
+            if !is_login_param {
+                let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+                return e;
+            } else {//需要登录-且登录参数齐全
+                let mut login_param = serde_json::json!({});
+                login_param["AccessKeyId"] = Value::from(access_key);
+                login_param["SignatureMethod"] = Value::from("HmacSHA256");
+                login_param["SignatureVersion"] = Value::from("2");
+                login_param["Timestamp"] = Value::from(timestamp);
+
+                //如果是get 所有参所也要参与校验,如果是post 只有校验参数需要校验
+                let mut verify_param = serde_json::json!({});
+                if requesst_type == "GET" {
+                    merge_json(&mut verify_param, &params);
+                    merge_json(&mut verify_param, &login_param);
+                    body = "{}".to_string();
+                } else if requesst_type == "POST" {
+                    merge_json(&mut verify_param, &login_param);
+                    body = params.to_string();
+                }
+                trace!("需要校验参数:\n{}", verify_param.to_string());
+
+
+                //4. ASCII码的顺序对参数名进行排序
+                let verify_param_str = json_to_query_string(verify_param);
+                trace!("排序后的拼接参数:\n{}", verify_param_str);
+                params_str = verify_param_str.to_string().clone();
+
+                //组装sing
+                sing = Self::sign(secret_key.clone(),
+                                  requesst_type.clone(),
+                                  prefix_url.clone(),
+                                  request_url.clone(),
+                                  verify_param_str.clone(),
+                );
+                // sing = utf8_percent_encode(sing.as_str(), FRAGMENT).to_string();
+                // sing = sing.to_uppercase();
+                sing = utf8_percent_encode(&sing, FRAGMENT).to_string();
+                trace!("sing:{}", sing);
+                // ?AccessKeyId=e2xxxxxx-99xxxxxx-84xxxxxx-7xxxx&SignatureMethod=HmacSHA256&SignatureVersion=2&Timestamp=2017-05-11T15%3A19%3A30&Signature=    4F65x5A2bLyMWVQj3Aqp%2BB4w%2BivaA7n5Oi2SuYtCJ9o%3D
+                // ?AccessKeyId=58edb6bb-qz5c4v5b6n-24498508-c5919&SignatureMethod=HmacSHA256&SignatureVersion=2&Timestamp=2024-04-29T07%3A26%3A39&Signature=  R0SAEWSEC+A6HS5URSHZIFOZBYQBRCLH0DTDSAL0HLS=
+                //                                                                                                                                             4F65x5A2bLyMWVQj3Aqp+B4w+ivaA7n5Oi2SuYtCJ9o=
+            }
+        } else {
+            if requesst_type == "GET" {
+                let verify_param_str = json_to_query_string(params);
+                params_str = verify_param_str.to_string().clone();
+                body = "{}".to_string();
+            } else if requesst_type == "POST" {
+                body = params.to_string();
+            }
+        }
+
+        // trace!("headers:{:?}", headers);
+        let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let response = self.http_tool(
+            base_url.clone(),
+            requesst_type.to_string(),
+            params_str,
+            body.clone(),
+            sing.clone(),
+            headers,
+        ).await;
+
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        self.get_delay_info();
+
+        response
+    }
+
+    // pub fn headers(access_key: String, timestamp: String, sign: String) -> HeaderMap {
+    //     let mut headers = HeaderMap::new();
+    //     // headers.insert("KEY", access_key.clone().parse().unwrap());
+    //     // headers.insert("Timestamp", timestamp.clone().parse().unwrap());
+    //     // headers.insert("SIGN", sign.clone().parse().unwrap());
+    //     headers
+    // }
+    pub fn sign(secret_key: String, requesst_type: String,
+                prefix_url: String, request_url: String,
+                verify_param: String) -> String
+    {
+        let message = format!("{}\napi.hbdm.vn\n{}{}\n{}",
+                              requesst_type,
+                              prefix_url, request_url,
+                              verify_param
+        );
+        // trace!("**********", );
+        trace!("组装数据:\n{}", message);
+        // trace!("**********", );
+
+        let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+        let result = hmac::sign(&hmac_key, &message.as_bytes());
+        let sign = base64::encode(result);
+        sign
+    }
+
+
+    async fn http_tool(&mut self, request_path: String,
+                       request_type: String,
+                       params: String,
+                       body: String,
+                       sing: String, headers: HeaderMap) -> ResponseData {
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+        let params_str = if sing.len() > 0 {
+            format!("{}&Signature={}", params.clone(), sing)
+        } else {
+            format!("{}", params.clone())
+        };
+        let addrs_url = if params_str.len() > 0 {
+            format!("{}?{}", url.clone(), params_str)
+        } else {
+            format!("{}", url.clone())
+        };
+
+        trace!("url-----:???{}",url.clone());
+        trace!("addrs_url-----:???{}",addrs_url.clone());
+        trace!("param-----:???{}",params.clone());
+        trace!("param_str-----:???{}",params_str.clone());
+        trace!("body-----:???{}",body.clone());
+
+        let request_builder = match request_type.as_str() {
+            "GET" => self.client.get(addrs_url.clone()).headers(headers),
+            "POST" => self.client.post(addrs_url.clone()).body(body).headers(headers),
+            // "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
+            // "PUT" => self.client.put(url.clone()).json(&params),
+            _ => {
+                panic!("{}", format!("错误的请求类型:{}", request_type.clone()))
+            }
+        };
+
+        // 读取响应的内容
+        let response = request_builder.send().await.unwrap();
+        let is_success = response.status().is_success(); // 先检查状态码
+        let text = response.text().await.unwrap();
+        // trace!("text:???{:?}",text);
+        return if is_success {
+            self.on_success_data(&text)
+        } else {
+            self.on_error_data(&text, &addrs_url, &params)
+        };
+    }
+
+    pub fn on_success_data(&mut self, text: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text).unwrap();
+        // return  ResponseData::new(self.label.clone(), 200, "success".to_string(), json_value);
+
+        let status = json_value["status"].as_str().unwrap();
+        match status {
+            "ok" => {
+                //判断是否有code ,没有表示特殊接口,直接返回
+                if json_value.get("data").is_some() {
+                    let data = json_value.get("data").unwrap();
+                    ResponseData::new(self.label.clone(), 200, "success".to_string(), data.clone())
+                } else {
+                    ResponseData::new(self.label.clone(), 200, "success".to_string(), json_value)
+                }
+            }
+            _ => {
+                ResponseData::new(self.label.clone(), 400, "error".to_string(), json_value)
+            }
+        }
+    }
+
+    pub fn on_error_data(&mut self, text: &String, base_url: &String, params: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text);
+
+        match json_value {
+            Ok(data) => {
+                let message;
+
+                if !data["message"].is_null() {
+                    message = format!("{}:{}", data["label"].as_str().unwrap(), data["message"].as_str().unwrap());
+                } else {
+                    message = data["label"].to_string();
+                }
+
+                let mut error = ResponseData::error(self.label.clone(), message);
+                error.message = format!("请求地址:{}, 请求参数:{}, 报错内容:{}。", base_url, params, error.message);
+                error
+            }
+            Err(e) => {
+                error!("解析错误:{:?}", e);
+                let error = ResponseData::error("".to_string(),
+                                                format!("json 解析失败:{},相关参数:{}", e, text));
+                error
+            }
+        }
+    }
+}
+
+
+// 合并两个 JSON 对象的函数
+fn merge_json(a: &mut Value, b: &Value) {
+    match (a, b) {
+        (Value::Object(ref mut a_map), Value::Object(b_map)) => {
+            for (k, v) in b_map {
+                merge_json(a_map.entry(k.clone()).or_insert(Value::Null), v);
+            }
+        }
+        (a, b) => {
+            *a = b.clone();
+        }
+    }
+}
+
+
+// 函数:将 JSON 对象转换为排序后的查询字符串
+fn json_to_query_string(value: Value) -> String {
+    let mut params = BTreeMap::new();
+
+    if let Value::Object(obj) = value {
+        for (k, v) in obj {
+            // 确保只处理字符串值
+            if let Value::String(v_str) = v {
+                params.insert(k, v_str);
+            } else {
+                // 对于非字符串值,我们可以转换为字符串或执行其他处理
+                params.insert(k, v.to_string());
+            }
+        }
+    }
+
+    // 拼接键值对为查询字符串
+    params.iter()
+        .map(|(k, v)| format!("{}={}", utf8_percent_encode(k, FRAGMENT), utf8_percent_encode(v, FRAGMENT)))
+        .collect::<Vec<String>>()
+        .join("&")
+}

+ 467 - 0
exchanges/src/htx_swap_ws.rs

@@ -0,0 +1,467 @@
+use std::io::Read;
+use std::str::from_utf8;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+
+use chrono::Utc;
+use flate2::bufread::GzDecoder;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use once_cell::sync::Lazy;
+use ring::hmac;
+use serde_json::{json, Value};
+use tokio::sync::Mutex;
+use tokio::task;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace};
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::AbstractWsMode;
+
+pub(crate) static LOGIN_DATA: Lazy<Mutex<(bool, bool)>> = Lazy::new(|| {
+    println!("初始化...");
+    // 0: 需要登录, 1:是否已经登录
+    Mutex::new((false, false))
+});
+
+
+pub enum HtxSwapWsType {
+    Public,
+    Private,
+}
+
+
+//订阅频道
+#[derive(Clone)]
+pub enum HtxSwapSubscribeType {
+    // 深度
+    PuFuturesDepth,
+    // // 公开成交
+    // PuFuturesDeals,
+
+    // 订单
+    PrFuturesOrders,
+    // 仓位
+    PrFuturesPositions,
+    // 余额
+    PrFuturesBalances,
+}
+
+//账号信息
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct HtxSwapLogin {
+    pub api_key: String,
+    pub secret: String,
+}
+
+#[derive(Clone)]
+pub struct HtxSwapWs {
+    //类型
+    label: String,
+    //地址
+    address_url: String,
+    //账号信息
+    login_param: Option<HtxSwapLogin>,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<HtxSwapSubscribeType>,
+    //心跳间隔
+    _heartbeat_time: u64,
+}
+
+
+impl HtxSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************实例化一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(login_param: Option<HtxSwapLogin>, ws_type: HtxSwapWsType) -> HtxSwapWs {
+        return HtxSwapWs::new_label("default-HtxSwapWs".to_string(), login_param, ws_type);
+    }
+
+    pub fn new_label(label: String, login_param: Option<HtxSwapLogin>, ws_type: HtxSwapWsType) -> HtxSwapWs
+    {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            HtxSwapWsType::Public => {
+                let url = "wss://api.hbdm.vn/linear-swap-ws".to_string();
+                info!("走普通通道(不支持colo通道):{}", url);
+                url
+            }
+            HtxSwapWsType::Private => {
+                let url = "wss://api.hbdm.vn/linear-swap-notification".to_string();
+                info!("走普通通道(不支持colo通道):{}", url);
+                url
+            }
+        };
+
+        HtxSwapWs {
+            label,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            _heartbeat_time: 1000 * 10,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<HtxSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "-");
+        }
+        self.symbol_s = b_array;
+    }
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                HtxSwapSubscribeType::PuFuturesDepth => false,
+                // HtxSwapSubscribeType::PuFuturesDeals => false,
+                //
+                HtxSwapSubscribeType::PrFuturesOrders => true,
+                HtxSwapSubscribeType::PrFuturesPositions => true,
+                HtxSwapSubscribeType::PrFuturesBalances => true,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: HtxSwapSubscribeType, _login_param: Option<HtxSwapLogin>) -> Value {
+        // let access_key;
+        // let secret_key;
+        // match login_param {
+        //     None => {
+        //         access_key = "".to_string();
+        //         secret_key = "".to_string();
+        //     }
+        //     Some(param) => {
+        //         access_key = param.api_key.clone();
+        //         secret_key = param.secret.clone();
+        //     }
+        // }
+        // let cid = "";
+
+        match subscribe_type {
+            HtxSwapSubscribeType::PuFuturesDepth => {
+                json!({
+                      "sub":format!("market.{}.depth.step0", symbol.to_uppercase()),
+                      "id": "id5"
+                })
+            }
+
+            HtxSwapSubscribeType::PrFuturesOrders => {
+                json!({
+                        "op":"sub",
+                        "topic":format!("orders_cross.{}", symbol.to_lowercase())
+                })
+            }
+            HtxSwapSubscribeType::PrFuturesPositions => {
+                json!({
+                        "op":"sub",
+                        "topic":format!("positions_cross.{}", symbol.to_uppercase())
+                })
+            }
+            HtxSwapSubscribeType::PrFuturesBalances => {
+                json!({
+                        "op":"sub",
+                        "topic":format!("accounts_cross.USDT")
+                })
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> Vec<Value> {
+        let mut args = vec![];
+        // 只获取第一个
+        // let symbol = self.symbol_s.get(0).unwrap().replace("_", "-");
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(),
+                                                  subscribe_type.clone(),
+                                                  self.login_param.clone(),
+                );
+                args.push(ty_str);
+            }
+        }
+        args
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                             is_shutdown_arc: Arc<AtomicBool>,
+                                             handle_function: F,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let login_param = self.login_param.clone();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let label = self.label.clone();
+        // let heartbeat_time = self.heartbeat_time.clone();
+
+
+        //心跳-- 方法内部线程启动
+        // let write_tx_clone1 = Arc::clone(write_tx_am);
+        let write_tx_clone2 = Arc::clone(write_tx_am);
+        // tokio::spawn(async move {
+        //     trace!("线程-异步心跳-开始");
+        //     let ping_str = json!({
+        //         "method": "server.ping",
+        //         "params": {},
+        //         "id": 1
+        //     });
+        //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
+        //     trace!("线程-异步心跳-结束");
+        // });
+
+        //设置订阅
+        let mut subscribe_array = vec![];
+
+
+        for s in subscription {
+            subscribe_array.push(s.to_string());
+        }
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            info!("启动连接");
+            loop {
+                info!("htx_usdt_swap socket 连接中……");
+                // 需要登录
+                if login_is {
+                    let mut login_data = LOGIN_DATA.lock().await;
+                    let login_param_real = login_param.clone().unwrap();
+                    login_data.0 = true;
+                    let utc_now = Utc::now();
+                    let timestamp = utc_now.format("%Y-%m-%dT%H:%M:%S").to_string();
+                    let timestamp_str = percent_encoding::utf8_percent_encode(timestamp.clone().as_str(), crate::htx_swap_rest::FRAGMENT).to_string();
+
+                    let access_key = login_param_real.api_key.clone();
+                    let secret_key = login_param_real.secret.clone();
+                    let param_str = format!("AccessKeyId={}&SignatureMethod=HmacSHA256&SignatureVersion=2&Timestamp={}", access_key, timestamp_str);
+
+                    let signature = {
+                        let message = format!("GET\napi.hbdm.vn\n/linear-swap-notification\n{}", param_str);
+                        trace!("组装数据:\n{}", message);
+
+                        let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+                        let result = hmac::sign(&hmac_key, &message.as_bytes());
+                        let sign = base64::encode(result);
+                        sign
+                    };
+                    let login_param = json!({
+                        "op": "auth",
+                        "type": "api",
+                        "AccessKeyId": access_key,
+                        "SignatureMethod": "HmacSHA256",
+                        "SignatureVersion": "2",
+                        "Timestamp": timestamp,
+                        "Signature": signature,
+                    });
+                    let login_str = login_param.to_string();
+                    info!("发起ws登录: {}", login_str);
+                    let write_tx_c = Arc::clone(&write_tx_clone2);
+                    AbstractWsMode::send_subscribe(write_tx_c, Message::Text(login_str)).await;
+                }
+
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text_sync, Self::message_ping, Self::message_pong, Self::message_binary_sync,
+                ).await;
+                let mut login_data = LOGIN_DATA.lock().await;
+                // 断联后 设置为没有登录
+                login_data.1 = false;
+                info!("htx_usdt_swap socket 断连,1s以后重连……");
+                error!("htx_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub async fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text).await;
+        Option::from(response_data)
+    }
+    pub fn message_text_sync(text: String) -> Option<ResponseData> {
+        // 使用 tokio::task::block_in_place 来等待异步函数的结果
+        task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(Self::message_text(text))
+        })
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
+    }
+    //数据解析-二进制
+    pub async fn message_binary(binary: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = Self::parse_zip_data(binary);
+        let response_data = Self::ok_text(message_str).await;
+        Option::from(response_data)
+    }
+    pub fn message_binary_sync(binary: Vec<u8>) -> Option<ResponseData> {
+        // 使用 tokio::task::block_in_place 来等待异步函数的结果
+        task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(Self::message_binary(binary))
+        })
+    }
+    //数据解析
+    pub async fn ok_text(text: String) -> ResponseData
+    {
+        trace!("原始数据:{}", text);
+        let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
+        let json_value: Value = serde_json::from_str(&text).unwrap();
+
+        /*公共:响应*/
+        //心跳包
+        let ping = json_value["ping"].as_i64();
+        match ping {
+            Some(ping_ts) => {
+                let pong = json!({
+                    "pong": ping_ts
+                });
+                return ResponseData::new("".to_string(), -302, "success".to_string(), pong);
+            }
+            None => {}
+        }
+        //推送数据
+        let status = json_value["status"].as_str();
+        match status {
+            Some(v) => {
+                match v {
+                    "ok" => {
+                        res_data.channel = format!("{}", v);
+                        res_data.code = -201;
+                        res_data.data = json_value["data"].clone();
+                    }
+                    "error" => {
+                        res_data.code = 400;
+                        res_data.message = format!("{}", json_value["err-msg"].as_str().unwrap());
+                        res_data.channel = format!("{}", json_value["id"].as_str().unwrap());
+                    }
+                    &_ => {}
+                }
+                return res_data;
+            }
+            None => {}
+        }
+        let ch = json_value["ch"].as_str();
+        match ch {
+            Some(channel) => {
+                res_data.channel = channel.parse().unwrap();
+                res_data.code = 200;
+                res_data.data = json_value["tick"].clone();
+                return res_data;
+            }
+            None => {}
+        }
+
+
+        /*私有:响应*/
+        let op = json_value["op"].as_str().unwrap();
+        match op {
+            "auth" => {
+                let op = json_value["err-code"].as_i64().unwrap();
+                res_data.channel = "auth".to_string();
+                if op == 0 {
+                    res_data.code = -200;
+                    res_data.message = "登录成功".to_string();
+                    res_data.data = json_value["data"].clone();
+                } else {
+                    res_data.code = 400;
+                    res_data.message = format!("登录失败:{}", json_value["err-msg"].as_str().unwrap());
+                }
+                return res_data;
+            }
+            "ping" => {
+                let ts = json_value["ts"].as_str().unwrap();
+                let pong = json!({
+                    "op": "pong",
+                    "ts": ts
+                });
+                return ResponseData::new("".to_string(), -302, "success".to_string(), pong);
+            }
+            "sub" => {
+                res_data.channel = json_value["topic"].as_str().unwrap().to_string();
+                res_data.code = -201;
+                res_data.message = "订阅成功".to_string();
+                return res_data;
+            }
+            "notify" => {
+                res_data.channel = json_value["topic"].as_str().unwrap().to_string();
+                res_data.code = 200;
+                res_data.message = "推送数据".to_string();
+                if json_value.get("data").is_some() {
+                    res_data.data = json_value["data"].clone();
+                } else {
+                    res_data.data = json_value.clone();
+                }
+
+                return res_data;
+            }
+            _ => {}
+        }
+
+
+        res_data.code = 400;
+        res_data.message = format!("未知响应内容");
+        res_data.data = text.parse().unwrap();
+        trace!("--------------------------------");
+        res_data
+    }
+
+    fn parse_zip_data(p0: Vec<u8>) -> String {
+        // 创建一个GzDecoder的实例,将压缩数据作为输入
+        let mut decoder = GzDecoder::new(&p0[..]);
+
+        // 创建一个缓冲区来存放解压缩后的数据
+        let mut decompressed_data = Vec::new();
+
+        // 读取解压缩的数据到缓冲区中
+        decoder.read_to_end(&mut decompressed_data).expect("解压缩失败");
+        let result = from_utf8(&decompressed_data)
+            .expect("解压缩后的数据不是有效的UTF-8");
+
+        // info!("解压缩数据 {:?}", result);
+        result.to_string()
+    }
+}
+

+ 2 - 0
exchanges/src/lib.rs

@@ -27,4 +27,6 @@ pub mod xlsx_utils;
 pub mod bybit_swap_ws;
 pub mod coinex_swap_rest;
 pub mod coinex_swap_ws;
+pub mod htx_swap_ws;
+pub mod htx_swap_rest;
 

+ 7 - 0
exchanges/src/socket_tool.rs

@@ -103,6 +103,13 @@ impl AbstractWsMode {
                        -302 -客户端收到服务器心跳自定义,需要响应自定义
                     */
                     match code {
+                        200 => {
+                            let mut data_c = data.clone();
+                            data_c.ins = Instant::now();
+                            data_c.time = Utc::now().timestamp_millis();
+
+                            handle_function(data_c).await;
+                        }
                         -200 => {
                             //登录成功
                             info!("ws登录成功:{:?}", data);

+ 314 - 0
exchanges/tests/htx_swap_test.rs

@@ -0,0 +1,314 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+
+use serde_json::json;
+use tokio::sync::Mutex;
+use tracing::trace;
+
+use exchanges::htx_swap_rest::HtxSwapRest;
+use exchanges::htx_swap_ws::{HtxSwapLogin, HtxSwapSubscribeType, HtxSwapWs, HtxSwapWsType};
+use exchanges::proxy;
+use exchanges::response_base::ResponseData;
+
+const ACCESS_KEY: &str = "";
+const SECRET_KEY: &str = "";
+
+
+//ws-订阅公共频道信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn ws_custom_subscribe() {
+    global::log_utils::init_log_with_trace();
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+
+    // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
+    // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
+
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+
+    //读取
+    let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
+    let _tr = tokio::spawn(async move {
+        trace!("线程-数据读取-开启");
+        loop {
+            // 从通道中接收并丢弃所有的消息,直到通道为空
+            while let Ok(Some(_)) = read_rx.try_next() {
+
+                // 从通道中接收并丢弃所有的消息,直到通道为空
+                while let Ok(Some(_)) = read_rx.try_next() {
+                    // 消息被忽略
+                }
+            }
+        }
+        // trace!("线程-数据读取-结束");
+    });
+
+    //写数据
+    // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
+
+    let fun = move |data: ResponseData| {
+        async move {
+            trace!("---传入的方法~~~~{:?}", data);
+        }
+    };
+    let param = HtxSwapLogin {
+        api_key: ACCESS_KEY.to_string(),
+        secret: SECRET_KEY.to_string(),
+    };
+    let t1 = tokio::spawn(async move {
+        let mut ws = get_ws(Option::from(param), HtxSwapWsType::Public);
+        ws.set_symbols(vec!["BTC_USDT".to_string(),"ETC_USDT".to_string()]);
+        ws.set_subscribe(vec![
+            HtxSwapSubscribeType::PuFuturesDepth,
+            // HtxSwapSubscribeType::PrFuturesOrders
+
+            // HtxSwapSubscribeType::PrFuturesOrders,
+            // HtxSwapSubscribeType::PrFuturesPositions,
+            // HtxSwapSubscribeType::PrFuturesBalances,
+        ]);
+        //链接
+        let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+        ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        trace!("test 唯一线程结束--");
+    });
+    tokio::try_join!(t1).unwrap();
+    trace!("当此结束");
+    trace!("重启!");
+    trace!("参考交易所关闭");
+    return;
+}
+
+fn get_ws(btree_map: Option<HtxSwapLogin>, ws_type: HtxSwapWsType) -> HtxSwapWs {
+    let htx_ws = HtxSwapWs::new(btree_map, ws_type);
+    htx_ws
+}
+
+
+/*服务器时间戳*/
+#[tokio::test]
+async fn rest_get_server_time_test() {
+    global::log_utils::init_log_with_trace();
+    proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_server_time().await;
+    trace!("htx--服务器时间戳--{:?}", req_data);
+    trace!("htx--服务器时间戳--{}", req_data.data);
+}
+
+
+/*获取聚合行情*/
+#[tokio::test]
+async fn rest_get_ticker_test() {
+    global::log_utils::init_log_with_trace();
+    proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_ticker(json!({
+            "contract_type":"swap"
+         })
+    ).await;
+    trace!("htx--获取聚合行情--{:?}", req_data);
+    trace!("htx--获取聚合行情--{}", req_data.data);
+}
+
+
+
+/*合约信息*/
+#[tokio::test]
+async fn rest_get_market_test() {
+    global::log_utils::init_log_with_trace();
+    proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_market(json!({
+            "contract_code":"BTC-USDT"
+         })
+    ).await;
+    trace!("htx--合约信息--{:?}", req_data);
+    trace!("htx--合约信息--{}", req_data.data);
+}
+
+
+/*查询合约账户*/
+#[tokio::test]
+async fn rest_get_account_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_account(json!({
+            "valuation_asset":"USDT"
+         })
+    ).await;
+    trace!("htx--查询合约账户--{:?}", req_data);
+    trace!("htx--查询合约账户--{}", req_data.data);
+}
+
+/*用户仓位列表*/
+#[tokio::test]
+async fn rest_get_user_position_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_user_position(
+   serde_json::json!({
+            "contract_type":"swap"
+         })
+    ).await;
+    trace!("htx--用户仓位列表--{:?}", req_data);
+    trace!("htx--用户仓位列表--{}", req_data.data);
+}
+
+/*查询合约订单列表*/
+#[tokio::test]
+async fn rest_get_orders_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_orders(json!({
+    })).await;
+    trace!("htx--查询合约订单列表--{:?}", req_data);
+    trace!("htx--查询合约订单列表--{}", req_data.data);
+}
+
+/*查询单个订单详情*/
+#[tokio::test]
+async fn rest_get_order_details_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_order_details(json!({
+        "order_id":"123"
+    })).await;
+    trace!("htx--查询单个订单详情--{:?}", req_data);
+    trace!("htx--查询单个订单详情--{}", req_data.data);
+}
+/*合约交易下单*/
+#[tokio::test]
+async fn rest_swap_order_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+
+    let req_data = ret.swap_order(json!({
+        "pair":"SHIB-USDT",
+        "contract_type":"swap",
+        "order_price_type":"limit",
+        "offset":"open",
+        "direction":"buy",
+        "price":"0.000001359",
+        "volume":1,
+        "lever_rate":1,
+
+    })).await;
+    trace!("htx--合约交易下单--{:?}", req_data);
+    trace!("htx--合约交易下单--{}", req_data.data);
+}
+
+/*全部撤单*/
+#[tokio::test]
+async fn rest_cancel_price_order_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+
+    let req_data = ret.cancel_price_order(json!({  })).await;
+    trace!("htx--全部撤单--{:?}", req_data);
+    trace!("htx--全部撤单--{}", req_data.data);
+}
+
+/*撤单*/
+#[tokio::test]
+async fn rest_cancel_order_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+
+    let req_data = ret.cancel_order(json!({
+        "order_id":"123"
+    })).await;
+    trace!("htx--撤单--{:?}", req_data);
+    trace!("htx--撤单--{}", req_data.data);
+}
+
+
+/*设置持仓模式*/
+#[tokio::test]
+async fn rest_setting_dual_mode_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+
+    let req_data = ret.setting_dual_mode(json!({
+        "margin_account":"USDT",
+        "position_mode":"dual_side",
+    })).await;
+    trace!("htx--设置持仓模式--{:?}", req_data);
+    trace!("htx--设置持仓模式--{}", req_data.data);
+}
+
+
+/*设置杠杆*/
+#[tokio::test]
+async fn rest_setting_dual_leverage_test() {
+    global::log_utils::init_log_with_trace();
+    // proxy_handle();
+
+    let mut ret = get_rest();
+
+    let req_data = ret.setting_dual_leverage(json!({
+        "pair":"BTC-USDT",
+        "contract_type":"swap",
+        "lever_rate":1,
+    })).await;
+    trace!("htx--设置持仓模式--{:?}", req_data);
+    trace!("htx--设置持仓模式--{}", req_data.data);
+}
+
+
+fn get_rest() -> HtxSwapRest {
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+    btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
+    btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
+
+    let htx_exc = HtxSwapRest::new(false, btree_map);
+    htx_exc
+}
+
+// 检测是否走代理
+pub fn proxy_handle() {
+    if proxy::ParsingDetail::http_enable_proxy() {
+        trace!("检测有代理配置,配置走代理");
+    }
+}

+ 6 - 0
global/src/account_info.rs

@@ -19,6 +19,9 @@ pub struct AccountInfo {
     pub bitget_access_key: String,
     pub bitget_secret_key: String,
     pub bitget_pass: String,
+    pub htx_access_key: String,
+    pub htx_secret_key: String,
+    pub htx_pass: String,
 }
 
 impl AccountInfo {
@@ -37,6 +40,9 @@ impl AccountInfo {
             bitget_access_key: "".to_string(),
             bitget_secret_key: "".to_string(),
             bitget_pass: "".to_string(),
+            htx_access_key: "".to_string(),
+            htx_secret_key: "".to_string(),
+            htx_pass: "".to_string(),
         }
     }
 }

+ 1 - 0
global/src/public_params.rs

@@ -30,5 +30,6 @@ pub const OKEX_USDT_SWAP_LIMIT:i64 = 30;
 pub const BITGET_USDT_SWAP_LIMIT:i64 = 10;
 pub const BITGET_USDT_SPOT_LIMIT:i64 = 100;
 pub const BYBIT_USDT_SWAP_LIMIT:i64 = 10;
+pub const HTX_USDT_SWAP_LIMIT:i64 = 24;
 pub const MEXC_SPOT_LIMIT:i64 = 333;
 pub const RATIO:i64 = 4;

+ 6 - 1
standard/src/exchange.rs

@@ -13,6 +13,7 @@ use crate::bitget_swap::BitgetSwap;
 use crate::coinex_swap::CoinexSwap;
 // use crate::kucoin_spot::KucoinSpot;
 // use crate::okx_swap::OkxSwap;
+use crate::htx_swap::HtxSwap;
 
 /// 交易所交易模式枚举
 /// - `BinanceSwap`: Binance交易所期货;
@@ -32,7 +33,8 @@ pub enum ExchangeEnum {
     // BitgetSpot,
     CoinexSwap,
     BitgetSwap,
-    BybitSwap
+    BybitSwap,
+    HtxSwap
 }
 
 /// Exchange结构体
@@ -107,6 +109,9 @@ impl Exchange {
             ExchangeEnum::CoinexSwap => {
                 Box::new(CoinexSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
             }
+            ExchangeEnum::HtxSwap => {
+                Box::new(HtxSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
+            }
         }
     }
 }

+ 23 - 1
standard/src/handle_info.rs

@@ -7,12 +7,13 @@ use tracing::{error, info};
 use exchanges::response_base::ResponseData;
 use global::public_params;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, kucoin_handle, coinex_swap_handle};
+use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, kucoin_handle, coinex_swap_handle, htx_swap_handle};
 use crate::{Account, MarketOrder, Position, SpecialDepth, SpecialOrder, SpecialTicker};
 
 #[allow(dead_code)]
 pub struct HandleSwapInfo;
 
+#[derive(Debug, Clone, PartialEq, Eq)]
 pub struct DepthParam {
     pub depth_asks: Vec<MarketOrder>,
     pub depth_bids: Vec<MarketOrder>,
@@ -53,6 +54,9 @@ impl HandleSwapInfo {
             ExchangeEnum::CoinexSwap => {
                 coinex_swap_handle::handle_account_info(res_data, symbol)
             }
+            ExchangeEnum::HtxSwap => {
+                htx_swap_handle::handle_account_info(res_data, symbol)
+            }
             _ => {
                 error!("未找到该交易所!handle_account_info: {:?}",exchange);
                 panic!("未找到该交易所!handle_account_info: {:?}", exchange);
@@ -97,6 +101,10 @@ impl HandleSwapInfo {
             ExchangeEnum::CoinexSwap => {
                 coinex_swap_handle::handle_ticker(res_data)
             }
+            ExchangeEnum::HtxSwap => {
+                SpecialDepth::new()
+                // htx_swap_handle::handle_ticker(res_data)
+            }
         }
     }
     // 处理position信息
@@ -132,6 +140,9 @@ impl HandleSwapInfo {
             ExchangeEnum::CoinexSwap => {
                 coinex_swap_handle::handle_position(res_data, ct_val)
             }
+            ExchangeEnum::HtxSwap => {
+                htx_swap_handle::handle_position(res_data, ct_val)
+            }
         }
     }
     // 处理订单信息
@@ -165,6 +176,9 @@ impl HandleSwapInfo {
             ExchangeEnum::CoinexSwap => {
                 coinex_swap_handle::handle_order(res_data, ct_val)
             }
+            ExchangeEnum::HtxSwap => {
+                htx_swap_handle::handle_order(res_data, ct_val)
+            }
         }
     }
 
@@ -313,6 +327,14 @@ pub fn format_depth(exchange: ExchangeEnum, res_data: &ResponseData) -> DepthPar
             t = Decimal::from_i64(time).unwrap();
             create_at = time * 1000;
         }
+        ExchangeEnum::HtxSwap => {
+            let depth = &res_data.data;
+            depth_asks = htx_swap_handle::format_depth_items(depth["asks"].clone());
+            depth_bids = htx_swap_handle::format_depth_items(depth["bids"].clone());
+            let time = depth["ts"].to_string().parse::<i64>().unwrap();
+            t = Decimal::from_i64(time).unwrap();
+            create_at = time * 1000;
+        }
     }
 
     DepthParam {

+ 641 - 0
standard/src/htx_swap.rs

@@ -0,0 +1,641 @@
+use std::collections::{BTreeMap};
+use exchanges::htx_swap_rest::HtxSwapRest;
+use std::io::{Error, ErrorKind};
+use tokio::sync::mpsc::Sender;
+use std::str::FromStr;
+use async_trait::async_trait;
+use futures::stream::FuturesUnordered;
+use futures::TryStreamExt;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::{FromPrimitive};
+use serde_json::json;
+use serde_json::Value::Null;
+use tokio::spawn;
+use tokio::time::Instant;
+use tracing::{error, info};
+use global::trace_stack::TraceStack;
+use crate::exchange::ExchangeEnum;
+use crate::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, Ticker, utils};
+
+#[allow(dead_code)]
+#[derive(Clone)]
+pub struct HtxSwap {
+    exchange: ExchangeEnum,
+    symbol: String,
+    is_colo: bool,
+    params: BTreeMap<String, String>,
+    request: HtxSwapRest,
+    market: Market,
+    order_sender: Sender<Order>,
+    error_sender: Sender<Error>,
+}
+
+impl HtxSwap {
+    pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> HtxSwap {
+        let market = Market::new();
+        let mut htx_swap = HtxSwap {
+            exchange: ExchangeEnum::HtxSwap,
+            symbol: symbol.to_uppercase(),
+            is_colo,
+            params: params.clone(),
+            request: HtxSwapRest::new(is_colo, params.clone()),
+            market,
+            order_sender,
+            error_sender,
+        };
+        htx_swap.market = HtxSwap::get_market(&mut htx_swap).await.unwrap();
+        // 修改持仓模式
+        let mode_result = htx_swap.set_dual_mode("", true).await;
+        match mode_result {
+            Ok(ok) => {
+                info!("HtxSwap:设置持仓模式成功!{:?}", ok);
+            }
+            Err(error) => {
+                error!("HtxSwap:设置持仓模式失败!{:?}", error)
+            }
+        }
+        // 设置持仓杠杆
+        let lever_rate_result = htx_swap.set_dual_leverage("10").await;
+        match lever_rate_result {
+            Ok(ok) => {
+                info!("HtxSwap:设置持仓杠杆成功!{:?}", ok);
+            }
+            Err(error) => {
+                error!("HtxSwap:设置持仓杠杆失败!{:?}", error)
+            }
+        }
+
+        return htx_swap;
+    }
+}
+
+#[async_trait]
+impl Platform for HtxSwap {
+    fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
+
+    fn get_self_exchange(&self) -> ExchangeEnum { ExchangeEnum::HtxSwap }
+
+    fn get_self_symbol(&self) -> String { self.symbol.clone() }
+
+    fn get_self_is_colo(&self) -> bool { self.is_colo }
+
+    fn get_self_params(&self) -> BTreeMap<String, String> { self.params.clone() }
+
+    fn get_self_market(&self) -> Market { self.market.clone() }
+
+    fn get_request_delays(&self) -> Vec<i64> {
+        // self.request.get_delays()
+        vec![]
+    }
+
+    fn get_request_avg_delay(&self) -> Decimal {
+        // self.request.get_avg_delay()
+        Decimal::ZERO
+    }
+
+    fn get_request_max_delay(&self) -> i64 { 0 }
+
+    async fn get_server_time(&mut self) -> Result<String, Error> {
+        let response = self.request.get_server_time().await;
+        if response.code != 200 {
+            return Err(Error::new(ErrorKind::NotFound, format!("htx_swap 获取服务器时间异常{:?}", response).to_string()));
+        }
+
+        let res_data_json = response.data;
+        let result = res_data_json["ts"].to_string();
+        Ok(result)
+    }
+
+    async fn get_account(&mut self) -> Result<Account, Error> {
+        let params = json!({
+            "margin_account": "USDT"
+        });
+        let response = self.request.get_account(params).await;
+        if response.code != 200 {
+            return Err(Error::new(ErrorKind::NotFound, format!("htx_swap 获取账户信息异常{:?}", response).to_string()));
+        }
+
+        let res_data_json = response.data;
+        let mut account = Account::new();
+        for data in res_data_json.as_array().unwrap() {
+            let margin_position = Decimal::from_f64(data["margin_position"].as_f64().unwrap()).unwrap();
+
+            let balance = Decimal::from_f64(data["margin_static"].as_f64().unwrap()).unwrap();
+            let frozen_balance = Decimal::from_f64(data["margin_frozen"].as_f64().unwrap()).unwrap();
+            let available_balance = balance - margin_position - frozen_balance;
+            // 格式化account信息
+            account = Account {
+                coin: data["margin_asset"].as_str().unwrap().to_string(),
+                balance,
+                available_balance,
+                frozen_balance,
+                stocks: Default::default(),
+                available_stocks: Default::default(),
+                frozen_stocks: Default::default(),
+            };
+        }
+        return Ok(account);
+    }
+
+    async fn get_spot_account(&mut self) -> Result<Vec<Account>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "htx_swap get_spot_account:该交易所方法未实现".to_string()))
+    }
+
+    async fn get_position(&mut self) -> Result<Vec<Position>, Error> {
+        let symbol_format = utils::format_symbol(self.symbol.clone(), "-");
+        let ct_val = self.market.ct_val;
+        let params = json!({
+            "contract_type": "swap",
+            "contract_code": symbol_format
+        });
+        let response = self.request.get_user_position(params).await;
+        if response.code != 200 {
+            return Err(Error::new(ErrorKind::NotFound, format!("htx_swap 获取仓位异常{:?}", response).to_string()));
+        }
+
+        let res_data_json = response.data;
+        let positions_info = res_data_json.as_array().unwrap();
+        let result = positions_info.iter().map(|item| { format_position_item(item, &ct_val) }).collect();
+        Ok(result)
+    }
+
+    async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
+        let params = json!({
+            "contract_type": "swap"
+        });
+        let response = self.request.get_user_position(params).await;
+
+        if response.code != 200 {
+            return Err(Error::new(ErrorKind::NotFound, format!("htx_swap 获取仓位异常{:?}", response).to_string()));
+        }
+
+        let res_data_json = response.data;
+        let positions_info = res_data_json.as_array().unwrap();
+        let result = positions_info.iter().map(|item| { format_position_item(item, &Decimal::ONE) }).collect();
+        Ok(result)
+    }
+
+    async fn get_ticker(&mut self) -> Result<Ticker, Error> {
+        return self.get_ticker_symbol(self.symbol.clone()).await;
+    }
+
+    async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error> {
+        let symbol_format = utils::format_symbol(symbol.clone(), "-");
+        let params = json!({
+            "contract_code": symbol_format,
+        });
+        let response = self.request.get_ticker(params).await;
+        if response.code != 200 {
+            return Err(Error::new(ErrorKind::NotFound, format!("htx_swap 获取行情信息异常{:?}", response).to_string()));
+        }
+
+        let res_data_json = response.data;
+        let ticker_info = res_data_json["tick"].clone();
+        let time = ticker_info["ts"].as_i64().unwrap();
+        let result = Ticker {
+            time,
+            high: Decimal::from_str(ticker_info["high"].as_str().unwrap()).unwrap(),
+            low: Decimal::from_str(ticker_info["low"].as_str().unwrap()).unwrap(),
+            sell: Decimal::from_f64(ticker_info["bid"][0].as_f64().unwrap()).unwrap(),
+            buy: Decimal::from_f64(ticker_info["ask"][0].as_f64().unwrap()).unwrap(),
+            last: Decimal::from_str(ticker_info["close"].as_str().unwrap()).unwrap(),
+            volume: Decimal::from_str(ticker_info["amount"].as_str().unwrap()).unwrap(),
+        };
+        Ok(result)
+    }
+
+    async fn get_market(&mut self) -> Result<Market, Error> {
+        self.get_market_symbol(self.symbol.clone()).await
+    }
+
+    async fn get_market_symbol(&mut self, symbol: String) -> Result<Market, Error> {
+        let symbol_format = utils::format_symbol(symbol.clone(), "-");
+        let symbol_array: Vec<&str> = self.symbol.split("_").collect();
+
+        let params = json!({
+            "pair": symbol_format,
+            "contract_type":"swap"
+        });
+        let response = self.request.get_market(params).await;
+        if response.code != 200 {
+            return Err(Error::new(ErrorKind::NotFound, format!("htx_swap 获取市场信息异常{:?}", response).to_string()));
+        }
+        let res_data_json = response.data.as_array().unwrap();
+        let market_info = res_data_json[0].clone();
+
+        if !market_info["pair"].as_str().unwrap().to_string().eq(&symbol_format) {
+            return Err(Error::new(ErrorKind::NotFound, format!("符号未找到:symbol={}, response={:?}", symbol_format, response))).unwrap();
+        }
+
+        let base_asset = market_info["symbol"].as_str().unwrap().to_string();
+        let quote_asset = symbol_array[1].to_string();
+
+        let tick_size = Decimal::from_f64(market_info["price_tick"].as_f64().unwrap()).unwrap();
+        let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
+        let amount_size = Decimal::from_f64(market_info["contract_size"].as_f64().unwrap()).unwrap();
+        let amount_precision = Decimal::ZERO;
+        let min_qty = Decimal::NEGATIVE_ONE;
+        let max_qty = Decimal::NEGATIVE_ONE;
+        let ct_val = Decimal::from_f64(market_info["contract_size"].as_f64().unwrap()).unwrap();
+
+        let result = Market {
+            symbol: format!("{}_{}", base_asset, quote_asset),
+            base_asset,
+            quote_asset,
+            tick_size,
+            amount_size,
+            price_precision,
+            amount_precision,
+            min_qty,
+            max_qty,
+            min_notional: min_qty,
+            max_notional: max_qty,
+            ct_val,
+        };
+        Ok(result)
+    }
+
+    async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
+        let symbol_format = utils::format_symbol(self.symbol.clone(), "-");
+        let ct_val = self.market.ct_val;
+
+        let mut params = json!({
+            "pair": symbol_format
+        });
+        if order_id.eq("") { params["client_order_id"] = json!(custom_id) } else { params["order_id"] = json!(order_id) };
+        let response = self.request.get_order_details(params).await;
+        if response.code != 200 || response.data == Null {
+            return Err(Error::new(ErrorKind::NotFound, format!("htx_swap 获取订单详情异常{:?}", response).to_string()));
+        }
+
+        let res_data_json = response.data;
+        let orders = res_data_json.as_array().unwrap();
+        let result = format_order_item(orders[0].clone(), ct_val);
+        Ok(result)
+    }
+
+    async fn get_orders_list(&mut self, _status: &str) -> Result<Vec<Order>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "htx_swap get_orders_list:该交易所方法未实现".to_string()))
+    }
+
+    async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
+        let ct_val = self.market.ct_val;
+
+        return self.take_order_symbol(self.symbol.clone(), ct_val, custom_id, origin_side, price, amount).await;
+    }
+
+    async fn take_order_symbol(&mut self, symbol: String, ct_val: Decimal, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
+        let symbol_format = utils::format_symbol(symbol, "-");
+        let final_size = (amount / ct_val).floor();
+        let mut params = json!({
+            "pair": symbol_format,
+            "client_order_id": custom_id,
+            "contract_type": "swap",
+            "volume": final_size.to_string(),
+            "lever_rate": 10
+        });
+        if price.eq(&Decimal::ZERO) {
+            params["order_price_type"] = json!("market");
+        } else {
+            params["price"] = json!(price.to_string());
+            params["order_price_type"] = json!("limit");
+        };
+        match origin_side {
+            "kd" => {
+                params["direction"] = json!("buy");
+                params["offset"] = json!("open");
+            }
+            "pd" => {
+                params["direction"] = json!("sell");
+                params["offset"] = json!("close");
+            }
+            "kk" => {
+                params["direction"] = json!("sell");
+                params["offset"] = json!("open");
+            }
+            "pk" => {
+                params["direction"] = json!("buy");
+                params["offset"] = json!("close");
+            }
+            _ => { panic!("htx_usdt_swap 下单参数错误"); }
+        };
+        let response = self.request.swap_order(params).await;
+        if response.code != 200 {
+            return Err(Error::new(ErrorKind::NotFound, format!("htx_swap 下单异常{:?}", response).to_string()));
+        }
+
+        let res_data_json = response.data;
+        let result = Order {
+            id: res_data_json["order_id"].to_string(),
+            custom_id: res_data_json["client_order_id"].to_string(),
+            price: Decimal::ZERO,
+            amount: Decimal::ZERO,
+            deal_amount: Decimal::ZERO,
+            avg_price: Decimal::ZERO,
+            status: "NEW".to_string(),
+            order_type: "".to_string(),
+            trace_stack: TraceStack::new(0, Instant::now()).on_special("339 htx_swap".to_string()),
+        };
+        return Ok(result);
+    }
+
+    async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
+        let symbol_format = utils::format_symbol(self.symbol.clone(), "-");
+        let mut params = json!({
+            "pair": symbol_format,
+            "contract_type": "swap"
+        });
+        if order_id.eq("") { params["client_order_id"] = json!(custom_id) } else { params["order_id"] = json!(order_id) };
+        let response = self.request.cancel_order(params).await;
+
+        // 取消失败,进行报错
+        if response.code != 200 || response.data["errors"].as_array().unwrap_or(&vec![]).len() > 0 {
+            return Err(Error::new(ErrorKind::NotFound, format!("htx_swap 取消订单异常{:?}", response).to_string()));
+        }
+        let res_data_json = response.data;
+        let mut id = order_id.to_string();
+        let mut custom_id = custom_id.to_string();
+        if order_id.eq("") {
+            custom_id = res_data_json["successes"].as_str().unwrap().to_string()
+        } else {
+            id = res_data_json["successes"].as_str().unwrap().to_string()
+        };
+        let result = Order {
+            id,
+            custom_id,
+            price: Decimal::ZERO,
+            amount: Decimal::ZERO,
+            deal_amount: Decimal::ZERO,
+            avg_price: Decimal::ZERO,
+            status: "REMOVE".to_string(),
+            order_type: "".to_string(),
+            trace_stack: TraceStack::new(0, Instant::now()).on_special("374 htx_swap".to_string()),
+        };
+        Ok(result)
+    }
+
+    async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "htx_swap cancel_orders:该交易所方法未实现".to_string()))
+    }
+
+    async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> {
+        let symbol_format = utils::format_symbol(self.symbol.clone(), "-");
+        let params = json!({
+            "pair": symbol_format,
+            "contract_type": "swap"
+        });
+        let response = self.request.cancel_price_order(params).await;
+        if response.code != 200 {
+            if response.data.to_string().contains("No cancellable orders") { return Ok(vec![]); }
+            return Err(Error::new(ErrorKind::NotFound, format!("htx_swap 撤销所有订单异常{:?}", response).to_string()));
+        }
+
+        Ok(vec![])
+    }
+
+    async fn take_stop_loss_order(&mut self, _stop_price: Decimal, _price: Decimal, _side: &str) -> Result<serde_json::Value, Error> {
+        Err(Error::new(ErrorKind::NotFound, "htx_swap take_stop_loss_order:该交易所方法未实现".to_string()))
+    }
+
+    async fn cancel_stop_loss_order(&mut self, _order_id: &str) -> Result<serde_json::Value, Error> {
+        Err(Error::new(ErrorKind::NotFound, "htx_swap cancel_stop_loss_order:该交易所方法未实现".to_string()))
+    }
+
+    async fn set_dual_mode(&mut self, _coin: &str, is_dual_mode: bool) -> Result<String, Error> {
+        let pos_mode = if is_dual_mode { "dual_side" } else { "single_side" };
+        let params = json!({
+            "margin_account": "USDT",
+            "position_mode": pos_mode,
+        });
+        let response = self.request.setting_dual_mode(params).await;
+        if response.code != 200 {
+            return Err(Error::new(ErrorKind::Other, format!("设置持仓模式失败:{:?}", response).to_string()));
+        }
+
+        return Ok(response.data.to_string());
+    }
+
+    async fn set_dual_leverage(&mut self, leverage: &str) -> Result<String, Error> {
+        let symbol_format = utils::format_symbol(self.symbol.clone(), "-");
+        let params = json!({
+            "pair": symbol_format,
+            "contract_type": "swap",
+            "lever_rate": leverage
+        });
+        let response = self.request.setting_dual_leverage(params).await;
+
+        if response.code != 200 {
+            return Err(Error::new(ErrorKind::Other, format!("设置杠杆失败:{:?}", response).to_string()));
+        }
+
+        return Ok(response.data.to_string());
+    }
+
+    async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> {
+        Err(Error::new(ErrorKind::NotFound, "htx_swap set_auto_deposit_status:该交易所方法未实现".to_string()))
+    }
+
+    async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
+        Err(Error::new(ErrorKind::NotFound, "htx_swap wallet_transfers:该交易所方法未实现".to_string()))
+    }
+
+    async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack) {
+        let mut handles = vec![];
+
+        // 下单指令
+        for item in order_command.limits_open.keys() {
+            let mut ts = trace_stack.clone();
+
+            let amount = Decimal::from_str(&*order_command.limits_open[item].get(0).unwrap().clone()).unwrap();
+            let side = order_command.limits_open[item].get(1).unwrap().clone();
+            let price = Decimal::from_str(&*order_command.limits_open[item].get(2).unwrap().clone()).unwrap();
+            let cid = order_command.limits_open[item].get(3).unwrap().clone();
+
+            //  order_name: [数量,方向,价格,c_id]
+            let mut self_clone = self.clone();
+            let handle = spawn(async move {
+                // TraceStack::show_delay(&ts.ins);
+                ts.on_before_send();
+                let result = self_clone.take_order(cid.as_str(), side.as_str(), price, amount).await;
+                ts.on_after_send();
+                // info!("下单 custom_id {}", cid);
+                match result {
+                    Ok(mut result) => {
+                        result.trace_stack = ts;
+                        // info!("下单完成 order_id {}", result.id);
+                        self_clone.order_sender.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        info!(?error);
+                        let mut err_order = Order::new();
+                        err_order.custom_id = cid.clone();
+                        err_order.status = "REMOVE".to_string();
+                        ts.source = "htx_swap 474".to_string();
+                        err_order.trace_stack = ts;
+
+                        self_clone.order_sender.send(err_order).await.unwrap();
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            handles.push(handle)
+        }
+        let futures = FuturesUnordered::from_iter(handles);
+        // 等待所有任务完成
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+
+        // 撤销订单
+        let mut cancel_handlers = vec![];
+        for item in order_command.cancel.keys() {
+            let order_id = order_command.cancel[item].get(1).unwrap().clone();
+            let custom_id = order_command.cancel[item].get(0).unwrap().clone();
+            // info!("取消订单 order_id {}, custom_id {}", order_id, custom_id);
+            let mut self_clone = self.clone();
+            let handle = spawn(async move {
+                let result = self_clone.cancel_order(&order_id, &custom_id).await;
+                match result {
+                    Ok(_) => {
+                        // result_sd.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        // 已经取消的订单不去撤单了
+                        if !error.to_string().contains("Your order has been canceled") {
+                            // 取消失败去查订单。
+                            let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
+                            match query_rst {
+                                Ok(order) => {
+                                    self_clone.order_sender.send(order).await.unwrap();
+                                }
+                                Err(err) => {
+                                    error!("撤单失败,而且查单也失败了,htx_swap,oid={}, cid={}, err={:?}。", order_id.clone(), custom_id.clone(), err);
+                                }
+                            }
+                            error!("撤单失败,异常:{}", error);
+                            self_clone.error_sender.send(error).await.unwrap();
+                        }
+                    }
+                }
+            });
+            cancel_handlers.push(handle)
+        }
+        let futures = FuturesUnordered::from_iter(cancel_handlers);
+        // 等待所有任务完成
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+
+        // 检查订单指令
+        let mut check_handlers = vec![];
+        for item in order_command.check.keys() {
+            let order_id = order_command.check[item].get(1).unwrap().clone();
+            let custom_id = order_command.check[item].get(0).unwrap().clone();
+
+            let mut self_clone = self.clone();
+            let handle = spawn(async move {
+                let result = self_clone.get_order_detail(order_id.as_str(), custom_id.as_str()).await;
+                match result {
+                    Ok(result) => {
+                        self_clone.order_sender.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            check_handlers.push(handle)
+        }
+
+        let futures = FuturesUnordered::from_iter(check_handlers);
+        // 等待所有任务完成
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+    }
+}
+
+// pub fn format_account_info(balance_data: Value) -> Account {
+//     let balance_coin = balance_data["coin"].as_str().unwrap().to_string().to_uppercase();
+//     let available_balance = Decimal::from_str(balance_data["available"].as_str().unwrap()).unwrap();
+//     let frozen_balance = Decimal::from_str(balance_data["frozen"].as_str().unwrap()).unwrap();
+//     let balance = available_balance + frozen_balance;
+//
+//     Account {
+//         coin: balance_coin,
+//         balance,
+//         available_balance,
+//         frozen_balance,
+//         stocks: Decimal::ZERO,
+//         available_stocks: Decimal::ZERO,
+//         frozen_stocks: Decimal::ZERO,
+//     }
+// }
+
+pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
+    let id = order["order_id"].to_string();
+    let custom_id = order["client_order_id"].to_string();
+    let price = Decimal::from_f64(order["price"].as_f64().unwrap()).unwrap();
+    let amount = Decimal::from_f64(order["volume"].as_f64().unwrap()).unwrap() * ct_val;
+    let deal_amount = Decimal::from_f64(order["trade_volume"].as_f64().unwrap()).unwrap() * ct_val;
+    let avg_price = Decimal::from_f64(order["trade_avg_price"].as_f64().unwrap_or(0.0)).unwrap();
+
+    let status = order["status"].to_string();
+
+    let custom_status = if ["5", "6", "7"].contains(&&**&status) {
+        "REMOVE".to_string()
+    } else if ["1", "2", "3", "4"].contains(&&**&status) {
+        "NEW".to_string()
+    } else {
+        "NULL".to_string()
+    };
+    Order {
+        id,
+        custom_id,
+        price,
+        amount,
+        deal_amount,
+        avg_price,
+        status: custom_status,
+        order_type: order["order_price_type"].as_str().unwrap().to_string(),
+        trace_stack: TraceStack::new(0, Instant::now()).on_special("630 htx_swap".to_string()),
+    }
+}
+
+pub fn format_position_item(position: &serde_json::Value, ct_val: &Decimal) -> Position {
+    let symbol = position["pair"].as_str().unwrap().to_string();
+    let margin_level = Decimal::from_str(&position["lever_rate"].to_string()).unwrap();
+    let amount = Decimal::from_f64(position["volume"].as_f64().unwrap()).unwrap() * ct_val;
+
+    let frozen_amount = Decimal::from_f64(position["frozen"].as_f64().unwrap()).unwrap();
+    let price = Decimal::from_f64(position["cost_hold"].as_f64().unwrap()).unwrap();
+    let profit = Decimal::from_f64(position["profit_unreal"].as_f64().unwrap()).unwrap();
+    let position_mode = match position["position_mode"].as_str().unwrap() {
+        "dual_side" => {
+            match position["direction"].as_str().unwrap() {
+                "sell" => {
+                    PositionModeEnum::Short
+                }
+                "buy" => {
+                    PositionModeEnum::Long
+                }
+                _ => {
+                    panic!("htx_usdt_swap: 未知的持仓模式与持仓方向: {}, {}",
+                           position["direction"].as_str().unwrap(), position["direction"].as_str().unwrap())
+                }
+            }
+        }
+        "single_side" => {
+            PositionModeEnum::Both
+        }
+        _ => {
+            panic!("htx_usdt_swap: 未知的持仓模式: {}", position["position_mode"].as_str().unwrap())
+        }
+    };
+    let margin = Decimal::from_f64(position["position_margin"].as_f64().unwrap()).unwrap();
+
+    Position {
+        symbol,
+        margin_level,
+        amount,
+        frozen_amount,
+        price,
+        profit,
+        position_mode,
+        margin,
+    }
+}

+ 135 - 0
standard/src/htx_swap_handle.rs

@@ -0,0 +1,135 @@
+use std::str::FromStr;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use tokio::time::Instant;
+use exchanges::response_base::ResponseData;
+use global::trace_stack::TraceStack;
+use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialOrder};
+
+// 处理账号信息
+pub fn handle_account_info(response: &ResponseData, _symbol: &String) -> Account {
+    let mut result = Account::new();
+    let account_infos = response.data.as_array().unwrap();
+    for data in account_infos {
+        if data["margin_asset"].as_str().unwrap() != "USDT" { continue; }
+        let margin_position = Decimal::from_f64(data["margin_position"].as_f64().unwrap()).unwrap();
+        let balance = Decimal::from_f64(data["margin_static"].as_f64().unwrap()).unwrap();
+        let frozen_balance = Decimal::from_f64(data["margin_frozen"].as_f64().unwrap()).unwrap();
+        let available_balance = balance - margin_position - frozen_balance;
+        // 格式化account信息
+        let account = Account {
+            coin: data["margin_asset"].as_str().unwrap().to_string(),
+            balance,
+            available_balance,
+            frozen_balance,
+            stocks: Default::default(),
+            available_stocks: Default::default(),
+            frozen_stocks: Default::default(),
+        };
+        result = account
+    }
+    return result;
+}
+
+// 处理order信息
+pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
+    let res_data_json = res_data.data;
+    let order_info = vec![format_order_item(res_data_json.clone(), ct_val)];
+    SpecialOrder {
+        name: res_data.label,
+        order: order_info,
+    }
+}
+
+// 处理订单信息
+pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
+    let id = order["order_id"].to_string();
+    let custom_id = order["client_order_id"].to_string();
+    let price = Decimal::from_f64(order["price"].as_f64().unwrap()).unwrap();
+    let amount = Decimal::from_f64(order["volume"].as_f64().unwrap()).unwrap() * ct_val;
+    let deal_amount = Decimal::from_f64(order["trade_volume"].as_f64().unwrap()).unwrap() * ct_val;
+    let avg_price = Decimal::from_f64(order["trade_avg_price"].as_f64().unwrap_or(0.0)).unwrap();
+
+    let status = order["status"].to_string();
+
+    let custom_status = if ["5", "6", "7"].contains(&&**&status) {
+        "REMOVE".to_string()
+    } else if ["1", "2", "3", "4"].contains(&&**&status) {
+        "NEW".to_string()
+    } else {
+        "NULL".to_string()
+    };
+    Order {
+        id,
+        custom_id,
+        price,
+        amount,
+        deal_amount,
+        avg_price,
+        status: custom_status,
+        order_type: order["order_price_type"].as_str().unwrap().to_string(),
+        trace_stack: TraceStack::new(0, Instant::now()).on_special("73 htx_swap_handle".to_string()),
+    }
+}
+
+// 格式化深度信息
+pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
+    let mut depth_items: Vec<MarketOrder> = vec![];
+    for value in value.as_array().unwrap() {
+        depth_items.push(MarketOrder {
+            price: Decimal::from_f64(value[0].as_f64().unwrap()).unwrap(),
+            amount: Decimal::from_f64(value[1].as_f64().unwrap()).unwrap(),
+        })
+    }
+    return depth_items;
+}
+
+// 处理position信息
+pub fn handle_position(res_data: &ResponseData, ct_val: &Decimal) -> Vec<Position> {
+    let res_data_json = res_data.data.as_array().unwrap();
+    res_data_json.iter().map(|item| { format_position_item(item, ct_val) }).collect()
+}
+
+pub fn format_position_item(position: &serde_json::Value, ct_val: &Decimal) -> Position {
+    let symbol = position["pair"].as_str().unwrap().to_string();
+    let margin_level = Decimal::from_str(&position["lever_rate"].to_string()).unwrap();
+    let amount = Decimal::from_f64(position["volume"].as_f64().unwrap()).unwrap() * ct_val;
+
+    let frozen_amount = Decimal::from_f64(position["frozen"].as_f64().unwrap()).unwrap();
+    let price = Decimal::from_f64(position["cost_hold"].as_f64().unwrap()).unwrap();
+    let profit = Decimal::from_f64(position["profit_unreal"].as_f64().unwrap()).unwrap();
+    let position_mode = match position["position_mode"].as_str().unwrap() {
+        "dual_side" => {
+            match position["direction"].as_str().unwrap() {
+                "sell" => {
+                    PositionModeEnum::Short
+                }
+                "buy" => {
+                    PositionModeEnum::Long
+                }
+                _ => {
+                    panic!("htx_usdt_swap: 未知的持仓模式与持仓方向: {}, {}",
+                           position["direction"].as_str().unwrap(), position["direction"].as_str().unwrap())
+                }
+            }
+        }
+        "single_side" => {
+            PositionModeEnum::Both
+        }
+        _ => {
+            panic!("htx_usdt_swap: 未知的持仓模式: {}", position["position_mode"].as_str().unwrap())
+        }
+    };
+    let margin = Decimal::from_f64(position["position_margin"].as_f64().unwrap()).unwrap();
+
+    Position {
+        symbol,
+        margin_level,
+        amount,
+        frozen_amount,
+        price,
+        profit,
+        position_mode,
+        margin,
+    }
+}

+ 2 - 0
standard/src/lib.rs

@@ -38,6 +38,8 @@ mod bitget_swap;
 mod bitget_swap_handle;
 mod coinex_swap;
 mod coinex_swap_handle;
+mod htx_swap;
+pub mod htx_swap_handle;
 
 /// 持仓模式枚举
 /// - `Both`:单持仓方向

+ 81 - 0
standard/tests/exchange_test.rs

@@ -106,6 +106,19 @@ pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn
             params.insert("pass_key".to_string(), pass_key);
             Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
         }
+        ExchangeEnum::HtxSwap => {
+            let mut params: BTreeMap<String, String> = BTreeMap::new();
+            let access_key = account_info.htx_access_key;
+            let secret_key = account_info.htx_secret_key;
+            let pass_key = account_info.htx_pass;
+            params.insert("access_key".to_string(), access_key);
+            params.insert("secret_key".to_string(), secret_key);
+            params.insert("pass_key".to_string(), pass_key);
+            Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
+        }
+        _ => {
+            panic!("该交易所未实现!")
+        }
     }
 }
 
@@ -573,6 +586,74 @@ pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subs
             });
             try_join!(t1).unwrap();
         }
+        ExchangeEnum::HtxSwap => {
+            let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
+            trace!(symbol_format);
+            let name = format!("htx_swap@{}", symbol.to_string().to_lowercase());
+            let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+            let write_tx_am = Arc::new(Mutex::new(write_tx));
+            let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+
+            let params = HtxSwapLogin {
+                api_key: account_info.htx_access_key,
+                secret: account_info.htx_secret_key,
+            };
+
+            let htx_wss_type = match mold.to_string().clone().as_str() {
+                "depth" => HtxSwapWsType::Public,
+                _ => HtxSwapWsType::Private
+            };
+
+            let mut exchange_wss = HtxSwapWs::new_label(name, Option::from(params), htx_wss_type);
+            exchange_wss.set_symbols(vec![symbol_format.clone()]);
+            exchange_wss.set_subscribe(subscriber_type.into());
+            let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+            let mold_clone = mold.to_string().clone();
+            let fun = move |data: ResponseData| {
+                let symbol_format_c = symbol_format.clone();
+                let mold_cc = mold_clone.clone();
+
+                async move {
+                    trace!("原始数据 data:{:?}",data);
+                    match mold_cc.as_str() {
+                        "depth" => {
+                            if data.data != "" {
+                                let result = handle_info::format_depth(ExchangeEnum::HtxSwap, &data);
+                                trace!("-------------------------------");
+                                trace!(?result)
+                            }
+                        }
+                        "position" => {
+                            if data.data != "" {
+                                let result = htx_swap_handle::handle_position(&data, &dec!(10));
+                                trace!("-------------------------------");
+                                trace!(?result)
+                            }
+                        }
+                        "account" => {
+                            if data.data != "" {
+                                let result = htx_swap_handle::handle_account_info(&data, &symbol_format_c);
+                                trace!("-------------------------------");
+                                trace!(?result)
+                            }
+                        }
+                        "orders" => {
+                            println!("{:?}", data);
+                            if data.data != "" {
+                                let result = htx_swap_handle::handle_order(data, dec!(10));
+                                trace!("-------------------------------");
+                                trace!(?result)
+                            }
+                        }
+                        _ => {
+                            error!("没有该命令!mode={}", mold_cc);
+                            panic!("没有该命令!mode={}", mold_cc)
+                        }
+                    };
+                }
+            };
+            exchange_wss.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        }
         _ => {
             error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
             panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)

+ 56 - 0
standard/tests/htx_handle_test.rs

@@ -0,0 +1,56 @@
+mod exchange_test;
+
+use tracing::{instrument};
+use exchanges::htx_swap_ws::HtxSwapSubscribeType;
+use standard::exchange::ExchangeEnum;
+use crate::exchange_test::{test_new_exchange_wss};
+
+const SYMBOL: &str = "USTC_USDT";
+
+// 测试订阅深度订阅
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+#[instrument(level = "TRACE")]
+async fn test_get_wss_depth() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_subscribe_type = vec![
+        HtxSwapSubscribeType::PuFuturesDepth
+    ];
+    test_new_exchange_wss(ExchangeEnum::HtxSwap, SYMBOL, htx_subscribe_type, "depth").await;
+}
+
+// 测试订阅Account信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+#[instrument(level = "TRACE")]
+async fn test_get_wss_account() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_subscribe_type = vec![
+        HtxSwapSubscribeType::PrFuturesBalances
+    ];
+    test_new_exchange_wss(ExchangeEnum::HtxSwap, SYMBOL, htx_subscribe_type, "account").await;
+}
+
+// 测试订阅Position信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+#[instrument(level = "TRACE")]
+async fn test_get_wss_position() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_subscribe_type = vec![
+        HtxSwapSubscribeType::PrFuturesPositions
+    ];
+    test_new_exchange_wss(ExchangeEnum::HtxSwap, SYMBOL, htx_subscribe_type, "position").await;
+}
+
+// 测试订阅Orders信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+#[instrument(level = "TRACE")]
+async fn test_get_wss_orders() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_subscribe_type = vec![
+        HtxSwapSubscribeType::PrFuturesOrders
+    ];
+    test_new_exchange_wss(ExchangeEnum::HtxSwap, SYMBOL, htx_subscribe_type, "orders").await;
+}

+ 282 - 0
standard/tests/htx_swap_test.rs

@@ -0,0 +1,282 @@
+mod exchange_test;
+
+use std::collections::BTreeMap;
+use std::env;
+use std::io::Error;
+use chrono::Utc;
+use rust_decimal_macros::dec;
+use tokio::sync::mpsc;
+use tokio::time::Instant;
+use tracing::{instrument, trace};
+use global::trace_stack::TraceStack;
+use standard::exchange::{Exchange, ExchangeEnum};
+use standard::{Order, OrderCommand, Platform, utils};
+use crate::exchange_test::{test_new_exchange};
+
+const SYMBOL: &str = "USTC_USDT";
+
+// 测试获取Exchange实体
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_self_exchange() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_self_exchange = htx_swap_exchange.get_self_exchange();
+    trace!(?htx_get_self_exchange);
+}
+
+// 测试获取交易对信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_self_symbol() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_self_symbol = htx_swap_exchange.get_self_symbol();
+    trace!(?htx_get_self_symbol);
+}
+
+// 测试获取是否使用高速通道
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_self_is_colo() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_self_is_colo = htx_swap_exchange.get_self_is_colo();
+    trace!(?htx_get_self_is_colo);
+}
+
+// 测试获取登录params信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_self_params() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_self_params = htx_swap_exchange.get_self_params();
+    trace!("htx_get_self_params={:?}",htx_get_self_params);
+}
+
+// 测试获取Market信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_self_market() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_self_market = htx_swap_exchange.get_self_market();
+    trace!(?htx_get_self_market);
+}
+
+// 测试获取请求时间信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_request_delays() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_request_delays = htx_swap_exchange.get_request_delays();
+    trace!(?htx_get_request_delays);
+}
+
+// 测试获取请求平均时间信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_request_avg_delay() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_request_avg_delay = htx_swap_exchange.get_request_avg_delay();
+    trace!(?htx_get_request_avg_delay);
+}
+
+// 测试获取最大请求时间信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_request_max_delay() {
+    global::log_utils::init_log_with_trace();
+
+    let htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_request_max_delay = htx_swap_exchange.get_request_max_delay();
+    trace!(?htx_get_request_max_delay);
+}
+
+// 测试获取服务器时间
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_server_time() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_server_time = htx_swap_exchange.get_server_time().await;
+    trace!(?htx_get_server_time);
+}
+
+// 测试获取账号信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_account() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_account = htx_swap_exchange.get_account().await;
+    trace!(?htx_get_account);
+}
+
+// 测试获取持仓信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_position() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_position = htx_swap_exchange.get_position().await;
+    trace!(?htx_get_position);
+}
+
+// 测试获取所有持仓信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_positions() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_positions = htx_swap_exchange.get_positions().await;
+    trace!(?htx_get_positions);
+}
+
+// 测试获取Ticker信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_ticker() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_ticker = htx_swap_exchange.get_ticker().await;
+    trace!(?htx_get_ticker);
+}
+
+// 测试获取Market信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_market() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_market = htx_swap_exchange.get_market().await;
+    trace!(?htx_get_market);
+}
+
+// 测试获取Order详情信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_order_detail() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_order_detail = htx_swap_exchange.get_order_detail("1234925996822429696", "9999992").await;
+    trace!(?htx_get_order_detail);
+}
+
+// 测试获取Order列表信息
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_get_orders_list() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_get_orders_list = htx_swap_exchange.get_orders_list("finished").await;
+    trace!(?htx_get_orders_list);
+}
+
+// 测试下单
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_take_order() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_take_order = htx_swap_exchange.take_order("999999914", "kd", dec!(0.02320), dec!(20)).await;
+    trace!(?htx_take_order);
+}
+
+// 测试撤销订单
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_cancel_order() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_cancel_order = htx_swap_exchange.cancel_order("", "999999900").await;
+    trace!(?htx_cancel_order);
+}
+
+// 测试批量撤销订单
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_cancel_orders() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_cancel_orders = htx_swap_exchange.cancel_orders_all().await;
+    trace!(?htx_cancel_orders);
+}
+
+// 测试设置持仓模式
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_set_dual_mode() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_set_dual_mode = htx_swap_exchange.set_dual_mode("usdt", true).await;
+    trace!(?htx_set_dual_mode);
+}
+
+// 测试设置杠杆
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_set_dual_leverage() {
+    global::log_utils::init_log_with_trace();
+
+    let mut htx_swap_exchange: Box<dyn Platform> = test_new_exchange(ExchangeEnum::HtxSwap, SYMBOL).await;
+    let htx_set_dual_leverage = htx_swap_exchange.set_dual_leverage("10").await;
+    trace!(?htx_set_dual_leverage);
+}
+
+// 测试指令下单
+#[tokio::test]
+#[instrument(level = "TRACE")]
+async fn test_command_order() {
+    global::log_utils::init_log_with_trace();
+    utils::proxy_handle();
+
+    let (order_sender, mut order_receiver): (mpsc::Sender<Order>, mpsc::Receiver<Order>) = mpsc::channel(1024);
+    let (error_sender, mut error_receiver): (mpsc::Sender<Error>, mpsc::Receiver<Error>) = mpsc::channel(1024);
+
+    let mut params: BTreeMap<String, String> = BTreeMap::new();
+    let access_key = env::var("htx_access_key").unwrap_or("".to_string());
+    let secret_key = env::var("htx_secret_key").unwrap_or("".to_string());
+    params.insert("access_key".to_string(), access_key);
+    params.insert("secret_key".to_string(), secret_key);
+
+    let mut htx_swap_exchange: Box<dyn Platform> = Exchange::new(ExchangeEnum::HtxSwap, SYMBOL.to_string(), false, params, order_sender, error_sender).await;
+
+    let mut command = OrderCommand::new();
+    command.cancel.insert("888888".to_string(), vec!["888888".to_string(), "".to_string()]);
+    command.limits_open.insert("888888".to_string(), vec!["100".to_string(), "kd".to_string(), "0.18".to_string(), "888888".to_string()]);
+    command.limits_close.insert("999999".to_string(), vec!["100".to_string(), "kk".to_string(), "0.25".to_string(), "999999".to_string()]);
+    command.check.insert("888888".to_string(), vec!["999999".to_string(), "".to_string()]);
+    htx_swap_exchange.command_order(&mut command, &TraceStack::new(Utc::now().timestamp_micros(), Instant::now())).await;
+
+    loop {
+        if let Ok(order) = order_receiver.try_recv() {
+            trace!(?order);
+        }
+        if let Ok(error) = error_receiver.try_recv() {
+            trace!(?error);
+        }
+    }
+}

+ 4 - 1
strategy/src/core.rs

@@ -22,7 +22,7 @@ use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
 use standard::exchange::{Exchange};
-use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, KucoinSwap};
+use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap};
 
 use crate::model::{LocalPosition, OrderInfo, TokenParam};
 use crate::predictor::Predictor;
@@ -233,6 +233,9 @@ impl Core {
                 "coinex_usdt_swap" => {
                     Exchange::new(CoinexSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
                 }
+                "htx_usdt_swap" => {
+                    Exchange::new(HtxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
                 _ => {
                     error!("203未找到对应的交易所rest枚举!");
                     panic!("203未找到对应的交易所rest枚举!");

+ 8 - 0
strategy/src/exchange_disguise.rs

@@ -18,6 +18,7 @@ use crate::coinex_usdt_swap::coinex_swap_run;
 use crate::kucoin_swap::kucoin_swap_run;
 // use crate::okx_usdt_swap::okex_swap_run;
 use crate::core::Core;
+use crate::htx_usdt_swap::htx_swap_run;
 
 // 交易交易所启动
 pub async fn run_transactional_exchange(is_shutdown_arc :Arc<AtomicBool>,
@@ -50,6 +51,9 @@ pub async fn run_transactional_exchange(is_shutdown_arc :Arc<AtomicBool>,
             tokio::time::sleep(Duration::from_secs(1)).await;
             coinex_swap_run(is_shutdown_arc,true, core_arc, name, symbols, is_colo, exchange_params).await;
         }
+        "htx_usdt_swap" =>{
+            htx_swap_run(is_shutdown_arc,true, core_arc, name, symbols, is_colo, exchange_params).await;
+        }
         _ => {
             let msg = format!("不支持的交易交易所:{}", exchange_name);
             panic!("{}", msg);
@@ -95,6 +99,10 @@ pub async fn run_reference_exchange(is_shutdown_arc: Arc<AtomicBool>,
         },
         "coinex_usdt_swap" => {
             coinex_swap_run(is_shutdown_arc,false, core_arc, name, symbols, is_colo, exchange_params).await;
+
+        }
+        "htx_usdt_swap" =>{
+            htx_swap_run(is_shutdown_arc, false, core_arc, name, symbols, is_colo, exchange_params).await;
         }
         _ => {
             let msg = format!("不支持的参考交易所:{}", exchange_name);

+ 168 - 0
strategy/src/htx_usdt_swap.rs

@@ -0,0 +1,168 @@
+use tracing::{error};
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use rust_decimal::Decimal;
+use tokio::spawn;
+use tokio::sync::Mutex;
+use exchanges::htx_swap_ws::{HtxSwapLogin, HtxSwapSubscribeType, HtxSwapWs, HtxSwapWsType};
+use exchanges::response_base::ResponseData;
+use global::trace_stack::{TraceStack};
+use standard::exchange::ExchangeEnum::{HtxSwap};
+use crate::model::{OrderInfo};
+use crate::core::Core;
+use crate::exchange_disguise::on_special_depth;
+
+// 1交易、0参考 htx 合约 启动
+pub async fn htx_swap_run(is_shutdown_arc: Arc<AtomicBool>,
+                           is_trade: bool,
+                           core_arc: Arc<Mutex<Core>>,
+                           name: String,
+                           symbols: Vec<String>,
+                           _is_colo: bool,
+                           exchange_params: BTreeMap<String, String>) {
+    // 开启公共频道
+    let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
+
+    // 开启公共连接
+    let is_shutdown_arc_c1 = is_shutdown_arc.clone();
+    let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
+    let name_clone = name.clone();
+    let core_arc_clone = core_arc.clone();
+    let symbols_clone = symbols.clone();
+    let symbol = symbols.clone()[0].clone();
+    spawn(async move {
+        // 构建链接ws
+        let mut bg_public = HtxSwapWs::new_label(name_clone.clone(),
+                                                    None,
+                                                 HtxSwapWsType::Public);
+
+        // 消费数据的函数
+        let mut update_flag_u = Decimal::ZERO;
+        let fun = move |data: ResponseData| {
+            let core_arc_cc = core_arc_clone.clone();
+            let rs = symbol.clone();
+            async move {
+                on_public_data(core_arc_cc, &mut update_flag_u, rs, data).await
+            }
+        };
+
+        // 准备链接
+        bg_public.set_subscribe(vec![HtxSwapSubscribeType::PuFuturesDepth]); // 只用订阅深度数据
+        bg_public.set_symbols(symbols_clone);
+        bg_public.ws_connect_async(is_shutdown_arc_c1, fun, &write_tx_am_public, write_rx_public).await.expect("htx_usdt_swap 链接有异常")
+    });
+
+    // 不需要交易就不用开启私有频道了
+    if !is_trade {
+        return;
+    }
+
+    // 开启私有频道
+    let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
+
+    // 开启公共连接
+    let is_shutdown_arc_c1 = is_shutdown_arc.clone();
+    let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
+    spawn(async move {
+        let login_param = parse_btree_map_to_htx_swap_login(exchange_params);
+        // 构建链接ws
+        let mut bg_private = HtxSwapWs::new_label(name.clone(),
+                                                     Some(login_param),
+                                                  HtxSwapWsType::Private);
+
+        // 消费数据的函数
+        let core_arc_clone = core_arc.clone();
+        let run_symbol = symbols[0].clone();
+        let ct_val = core_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
+        let fun = move |data: ResponseData| {
+            let core_arc_cc = core_arc_clone.clone();
+            let run_symbol_c = run_symbol.clone();
+
+            async move {
+                on_private_data(core_arc_cc, ct_val, data, &run_symbol_c).await
+            }
+        };
+
+        // 准备链接
+        bg_private.set_subscribe(vec![
+            HtxSwapSubscribeType::PrFuturesOrders,
+            HtxSwapSubscribeType::PrFuturesBalances,
+            HtxSwapSubscribeType::PrFuturesPositions
+        ]);
+        bg_private.set_symbols(symbols.clone());
+        bg_private.ws_connect_async(is_shutdown_arc_c1, fun, &write_tx_am_private, write_rx_private).await.expect("htx_usdt_swap 链接有异常")
+    });
+}
+
+async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>,
+                         ct_val: Decimal,
+                         response: ResponseData,
+                         run_symbol: &String) {
+    let mut trace_stack = TraceStack::new(response.time, response.ins);
+    trace_stack.on_after_span_line();
+
+    let order_channel = "orders_cross";
+    let position_channel = "positions_cross";
+    let balance_channel = "accounts_cross";
+    if response.channel.contains(order_channel) { // 订单频道
+        trace_stack.set_source("htx_swap.orders".to_string());
+        let orders = standard::handle_info::HandleSwapInfo::handle_order(HtxSwap, response.clone(), ct_val.clone());
+        let mut order_infos:Vec<OrderInfo> = Vec::new();
+        for mut order in orders.order {
+            if order.status == "NULL" {
+                error!("htx_usdt_swap 未识别的订单状态:{:?}", response);
+                continue;
+            }
+            let mut order_info = OrderInfo::parse_order_to_order_info(&mut order);
+            order_info.trace_stack.source = "htx_usdt_swap 118".to_string();
+            order_infos.push(order_info);
+        }
+
+        {
+            let mut core = core_arc_clone.lock().await;
+            core.update_order(order_infos, trace_stack).await;
+        }
+    } else if response.channel.contains(position_channel) { // 仓位频道
+        let positions = standard::handle_info::HandleSwapInfo::handle_position(HtxSwap, &response, &ct_val);
+        let mut core = core_arc_clone.lock().await;
+
+        core.update_position(positions).await;
+    } else if response.channel.contains(balance_channel)  { // 余额频道
+        let account = standard::handle_info::HandleSwapInfo::handle_account_info(HtxSwap, &response, run_symbol);
+        let mut core = core_arc_clone.lock().await;
+
+        core.update_equity(account).await;
+    } else {
+        error!("未知推送类型");
+        error!(?response);
+    }
+}
+
+async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>,
+                        update_flag_u: &mut Decimal,
+                        run_symbol: String,
+                        response: ResponseData) {
+    let mut trace_stack = TraceStack::new(response.time, response.ins);
+    trace_stack.on_after_span_line();
+    let channel_symbol = run_symbol.replace("_", "-");
+    let depth_channel = format!("market.{}.depth.step0", channel_symbol.to_uppercase());
+    // public类型,目前只考虑订单流数据
+    if response.channel == depth_channel { // 深度频道
+        trace_stack.set_source("htx_usdt_swap.depth".to_string());
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(HtxSwap, &response);
+        trace_stack.on_after_format();
+
+        on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+    } else {
+        error!("未知推送类型");
+        error!(?response);
+    }
+}
+
+fn parse_btree_map_to_htx_swap_login(exchange_params: BTreeMap<String, String>) -> HtxSwapLogin {
+    HtxSwapLogin {
+        api_key: exchange_params.get("access_key").unwrap().clone(),
+        secret: exchange_params.get("secret_key").unwrap().clone()
+    }
+}

+ 2 - 1
strategy/src/lib.rs

@@ -13,4 +13,5 @@ mod bitget_spot;
 mod okx_usdt_swap;
 mod bybit_usdt_swap;
 mod bitget_usdt_swap;
-mod coinex_usdt_swap;
+mod coinex_usdt_swap;
+mod htx_usdt_swap;

+ 13 - 3
strategy/src/utils.rs

@@ -8,14 +8,20 @@ use global::public_params;
 // 生成订单的id,可以根据交易所名字来
 pub fn generate_client_id(exchange_name_some: Option<String>) -> String {
     // 交易所名字获取
-    let exchange_name = exchange_name_some.unwrap_or("".to_string());
+    let mut exchange_name = exchange_name_some.unwrap_or("".to_string());
     // 0-1000的随机数
     let rand_num = rand::thread_rng().gen_range(1..1000);
     // 随机时间戳
     let in_ms = Utc::now().timestamp_micros();
     let time_str = in_ms.to_string();
     let time_slice = &time_str[10..];
-    let result = format!("{}{}{}", time_slice, rand_num, exchange_name);
+    // 火币只能纯数字
+    if exchange_name == "htx" {
+        let mut rng = rand::thread_rng(); // 获取一个随机数生成器
+        let random_number = rng.gen_range(1000..10000); // 生成一个四位随机数
+        exchange_name = random_number.to_string();
+    }
+    let result = format!("{}{}{}{}", "1", time_slice, rand_num, exchange_name);
 
     return result;
 }
@@ -69,7 +75,9 @@ pub fn get_limit_requests_num_per_second(exchange: String) -> i64 {
         return public_params::BITGET_USDT_SWAP_LIMIT * public_params::RATIO;
     } else if exchange.eq("bybit_usdt_swap"){
         return public_params::BYBIT_USDT_SWAP_LIMIT * public_params::RATIO;
-    } else {
+    } else if exchange.eq("htx_usdt_swap") {
+        return public_params::HTX_USDT_SWAP_LIMIT * public_params::RATIO;
+    }else {
         error!("限频规则(ratio)未找到,请检查配置!");
         panic!("限频规则(ratio)未找到,请检查配置!");
     }
@@ -101,6 +109,8 @@ pub fn get_limit_order_requests_num_per_second(exchange: String) -> i64 {
         return public_params::BITGET_USDT_SWAP_LIMIT
     } else if exchange.eq("bybit_usdt_swap") {
         return public_params::BYBIT_USDT_SWAP_LIMIT
+    } else if exchange.eq("htx_usdt_swap") {
+        return public_params::HTX_USDT_SWAP_LIMIT
     } else {
         error!("限频规则(limit)未找到,请检查配置!");
         panic!("限频规则(limit)未找到,请检查配置!");

+ 6 - 1
test_account.toml.sample

@@ -19,4 +19,9 @@ okx_pass = ""
 # bitget账号密码
 bitget_access_key = ""
 bitget_secret_key = ""
-bitget_pass = ""
+bitget_pass = ""
+
+# htx账号密码
+htx_access_key = ""
+htx_secret_key = ""
+htx_pass = ""