瀏覽代碼

简易版 币安异步交易库对接,仅供效率与老版本对比

hl 2 年之前
父節點
當前提交
38d045ed6d

+ 252 - 0
exchanges/src/binance_swap_ws_async.rs

@@ -0,0 +1,252 @@
+use std::collections::{BTreeMap};
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use serde_json::json;
+use tokio::sync::mpsc::Sender;
+use tracing::{error, info, trace};
+use crate::{proxy};
+use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
+use tungstenite::protocol::WebSocketConfig;
+use url::Url;
+use crate::proxy::ParsingDetail;
+use crate::response_base::ResponseData;
+use crate::utils::get_time_microsecond;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use crate::socket_tool::AbstractWsMode;
+
+pub enum BinanceSwapWsType {
+    //订阅频道类型
+    PublicAndPrivate,
+}
+
+
+#[derive(Clone)]                        //订阅枚举
+pub enum BinanceSwapSubscribeType {
+    PuBookTicker,
+    PuAggTrade,
+    PuDepth20levels100ms,
+}
+
+#[derive(Clone)]
+pub struct BinanceSwapLogin {
+    api_key: String,
+    api_secret: String,
+}
+
+#[derive(Clone)]
+pub struct BinanceSwapWs {
+    label: String,
+    address_url: String,
+    //账号信息
+    login_param: Option<BinanceSwapLogin>,
+    //kuconis特殊参数
+    symbol_s: Vec<String>,
+    //订阅币对
+    subscribe_types: Vec<BinanceSwapSubscribeType>,
+}
+
+impl BinanceSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
+        return BinanceSwapWs::new_label("default-BinanceSwapWs".to_string(), is_colo, login_param, ws_type);
+    }
+    pub fn new_label(label: String, is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            BinanceSwapWsType::PublicAndPrivate => {
+                "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms".to_string()
+            }
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",address_url);
+        } else {
+            info!("走普通通道:{}",address_url);
+        }
+        BinanceSwapWs {
+            label,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<BinanceSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_lowercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "");
+            *symbol = symbol.replace("-", "");
+        }
+        self.symbol_s = b_array;
+    }
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if let t_f = match t {
+                BinanceSwapSubscribeType::PuBookTicker => false,
+                BinanceSwapSubscribeType::PuAggTrade => false,
+                BinanceSwapSubscribeType::PuDepth20levels100ms => false,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: BinanceSwapSubscribeType) -> String {
+        match subscribe_type {
+            BinanceSwapSubscribeType::PuAggTrade => {
+                format!("{}@aggTrade", symbol)
+            }
+            BinanceSwapSubscribeType::PuDepth20levels100ms => {
+                format!("{}@depth20@100ms", symbol)
+            }
+            BinanceSwapSubscribeType::PuBookTicker => {
+                format!("{}@bookTicker", symbol)
+            }
+        }
+    }
+    //组装订阅数据-实时订阅
+    fn get_subscription(&self) -> String {
+        // let mut str = "".to_string();
+        let mut params = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                params.push(ty_str);
+            }
+        }
+
+        // trace!("订阅信息:{}", str.to_string());
+        // str.to_string()
+
+        let str = json!({
+            "method": "SUBSCRIBE",
+            "params":  params,
+            "id": 1
+            });
+        str.to_string()
+    }
+    //数据解析
+    pub fn analysis_message(message: Result<Message, Error>) -> ResponseData {
+        let mut code = "00".to_string();
+        let mut message_str = "".to_string();
+        let mut data = "".to_string();
+        match message {
+            Ok(Message::Text(text)) => {
+                // trace!("Text:{}",text);
+                let response_data = Self::ok_text(text);
+                // trace!("response_data:{:?}",response_data);
+                if response_data.code == "200" {
+                    return response_data;
+                }
+            }
+            Ok(Message::Binary(s)) => {
+                trace!("Binary:{:?}",s);
+            }
+            Ok(Message::Ping(pi)) => {
+                trace!("Ping:{:?}",pi);
+                code = "200".to_string();
+                message_str = format!("服务器响应ping:{:?}", String::from_utf8(pi));
+            }
+            Ok(Message::Pong(po)) => {
+                trace!("Pong:{:?}",po);
+                //原始帧 正常读取数据不会读取到该 信息类型
+                code = "200".to_string();
+                message_str = format!("服务器响应pong:{:?}", String::from_utf8(po));
+            }
+            Ok(Message::Close(c)) => {
+                trace!("Close:{:?}",c);
+                //原始帧 正常读取数据不会读取到该 信息类型
+                code = "0".to_string();
+                message_str = format!("关闭指令:{:?}", c);
+            }
+            Ok(Message::Frame(f)) => {
+                trace!("Frame:{:?}",f);
+                //原始帧 正常读取数据不会读取到该 信息类型
+                code = "-2".to_string();
+                message_str = format!("意外读取到原始帧:{:?}", f);
+            }
+            Err(e) => {
+                trace!("Err:{:?}",e);
+                code = "-1".to_string();
+                message_str = format!("币安-数据解析 未知错误:{:?}", e);
+            }
+        }
+        return ResponseData::new("".to_string(), code, message_str, data);
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async(&self, write_rx: UnboundedReceiver<Message>,
+                                  read_tx: UnboundedSender<ResponseData>) -> Result<(), Error> {
+
+        AbstractWsMode::ws_connect_async(self.address_url.clone(), write_rx, read_tx, self.label.clone(), BinanceSwapWs::analysis_message).await
+    }
+    //发出指令
+    pub fn write_message(&self, write_tx: UnboundedSender<Message>, str: String) {
+        trace!("发送指令:{}",str);
+        write_tx.unbounded_send(Message::Text(str.to_string())).expect(format!("发送失败:{}", str).as_str());
+    }
+    //发出指令-订阅频道
+    pub fn send_subscribe(&self, write_tx: UnboundedSender<Message>) {
+        if self.contains_pr() {
+            //有订阅需要登录
+        }
+        //发送订阅
+        let subscribe = self.get_subscription();
+        self.write_message(write_tx, subscribe);
+    }
+
+
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData
+    {
+        // trace!("原始数据");
+        // trace!(?text);
+        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+
+        if json_value.get("error").is_some() {//订阅返回
+            res_data.code = json_value["error"]["code"].to_string();
+            res_data.message = json_value["error"]["msg"].to_string();
+        } else if json_value.get("stream").is_some() {//订阅返回
+            res_data.data = format!("{}", json_value.get("data").as_ref().unwrap());
+            res_data.code = "200".to_string();
+
+            let channel = format!("{}", json_value.get("stream").as_ref().unwrap());
+            if channel.contains("@aggTrade") {
+                res_data.channel = "aggTrade".to_string();
+            } else if channel.contains("@depth20@100ms") {
+                res_data.channel = "depth".to_string();
+            } else if channel.contains("@bookTicker") {
+                res_data.channel = "bookTicker".to_string();
+            } else {
+                res_data.channel = "未知的频道".to_string();
+            }
+        } else {
+            res_data.data = text
+        }
+        res_data
+    }
+}

