875428575@qq.com 2 éve
szülő
commit
5a2213fbf0

+ 5 - 0
exchanges/Cargo.toml

@@ -23,3 +23,8 @@ data-encoding = "2.4.0"
 
 hmac = "0.8.1"
 sha2 = "0.9.8"
+
+tokio-tungstenite = "0.14"
+#tokio-tungstenite = "0.14"
+#websockets= "0.3.0"
+#futures = "0.3.21"

+ 5 - 1
exchanges/src/binance_spot_rest.rs

@@ -1,10 +1,14 @@
 use std::collections::BTreeMap;
+
 #[derive(Clone)]
 pub struct BinanceSpotRest {}
 
 impl BinanceSpotRest {
-    pub fn new(_is_colo: bool,  _login_param: BTreeMap<String, String>) -> BinanceSpotRest
+    pub fn new(_is_colo: bool, _login_param: BTreeMap<String, String>) -> BinanceSpotRest
     {
+        return BinanceSpotRest::new_label("default-BinanceSpotRest".to_string(), _is_colo, _login_param);
+    }
+    pub fn new_label(_label: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> BinanceSpotRest {
         BinanceSpotRest {}
     }
 }

+ 7 - 1
exchanges/src/binance_spot_ws.rs

@@ -1,10 +1,16 @@
 use std::collections::BTreeMap;
+
 #[derive(Clone)]
 pub struct BinanceSpotWs {}
 
 impl BinanceSpotWs {
-    pub fn new(_is_colo: bool,  _login_param: BTreeMap<String, String>) -> BinanceSpotWs
+    pub fn new(_is_colo: bool, _login_param: BTreeMap<String, String>) -> BinanceSpotWs
     {
+        return BinanceSpotWs::new_label("default-BinanceSpotWs".to_string(), _is_colo, _login_param);
+    }
+
+
+    pub fn new_label(_label: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> BinanceSpotWs {
         BinanceSpotWs {}
     }
 }

+ 6 - 3
exchanges/src/binance_usdt_swap_rest.rs

@@ -7,8 +7,7 @@ use crate::response_base::ResponseData;
 
 #[derive(Clone)]
 pub struct BinanceUsdtSwapRest {
-    /*******参数*/
-    //登陆所需参数
+    label: String,
     login_param: BTreeMap<String, String>,
     rest: RestTool,
 }
@@ -19,7 +18,10 @@ impl BinanceUsdtSwapRest {
     /*******************************************************************************************************/
     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> BinanceUsdtSwapRest
     {
-        let  base_url = if is_colo {
+        return BinanceUsdtSwapRest::new_label("default-BinanceSpotWs".to_string(), is_colo, login_param);
+    }
+    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BinanceUsdtSwapRest {
+        let base_url = if is_colo {
             println!("不支持colo高速线路");
             "https://fapi.binance.com".to_string()
         } else {
@@ -28,6 +30,7 @@ impl BinanceUsdtSwapRest {
 
         /*****返回结构体*******/
         BinanceUsdtSwapRest {
+            label,
             login_param,
             rest: RestTool::new(base_url.to_string()),
         }

+ 9 - 2
exchanges/src/binance_usdt_swap_ws.rs

@@ -13,6 +13,7 @@ use url::Url;
 
 #[derive(Clone)]
 pub struct BinanceUsdtSwapWs {
+    label: String,
     //连接地址
     request_url: String,
     //ip
@@ -31,8 +32,13 @@ impl BinanceUsdtSwapWs {
     /*******************************************************************************************************/
     pub fn new(is_colo: bool,
                _login_param: BTreeMap<String, String>,
-               sender: mpsc::Sender<ResponseData>,
-    ) -> BinanceUsdtSwapWs
+               sender: mpsc::Sender<ResponseData>) -> BinanceUsdtSwapWs
+    {
+        return BinanceUsdtSwapWs::new_label("default-BinanceUsdtSwapWs".to_string(), is_colo, _login_param, sender);
+    }
+    pub fn new_label(label: String, is_colo: bool,
+                     _login_param: BTreeMap<String, String>,
+                     sender: mpsc::Sender<ResponseData>) -> BinanceUsdtSwapWs
     {
         let base_url = if is_colo {
             println!("不支持colo高速线路");
@@ -52,6 +58,7 @@ impl BinanceUsdtSwapWs {
 
         /*****返回结构体*******/
         BinanceUsdtSwapWs {
+            label,
             request_url: base_url,
             ip: ip_v.clone().to_string(),
             port: port_v,

+ 6 - 1
exchanges/src/gate_spot_rest.rs

@@ -1,9 +1,14 @@
 use std::collections::BTreeMap;
+
 #[derive(Clone)]
 pub struct GateSpotRest {}
 
 impl GateSpotRest {
-    pub fn new(_is_colo: bool,  _login_param: BTreeMap<String, String>) -> GateSpotRest
+    pub fn new(_is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSpotRest
+    {
+        return GateSpotRest::new_label("default-GateSpotRest".to_string(), _is_colo, _login_param);
+    }
+    pub fn new_label(_label: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSpotRest
     {
         GateSpotRest {}
     }

+ 5 - 0
exchanges/src/gate_spot_ws.rs

@@ -1,9 +1,14 @@
 use std::collections::BTreeMap;
+
 #[derive(Clone)]
 pub struct GateSpotWs {}
 
 impl GateSpotWs {
     pub fn new(_is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSpotWs
+    {
+        return GateSpotWs::new_label("default-GateSpotWs".to_string(), _is_colo, _login_param);
+    }
+    pub fn new_label(_label: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSpotWs
     {
         GateSpotWs {}
     }

+ 6 - 0
exchanges/src/gate_swap_rest.rs

@@ -10,6 +10,7 @@ use sha2::Sha512;
 
 #[derive(Clone)]
 pub struct GateSwapRest {
+    label: String,
     base_url: String,
     client: reqwest::Client,
     /*******参数*/
@@ -22,6 +23,10 @@ impl GateSwapRest {
     /*****************************************获取一个对象****************************************************/
     /*******************************************************************************************************/
     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> GateSwapRest
+    {
+        return GateSwapRest::new_label("default-GateSwapRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> GateSwapRest
     {
         let base_url = if is_colo {
             println!("使用colo高速线路");
@@ -32,6 +37,7 @@ impl GateSwapRest {
 
         /*****返回结构体*******/
         GateSwapRest {
+            label,
             base_url: base_url.to_string(),
             client: Client::new(),
             login_param,

+ 5 - 0
exchanges/src/gate_swap_ws.rs

@@ -1,9 +1,14 @@
 use std::collections::BTreeMap;
+
 #[derive(Clone)]
 pub struct GateSwapWs {}
 
 impl GateSwapWs {
     pub fn new(_is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSwapWs
+    {
+        return GateSwapWs::new_label("default-GateSwapWs".to_string(), _is_colo, _login_param);
+    }
+    pub fn new_label(label: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> GateSwapWs
     {
         GateSwapWs {}
     }

+ 6 - 1
exchanges/src/kucoin_swap_rest.rs

@@ -8,6 +8,7 @@ use crate::response_base::ResponseData;
 
 #[derive(Clone, Debug)]
 pub struct KucoinSwapRest {
+    label: String,
     base_url: String,
     client: reqwest::Client,
     /*******参数*/
@@ -24,6 +25,9 @@ impl KucoinSwapRest {
 
     pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> KucoinSwapRest
     {
+        return KucoinSwapRest::new_lable("default-KucoinSwapRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_lable(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> KucoinSwapRest {
         let base_url = if is_colo {
             println!("不支持colo高速线路");
             "https://api-futures.kucoin.com".to_string()
@@ -33,6 +37,7 @@ impl KucoinSwapRest {
 
         /*****返回结构体*******/
         KucoinSwapRest {
+            label,
             base_url,
             client: Client::new(),
             login_param,
@@ -239,7 +244,7 @@ impl KucoinSwapRest {
     }
     //获取合约令牌-私有
     pub async fn get_private_token(&self) -> ResponseData {
-        let  params = serde_json::json!({});
+        let params = serde_json::json!({});
         let data = self.request("POST".to_string(),
                                 "/api/v1".to_string(),
                                 "/bullet-private".to_string(),

+ 10 - 3
exchanges/src/kuconin_swap_ws.rs

@@ -1,5 +1,5 @@
 use std::collections::{BTreeMap, HashSet};
-use std::{io};
+use std::{io, thread};
 use std::io::{Write};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 use std::sync::{mpsc};
@@ -40,6 +40,7 @@ pub enum KuconinSubscribeType {
 
 #[derive(Clone)]
 pub struct KuconinSwapWs {
+    label: String,
     request_url: String,
     //实际ws 链接地址
     proxy: ParsingDetail,
@@ -63,6 +64,13 @@ impl KuconinSwapWs {
                      login_param: BTreeMap<String, String>,
                      ws_type: KuconinWsType,
                      sender: mpsc::Sender<ResponseData>,
+    ) -> KuconinSwapWs {
+        return KuconinSwapWs::new_label("default-KuconinSwapWs".to_string(), is_colo, login_param, ws_type, sender).await;
+    }
+    pub async fn new_label(label: String, is_colo: bool,
+                           login_param: BTreeMap<String, String>,
+                           ws_type: KuconinWsType,
+                           sender: mpsc::Sender<ResponseData>,
     ) -> KuconinSwapWs
     {
         if is_colo {
@@ -90,6 +98,7 @@ impl KuconinSwapWs {
         }
         /*****返回结构体*******/
         KuconinSwapWs {
+            label,
             request_url: "".to_string(),
             proxy: parsing_detail,
             // login_param,
@@ -230,8 +239,6 @@ impl KuconinSwapWs {
                     accept_unmasked_frames: false,
                 });
                 let max_redirects = 5;
-
-
                 let _ = match connect_with_proxy(request_url.clone(),
                                                  proxy_address, websocket_config, max_redirects) {
                     Ok(ws) => {

+ 12 - 2
exchanges/src/okx_swap_ws.rs

@@ -34,6 +34,7 @@ pub enum OkxSubscribeType {
 
 #[derive(Clone)]
 pub struct OkxSwapWs {
+    label: String,
     request_url: String,
     //实际ws 链接地址
     proxy: ParsingDetail,
@@ -56,6 +57,14 @@ impl OkxSwapWs {
                      ws_type: OkxWsType,
                      sender: mpsc::Sender<ResponseData>,
     ) -> OkxSwapWs
+    {
+        return OkxSwapWs::new_label("default-OkxSwapWs".to_string(), is_colo, login_param, ws_type, sender).await;
+    }
+    pub async fn new_label(label: String, is_colo: bool,
+                           login_param: BTreeMap<String, String>,
+                           ws_type: OkxWsType,
+                           sender: mpsc::Sender<ResponseData>,
+    ) -> OkxSwapWs
     {
         if is_colo {
             println!("不支持高速通道")
@@ -65,7 +74,7 @@ impl OkxSwapWs {
         let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
 
         /*******公共频道-私有频道数据组装*/
-        let  request_url = match ws_type {
+        let request_url = match ws_type {
             OkxWsType::Public => {
                 "wss://ws.okx.com:8443/ws/v5/public".to_string()
             }
@@ -79,6 +88,7 @@ impl OkxSwapWs {
 
         /*****返回结构体*******/
         OkxSwapWs {
+            label,
             request_url,
             proxy: parsing_detail,
             login_param,
@@ -176,7 +186,7 @@ impl OkxSwapWs {
             let message = format!("{}GET{}", timestamp, "/users/self/verify");
             let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
             let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
-            let  sign = base64::encode(result);
+            let sign = base64::encode(result);
 
             let login_json = json!({
                               "op": "login",

+ 65 - 6
exchanges/tests/test.rs

@@ -1,9 +1,10 @@
 use std::{fs, thread};
 use exchanges::gate_swap_rest::GateSwapRest;
 use std::collections::BTreeMap;
-use std::io::{BufRead, BufReader,  Write};
-use std::net::{TcpListener, TcpStream};
+use std::io::{BufRead, BufReader};
+use std::net::{TcpListener};
 use std::sync::{mpsc};
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
 use exchanges::binance_usdt_swap_rest::BinanceUsdtSwapRest;
 use exchanges::kucoin_swap_rest::KucoinSwapRest;
 use exchanges::kuconin_swap_ws::{KuconinSubscribeType, KuconinSwapWs, KuconinWsType};
@@ -11,6 +12,9 @@ use exchanges::{proxy};
 use exchanges::binance_usdt_swap_ws::BinanceUsdtSwapWs;
 use exchanges::okx_swap_ws::{OkxSubscribeType, OkxSwapWs, OkxWsType};
 
+use std::io::{Read, Write};
+use std::net::{TcpStream, ToSocketAddrs};
+
 
 #[tokio::test]
 async fn test_import() {
@@ -38,16 +42,71 @@ async fn test_import() {
     //Kuconin-ws--公共频道
     // demo_ws_kucoin_pu().await;
     //Kuconin-ws--私有频道
-    demo_ws_kucoin_pr().await;
+    // demo_ws_kucoin_pr().await;
 
     //okx - Business 频道
     // demo_ws_okx_bu().await;
     //okx - public 频道
     // demo_ws_okx_pu().await;
+
+    demo_so();
+}
+
+
+  fn demo_so() {
+    // 代理服务器地址和端口
+    // let proxy_address = "127.0.0.1:7890";
+    // // 目标服务器地址和端口
+    // let target_address = "wss://ws.okx.com:8443/ws/v5/public";
+    //
+    //   // 建立与代理服务器的连接
+    //   let mut proxy_stream = TcpStream::connect(proxy_address).expect("Failed to connect to proxy");
+    //
+    //   // 发送代理请求
+    //   let request = format!("CONNECT {}\r\n\r\n", target_address);
+    //   proxy_stream.write_all(request.as_bytes()).expect("Failed to send proxy request");
+    //
+    //   // 读取代理响应
+    //   let mut response = String::new();
+    //   proxy_stream.read_to_string(&mut response).expect("Failed to read proxy response");
+    //
+    //   // 检查代理响应是否成功
+    //   if !response.contains("200 Connection established") {
+    //       println!("Proxy connection failed: {}", response);
+    //   }
+    //
+    //   // 从代理连接中获取原始 TCP 流
+    //   let mut tcp_stream = std::mem::ManuallyDrop::new(proxy_stream);
+    //
+    //   // 现在你可以使用 `tcp_stream` 来进行与目标服务器的通信
+    //   // 例如,发送和接收数据
+    //   // tcp_stream.write_all(b"Hello, server!").expect("Failed to send data");
+    //
+    //   thread::spawn(move || {
+    //       loop {
+    //           let mut buffer = [0u8; 1024]; // 用于存储读取的数据的缓冲区
+    //           let bytes_read = tcp_stream.read(&mut buffer).expect("Failed to read data");
+    //
+    //           // 将读取的数据转换为字符串并打印
+    //           if let Ok(data) = std::str::from_utf8(&buffer[..bytes_read]) {
+    //               println!("Received data: {}", data);
+    //           } else {
+    //               println!("Received data contains non-UTF8 characters");
+    //           }
+    //       }
+    //
+    //   });
+    //
+    //   // 最后记得手动释放套接字
+    //   // 注意:不要调用 `drop(tcp_stream)`,因为我们使用了 `ManuallyDrop` 来避免自动释放
+    //   unsafe {
+    //       std::mem::ManuallyDrop::drop(&mut tcp_stream);
+    //   }
 }
 
+
 async fn demo_ws_okx_pu() {
-    let  btree_map: BTreeMap<String, String> = BTreeMap::new();
+    let btree_map: BTreeMap<String, String> = BTreeMap::new();
     let (tx, rx) = mpsc::channel();
     let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Public, tx).await;
     ku_ws.set_subscribe(vec![OkxSubscribeType::PuIndexTickers]);
@@ -100,7 +159,7 @@ async fn demo_ws_kucoin_pr() {
 }
 
 async fn demo_ws_kucoin_pu() {
-    let  btree_map: BTreeMap<String, String> = BTreeMap::new();
+    let btree_map: BTreeMap<String, String> = BTreeMap::new();
     let (tx, rx) = mpsc::channel();
     let mut ku_ws = KuconinSwapWs::new(false, btree_map, KuconinWsType::Public, tx).await;
     ku_ws.set_subscribe(vec![KuconinSubscribeType::PuContractMarketLevel2Depth50]);
@@ -255,7 +314,7 @@ fn demo_http() {
     for stream in listener.incoming() {
         let stream = stream.unwrap();
 
-        handle_connection(stream);
+        handle_connection(TcpStream::try_from(stream).unwrap());
     }
 }