Browse Source

粗版本

hl 1 year ago
parent
commit
58202c4b15

+ 1 - 1
exchanges/Cargo.toml

@@ -20,7 +20,7 @@ base64 = "0.13"
 tokio = { version = "1.31.0", features = ["full"] }
 chrono = "0.4.26"
 hex = "0.4"
-reqwest = { version = "0.11.14", features = ["json"] }
+reqwest = { version = "0.11.22", features = ["json","socks"] }
 # 解壓縮數據
 flate2 = "1.0"
 

+ 101 - 49
exchanges/src/binance_swap_rest.rs

@@ -1,16 +1,19 @@
 use std::collections::BTreeMap;
 use std::str::FromStr;
-use reqwest::header::HeaderMap;
+
 use hex;
-use reqwest::Client;
+use reqwest::{Client, Proxy};
+use reqwest::header::HeaderMap;
+use ring::hmac;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::FromPrimitive;
 use rust_decimal_macros::dec;
+use serde_json::{json, Value};
+use tracing::{error, info, trace};
+
 use crate::http_tool::RestTool;
+use crate::proxy;
 use crate::response_base::ResponseData;
-use tracing::{error, info, trace};
-use ring::hmac;
-use serde_json::{json, Value};
 
 #[derive(Clone)]
 pub struct BinanceSwapRest {
@@ -69,8 +72,8 @@ impl BinanceSwapRest {
         let data = self.request("GET".to_string(),
                                 "".to_string(),
                                 format!("/fapi/v1/time"),
-                                true,
-                                params.to_string(),
+                                false,
+                                params,
         ).await;
         data
     }
@@ -82,8 +85,8 @@ impl BinanceSwapRest {
         let data = self.request("GET".to_string(),
                                 "".to_string(),
                                 format!("/fapi/v1/exchangeInfo"),
-                                true,
-                                params.to_string(),
+                                false,
+                                params,
         ).await;
         data
     }
@@ -99,7 +102,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v2/balance"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -121,7 +124,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v1/order"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -136,7 +139,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v1/openOrders"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -154,7 +157,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v1/allOrders"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -168,7 +171,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v1/ticker/bookTicker"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -183,7 +186,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v2/positionRisk"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -194,7 +197,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v1/order"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -209,7 +212,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v1/positionSide/dual"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -229,7 +232,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v1/order"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -243,7 +246,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v1/allOpenOrders"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -267,7 +270,7 @@ impl BinanceSwapRest {
                                 "".to_string(),
                                 format!("/fapi/v1/userTrades"),
                                 true,
-                                params.to_string(),
+                                params,
         ).await;
         data
     }
@@ -305,12 +308,12 @@ impl BinanceSwapRest {
     }
 
     //调用请求