+ 1 - 0
exchanges/src/lib.rs

@@ -19,4 +19,5 @@ pub mod bitget_spot_ws;
 pub mod bitget_spot_rest;
 pub mod kucoin_spot_ws;
 pub mod kucoin_spot_rest;
+pub mod binance_swap_ws_async;//临时文件
 

+ 2 - 1
exchanges/src/okx_swap_ws.rs

@@ -462,12 +462,13 @@ impl OkxSwapWs {
                 res_data.code = "-200".to_string();
                 res_data.data = text;
             } else {
-                res_data.data = text
+                res_data.data = text;
             }
         } else {
             if json_value.get("arg").is_some() && json_value.get("data").is_some() {
                 res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
                 res_data.data = json_value["data"].to_string();
+                res_data.reach_time = json_value["data"][0]["ts"].as_str().unwrap().parse().unwrap()
             } else {
                 res_data.data = text;
                 res_data.channel = "未知频道".to_string();

+ 54 - 60
exchanges/src/socket_tool.rs

@@ -18,23 +18,7 @@ use tracing::trace;
 use tungstenite::client::IntoClientRequest;
 use crate::proxy;
 use crate::proxy::{ProxyEnum, ProxyResponseEnum};
-
-pub struct BinanceSwapModel {
-    address_url: String,
-}
-
-impl BinanceSwapModel {
-    pub fn new() -> OkxSwapModel {
-        OkxSwapModel {
-            // ws: AbstractWsMode::new(),
-            address_url: "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms".to_string(),
-        }
-    }
-    //链接
-    pub async fn ws_connect_async(&self, write_rx: UnboundedReceiver<Message>, read_tx: UnboundedSender<Message>) ->  Result<(), Error> {
-        AbstractWsMode::ws_connect_async(self.address_url.clone(), write_rx, read_tx).await
-    }
-}
+use crate::response_base::ResponseData;
 
 
 pub struct OkxSwapModel {
@@ -42,22 +26,23 @@ pub struct OkxSwapModel {
     address_url: String,
 }
 
-impl OkxSwapModel {
-    pub fn new() -> OkxSwapModel {
-        OkxSwapModel {
-            // ws: AbstractWsMode::new(),
-            address_url: "wss://ws.okx.com:8443/ws/v5/public".to_string(),
-        }
-    }
-    //链接
-    pub async fn ws_connect_async(&self, write_rx: UnboundedReceiver<Message>, read_tx: UnboundedSender<Message>) ->  Result<(), Error> {
-        AbstractWsMode::ws_connect_async(self.address_url.clone(), write_rx, read_tx).await
-    }
-
-    // //心跳包发送
-    // pub fn send_ping_or_pong(){
-    // }
-}
+// impl OkxSwapModel {
+//     pub fn new() -> OkxSwapModel {
+//         OkxSwapModel {
+//             // ws: AbstractWsMode::new(),
+//             address_url: "wss://ws.okx.com:8443/ws/v5/public".to_string(),
+//         }
+//     }
+//     //链接
+//     pub async fn ws_connect_async(&self, write_rx: UnboundedReceiver<Message>, read_tx: UnboundedSender<ResponseData>) -> Result<(), Error> {
+//         AbstractWsMode::ws_connect_async(self.address_url.clone(), write_rx,read_tx,"".to_string(), read_tx).await
+//     }
+//
+//
+//     // //心跳包发送
+//     // pub fn send_ping_or_pong(){
+//     // }
+// }
 
 // // 抽象ws 接口
 // pub trait AbstractWs {
@@ -72,46 +57,56 @@ impl OkxSwapModel {
 
 pub struct AbstractWsMode {}
 
-impl AbstractWsMode
-    where SplitSink<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, tokio_tungstenite::tungstenite::Message>: futures_util::Sink<Message>,
-{
+impl AbstractWsMode {
     //创建链接
-    pub async fn ws_connect_async(address_url: String, write_rx: UnboundedReceiver<Message>, read_tx: UnboundedSender<Message>) -> Result<(), Error> {
+    pub async fn ws_connect_async<F>(address_url: String,
+                                     write_rx: UnboundedReceiver<Message>,
+                                     read_tx: UnboundedSender<ResponseData>
+                                     , lable: String, func: F) -> Result<(), Error>
+        where F: Fn(Result<Message, Error>) -> ResponseData
+    {
         //1.是否走代理
         /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
         let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
             ProxyResponseEnum::NO => {
-                trace!("非 代理");
+                // trace!("非 代理");
                 None
             }
             ProxyResponseEnum::YES(proxy) => {
-                trace!("代理");
+                // trace!("代理");
                 Option::from(proxy)
             }
         };
         match connect_async(address_url, proxy).await {
             Ok((ws_stream, _)) => {
                 trace!("WebSocket 握手完成。");
-                tokio::spawn(async move {
-                    let (write, read) = ws_stream.split();
-
-                    //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
-                    let stdin_to_ws = write_rx.map(Ok).forward(write);
-                    let ws_to_stdout = {
-                        trace!("---1");
-                        //读,循环读取,然后拿到 message,,然后开启异步处理 message,
-                        let result = read.for_each(|message| async {
-                            read_tx.unbounded_send(message.unwrap()).unwrap();
-                        });
-                        trace!("---3");
-                        result
-                    };
-
-                    //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
-                    pin_mut!(stdin_to_ws, ws_to_stdout,);
-                    future::select(stdin_to_ws, ws_to_stdout).await;
-                    trace!("---5");
-                });
+                // tokio::task::spawn_local(async move {
+                let (write, read) = ws_stream.split();
+
+                //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
+                let stdin_to_ws = write_rx.map(Ok).forward(write);
+                let ws_to_stdout = {
+                    trace!("---1");
+                    //读,循环读取,然后拿到 message,,然后开启异步处理 message,
+                    let result = read.for_each(|message| async {
+                        // trace!("message:{:?}",message.unwrap())
+                        let mut response_data = func(message);
+                        //code 说明
+                        //200 推送数据
+                        // if response_data.code == "200" {
+                        // trace!("response_data:{:?}",response_data);
+                        response_data.label = lable.clone();
+                        read_tx.unbounded_send(response_data).unwrap();
+                        // }
+                    });
+                    trace!("---3");
+                    result
+                };
+
+                //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
+                pin_mut!(stdin_to_ws, ws_to_stdout,);
+                future::select(stdin_to_ws, ws_to_stdout).await;
+                trace!("---5");
                 trace!("---4");
                 Ok(())
             }
@@ -121,7 +116,6 @@ impl AbstractWsMode
             }
         }
     }
-
 }
 
 

+ 31 - 21
exchanges/tests/binance_swap_test.rs

@@ -1,11 +1,13 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use futures_util::StreamExt;
 use tokio::sync::mpsc::{channel, Sender};
 use tokio::try_join;
+use tokio_tungstenite::tungstenite::Error;
 use tracing::trace;
 use exchanges::binance_swap_rest::BinanceSwapRest;
-use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
+use exchanges::binance_swap_ws_async::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
 use exchanges::response_base::ResponseData;
 
 const ACCESS_KEY: &str = "";
@@ -13,33 +15,42 @@ const SECRET_KEY: &str = "";
 
 
 //ws-订阅公共频道信息
-#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
 async fn ws_custom_subscribe() {
     global::log_utils::init_log_with_trace();
+    //创建读写通道
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
 
-    let bool_v1 = Arc::new(AtomicBool::new(true));
+    //对象
     let btree_map: BTreeMap<String, String> = BTreeMap::new();
-    let (tx, mut rx) = channel(1024);
-    let mut ws = get_ws(btree_map, tx);
-    ws.set_subscribe(vec![
-        // BinanceSubscribeType::PuBookTicker,
-        // BinanceSubscribeType::PuAggTrade,
-        BinanceSubscribeType::PuDepth20levels100ms,
-    ]);
-    let t1 = tokio::spawn(async move {
-        ws.custom_subscribe(bool_v1, vec!["BTCUSDT".to_string()]).await;
-    });
-    let t2 = tokio::spawn(async move {
+    let mut ws = get_ws(None);
+    // 币对
+    // ws.set_symbols(vec!["BTC_USDT".to_string()]);
+    // //订阅
+    // ws.set_subscribe(vec![
+    //     BinanceSwapSubscribeType::PuBookTicker,
+    //     BinanceSwapSubscribeType::PuAggTrade,
+    //     BinanceSwapSubscribeType::PuDepth20levels100ms,
+    // ]);
+    //主动发起订阅
+    // tokio::spawn(async move {
+    //     ws.send_subscribe(write_tx);
+    // });
+
+
+    //读取数据
+    tokio::spawn(async move {
         loop {
-            if let Ok(received) = rx.try_recv() {
-                trace!( "age: {:?}", received);
+            if let Some(data) = read_rx.next().await {
+                trace!("读取数据data:{:?}",data)
             }
         }
     });
-    try_join!(t1,t2).unwrap();
+    //链接
+    ws.ws_connect_async(write_rx, read_tx).await.expect("链接失败");
 }
 
-
 //rest-获取服务器时间
 #[tokio::test]
 async fn rest_get_server_time_test() {
@@ -98,11 +109,10 @@ async fn rest_get_user_trades_test() {
 }
 
 
-fn get_ws(btree_map: BTreeMap<String, String>, tx: Sender<ResponseData>) -> BinanceSwapWs {
+fn get_ws(btree_map: Option<BinanceSwapLogin>) -> BinanceSwapWs {
     let binance_ws = BinanceSwapWs::new(false,
                                         btree_map,
-                                        BinanceWsType::PublicAndPrivate,
-                                        tx);
+                                        BinanceSwapWsType::PublicAndPrivate);
     binance_ws
 }
 

+ 37 - 71
exchanges/tests/socket_tool_test.rs

@@ -10,18 +10,18 @@ use tokio_tungstenite::client_async;
 use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::trace;
 use exchanges::binance_swap_rest::BinanceSwapRest;
-use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
+use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
 use exchanges::response_base::ResponseData;
-use exchanges::socket_tool::{BinanceSwapModel, OkxSwapModel};
+use exchanges::socket_tool::{client, ws_connect_async};
 
 //ws-订阅公共频道信息
 #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
 async fn ws_custom_subscribe() {
     global::log_utils::init_log_with_trace();
 
-    // 币安订阅
-    // let base_url = "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms".to_string();
-    // okx 公共订阅
+    // // 币安订阅
+    let base_url = "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms".to_string();
+    // // okx 公共订阅
     // let str = serde_json::json!({
     //             "op": "subscribe",
     //             "args": [
@@ -31,71 +31,37 @@ async fn ws_custom_subscribe() {
     //                     }
     //                 ]
     //         });
-
-    //创建通道,发送指令通道, 与读取推送数据通道
-    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-
-
-    //对象
-    let model = BinanceSwapModel::new();
-    let is_ok = model.ws_connect_async(write_rx, read_tx).await;
-    //写数据,订阅频道,有些是Url跟参数,有些是单独发送订阅字符串
-    // write_tx.unbounded_send(Message::Text(str.to_string())).expect("发送失败");
-    match is_ok {
-        Ok(_) => {
-            let t1 = tokio::spawn(async move {
-                loop {
-                    if let Some(message) = read_rx.next().await {
-                        match message {
-                            Message::Text(s) => {
-                                trace!("Text: {}",s);
-                                //okx 数据解析
-                                // let json_value: serde_json::Value = serde_json::from_str(&s).unwrap();
-                                // if json_value.get("event").is_some() {//订阅返回
-                                //     if json_value["event"].as_str() == Option::from("login") &&
-                                //         json_value["code"].as_str() == Option::from("0") {
-                                //     } else if json_value["event"].as_str() == Option::from("error") {
-                                //     } else if json_value["event"].as_str() == Option::from("subscribe") {
-                                //     } else {
-                                //     }
-                                // } else {
-                                //     if json_value.get("arg").is_some() && json_value.get("data").is_some() {
-                                //         trace!("Text: {}",json_value["data"][0]["ts"].as_str().unwrap());
-                                //     } else {
-                                //     }
-                                // }
-                            }
-                            Message::Binary(s) => {
-                                trace!("Binary: {:?}", s);
-                            }
-                            Message::Ping(s) => {
-                                trace!("Ping: {:?}", s);
-                            }
-                            Message::Pong(s) => {
-                                trace!("Pong: {:?}", s);
-                            }
-                            Message::Close(s) => {
-                                trace!("Close: {:?}", s);
-                            }
-                            Message::Frame(s) => {
-                                trace!("Frame: {:?}", s);
-                            }
-                        }
-                    }
-                }
-            });
-            try_join!(t1).unwrap();
-        }
-        _ => {}
-    }
+    //
+    // //创建通道,发送指令通道, 与读取推送数据通道
+    // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+    //
+    //
+    // //对象
+    // // let model = OkxSwapModel::new();
+    // let is_ok =  ws_connect_async(base_url).await;
+    // //写数据,订阅频道,有些是Url跟参数,有些是单独发送订阅字符串
+    // // write_tx.unbounded_send(Message::Text(str.to_string())).expect("发送失败");
+    // match is_ok {
+    //     Ok(_) => {
+    //         let t1 = tokio::spawn(async move {
+    //             loop {
+    //                 if let Some(message) = read_rx.next().await {
+    //
+    //                 }
+    //             }
+    //         });
+    //         try_join!(t1).unwrap();
+    //     }
+    //     _ => {}
+    // }
+    client(base_url).await;
 }
 
-
-fn get_ws(btree_map: BTreeMap<String, String>, tx: Sender<ResponseData>) -> BinanceSwapWs {
-    let binance_ws = BinanceSwapWs::new(false,
-                                        btree_map,
-                                        BinanceWsType::PublicAndPrivate,
-                                        tx);
-    binance_ws
-}
+//
+// fn get_ws(btree_map: BTreeMap<String, String>, tx: Sender<ResponseData>) -> BinanceSwapWs {
+//     let binance_ws = BinanceSwapWs::new(false,
+//                                         btree_map,
+//                                         BinanceSwapWsType::PublicAndPrivate);
+//     binance_ws
+// }

+ 20 - 20
exchanges/tests/test.rs

@@ -482,26 +482,26 @@ async fn demo_rest_ba() {
 }
 
 async fn demo_pub_ws_ba() {
-    let mut bool_v1 = Arc::new(AtomicBool::new(true));
-    let btree_map: BTreeMap<String, String> = BTreeMap::new();
-    let (tx, mut rx) = channel(1024);
-    let mut binance_ws = BinanceSwapWs::new(false, btree_map, BinanceWsType::PublicAndPrivate, tx);
-    binance_ws.set_subscribe(vec![
-        BinanceSubscribeType::PuAggTrade,
-        BinanceSubscribeType::PuDepth20levels100ms,
-        BinanceSubscribeType::PuBookTicker,
-    ]);
-    let t1 = tokio::spawn(async move {
-        binance_ws.custom_subscribe(bool_v1,vec!["BTCUSDT".to_string()]).await;
-    });
-    let t2 = tokio::spawn(async move {
-        loop {
-            if let Ok(received) = rx.try_recv() {
-                trace!( "age: {:?}", received);
-            }
-        }
-    });
-    try_join!(t1,t2).unwrap();
+    // let mut bool_v1 = Arc::new(AtomicBool::new(true));
+    // let btree_map: BTreeMap<String, String> = BTreeMap::new();
+    // let (tx, mut rx) = channel(1024);
+    // let mut binance_ws = BinanceSwapWs::new(false, btree_map, BinanceWsType::PublicAndPrivate, tx);
+    // binance_ws.set_subscribe(vec![
+    //     BinanceSubscribeType::PuAggTrade,
+    //     BinanceSubscribeType::PuDepth20levels100ms,
+    //     BinanceSubscribeType::PuBookTicker,
+    // ]);
+    // let t1 = tokio::spawn(async move {
+    //     binance_ws.custom_subscribe(bool_v1,vec!["BTCUSDT".to_string()]).await;
+    // });
+    // let t2 = tokio::spawn(async move {
+    //     loop {
+    //         if let Ok(received) = rx.try_recv() {
+    //             trace!( "age: {:?}", received);
+    //         }
+    //     }
+    // });
+    // try_join!(t1,t2).unwrap();
 }
 
 fn demo_get_http_proxy() {