Kaynağa Gözat

新增交易所

hl 1 yıl önce
ebeveyn
işleme
7c19e2e966

+ 2 - 1
derive/tests/export_excel_test.rs

@@ -3,13 +3,14 @@ use tracing::trace;
 use derive::export_excel::{ExportEnum, ExportExcel};
 use derive::ExportConnector;
 use exchanges::proxy;
+use exchanges::proxy::ProxyResponseEnum::NO;
 
 // 创建实体
 #[allow(dead_code)]
 pub async fn test_new_export(export_enum: ExportEnum) -> Box<dyn ExportConnector> {
     // 检测是否走代理
     pub fn proxy_handle() {
-        if proxy::ParsingDetail::http_enable_proxy() {
+        if proxy::ParsingDetail::http_enable_proxy(None) {
             trace!("检测有代理配置,配置走代理");
         }
     }

+ 2 - 0
exchanges/src/lib.rs

@@ -29,4 +29,6 @@ pub mod coinex_swap_rest;
 pub mod coinex_swap_ws;
 pub mod htx_swap_ws;
 pub mod htx_swap_rest;
+pub mod phemex_swap_ws;
+pub mod phemex_swap_rest;
 

+ 387 - 0
exchanges/src/phemex_swap_rest.rs

@@ -0,0 +1,387 @@
+use std::collections::BTreeMap;
+use chrono::Utc;
+use reqwest::header::HeaderMap;
+use reqwest::{Client};
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use rust_decimal_macros::dec;
+use tracing::{error, info, trace};
+use crate::http_tool::RestTool;
+use crate::response_base::ResponseData;
+use ring::hmac;
+use serde_json::{json, Value};
+
+#[derive(Clone, Debug)]
+pub struct PhemexSwapRest {
+    pub tag: String,
+    base_url: String,
+    client: reqwest::Client,
+    /*******参数*/
+    //是否需要登录
+    //登录所需参数
+    login_param: BTreeMap<String, String>,
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+
+}
+
+impl PhemexSwapRest {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+
+    pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> PhemexSwapRest
+    {
+        return PhemexSwapRest::new_with_tag("default-PhemexSwapRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> PhemexSwapRest {
+        let base_url = if is_colo {
+            "https://api.phemex.com".to_string()
+        } else {
+            "https://api.phemex.com".to_string()
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",base_url);
+        } else {
+            info!("走普通通道:{}",base_url);
+        }
+        /*****返回结构体*******/
+        PhemexSwapRest {
+            tag,
+            base_url,
+            client: Client::new(),
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: dec!(0.0),
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************rest请求函数********************************************************/
+    /*******************************************************************************************************/
+    //服务器时间
+    pub async fn get_server(&mut self) -> ResponseData {
+        let params = json!({});
+        let data = self.request("GET".to_string(),
+                                "".to_string(),
+                                "/public/time".to_string(),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+
+    //查詢合約基礎信息
+    pub async fn get_market(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "".to_string(),
+                                "/public/products".to_string(),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+    //持仓(查询交易账户和仓位)
+    pub async fn get_account_and_positions(&mut self) -> ResponseData {
+        let params = json!({
+            "currency":"USDT"
+        });
+        let data = self.request("GET".to_string(),
+                                "".to_string(),
+                                "/g-accounts/accountPositions".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //仓位设置(Switch Position Mode Synchronously)
+    pub async fn set_target_pos_mode(&mut self, params: Value) -> ResponseData {
+        let data = self.request("PUT".to_string(),
+                                "".to_string(),
+                                "/g-positions/switch-pos-mode-sync".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //设置杠杆(Set Leverage 设置杠杆)
+    pub async fn set_leverage(&mut self, params: Value) -> ResponseData {
+        let data = self.request("PUT".to_string(),
+                                "".to_string(),
+                                "/g-positions/leverage".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //下单
+    pub async fn orders(&mut self, params: Value) -> ResponseData {
+        let data = self.request("PUT".to_string(),
+                                "".to_string(),
+                                "/g-orders/create".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+
+    //撤单
+    pub async fn cancel_order(&mut self, params: Value) -> ResponseData {
+        let data = self.request("DELETE".to_string(),
+                                "".to_string(),
+                                "/g-orders/cancel".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //撤销所有
+    pub async fn cancel_order_all(&mut self, params: Value) -> ResponseData {
+        let data = self.request("DELETE".to_string(),
+                                "".to_string(),
+                                "/g-orders/all".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    pub fn get_delays(&self) -> Vec<i64> {
+        self.delays.clone()
+    }
+    pub fn get_avg_delay(&self) -> Decimal {
+        self.avg_delay.clone()
+    }
+    pub fn get_max_delay(&self) -> i64 {
+        self.max_delay.clone()
+    }
+    fn get_delay_info(&mut self) {
+        let last_100 = if self.delays.len() > 100 {
+            self.delays[self.delays.len() - 100..].to_vec()
+        } else {
+            self.delays.clone()
+        };
+
+        let max_value = last_100.iter().max().unwrap();
+        if max_value.clone().to_owned() > self.max_delay {
+            self.max_delay = max_value.clone().to_owned();
+        }
+
+        let sum: i64 = last_100.iter().sum();
+        let sum_v = Decimal::from_i64(sum).unwrap();
+        let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+        self.avg_delay = (sum_v / len_v).round_dp(1);
+        self.delays = last_100.clone().into_iter().collect();
+    }
+    //调用请求
+    pub async fn request(&mut self,
+                         method: String,
+                         prefix_url: String,
+                         request_url: String,
+                         is_login: bool,
+                         mut params_json: Value) -> ResponseData
+    {
+        trace!("login_param:{:?}", self.login_param);
+        //解析账号信息
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        if self.login_param.contains_key("access_key") {
+            access_key = self.login_param.get("access_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            secret_key = self.login_param.get("secret_key").unwrap().to_string();
+        }
+        let mut is_login_param = true;
+        if access_key == "" || secret_key == "" {
+            is_login_param = false
+        }
+
+        //每个接口都有的参数
+        let timestamp = (Utc::now().timestamp_millis() + (60 * 1000)) / 1000;
+
+        //请求类型不同,可能请求头body 不同
+        let mut body = "".to_string();
+        let mut params = "".to_string();
+        let mut headers = HeaderMap::new();
+        if method == "POST" {
+            body = params_json.to_string();
+        }
+        if method == "GET" || method == "PUT" ||method ==  "DELETE" {
+            let z = params_json.to_string();
+            params = RestTool::parse_params_to_str(z);
+        }
+
+        //是否需要登录-- 组装sing
+        if is_login {
+            if !is_login_param {
+                let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
+                return e;
+            } else {
+                //需要登录-且登录参数齐全
+                trace!("Path:{}{}", prefix_url.clone(),request_url.clone());
+                trace!("Query:{}", params);
+                trace!("Body:{}", body);
+                trace!("expire:{}", timestamp.to_string());
+                //组装sing
+                let sing = Self::sign(secret_key.clone(),
+                                      prefix_url.clone(),
+                                      request_url.clone(),
+                                      params.clone(),
+                                      body.clone(),
+                                      timestamp.to_string(),
+                );
+                trace!("Signature:{}", sing);
+                //组装header
+                headers.extend(Self::headers(sing, timestamp.to_string(), access_key));
+            }
+        }
+
+
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let response = self.http_tool(
+            format!("{}{}", prefix_url.clone(), request_url.clone()),
+            method.to_string(),
+            params.clone(),
+            body.clone(),
+            headers,
+        ).await;
+
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        self.get_delay_info();
+
+        response
+    }
+
+    pub fn headers(sign: String, timestamp: String, access_key: String) -> HeaderMap {
+        let mut headers = HeaderMap::new();
+        headers.insert("x-phemex-access-token", access_key.parse().unwrap());// 这是 Phemex 网站的 API-KEY(id 字段)
+        headers.insert("x-phemex-request-expiry", timestamp.parse().unwrap());// 描述请求过期的 Unix EPoch 秒数,通常应为 (Now() + 1 分钟)
+        headers.insert("x-phemex-request-signature", sign.parse().unwrap());// 这是 http 请求的 HMAC SHA256 签名。Secret 是 API Secret,其公式为:HMacSha256(URL Path + QueryString + Expiry + body)
+        let tracing = format!("4l_{:?}", Utc::now().timestamp_millis().to_string());
+        // headers.insert("x-phemex-request-tracing", tracing.parse().unwrap());
+        headers
+    }
+    pub fn sign(secret_key: String,
+                prefix_url: String, request_url: String,
+                url_param_str: String, body: String, timestamp: String) -> String
+    {
+        /*签名生成*/
+        let base_url = format!("{}{}", prefix_url, request_url);
+        // HMacSha256(URL Path + QueryString + Expiry + body)
+
+        // 时间戳 + 请求类型+ 请求参数字符串
+        let message = format!("{}{}{}{}", base_url, url_param_str, timestamp, body);
+        trace!("message:{}",message);
+
+        // 做签名
+        let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+        let sign = hex::encode(hmac::sign(&hmac_key, message.as_bytes()).as_ref());
+        sign
+    }
+
+    // async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+    async fn http_tool(&mut self, request_path: String,
+                       request_type: String,
+                       params: String,
+                       body: String,
+                       headers: HeaderMap) -> ResponseData {
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+        let addrs_url: String = if params == "" {
+            url.clone()
+        } else {
+            format!("{}?{}", url.clone(), params)
+        };
+
+        trace!("url-----:{}",url.clone());
+        trace!("addrs_url-----:{}",addrs_url.clone());
+        trace!("params-----:{}",params.clone());
+        trace!("body-----:{}",body.clone());
+        trace!("headers-----:{:?}",headers.clone());
+        trace!("request_type-----:{:?}",request_type.clone());
+
+        let request_builder = match request_type.as_str() {
+            "GET" => self.client.get(addrs_url.clone()).headers(headers),
+            "POST" => self.client.post(url.clone()).body(body).headers(headers),
+            "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
+            "PUT" => self.client.put(addrs_url.clone()).headers(headers),
+            _ => {
+                panic!("{}", format!("错误的请求类型:{}", request_type.clone()))
+            }
+        };
+
+        // 读取响应的内容
+        let response = request_builder.send().await.unwrap();
+        let is_success = response.status().is_success(); // 先检查状态码
+        let text = response.text().await.unwrap();
+        trace!("text:{:?}",text);
+        return if is_success {
+            self.on_success_data(&text)
+        } else {
+            self.on_error_data(&text, &addrs_url, &params)
+        };
+    }
+    pub fn on_success_data(&mut self, text: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text).unwrap();
+        // return  ResponseData::new(self.tag.clone(), 200, "success".to_string(), json_value);
+
+        let code = json_value["code"].as_i64().unwrap();
+        match code {
+            0 => {
+                //判断是否有code ,没有表示特殊接口,直接返回
+                if json_value.get("data").is_some() {
+                    let data = json_value.get("data").unwrap();
+                    ResponseData::new(self.tag.clone(), 200, "success".to_string(), data.clone())
+                } else {
+                    ResponseData::new(self.tag.clone(), 200, "success".to_string(), json_value)
+                }
+            }
+            _ => {
+                ResponseData::new(self.tag.clone(), 400, "error".to_string(), json_value)
+            }
+        }
+    }
+
+    pub fn on_error_data(&mut self, text: &String, base_url: &String, params: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text);
+
+        match json_value {
+            Ok(data) => {
+                let message;
+
+                if !data["message"].is_null() {
+                    message = format!("{}:{}", data["tag"].as_str().unwrap(), data["message"].as_str().unwrap());
+                } else {
+                    message = data["tag"].to_string();
+                }
+
+                let mut error = ResponseData::error(self.tag.clone(), message);
+                error.message = format!("请求地址:{}, 请求参数:{}, 报错内容:{}。", base_url, params, error.message);
+                error
+            }
+            Err(e) => {
+                error!("解析错误:{:?}", e);
+                let error = ResponseData::error("".to_string(),
+                                                format!("json 解析失败:{},相关参数:{}", e, text));
+                error
+            }
+        }
+    }
+}

+ 412 - 0
exchanges/src/phemex_swap_ws.rs

@@ -0,0 +1,412 @@
+use std::io::Read;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+
+use flate2::read::GzDecoder;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use serde_json::json;
+use serde_json::Value;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace};
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+//类型
+pub enum PhemexSwapWsType {
+    PublicAndPrivate,
+}
+
+
+#[derive(Debug)]
+#[derive(Clone)]
+pub struct PhemexSwapWsParam {
+    pub token: String,
+    pub ws_url: String,
+    pub ws_ping_interval: i64,
+    pub ws_ping_timeout: i64,
+    pub is_ok_subscribe: bool,
+}
+
+//订阅频道
+#[derive(Clone)]
+pub enum PhemexSwapSubscribeType {
+    // 订单
+    PuFuturesOrderBook,
+    // 公开成交
+    PuFuturesTrades,
+    // K线数据
+    PuFuturesRecords,
+}
+
+//账号信息
+#[derive(Clone, Debug)]
+pub struct PhemexSwapLogin {
+    pub access_key: String,
+    pub secret_key: String,
+    pub pass_key: String,
+}
+
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct PhemexSwapWs {
+    //类型
+    tag: String,
+    //地址
+    address_url: String,
+    //账号
+    login_param: Option<PhemexSwapLogin>,
+    //登录数据
+    ws_param: PhemexSwapWsParam,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<PhemexSwapSubscribeType>,
+    //心跳间隔
+    heartbeat_time: u64,
+}
+
+impl PhemexSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: Option<PhemexSwapLogin>, ws_type: PhemexSwapWsType) -> PhemexSwapWs {
+        return Self::new_with_tag("default-PhemexSwapWs".to_string(), is_colo, login_param, ws_type);
+    }
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<PhemexSwapLogin>, ws_type: PhemexSwapWsType) -> PhemexSwapWs {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            PhemexSwapWsType::PublicAndPrivate => {
+                let url = "wss://ws.phemex.com".to_string();
+                info!("走普通通道(不支持colo通道):{}", url);
+                url
+            }
+        };
+
+        /*******公共频道-私有频道数据组装*/
+        let ws_param = PhemexSwapWsParam {
+            token: "".to_string(),
+            ws_url: "".to_string(),
+            ws_ping_interval: 15 * 1000,
+            ws_ping_timeout: 0,
+            is_ok_subscribe: false,
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",address_url);
+        } else {
+            info!("走普通通道:{}",address_url);
+        }
+
+        PhemexSwapWs {
+            tag,
+            address_url,
+            login_param,
+            ws_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 18,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<PhemexSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_string();
+            // 字符串替换
+            *symbol = symbol.replace("_", "");
+        }
+        self.symbol_s = b_array;
+    }
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                PhemexSwapSubscribeType::PuFuturesTrades => false,
+                PhemexSwapSubscribeType::PuFuturesRecords => false,
+                PhemexSwapSubscribeType::PuFuturesOrderBook => false,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: PhemexSwapSubscribeType) -> Value {
+        match subscribe_type {
+            PhemexSwapSubscribeType::PuFuturesOrderBook => {
+                // format!("{}@depth5@100ms", symbol)
+                json!({
+                    "id": 999,
+                    "method": "orderbook_p.subscribe",
+                    "params": [
+                        symbol,
+                    ]
+                })
+            }
+            PhemexSwapSubscribeType::PuFuturesRecords => {
+                // format!("{}@kline_1m", symbol)
+                json!({
+                    "id": 999,
+                    "method": "kline_p.subscribe",
+                    "params": [
+                        symbol,
+                        60
+                    ]
+                })
+            }
+            PhemexSwapSubscribeType::PuFuturesTrades => {
+                json!({
+                    "id": 999,
+                    "method": "trade_p.subscribe",
+                    "params": [
+                        symbol
+                    ]
+                })
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> Vec<String> {
+        let mut array = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                array.push(ty_str.to_string());
+            }
+        }
+        array
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                             is_shutdown_arc: Arc<AtomicBool>,
+                                             handle_function: F,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let tag = self.tag.clone();
+        let heartbeat_time = self.ws_param.ws_ping_interval.clone();
+
+        // 心跳-- 方法内部线程启动
+        let write_tx_clone1 = write_tx_am.clone();
+        tokio::spawn(async move {
+            trace!("线程-异步心跳-开始");
+            let info = json!({
+                "id": 0,
+                "method": "server.ping",
+                "params": []
+            });
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(info.to_string()), heartbeat_time as u64).await;
+            trace!("线程-异步心跳-结束");
+        });
+
+
+        //设置订阅
+        let subscribe_array = subscription.clone();
+        if login_is {
+            //登录相关
+        }
+
+        //1 链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            loop {
+                info!("Phemex_usdt_swap socket 连接中……");
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
+
+                error!("Phemex_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
+    }
+    //数据解析-二进制
+    pub fn message_binary(po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        // let message_str = format!("Binary:{:?}", _po);
+        // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+        // let result = String::from_utf8(bytes);
+        // let result = String::from_utf8(po);
+
+        let mut gz_decoder = GzDecoder::new(&po[..]);
+        let mut decompressed_data = Vec::new();
+
+        // 尝试解压数据
+        if let Ok(_) = gz_decoder.read_to_end(&mut decompressed_data) {
+            // 将解压后的字节向量转换为 UTF-8 字符串
+            match String::from_utf8(decompressed_data) {
+                Ok(text) => {
+                    let response_data = Self::ok_text(text);
+                    return Option::from(response_data);
+                }
+                Err(_) => {
+                    return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
+                }
+            }
+        } else {
+            return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
+        }
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData
+    {
+        // trace!("原始数据:{:?}",text);
+
+        let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+
+        // { "id": "id1", "code": 0, "msg": "" }
+
+        let id = json_value["id"].as_i64();
+        match id {
+            None => {}
+            Some(v) => {
+                match v {
+                    999 => {
+                        let status = json_value["result"]["status"].as_str();
+                        //订阅
+                        if status == Option::from("success") {
+                            res_data.code = -201;
+                            res_data.message = "订阅成功".to_string();
+                        } else {
+                            res_data.code = 400;
+                            res_data.message = "订阅失败".to_string();
+                        }
+                        return res_data;
+                    }
+                    0 => {
+                        //心跳
+                        if json_value["result"].as_str() == Option::from("pong") {
+                            return ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null);
+                        }
+                    }
+                    _ => {}
+                }
+            }
+        }
+
+        //推送
+        let type_v = json_value["type"].as_str();
+        match type_v {
+            None => {}
+            Some(v) => {
+                match v {
+                    "incremental" => {
+                        if !json_value["kline_p"].is_null() {
+                            res_data.code = 200;
+                            res_data.data = json_value.clone();
+                            res_data.channel = "futures.candlesticks".to_string();
+                        } else if !json_value["trades_p"].is_null() {
+                            res_data.code = 200;
+                            res_data.data = json_value.clone();
+                            res_data.channel = "futures.trades".to_string();
+                        } else if !json_value["orderbook_p"].is_null() {
+                            res_data.code = 200;
+                            res_data.data = json_value.clone();
+                            res_data.channel = "futures.order_book".to_string();
+                        } else {
+                            res_data.code = 400;
+                            res_data.message = "未知解析".to_string();
+                        }
+                        return res_data;
+                    }
+                    "snapshot" => {
+                        res_data.code = 400;
+                        res_data.message = "(快照数据)未知解析".to_string();
+                        return res_data;
+                    }
+                    _ => {
+                    }
+                }
+            }
+        }
+
+        res_data.code = 400;
+        res_data.message = "未知解析".to_string();
+        //
+        // if json_value["id"].as_i64() == Option::from(0) {
+        //     //订阅
+        //     if json_value["result"].as_str() == Option::from("pong") {
+        //         res_data.code = -301;
+        //         res_data.message = "Pong".to_string();
+        //         return res_data;
+        //     }
+        // } else if json_value["id"].as_i64() == Option::from(999) {
+        //     //订阅
+        //     if json_value["result"]["status"].as_str() == Option::from("success") {
+        //         res_data.code = -201;
+        //         res_data.message = "订阅成功".to_string();
+        //     } else {
+        //         res_data.code = 400;
+        //         res_data.message = "订阅失败".to_string();
+        //     }
+        //     return res_data;
+        // } else if json_value["type"].as_str() == Option::from("incremental") {
+        //     if !json_value["kline_p"].is_null() {
+        //         res_data.code = 200;
+        //         res_data.data = json_value.clone();
+        //         res_data.channel = "futures.candlesticks".to_string();
+        //     } else if !json_value["trades_p"].is_null() {
+        //         res_data.code = 200;
+        //         res_data.data = json_value.clone();
+        //         res_data.channel = "futures.trades".to_string();
+        //     } else if !json_value["orderbook_p"].is_null() {
+        //         res_data.code = 200;
+        //         res_data.data = json_value.clone();
+        //         res_data.channel = "futures.order_book".to_string();
+        //     } else {
+        //         res_data.code = -1;
+        //         res_data.message = "未知解析".to_string();
+        //     }
+        //     return res_data;
+        // } else {
+        //     res_data.code = -1;
+        //     res_data.message = "未知解析".to_string();
+        // }
+        res_data
+    }
+}

+ 23 - 8
exchanges/src/proxy.rs

@@ -43,7 +43,7 @@ impl ParsingDetail {
                 ProxyEnum::REST => {
                     env::set_var("http_proxy", ip_port.to_string());
                     env::set_var("https_proxy", ip_port.to_string());
-                return     ProxyResponseEnum::NO;
+                    return ProxyResponseEnum::NO;
                 }
                 ProxyEnum::WS => {
                     let ip_port: Vec<&str> = ip_port.split(":").collect();
@@ -54,11 +54,11 @@ impl ParsingDetail {
                         ip_array[2].parse().unwrap(),
                         ip_array[3].parse().unwrap())
                     ), ip_port[1].parse().unwrap());
-              return       ProxyResponseEnum::YES(proxy);
+                    return ProxyResponseEnum::YES(proxy);
                 }
             }
         }
-        return  ProxyResponseEnum::NO;
+        return ProxyResponseEnum::NO;
     }
 
     fn new(ip_address: String,
@@ -66,8 +66,23 @@ impl ParsingDetail {
         ParsingDetail { ip_address, port }
     }
     //获取环境变量配置'proxy_address'拿到代理地址
-    pub fn parsing_environment_variables() -> ParsingDetail {
-        let proxy_address = env::var("proxy_address");
+    pub fn parsing_environment_variables(is_unusual: Option<&str>) -> ParsingDetail {
+        let proxy_address_name = match is_unusual {
+            None => {
+                "proxy_address"
+            }
+            Some(v) => {
+                match v {
+                    "phemex" => {
+                        "phemex_proxy_address"
+                    }
+                    _ => {
+                        "proxy_address"
+                    }
+                }
+            }
+        };
+        let proxy_address =    env::var(proxy_address_name);
         // 使用std::env::var函数获取环境变量的值,如果返回Err,则说明环境变量不存在
         match proxy_address {
             Ok(value) => {
@@ -80,7 +95,7 @@ impl ParsingDetail {
                 parsing_detail
             }
             Err(_) => {
-                trace!("环境变量读取失败:'proxy_address'");
+                trace!("环境变量读取失败:'{:?}'",proxy_address_name);
                 let parsing_detail = ParsingDetail::new("".to_string(), "".to_string());
                 parsing_detail
             }
@@ -89,9 +104,9 @@ impl ParsingDetail {
 
     //http请求是否开启代理:HTTP 只需要调用该方法即可
     //原理是 设置全局代理,所以程序如果要走代理只需要执行一次,后续的get,post..都会走代理
-    pub fn http_enable_proxy() -> bool {
+    pub fn http_enable_proxy(is_unusual: Option<&str>) -> bool {
         //拿到环境变量解析的数据
-        let parsing_detail = Self::parsing_environment_variables();
+        let parsing_detail = Self::parsing_environment_variables(is_unusual);
         if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
             let http_proxy = format!("http://{}:{}", parsing_detail.ip_address, parsing_detail.port);
             env::set_var("http_proxy", http_proxy.clone());

+ 140 - 69
exchanges/tests/gate_swap_test.rs

@@ -1,13 +1,22 @@
+use std::borrow::Cow;
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use chrono::Utc;
 
 use futures_util::StreamExt;
+use serde_json::json;
 use tokio::sync::Mutex;
-use tracing::trace;
+use tokio_tungstenite::tungstenite::Message;
+use tokio_tungstenite::tungstenite::protocol::CloseFrame;
+use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
+use tracing::{error, info, trace};
 
 use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
+use exchanges::response_base::ResponseData;
+use exchanges::socket_tool::{AbstractWsMode, HeartbeatType};
 
 const ACCESS_KEY: &str = "";
 const SECRET_KEY: &str = "";
@@ -18,25 +27,7 @@ async fn ws_custom_subscribe() {
     global::log_utils::init_log_with_trace();
 
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-
-    let param = GateSwapLogin {
-        api_key: ACCESS_KEY.to_string(),
-        secret: SECRET_KEY.to_string(),
-    };
-
-    let mut ws = get_ws(Option::from(param));
-    ws.set_symbols(vec!["BTC_USDT".to_string()]);
-    ws.set_subscribe(vec![
-        // GateSwapSubscribeType::PuFuturesOrderBook,
-        GateSwapSubscribeType::PuFuturesCandlesticks,
-        GateSwapSubscribeType::PuFuturesTrades,
-
-        // GateSwapSubscribeType::PrFuturesBalances("".to_string()),
-        // GateSwapSubscribeType::PrFuturesOrders("".to_string()),
-        // GateSwapSubscribeType::PrFuturesPositions("".to_string()),
-    ]);
-
+    let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
 
     let write_tx_am = Arc::new(Mutex::new(write_tx));
     let is_shutdown_arc = Arc::new(AtomicBool::new(true));
@@ -46,39 +37,83 @@ async fn ws_custom_subscribe() {
     let _tr = tokio::spawn(async move {
         trace!("线程-数据读取-开启");
         loop {
-            if let Some(data) = read_rx.next().await {
-                trace!("读取数据data:{:?}",data)
+            // 从通道中接收并丢弃所有的消息,直到通道为空
+            while let Ok(Some(_)) = read_rx.try_next() {
+
+                // 从通道中接收并丢弃所有的消息,直到通道为空
+                while let Ok(Some(_)) = read_rx.try_next() {
+                    // 消息被忽略
+                }
             }
         }
         // trace!("线程-数据读取-结束");
     });
 
+    let param = GateSwapLogin {
+        api_key: "".to_string(),
+        secret: "".to_string(),
+    };
+    let mut ws = get_ws(Option::from(param));
+    ws.set_symbols(vec!["BTC_USDT".to_string()]);
+    ws.set_subscribe(vec![
+        // GateSwapSubscribeType::PuFuturesTrades,
+    ]);
+
     //写数据
     // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
     // let write_tx_clone = Arc::clone(&write_tx_am);
     // let su = ws.get_subscription();
-    // let tw = tokio::spawn(async move {
-    //     trace!("线程-数据写入-开始");
+    // let tw_restart = tokio::spawn(async move {
     //     loop {
     //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
-    //         // let close_frame = CloseFrame {
-    //         //     code: CloseCode::Normal,
-    //         //     reason: Cow::Borrowed("Bye bye"),
-    //         // };
-    //         // let message = Message::Close(Some(close_frame));
-    //
+    //         let close_frame = CloseFrame {
+    //             code: CloseCode::Normal,
+    //             reason: Cow::Borrowed("Bye bye"),
+    //         };
+    //         let message = Message::Close(Some(close_frame));
     //
-    //         let message = Message::Text(su.clone());
     //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
-    //         trace!("发送指令成功");
+    //     }
+    // });
+
+
+    // let write_tx_clone2222 = Arc::clone(&write_tx_am);
+    // let tw_ping = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(1 * 1000)).await;
+    //         let timestamp = Utc::now().timestamp();
+    //         let ping_str = json!({
+    //             "time" : timestamp,
+    //             "channel" : "futures.ping",
+    //         });
+    //         write_tx_clone2222.lock().await.unbounded_send(Message::Text(ping_str.to_string())).expect("发送失败1");
+    //         // AbstractWsMode::ping_or_pong(write_tx_clone.clone(), HeartbeatType::Custom(ping_str.to_string()), 1000).await;
+    //         trace!("---------tw_ping");
     //     }
     //     trace!("线程-数据写入-结束");
     // });
 
+
+    // let write_tx_clone3333333 = Arc::clone(&write_tx_am);
+    // let tw_close = tokio::spawn(async move {
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(3 * 1000)).await;
+    //         write_tx_clone3333333.lock().await.close_channel();
+    //         trace!("---------tw_close");
+    //     }
+    // });
+
+    let fun = move |data: ResponseData| {
+        async move {
+            trace!("---传入的方法~~~~{:?}", data);
+        }
+    };
+
     let t1 = tokio::spawn(async move {
         //链接
         let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         trace!("test 唯一线程结束--");
     });
     tokio::try_join!(t1).unwrap();
@@ -88,6 +123,42 @@ async fn ws_custom_subscribe() {
     return;
 }
 
+//ws-订阅公共频道信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
+async fn ws_custom_subscribe2() {
+    global::log_utils::init_log_with_trace();
+
+    let (write_tx, mut write_rx) = futures_channel::mpsc::unbounded();
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let write_tx_clone = Arc::clone(&write_tx_am);
+    let write_tx_clone2 = Arc::clone(&write_tx_am);
+    let tw = tokio::spawn(async move {
+        loop {
+            tokio::time::sleep(Duration::from_millis(2 * 1000)).await;
+            let _ = write_tx_clone.lock().await.unbounded_send(Message::Ping(Vec::from("Ping"))).expect("发送失败!!!");
+        }
+    });
+
+    let tw2 = tokio::spawn(async move {
+        loop {
+            tokio::time::sleep(Duration::from_millis(10 * 1000)).await;
+            let _ = write_tx_clone2.lock().await.close_channel();
+        }
+    });
+
+    let t2 = tokio::spawn(async move {
+        let mut z = 0;
+        while let Some(message) = write_rx.next().await {
+            trace!("输出:{:?}",message);
+            z = z + 1;
+            if z > 3 {
+                write_rx.close();
+            }
+        }
+    });
+    tokio::try_join!(t2,tw).unwrap();
+}
 
 //rest-设置持仓模式
 #[tokio::test]
@@ -104,42 +175,42 @@ async fn rest_cancel_order_all_test() {
 async fn price_order_test() {
     global::log_utils::init_log_with_info();
 
-    let mut rest = get_rest();
-    let mut params = json!({});
-
-    params["initial"] = json!({
-        "contract": "XRP_USDT",
-        "price": "0",
-        "tif": "ioc",
-        "reduce_only": true,
-        // [平多:close_long, 平空:close_short]
-        "auto_size": "close_long"
-    });
-
-    params["trigger"] = json!({
-        // [平多:close-long-position, 平空:close-short-position]
-        "order_type": "close-long-position",
-        // 一般都默认用0
-        "strategy_type": 0,
-        // [0 - 最新成交价,1 - 标记价格,2 - 指数价格]
-        "price_type": 0,
-        // [1: 引用价格大于等于我们传的价格,2:引用价格小于等于我们传的价格]
-        // 在止损的情况下:
-        //     1 可以理解为向上突破触发价(一般是给空单用)
-        //     2 可以理解为向下突破触发价(一般是给多单用)
-        "rule": 2,
-        // 订单触发价格
-        "price": "0.5600",
-    });
-
-    let response_data = rest.place_price_order("usdt".to_string(), params).await;
-    if response_data.code == "200" {
-        let response_obj: serde_json::Value = serde_json::from_str(response_data.data.as_str()).unwrap();
-
-        info!("resp={:?}", response_obj.as_object().unwrap());
-    } else {
-        error!(?response_data);
-    }
+    // let mut rest = get_rest();
+    // let mut params = json!({});
+    //
+    // params["initial"] = json!({
+    //     "contract": "XRP_USDT",
+    //     "price": "0",
+    //     "tif": "ioc",
+    //     "reduce_only": true,
+    //     // [平多:close_long, 平空:close_short]
+    //     "auto_size": "close_long"
+    // });
+    //
+    // params["trigger"] = json!({
+    //     // [平多:close-long-position, 平空:close-short-position]
+    //     "order_type": "close-long-position",
+    //     // 一般都默认用0
+    //     "strategy_type": 0,
+    //     // [0 - 最新成交价,1 - 标记价格,2 - 指数价格]
+    //     "price_type": 0,
+    //     // [1: 引用价格大于等于我们传的价格,2:引用价格小于等于我们传的价格]
+    //     // 在止损的情况下:
+    //     //     1 可以理解为向上突破触发价(一般是给空单用)
+    //     //     2 可以理解为向下突破触发价(一般是给多单用)
+    //     "rule": 2,
+    //     // 订单触发价格
+    //     "price": "0.5600",
+    // });
+    //
+    // let response_data = rest.place_price_order("usdt".to_string(), params).await;
+    // if response_data.code == "200" {
+    //     let response_obj: serde_json::Value = serde_json::from_str(response_data.data.as_str()).unwrap();
+    //
+    //     info!("resp={:?}", response_obj.as_object().unwrap());
+    // } else {
+    //     error!(?response_data);
+    // }
 }
 
 #[tokio::test]

+ 1 - 1
exchanges/tests/htx_swap_test.rs

@@ -308,7 +308,7 @@ fn get_rest() -> HtxSwapRest {
 
 // 检测是否走代理
 pub fn proxy_handle() {
-    if proxy::ParsingDetail::http_enable_proxy() {
+    if proxy::ParsingDetail::http_enable_proxy(None) {
         trace!("检测有代理配置,配置走代理");
     }
 }

+ 207 - 0
exchanges/tests/phemex_swap_test.rs

@@ -0,0 +1,207 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+
+use serde_json::json;
+use tokio::sync::Mutex;
+use tracing::trace;
+use exchanges::phemex_swap_rest::PhemexSwapRest;
+use exchanges::phemex_swap_ws::{PhemexSwapLogin, PhemexSwapSubscribeType, PhemexSwapWs, PhemexSwapWsType};
+use exchanges::proxy;
+use exchanges::response_base::ResponseData;
+
+const ACCESS_KEY: &str = "";
+const SECRET_KEY: &str = "";
+
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn ws_custom_subscribe() {
+    global::log_utils::init_log_with_trace();
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+
+    // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
+    // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
+
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+
+    //读取
+    let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
+    let _tr = tokio::spawn(async move {
+        trace!("线程-数据读取-开启");
+        loop {
+            // 从通道中接收并丢弃所有的消息,直到通道为空
+            while let Ok(Some(_)) = read_rx.try_next() {
+
+                // 从通道中接收并丢弃所有的消息,直到通道为空
+                while let Ok(Some(_)) = read_rx.try_next() {
+                    // 消息被忽略
+                }
+            }
+        }
+        // trace!("线程-数据读取-结束");
+    });
+
+    //写数据
+    // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
+
+    let fun = move |data: ResponseData| {
+        async move {
+            trace!("---传入的方法~~~~{:?}", data);
+        }
+    };
+    let param = PhemexSwapLogin {
+        access_key: ACCESS_KEY.to_string(),
+        secret_key: SECRET_KEY.to_string(),
+        pass_key: "".to_string(),
+    };
+    let t1 = tokio::spawn(async move {
+        let mut ws = get_ws(Option::from(param), PhemexSwapWsType::PublicAndPrivate);
+        ws.set_symbols(vec!["BTC_USDT".to_string(), "ETC_USDT".to_string()]);
+        ws.set_subscribe(vec![
+            PhemexSwapSubscribeType::PuFuturesTrades,
+            // PhemexSwapSubscribeType::PuFuturesOrderBook,
+            // PhemexSwapSubscribeType::PuFuturesRecords,
+        ]);
+        //链接
+        let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+        ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        trace!("test 唯一线程结束--");
+    });
+    tokio::try_join!(t1).unwrap();
+    trace!("当此结束");
+    trace!("重启!");
+    trace!("参考交易所关闭");
+    return;
+}
+
+fn get_ws(btree_map: Option<PhemexSwapLogin>, ws_type: PhemexSwapWsType) -> PhemexSwapWs {
+    let phemex_ws = PhemexSwapWs::new(false, btree_map, ws_type);
+    phemex_ws
+}
+
+
+//服务器时间
+#[tokio::test]
+async fn rest_get_server_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_server().await;
+    println!("Phemex--服务器时间--{:?}", req_data);
+}
+
+//查詢合約基礎信息
+#[tokio::test]
+async fn rest_get_market_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_market(json!({
+
+    })).await;
+    println!("Phemex--查詢合約基礎信息--{:?}", req_data);
+}
+
+
+//持仓(查询交易账户和仓位)
+#[tokio::test]
+async fn rest_get_account_and_positions_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_account_and_positions().await;
+    println!("Phemex--持仓(查询交易账户和仓位)--{:?}", req_data);
+}
+
+//设置杠杆
+#[tokio::test]
+async fn rest_set_leverage_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.set_leverage(json!({
+        "symbol":"ZKUSDT",
+        "leverageRr":"1"
+    })).await;
+    println!("Phemex--设置杠杆)--{:?}", req_data);
+}
+
+
+//下单
+#[tokio::test]
+async fn rest_orders_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.orders(json!({
+          "symbol": "BTCUSDT",
+           "side": "Buy",
+          "posSide": "Long",
+
+    })).await;
+    println!("Phemex--下单)--{:?}", req_data);
+}
+
+//撤单
+#[tokio::test]
+async fn rest_cancel_order_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.cancel_order(json!({
+          "symbol ": "BTCUSDT",
+          "posSide": "Long",
+          "clOrdID": "1231312"
+    })).await;
+    println!("Phemex--撤单)--{:?}", req_data);
+}
+
+//撤销所有
+#[tokio::test]
+async fn rest_cancel_order_all_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.cancel_order_all(json!({
+    })).await;
+    println!("Phemex--撤销所有)--{:?}", req_data);
+}
+
+
+
+
+fn get_rest() -> PhemexSwapRest {
+    if proxy::ParsingDetail::http_enable_proxy(Some("phemex")) {
+        trace!("检测有代理配置,配置走代理");
+    }
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+    btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
+    btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
+
+    let phemex_exc = PhemexSwapRest::new(false, btree_map.clone());
+    phemex_exc
+}
+

+ 2 - 2
exchanges/tests/test.rs

@@ -23,7 +23,7 @@ async fn test_import() {
 
 
     /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
-    if proxy::ParsingDetail::http_enable_proxy() {
+    if proxy::ParsingDetail::http_enable_proxy(None) {
         trace!("检测有代理配置,配置走代理");
     }
 
@@ -502,7 +502,7 @@ async fn demo_pub_ws_ba() {
 
 fn demo_get_http_proxy() {
     //代理地址
-    let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
+    let parsing_detail = proxy::ParsingDetail::parsing_environment_variables(None);
     trace!("----代理信息:{:?}", parsing_detail);
 }
 

+ 1 - 1
standard/src/utils.rs

@@ -12,7 +12,7 @@ pub fn format_symbol(symbol: String, pat: &str) -> String {
 
 // 检测是否走代理
 pub fn proxy_handle() {
-    if proxy::ParsingDetail::http_enable_proxy() {
+    if proxy::ParsingDetail::http_enable_proxy(None) {
         trace!("检测有代理配置,配置走代理");
     }
 }

+ 1 - 1
tests/order_command_test.rs

@@ -11,7 +11,7 @@ use standard::{Order, OrderCommand};
 
 #[tokio::test]
 async fn main() {
-    if proxy::ParsingDetail::http_enable_proxy() {
+    if proxy::ParsingDetail::http_enable_proxy(None) {
         println!("检测有代理配置,配置走代理")
     }
     let mut params: BTreeMap<String, String> = BTreeMap::new();