-    async fn request(&mut self,
-                     request_type: String,
-                     prefix_url: String,
-                     request_url: String,
-                     is_login: bool,
-                     params: String) -> ResponseData
+    pub async fn request(&mut self,
+                         method: String,
+                         prefix_url: String,
+                         request_url: String,
+                         is_login: bool,
+                         mut params_json: Value) -> ResponseData
     {
         // trace!("login_param:{:?}", self.login_param);
         //解析账号信息
@@ -328,16 +331,15 @@ impl BinanceSwapRest {
         }
 
         //请求头配置-如果需要登录则存在额外配置
-        let mut params_json: serde_json::Value = serde_json::from_str(params.clone().as_str()).unwrap();
-        // let mut body = "".to_string();
         let timestamp = chrono::Utc::now().timestamp_millis().to_string();
-        params_json.as_object_mut().unwrap().insert("timestamp".to_string(), serde_json::Value::from(timestamp));
-
+        params_json["timestamp"] = Value::from(timestamp);
 
+        let mut body = "".to_string();
+        let mut params = "".to_string();
         let mut headers = HeaderMap::new();
-        if request_type == "GET" {
+        if method == "GET" {
             headers.insert("Content-Type", "application/json; charset=UTF-8".parse().unwrap());
-        } else if request_type == "POST" || request_type == "PUT" || request_type == "DELETE" {
+        } else if method == "POST" || method == "PUT" || method == "DELETE" {
             headers.insert("Content-Type", "application/x-www-form-urlencoded".parse().unwrap());
         }
 
@@ -352,20 +354,29 @@ impl BinanceSwapRest {
                 let sing = Self::sign(secret_key.clone(),
                                       params_json.to_string(),
                 );
-                params_json.as_object_mut().unwrap().insert("signature".to_string(), serde_json::Value::from(sing.clone()));
+                params_json["signature"] = serde_json::Value::from(sing.clone());
                 // trace!("sing:{}", sing);
                 //组装header
                 headers.extend(Self::headers(access_key));
             }
         }
+        if method == "POST" {
+            body = params_json.to_string();
+        }
+        if method == "GET" || method == "PUT" || method == "DELETE" {
+            let z = params_json.to_string();
+            params = RestTool::parse_params_to_str(z);
+        }
 
         trace!("headers:{:?}", headers);
         let start_time = chrono::Utc::now().timestamp_millis();
         let response = self.http_tool(
             format!("{}{}", prefix_url.clone(), request_url.clone()),
-            request_type.to_string(),
-            params_json.to_string(),
+            method.to_string(),
+            params.clone(),
+            body.clone(),
             headers,
+            is_login,
         ).await;
 
         let time_array = chrono::Utc::now().timestamp_millis() - start_time;
@@ -395,41 +406,82 @@ impl BinanceSwapRest {
     }
 
 
-    async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> ResponseData {
+    async fn http_tool(&mut self, request_path: String,
+                       request_type: String,
+                       params: String,
+                       body: String,
+                       headers: HeaderMap,
+                       is_login: bool) -> ResponseData {
         /****请求接口与 地址*/
         let url = format!("{}{}", self.base_url.to_string(), request_path);
         let request_type = request_type.clone().to_uppercase();
-        let url_param = RestTool::parse_params_to_str(params.clone());
-        let addrs_url = format!("{}?{}", url.clone(), url_param);
+        let addrs_url: String = if params == "" {
+            url.clone()
+        } else {
+            format!("{}?{}", url.clone(), params)
+        };
+
 
-        let params_json: serde_json::Value = serde_json::from_str(&params).unwrap();
-        trace!("url:{}",url);
-        trace!("addrs_url:{}",addrs_url);
-        trace!("params_json:{}",params_json);
-        trace!("headers:{:?}",headers);
-        trace!("body:{:?}",params);
+        trace!("url-----:{}",url.clone());
+        trace!("addrs_url-----:{}",addrs_url.clone());
+        trace!("params-----:{}",params.clone());
+        trace!("body-----:{}",body.clone());
+        trace!("headers-----:{:?}",headers.clone());
+        trace!("request_type-----:{:?}",request_type.clone());
+
+        // let parsing_detail = proxy::ParsingDetail::parsing_environment_variables(Some("binance"));
+        // // let parsing_detail = proxy::ParsingDetail::parsing_environment_variables(None);
+        // let client = if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
+        //     let proxy_address = format!("http://{}:{}", parsing_detail.ip_address, parsing_detail.port);
+        //     // let proxy_address = "socks5://127.0.0.1:17890"; // 替换为你的 SOCKS5 代理地址
+        //     trace!("http_proxy-----:{:?}",proxy_address.clone());
+        //     let proxy = Proxy::all(proxy_address).unwrap();
+        //     Client::builder().proxy(proxy).build().unwrap()
+        // } else {
+        //     Client::new()
+        // };
+
+        let client = if is_login {
+            let params = proxy::ParsingDetail::http_enable_proxy(Some("binance"));
+            // let params = proxy::ParsingDetail::http_enable_proxy(None);
+            let client_re;
+            if params {
+                let proxy_address = "socks5://127.0.0.1:17890"; // 替换为你的 SOCKS5 代理地址
+                let proxy = Proxy::all(proxy_address).unwrap();
+                client_re = Client::builder().proxy(proxy).build().unwrap();
+            } else {
+                client_re =  Client::new()
+            }
+            client_re
+        } else {
+            proxy::ParsingDetail::http_enable_proxy(None);
+            Client::new()
+        };
 
 
         let request_builder = match request_type.as_str() {
-            "GET" => self.client.get(url.clone()).query(&params_json).headers(headers),
-            "POST" => self.client.post(url.clone()).query(&params_json).headers(headers),
-            "DELETE" => self.client.delete(url.clone()).query(&params_json).headers(headers),
-            // "PUT" => self.client.put(url.clone()).json(&params),
+            "GET" => client.get(addrs_url.clone()).headers(headers),
+            "POST" => client.post(url.clone()).body(body).headers(headers),
+            "DELETE" => client.delete(addrs_url.clone()).headers(headers),
+            "PUT" => client.put(addrs_url.clone()).headers(headers),
             _ => {
                 panic!("{}", format!("错误的请求类型:{}", request_type.clone()))
             }
         };
 
+
         let response = request_builder.send().await.unwrap();
         let is_success = response.status().is_success(); // 先检查状态码
         let text = response.text().await.unwrap();
+        trace!("text:???{:?}",text);
         return if is_success {
             self.on_success_data(text)
         } else {
             self.on_error_data(text, addrs_url, params)
-        }
+        };
     }
 
+
     pub fn on_success_data(&mut self, text: String) -> ResponseData {
         let data = serde_json::from_str(text.as_str()).unwrap();
 

+ 37 - 13
exchanges/src/proxy.rs

@@ -43,7 +43,7 @@ impl ParsingDetail {
                 ProxyEnum::REST => {
                     env::set_var("http_proxy", ip_port.to_string());
                     env::set_var("https_proxy", ip_port.to_string());
-                return     ProxyResponseEnum::NO;
+                    return ProxyResponseEnum::NO;
                 }
                 ProxyEnum::WS => {
                     let ip_port: Vec<&str> = ip_port.split(":").collect();
@@ -54,11 +54,11 @@ impl ParsingDetail {
                         ip_array[2].parse().unwrap(),
                         ip_array[3].parse().unwrap())
                     ), ip_port[1].parse().unwrap());
-              return       ProxyResponseEnum::YES(proxy);
+                    return ProxyResponseEnum::YES(proxy);
                 }
             }
         }
-        return  ProxyResponseEnum::NO;
+        return ProxyResponseEnum::NO;
     }
 
     fn new(ip_address: String,
@@ -66,15 +66,27 @@ impl ParsingDetail {
         ParsingDetail { ip_address, port }
     }
     //获取环境变量配置'proxy_address'拿到代理地址
-    pub fn parsing_environment_variables() -> ParsingDetail {
-        let proxy_address = env::var("proxy_address");
+    pub fn parsing_environment_variables(is_unusual: Option<&str>) -> ParsingDetail {
+        let proxy_address_name = match is_unusual {
+            None => {
+                "proxy_address"
+            }
+            Some(v) => {
+                match v {
+                    "binance" => {
+                        "binance_proxy_address"
+                    }
+                    _ => {
+                        "proxy_address"
+                    }
+                }
+            }
+        };
+        let proxy_address = env::var(proxy_address_name);
         // 使用std::env::var函数获取环境变量的值,如果返回Err,则说明环境变量不存在
         match proxy_address {
             Ok(value) => {
-                //trace!("环境变量读取成功:key:proxy_address , val:{}", value);
-                env::set_var("http_proxy", value.to_string());
-                env::set_var("https_proxy", value.to_string());
-
+                trace!("环境变量读取成功:key:proxy_address , val:{}", value);
                 let ip_port: Vec<&str> = value.split(":").collect();
                 let parsing_detail = ParsingDetail::new(ip_port[0].to_string(), ip_port[1].to_string());
                 parsing_detail
@@ -89,17 +101,29 @@ impl ParsingDetail {
 
     //http请求是否开启代理:HTTP 只需要调用该方法即可
     //原理是 设置全局代理,所以程序如果要走代理只需要执行一次,后续的get,post..都会走代理
-    pub fn http_enable_proxy() -> bool {
+    pub fn http_enable_proxy(is_unusual: Option<&str>) -> bool {
         //拿到环境变量解析的数据
-        let parsing_detail = Self::parsing_environment_variables();
+        let parsing_detail = Self::parsing_environment_variables(is_unusual);
         if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
             let http_proxy = format!("http://{}:{}", parsing_detail.ip_address, parsing_detail.port);
             env::set_var("http_proxy", http_proxy.clone());
             env::set_var("https_proxy", http_proxy.clone());
-            //trace!("代理设置成功{0}", http_proxy.to_string());
+            trace!("代理设置成功{0}", http_proxy.to_string());
+            true
+        } else {
+            trace!("无法开启代理:环境变量获取失败:{:?}", parsing_detail);
+            false
+        }
+    }
+
+    pub fn removes_proxy(is_unusual: Option<&str>) -> bool {
+        //拿到环境变量解析的数据
+        let parsing_detail = Self::parsing_environment_variables(is_unusual);
+        if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
+            env::remove_var("http_proxy");
+            env::remove_var("https_proxy");
             true
         } else {
-            //trace!("无法开启代理:环境变量获取失败:{:?}", parsing_detail);
             false
         }
     }

+ 63 - 132
exchanges/tests/binance_swap_test.rs

@@ -11,164 +11,95 @@ use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, Bin
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 
-const ACCESS_KEY: &str = "";
-const SECRET_KEY: &str = "";
+const ACCESS_KEY: &str = "rgKH05A1QOpXj4NDKuIb8edOLPPpF0gYu8WBpiAIqG4aCwHsytjFWiOtoWYUPB7R";
+const SECRET_KEY: &str = "14573tSKmHLyV5WO7L31hNK0HZ21yiyMqWg85POvrKDyOn2i9bZKNTlPBUqR3fpm";
 
 
 //ws-订阅公共频道信息
 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
 async fn ws_custom_subscribe() {
     global::log_utils::init_log_with_trace();
-
-
-    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
-
-    // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
-    // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
-
-
-    let mut ws = get_ws(None);
-    ws.set_symbols(vec!["BTC_USDT".to_string()]);
-    ws.set_subscribe(vec![
-        BinanceSwapSubscribeType::PuBookTicker,
-        // BinanceSwapSubscribeType::PuAggTrade,
-        // BinanceSwapSubscribeType::PuDepth20levels100ms,
-    ]);
-
-
-    let write_tx_am = Arc::new(Mutex::new(write_tx));
-    let is_shutdown_arc = Arc::new(AtomicBool::new(true));
-
-    //读取
-    let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
-    let _tr = tokio::spawn(async move {
-        trace!("线程-数据读取-开启");
-        let mut max_delay = 0i64;
-        loop {
-            // 从通道中接收并丢弃所有的消息,直到通道为空
-            while let Ok(Some(data)) = read_rx.try_next() {
-                // 消息被忽略
-                let mut trace_stack = TraceStack::new(0, Instant::now());
-                trace_stack.on_before_unlock_core();
-                trace_stack.on_after_network(data.time);
-
-                let delay = trace_stack.before_unlock_core - trace_stack.after_network;
-                max_delay = max(max_delay, delay);
-                info!("{}us, max={}us", delay, max_delay);
-
-                // 从通道中接收并丢弃所有的消息,直到通道为空
-                while let Ok(Some(_)) = read_rx.try_next() {
-                    // 消息被忽略
-                }
-            }
-        }
-        // trace!("线程-数据读取-结束");
-    });
-
-    //写数据
-    // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
-    // let write_tx_clone = Arc::clone(&write_tx_am);
-    // let su = ws.get_subscription();
-    // let tw = tokio::spawn(async move {
-    //     trace!("线程-数据写入-开始");
-    //     loop {
-    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
-    //         // let close_frame = CloseFrame {
-    //         //     code: CloseCode::Normal,
-    //         //     reason: Cow::Borrowed("Bye bye"),
-    //         // };
-    //         // let message = Message::Close(Some(close_frame));
     //
     //
-    //         let message = Message::Text(su.clone());
-    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
-    //         trace!("发送指令成功");
-    //     }
-    //     trace!("线程-数据写入-结束");
-    // });
-
-    let t1 = tokio::spawn(async move {
-        //链接
-        let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        trace!("test 唯一线程结束--");
-    });
-    tokio::try_join!(t1).unwrap();
-    trace!("当此结束");
-    trace!("重启!");
-    trace!("参考交易所关闭");
-    return;
-
-    //************************************
-    //************************************
-    //************************************
-    //************************************
-    //************************************
-    //************************************
-    //************************************
-    //11 点31 分
-
-    // let mut is_shutdown_arc = Arc::new(AtomicBool::new(true));
-    // //创建读写通道
     // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-    // // 封装 write_tx 到 Arc 和 Mutex
-    // let write_tx_am = Arc::new(Mutex::new(write_tx));
+    // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+    //
+    // // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
+    // // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
+    //
     //
-    // //对象
     // let mut ws = get_ws(None);
-    // // 币对
     // ws.set_symbols(vec!["BTC_USDT".to_string()]);
-    // //订阅
     // ws.set_subscribe(vec![
     //     BinanceSwapSubscribeType::PuBookTicker,
-    //     BinanceSwapSubscribeType::PuAggTrade,
-    //     BinanceSwapSubscribeType::PuDepth20levels100ms,
+    //     // BinanceSwapSubscribeType::PuAggTrade,
+    //     // BinanceSwapSubscribeType::PuDepth20levels100ms,
     // ]);
     //
     //
-    // //模拟业务场景 开启链接
-    // let is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
-    // let write_tx_clone1 = Arc::clone(&write_tx_am);
-    // let t1 = tokio::spawn(async move {
-    //     ws.ws_connect_async(is_shutdown_arc_clone, write_tx_clone1, write_rx, &read_tx).await.unwrap();
-    //     trace!("ws_connect_async 完成");
-    // });
+    // let write_tx_am = Arc::new(Mutex::new(write_tx));
+    // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
     //
-    // //模拟业务场景 一直监听数据
-    // let t2 = tokio::spawn(async move {
+    // //读取
+    // let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
+    // let _tr = tokio::spawn(async move {
+    //     trace!("线程-数据读取-开启");
+    //     let mut max_delay = 0i64;
     //     loop {
-    //         if let Some(data) = read_rx.next().await {
-    //             trace!("读取数据data:{:?}",data)
+    //         // 从通道中接收并丢弃所有的消息,直到通道为空
+    //         while let Ok(Some(data)) = read_rx.try_next() {
+    //             // 消息被忽略
+    //             let mut trace_stack = TraceStack::new(0, Instant::now());
+    //             trace_stack.on_before_unlock_core();
+    //             trace_stack.on_after_network(data.time);
+    //
+    //             let delay = trace_stack.before_unlock_core - trace_stack.after_network;
+    //             max_delay = max(max_delay, delay);
+    //             info!("{}us, max={}us", delay, max_delay);
+    //
+    //             // 从通道中接收并丢弃所有的消息,直到通道为空
+    //             while let Ok(Some(_)) = read_rx.try_next() {
+    //                 // 消息被忽略
+    //             }
     //         }
     //     }
-    //     trace!("数据读取退出 完成");
+    //     // trace!("线程-数据读取-结束");
     // });
     //
-    //
-    // //模拟用户主动写入数据
-    // // let write_tx_clone2 = Arc::clone(&write_tx_am);
-    // // let t3 = tokio::spawn(async move {
-    // //     //模拟心跳
+    // //写数据
+    // // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
+    // // let write_tx_clone = Arc::clone(&write_tx_am);
+    // // let su = ws.get_subscription();
+    // // let tw = tokio::spawn(async move {
+    // //     trace!("线程-数据写入-开始");
     // //     loop {
-    // //         tokio::time::sleep(Duration::from_millis(5000)).await;
-    // //         let mut write_tx_clone = write_tx_clone2.lock().unwrap();
-    // //         match write_tx_clone.unbounded_send(Message::Pong(Vec::from("pong"))) {
-    // //             Ok(_) => {
-    // //                 trace!("发送心跳");
-    // //                 continue;
-    // //             }
-    // //             Err(_) => {
-    // //                 break;
-    // //             }
-    // //         }
+    // //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    // //         // let close_frame = CloseFrame {
+    // //         //     code: CloseCode::Normal,
+    // //         //     reason: Cow::Borrowed("Bye bye"),
+    // //         // };
+    // //         // let message = Message::Close(Some(close_frame));
+    // //
+    // //
+    // //         let message = Message::Text(su.clone());
+    // //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    // //         trace!("发送指令成功");
     // //     }
-    // //     trace!("主动推出 完成");
+    // //     trace!("线程-数据写入-结束");
     // // });
-    // // tokio::try_join!(y,y1,y2).unwrap();
-    // tokio::try_join!(t1,t2).unwrap();
-    // trace!("323123213");
+    //
+    // let t1 = tokio::spawn(async move {
+    //     //链接
+    //     let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+    //     ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    //     trace!("test 唯一线程结束--");
+    // });
+    // tokio::try_join!(t1).unwrap();
+    // trace!("当此结束");
+    // trace!("重启!");
+    // trace!("参考交易所关闭");
+    // return;
+    //
 }
 
 //rest-获取服务器时间

+ 467 - 0
phemex_swap_rest.rs

@@ -0,0 +1,467 @@
+use std::collections::BTreeMap;
+
+use chrono::Utc;
+use reqwest::{Client, Proxy};
+use reqwest::header::HeaderMap;
+use ring::hmac;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use rust_decimal_macros::dec;
+use serde_json::{json, Value};
+use tracing::{error, info, trace};
+
+use crate::http_tool::RestTool;
+use crate::proxy;
+use crate::response_base::ResponseData;
+
+#[derive(Clone, Debug)]
+pub struct PhemexSwapRest {
+    pub tag: String,
+    base_url: String,
+    /*******参数*/
+    //是否需要登录
+    //登录所需参数
+    login_param: BTreeMap<String, String>,
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+
+}
+
+impl PhemexSwapRest {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+
+    pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> PhemexSwapRest
+    {
+        return PhemexSwapRest::new_with_tag("default-PhemexSwapRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> PhemexSwapRest {
+        let base_url = if is_colo {
+            let z = "https://api.phemex.com".to_string();
+            info!("开启高速(未配置,走普通:{})通道",z);
+            z
+        } else {
+            let z = "https://api.phemex.com".to_string();
+            info!("走普通通道:{}",z);
+            z
+        };
+
+
+        /*****返回结构体*******/
+        PhemexSwapRest {
+            tag,
+            base_url,
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: dec!(0.0),
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************rest请求函数********************************************************/
+    /*******************************************************************************************************/
+    //服务器时间
+    pub async fn get_server(&mut self) -> ResponseData {
+        let params = json!({});
+        let data = self.request("GET".to_string(),
+                                "".to_string(),
+                                "/public/time".to_string(),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+
+    //查詢合約基礎信息
+    pub async fn get_market(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "".to_string(),
+                                "/public/products".to_string(),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+    //查詢ticker(Query 24 ticker)
+    pub async fn get_ticker(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "".to_string(),
+                                "/md/v3/ticker/24hr".to_string(),
+                                false,
+                                params,
+        ).await;
+        data
+    }
+
+
+    //持仓(查询交易账户和仓位)
+    pub async fn get_account_and_positions(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "".to_string(),
+                                "/g-accounts/accountPositions".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //仓位设置(Switch Position Mode Synchronously)
+    pub async fn set_target_pos_mode(&mut self, params: Value) -> ResponseData {
+        let data = self.request("PUT".to_string(),
+                                "".to_string(),
+                                "/g-positions/switch-pos-mode-sync".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //设置杠杆(Set Leverage 设置杠杆)
+    pub async fn set_leverage(&mut self, params: Value) -> ResponseData {
+        let data = self.request("PUT".to_string(),
+                                "".to_string(),
+                                "/g-positions/leverage".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //下单
+    pub async fn orders(&mut self, params: Value) -> ResponseData {
+        let data = self.request("PUT".to_string(),
+                                "".to_string(),
+                                "/g-orders/create".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+
+    //撤单
+    pub async fn cancel_order(&mut self, params: Value) -> ResponseData {
+        let data = self.request("DELETE".to_string(),
+                                "".to_string(),
+                                "/g-orders/cancel".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //撤销所有
+    pub async fn cancel_order_all(&mut self, params: Value) -> ResponseData {
+        let data = self.request("DELETE".to_string(),
+                                "".to_string(),
+                                "/g-orders/all".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //订单列表
+    pub async fn get_orders(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "".to_string(),
+                                "/api-data/g-futures/orders".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    //根据id查询订单
+    pub async fn get_orders_by_id(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "".to_string(),
+                                "/api-data/g-futures/orders/by-order-id".to_string(),
+                                true,
+                                params,
+        ).await;
+        data
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    pub fn get_delays(&self) -> Vec<i64> {
+        self.delays.clone()
+    }
+    pub fn get_avg_delay(&self) -> Decimal {
+        self.avg_delay.clone()
+    }
+    pub fn get_max_delay(&self) -> i64 {
+        self.max_delay.clone()
+    }
+    fn get_delay_info(&mut self) {
+        let last_100 = if self.delays.len() > 100 {
+            self.delays[self.delays.len() - 100..].to_vec()
+        } else {
+            self.delays.clone()
+        };
+
+        let max_value = last_100.iter().max().unwrap();
+        if max_value.clone().to_owned() > self.max_delay {
+            self.max_delay = max_value.clone().to_owned();
+        }
+
+        let sum: i64 = last_100.iter().sum();
+        let sum_v = Decimal::from_i64(sum).unwrap();
+        let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+        self.avg_delay = (sum_v / len_v).round_dp(1);
+        self.delays = last_100.clone().into_iter().collect();
+    }
+    //调用请求
+    pub async fn request(&mut self,
+                         method: String,
+                         prefix_url: String,
+                         request_url: String,
+                         is_login: bool,
+                         params_json: Value) -> ResponseData
+    {
+        trace!("login_param:{:?}", self.login_param);
+        //解析账号信息
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        if self.login_param.contains_key("access_key") {
+            access_key = self.login_param.get("access_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            secret_key = self.login_param.get("secret_key").unwrap().to_string();
+        }
+        let mut is_login_param = true;
+        if access_key == "" || secret_key == "" {
+            is_login_param = false
+        }
+
+        //每个接口都有的参数
+        let timestamp = (Utc::now().timestamp_millis() + (60 * 1000)) / 1000;
+
+        //请求类型不同,可能请求头body 不同
+        let mut body = "".to_string();
+        let mut params = "".to_string();
+        let mut headers = HeaderMap::new();
+        if method == "POST" {
+            body = params_json.to_string();
+        }
+        if method == "GET" || method == "PUT" || method == "DELETE" {
+            let z = params_json.to_string();
+            params = RestTool::parse_params_to_str(z);
+        }
+
+        //是否需要登录-- 组装sing
+        if is_login {
+            if !is_login_param {
+                let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
+                return e;
+            } else {
+                //需要登录-且登录参数齐全
+                trace!("Path:{}{}", prefix_url.clone(),request_url.clone());
+                trace!("Query:{}", params);
+                trace!("Body:{}", body);
+                trace!("expire:{}", timestamp.to_string());
+                //组装sing
+                let sing = Self::sign(secret_key.clone(),
+                                      prefix_url.clone(),
+                                      request_url.clone(),
+                                      params.clone(),
+                                      body.clone(),
+                                      timestamp.to_string(),
+                );
+                trace!("Signature:{}", sing);
+                //组装header
+                headers.extend(Self::headers(sing, timestamp.to_string(), access_key));
+            }
+        }
+
+
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let response = self.http_tool(
+            format!("{}{}", prefix_url.clone(), request_url.clone()),
+            method.to_string(),
+            params.clone(),
+            body.clone(),
+            headers,
+            is_login,
+        ).await;
+
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        self.get_delay_info();
+
+        response
+    }
+
+    pub fn headers(sign: String, timestamp: String, access_key: String) -> HeaderMap {
+        let mut headers = HeaderMap::new();
+        headers.insert("x-phemex-access-token", access_key.parse().unwrap());// 这是 Phemex 网站的 API-KEY(id 字段)
+        headers.insert("x-phemex-request-expiry", timestamp.parse().unwrap());// 描述请求过期的 Unix EPoch 秒数,通常应为 (Now() + 1 分钟)
+        headers.insert("x-phemex-request-signature", sign.parse().unwrap());// 这是 http 请求的 HMAC SHA256 签名。Secret 是 API Secret,其公式为:HMacSha256(URL Path + QueryString + Expiry + body)
+        // let tracing = format!("4l_{:?}", Utc::now().timestamp_millis().to_string());
+        // headers.insert("x-phemex-request-tracing", tracing.parse().unwrap());
+        headers
+    }
+    pub fn sign(secret_key: String,
+                prefix_url: String, request_url: String,
+                url_param_str: String, body: String, timestamp: String) -> String
+    {
+        /*签名生成*/
+        let base_url = format!("{}{}", prefix_url, request_url);
+        // HMacSha256(URL Path + QueryString + Expiry + body)
+
+        // 时间戳 + 请求类型+ 请求参数字符串
+        let message = format!("{}{}{}{}", base_url, url_param_str, timestamp, body);
+        trace!("message:{}",message);
+
+        // 做签名
+        let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+        let sign = hex::encode(hmac::sign(&hmac_key, message.as_bytes()).as_ref());
+        sign
+    }
+
+    // async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+    async fn http_tool(&mut self, request_path: String,
+                       request_type: String,
+                       params: String,
+                       body: String,
+                       headers: HeaderMap,
+                       is_login: bool) -> ResponseData {
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+        let addrs_url: String = if params == "" {
+            url.clone()
+        } else {
+            format!("{}?{}", url.clone(), params)
+        };
+
+        trace!("url-----:{}",url.clone());
+        trace!("addrs_url-----:{}",addrs_url.clone());
+        trace!("params-----:{}",params.clone());
+        trace!("body-----:{}",body.clone());
+        trace!("headers-----:{:?}",headers.clone());
+        trace!("request_type-----:{:?}",request_type.clone());
+
+
+        let client = if is_login {
+            let params = proxy::ParsingDetail::http_enable_proxy(Some("phemex"));
+            let client_re;
+            if params {
+                let proxy_address = "socks5://127.0.0.1:17890"; // 替换为你的 SOCKS5 代理地址
+                let proxy = Proxy::all(proxy_address).unwrap();
+                client_re = Client::builder().proxy(proxy).build().unwrap();
+            } else {
+                client_re =  Client::new()
+            }
+            client_re
+        } else {
+            proxy::ParsingDetail::http_enable_proxy(None);
+            Client::new()
+        };
+        let request_builder = match request_type.as_str() {
+            "GET" => client.get(addrs_url.clone()).headers(headers),
+            "POST" => client.post(url.clone()).body(body).headers(headers),
+            "DELETE" => client.delete(addrs_url.clone()).headers(headers),
+            "PUT" => client.put(addrs_url.clone()).headers(headers),
+            _ => {
+                panic!("{}", format!("错误的请求类型:{}", request_type.clone()))
+            }
+        };
+
+
+        // 读取响应的内容
+        let response = request_builder.send().await.unwrap();
+        let is_success = response.status().is_success(); // 先检查状态码
+        let text = response.text().await.unwrap();
+        // trace!("text:{:?}",text);
+        return if is_success {
+            self.on_success_data(&text)
+        } else {
+            self.on_error_data(&text, &addrs_url, &params)
+        };
+    }
+    pub fn on_success_data(&mut self, text: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text).unwrap();
+        // return  ResponseData::new(self.tag.clone(), 200, "success".to_string(), json_value);
+
+        let id = json_value["id"].as_i64();
+        match id {
+            None => {}
+            Some(v) => {
+                match v {
+                    0 => {
+                        let result = json_value.get("result").unwrap();
+                        return ResponseData::new(self.tag.clone(), 200, "success".to_string(), result.clone());
+                    }
+                    _ => {}
+                }
+            }
+        }
+
+        let code = json_value["code"].as_i64();
+        match code {
+            None => {}
+            Some(v) => {
+                match v {
+                    0 => {
+                        //判断是否有code ,没有表示特殊接口,直接返回
+                        if json_value.get("data").is_some() {
+                            let data = json_value.get("data").unwrap();
+                            return ResponseData::new(self.tag.clone(), 200, "success".to_string(), data.clone());
+                        } else {
+                            return ResponseData::new(self.tag.clone(), 200, "success".to_string(), json_value);
+                        }
+                    }
+                    _ => {
+                        if json_value.get("msg").is_some() {
+                            return ResponseData::new(self.tag.clone(), 400, format!("{:?}", json_value["msg"].as_str()), json_value);
+                        } else {
+                            return ResponseData::new(self.tag.clone(), 400, "error".to_string(), json_value);
+                        }
+                    }
+                }
+            }
+        }
+
+
+        return ResponseData::new(self.tag.clone(), 400, "error".to_string(), json_value);
+    }
+
+    pub fn on_error_data(&mut self, text: &String, _base_url: &String, _params: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text);
+
+
+        match json_value {
+            Ok(data) => {
+                let code = data["code"].as_i64();
+                match code {
+                    None => {}
+                    Some(v) => {
+                        match v {
+                            _ => {
+                                if data.get("msg").is_some() {
+                                    return ResponseData::new(self.tag.clone(), 400, format!("{:?}", data["msg"].as_str().unwrap()), data);
+                                } else {
+                                    return ResponseData::new(self.tag.clone(), 400, "error".to_string(), data);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            Err(e) => {
+                error!("解析错误:{:?}", e);
+                return ResponseData::error("".to_string(),
+                                           format!("json 解析失败:{},相关参数:{}", e, text));
+            }
+        }
+        return ResponseData::error("".to_string(), format!("请求失败:{:?}", text));
+    }
+}

+ 2 - 2
standard/src/utils.rs

@@ -11,8 +11,8 @@ pub fn format_symbol(symbol: String, pat: &str) -> String {
 }
 
 // 检测是否走代理
-pub fn proxy_handle() {
-    if proxy::ParsingDetail::http_enable_proxy() {
+pub fn proxy_handle(is_unusual: Option<&str>) {
+    if proxy::ParsingDetail::http_enable_proxy(is_unusual) {
         trace!("检测有代理配置,配置走代理");
     }
 }

+ 2 - 2
standard/tests/exchange_test.rs

@@ -28,7 +28,7 @@ use standard::{okx_handle, Order, Platform, utils};
 // 创建实体
 #[allow(dead_code)]
 pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
-    utils::proxy_handle();
+    utils::proxy_handle(None);
     let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
     let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
 
@@ -124,7 +124,7 @@ pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn
 
 #[allow(dead_code)]
 pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<OkxSwapSubscribeType>: From<T> {
-    utils::proxy_handle();
+    utils::proxy_handle(None);
     let account_info = global::account_info::get_account_info("../test_account.toml");
     match exchange {
         ExchangeEnum::BinanceSpot => {

+ 1 - 1
standard/tests/gate_swap_test.rs

@@ -248,7 +248,7 @@ async fn test_set_dual_leverage() {
 #[instrument(level = "TRACE")]
 async fn test_command_order() {
     global::log_utils::init_log_with_trace();
-    utils::proxy_handle();
+    utils::proxy_handle(None);
 
     let (order_sender, mut order_receiver): (mpsc::Sender<Order>, mpsc::Receiver<Order>) = mpsc::channel(1024);
     let (error_sender, mut error_receiver): (mpsc::Sender<Error>, mpsc::Receiver<Error>) = mpsc::channel(1024);

+ 1 - 1
standard/tests/htx_swap_test.rs

@@ -251,7 +251,7 @@ async fn test_set_dual_leverage() {
 #[instrument(level = "TRACE")]
 async fn test_command_order() {
     global::log_utils::init_log_with_trace();
-    utils::proxy_handle();
+    utils::proxy_handle(None);
 
     let (order_sender, mut order_receiver): (mpsc::Sender<Order>, mpsc::Receiver<Order>) = mpsc::channel(1024);
     let (error_sender, mut error_receiver): (mpsc::Sender<Error>, mpsc::Receiver<Error>) = mpsc::channel(1024);

+ 1 - 1
standard/tests/kucoin_swap_test.rs

@@ -238,7 +238,7 @@ async fn test_set_auto_deposit_status() {
 #[instrument(level = "TRACE")]
 async fn test_command_order() {
     global::log_utils::init_log_with_trace();
-    utils::proxy_handle();
+    utils::proxy_handle(None);
 
     let (order_sender, mut order_receiver): (mpsc::Sender<Order>, mpsc::Receiver<Order>) = mpsc::channel(1024);
     let (error_sender, mut error_receiver): (mpsc::Sender<Error>, mpsc::Receiver<Error>) = mpsc::channel(1024);