Kaynağa Gözat

优化代码

875428575@qq.com 2 yıl önce
ebeveyn
işleme
22eaeebf22

+ 7 - 7
exchanges/src/binance_usdt_swap_ws.rs

@@ -72,7 +72,7 @@ impl BinanceUsdtSwapWs {
     /*****************************************订阅函数********************************************************/
     /*******************************************************************************************************/
     //K线-不需要认证
-    pub fn kline(&self, b_array: Vec<&str>)
+    pub async fn kline(&self, b_array: Vec<&str>)
     {
         //订阅信息拼接
         let mut params = vec![];
@@ -81,11 +81,11 @@ impl BinanceUsdtSwapWs {
             b_name = format!("{}@{}", b_name, "kline_1s");
             params.push(b_name);
         }
-        self.run(params);
+        self.run(params).await;
     }
 
     //自定义-不需要认证
-    pub fn custom_subscribe(&self, b_array: Vec<String>, sub_trade: u8, sub_fast: u8)
+    pub async fn custom_subscribe(&self, b_array: Vec<String>, sub_trade: u8, sub_fast: u8)
     {
         let mut params = vec![];
 
@@ -100,7 +100,7 @@ impl BinanceUsdtSwapWs {
                 params.push(format!("{}@depth20@100ms", b_name));
             }
         }
-        self.run(params);
+        self.run(params).await;
     }
 
     /*******************************************************************************************************/
@@ -110,7 +110,7 @@ impl BinanceUsdtSwapWs {
     /*******************************************************************************************************/
     /*****************************************socket基本*****************************************************/
     /*******************************************************************************************************/
-    fn run(&self, params: Vec<String>)
+    async fn run(&self, params: Vec<String>)
     {
         //订阅信息组装
         let subscription = json!({
@@ -144,7 +144,7 @@ impl BinanceUsdtSwapWs {
                 let max_redirects = 5;
                 match connect_with_proxy(request_url.clone(), proxy_address, websocket_config, max_redirects) {
                     Ok(ws) => {
-                        self.proxy_subscription(ws.0, &subscription);
+                        self.proxy_subscription(ws.0, &subscription).await;
                     }
                     Err(err) => {
                         println!("Can't connect(无法连接): {}", err);
@@ -153,7 +153,7 @@ impl BinanceUsdtSwapWs {
             } else {
                 match connect(request_url.clone()) {
                     Ok(ws) => {
-                        self.subscription(ws.0, &subscription);
+                        self.subscription(ws.0, &subscription).await;
                     }
                     Err(err) => {
                         // 连接失败时执行的操作

+ 398 - 356
exchanges/src/gate_swap_ws.rs

@@ -1,356 +1,398 @@
-// use std::collections::{BTreeMap};
-// use std::{io, thread};
-// use std::io::{Write};
-// use std::net::{IpAddr, Ipv4Addr, SocketAddr};
-// use std::sync::{mpsc};
-// use std::time::Duration;
-// use chrono::Utc;
-// use serde_json::{json, Value};
-// use ring::hmac;
-// use crate::{proxy};
-// use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
-// use tungstenite::{connect, Message, WebSocket};
-// use tungstenite::protocol::WebSocketConfig;
-// use url::Url;
-// use crate::proxy::ParsingDetail;
-// use crate::response_base::ResponseData;
-//
-// pub enum GateWsType {
-//     //频道类型
-//     PublicAndPrivate
-// }
-//
-//
-// #[derive(Clone)]                        //订阅枚举
-// pub enum GateSubscribeType {
-//     PuFuturesOrderBook,
-//     PuFuturesCandlesticks,
-// }
-//
-// #[derive(Clone)]
-// pub struct GateSwapWs {
-//     pub label: String,
-//     request_url: String,
-//     //实际ws 链接地址
-//     proxy: ParsingDetail,
-//     //账号信息
-//     login_param: BTreeMap<String, String>,
-//     //kuconis特殊参数
-//     symbol_s: Vec<String>,
-//     //订阅币对
-//     subscribe_types: Vec<GateSubscribeType>,
-//     //订阅信息
-//     sender: mpsc::Sender<ResponseData>,     //数据通道
-// }
-//
-// impl GateSwapWs {
-//     /*******************************************************************************************************/
-//     /*****************************************获取一个对象****************************************************/
-//     /*******************************************************************************************************/
-//     pub async fn new(is_colo: bool,
-//                      login_param: BTreeMap<String, String>,
-//                      ws_type: GateWsType,
-//                      sender: mpsc::Sender<ResponseData>,
-//     ) -> GateSwapWs
-//     {
-//         return GateSwapWs::new_label("default-GateSwapWs".to_string(), is_colo, login_param, ws_type, sender).await;
-//     }
-//     pub async fn new_label(label: String, is_colo: bool,
-//                            login_param: BTreeMap<String, String>,
-//                            ws_type: GateWsType,
-//                            sender: mpsc::Sender<ResponseData>,
-//     ) -> GateSwapWs
-//     {
-//         if is_colo {
-//             println!("支持高速通道-未配置")
-//         } else {}
-//
-//         /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
-//         let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
-//
-//         let request_url = "wss://fx-ws.gateio.ws/v4/ws/".to_string();
-//
-//         /*****返回结构体*******/
-//         GateSwapWs {
-//             label,
-//             request_url,
-//             proxy: parsing_detail,
-//             login_param,
-//             symbol_s: vec![],
-//             subscribe_types: vec![],
-//             sender,
-//         }
-//     }
-//
-//     /*******************************************************************************************************/
-//     /*****************************************订阅函数********************************************************/
-//     /*******************************************************************************************************/
-//     //手动添加订阅信息
-//     pub fn set_subscribe(&mut self, subscribe_types: Vec<GateSubscribeType>) {
-//         self.subscribe_types.extend(subscribe_types);
-//     }
-//     //自定义
-//     pub fn custom_subscribe(&mut self, b_array: Vec<String>)
-//     {
-//         let mut symbol_s = b_array.clone();
-//         for symbol in symbol_s.iter_mut() {
-//             // 大写
-//             *symbol = symbol.to_uppercase();
-//             // 字符串替换
-//             *symbol = symbol.replace("-", "_");
-//         }
-//         self.symbol_s = symbol_s;
-//         self.run();
-//     }
-//
-//     /*******************************************************************************************************/
-//     /*****************************************工具函数********************************************************/
-//     /*******************************************************************************************************/
-//     //订阅枚举解析
-//     pub fn enum_to_string(symbol: String, subscribe_type: GateSubscribeType) -> Value {
-//         match subscribe_type {
-//             GateSubscribeType::BuIndexCandle30m => {
-//                 json!({
-//                     "channel":"index-candle30m",
-//                     "instId":symbol
-//                 })
-//             }
-//             GateSubscribeType::PuIndexTickers => {
-//                 json!({
-//                     "channel":"index-tickers",
-//                     "instId":symbol
-//                 })
-//             }
-//             GateSubscribeType::PrAccount(ccy) => {
-//                 json!({
-//                     "channel":"account",
-//                     "ccy":ccy
-//                 })
-//             }
-//         }
-//     }
-//     //组装订阅数据
-//     pub fn get_subscription(&self) -> String {
-//         let mut args = vec![];
-//         for symbol in &self.symbol_s {
-//             for subscribe_type in &self.subscribe_types {
-//                 let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
-//                 args.push(ty_str);
-//             }
-//         }
-//         let str = json!({
-//               "time": 1545445847,
-//               "time_ms": 1545445847123,
-//               "channel": "futures.candlesticks",
-//               "event": "subscribe",
-//               "result": {
-//                 "status": "success"
-//               }
-//         });
-//
-//         println!("订阅信息:{}", str.to_string());
-//
-//         str.to_string()
-//     }
-//
-//     //组装登陆数据
-//     fn log_in_to_str(&self) -> String {
-//         let mut login_json_str = "".to_string();
-//
-//         let mut access_key: String = "".to_string();
-//         let mut secret_key: String = "".to_string();
-//         let mut passphrase: String = "".to_string();
-//         for (key, value) in &self.login_param {
-//             if key == "access_key" {
-//                 access_key = value.parse().unwrap();
-//             } else if key == "secret_key" {
-//                 secret_key = value.parse().unwrap();
-//             } else if key == "passphrase" {
-//                 passphrase = value.parse().unwrap();
-//             }
-//         }
-//         if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
-//             let timestamp = Utc::now().timestamp().to_string();
-//             // 时间戳 + 请求类型+ 请求参数字符串
-//             let message = format!("{}GET{}", timestamp, "/users/self/verify");
-//             let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
-//             let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
-//             let sign = base64::encode(result);
-//
-//             let login_json = json!({
-//                               "op": "login",
-//                                 "args": [{
-//                                 "apiKey": access_key,
-//                                 "passphrase": passphrase,
-//                                 "timestamp": timestamp,
-//                                 "sign": sign  }]
-//                         });
-//
-//             println!("---login_json:{0}", login_json.to_string());
-//             println!("--登陆:{:?}", login_json);
-//             login_json_str = login_json.to_string();
-//         }
-//         login_json_str
-//     }
-//     /*******************************************************************************************************/
-//     /*****************************************socket基本*****************************************************/
-//     /*******************************************************************************************************/
-//     fn run(&mut self)
-//     {
-//         //订阅信息组装
-//         let subscription = self.get_subscription();
-//         loop {
-//             println!("要连接咯~~!!{}", self.request_url);
-//
-//             let request_url = Url::parse(self.request_url.as_str()).unwrap();
-//             //1. 判断是否需要代理,根据代理地址是否存来选择
-//             if self.proxy.ip_address.len() > 0 {
-//                 let ip_array: Vec<&str> = self.proxy.ip_address.split(".").collect();
-//                 let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
-//                     ip_array[0].parse().unwrap(),
-//                     ip_array[1].parse().unwrap(),
-//                     ip_array[2].parse().unwrap(),
-//                     ip_array[3].parse().unwrap())
-//                 ), self.proxy.port.parse().unwrap());
-//                 let websocket_config = Some(WebSocketConfig {
-//                     max_send_queue: Some(16),
-//                     max_message_size: Some(16 * 1024 * 1024),
-//                     max_frame_size: Some(16 * 1024 * 1024),
-//                     accept_unmasked_frames: false,
-//                 });
-//                 let max_redirects = 5;
-//                 match connect_with_proxy(request_url.clone(),
-//                                          proxy_address, websocket_config, max_redirects) {
-//                     Ok(ws) => {
-//                         self.proxy_subscription(ws.0, subscription.clone());
-//                     }
-//                     Err(err) => {
-//                         println!("Can't connect(无法连接): {}", err);
-//                     }
-//                 };
-//             } else {
-//                 match connect(request_url.clone()) {
-//                     Ok(ws) => {
-//                         self.subscription(ws.0, subscription.clone());
-//                     }
-//                     Err(err) => {
-//                         // 连接失败时执行的操作
-//                         println!("Can't connect(无法连接): {}", err);
-//                         // 返回一个默认的 WebSocket 对象或其他适当的值
-//                         // 或者根据需要触发 panic 或返回错误信息
-//                     }
-//                 };
-//             }
-//             println!("退出来咯")
-//         }
-//     }
-//
-//     //代理
-//     fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>, subscription: String)
-//     {
-//         /*****登陆***/
-//         let login_str = self.log_in_to_str();
-//         if login_str != "" {
-//             let _ = web_socket.write_message(Message::Text(login_str));
-//             thread::sleep(Duration::from_secs(3));
-//         }
-//         /*****订阅***/
-//         web_socket.write_message(Message::Text(subscription))
-//             .unwrap();
-//         /*****消息溜***/
-//         let mut stdout = io::stdout();
-//         loop {
-//             let msg = web_socket.read_message();
-//             match msg {
-//                 Ok(Message::Text(text)) => {
-//                     let res_data = Self::ok_text(text);
-//                     if res_data.code == "-200" {
-//                         writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
-//                     } else {
-//                         self.sender.send(res_data).unwrap();
-//                     }
-//                 }
-//                 Ok(Message::Ping(s)) => {
-//                     writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s.clone())).unwrap();
-//                     let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
-//                     writeln!(stdout, "回应-pong---{:?}", String::from_utf8(s.clone())).unwrap();
-//                 }
-//                 Ok(Message::Pong(s)) => {
-//                     // println!("Pong-响应--{:?}", String::from_utf8(s));
-//                     writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s.clone())).unwrap();
-//                 }
-//                 Ok(Message::Close(_)) => {
-//                     // println!("socket 关闭: ");
-//                     writeln!(stdout, "Close-响应").unwrap();
-//                 }
-//                 Err(error) => {
-//                     // println!("Error receiving message: {}", error);
-//                     writeln!(stdout, "Err-响应{}", error).unwrap();
-//                     break;
-//                 }
-//                 _ => {}
-//             }
-//         }
-//         web_socket.close(None).unwrap();
-//     }
-//
-//     //非代理
-//     fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
-//                     subscription: String)
-//     {
-//         /*****订阅***/
-//         web_socket.write_message(Message::Text(subscription))
-//             .unwrap();
-//         /*****消息溜***/
-//         let mut stdout = io::stdout();
-//         loop {
-//             let msg = web_socket.read_message();
-//             match msg {
-//                 Ok(Message::Text(text)) => {
-//                     let res_data = Self::ok_text(text);
-//                     if res_data.code == "-200" {
-//                         writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
-//                     } else {
-//                         self.sender.send(res_data).unwrap();
-//                     }
-//                 }
-//                 Ok(Message::Ping(s)) => {
-//                     println!("Ping-响应--{:?}", String::from_utf8(s));
-//                 }
-//                 Ok(Message::Pong(s)) => {
-//                     println!("Pong-响应--{:?}", String::from_utf8(s));
-//                 }
-//                 Ok(Message::Close(_)) => {
-//                     println!("socket 关闭: ");
-//                 }
-//                 Err(error) => {
-//                     println!("Error receiving message: {}", error);
-//                     break;
-//                 }
-//                 _ => {}
-//             }
-//         }
-//         web_socket.close(None).unwrap();
-//     }
-//
-//     //数据解析
-//     pub fn ok_text(text: String) -> ResponseData
-//     {
-//         let mut res_data = ResponseData::new("200".to_string(), "success".to_string(), "".to_string());
-//         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
-//         if json_value.get("event").is_some() {//订阅返回
-//             if json_value["event"].as_str() == Option::from("error") {
-//                 res_data.code = json_value["code"].to_string();
-//                 res_data.message = format!("错误:{}", json_value["msg"].to_string());
-//             } else if json_value["event"].as_str() == Option::from("subscribe") {
-//                 res_data.code = "-200".to_string();
-//                 res_data.data = text;
-//             } else {
-//                 res_data.data = text
-//             }
-//         } else {
-//             res_data.data = text
-//         }
-//         res_data
-//     }
-// }
+use std::collections::{BTreeMap};
+use std::{io};
+use std::io::{Write};
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use tokio::sync::mpsc::Sender;
+use serde_json::{json, Value};
+use hex;
+use hmac::{Hmac, Mac, NewMac};
+use sha2::Sha512;
+use crate::{proxy};
+use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
+use tungstenite::{connect, Message, WebSocket};
+use tungstenite::protocol::WebSocketConfig;
+use url::Url;
+use crate::proxy::ParsingDetail;
+use crate::response_base::ResponseData;
+
+pub enum GateWsType {
+    //频道类型
+    PublicAndPrivate(String),
+}
+
+
+#[derive(Clone)]                        //订阅枚举
+pub enum GateSubscribeType {
+    PuFuturesOrderBook,
+    PuFuturesCandlesticks,
+    PuFuturesTrades,
+    //
+    PrFuturesOrders(String),
+    PrFuturesPositions(String),
+    PrFuturesBalances(String),
+
+}
+
+#[derive(Clone)]
+pub struct GateSwapWs {
+    pub label: String,
+    request_url: String,
+    //实际ws 链接地址
+    proxy: ParsingDetail,
+    //账号信息
+    login_param: BTreeMap<String, String>,
+    //kuconis特殊参数
+    symbol_s: Vec<String>,
+    //订阅币对
+    subscribe_types: Vec<GateSubscribeType>,
+    //订阅信息
+    sender: Sender<ResponseData>,     //数据通道
+}
+
+impl GateSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool,
+               login_param: BTreeMap<String, String>,
+               ws_type: GateWsType,
+               sender: Sender<ResponseData>,
+    ) -> GateSwapWs
+    {
+        return GateSwapWs::new_label("default-GateSwapWs".to_string(), is_colo, login_param, ws_type, sender);
+    }
+    pub fn new_label(label: String, is_colo: bool,
+                     login_param: BTreeMap<String, String>,
+                     ws_type: GateWsType,
+                     sender: Sender<ResponseData>,
+    ) -> GateSwapWs
+    {
+        if is_colo {
+            println!("支持高速通道-未配置")
+        } else {}
+
+        /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
+        let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
+
+
+        let request_url = match ws_type {
+            GateWsType::PublicAndPrivate(name) => {
+                format!("wss://fx-ws.gateio.ws/v4/ws/{}", name.to_string())
+            }
+        };
+
+
+        /*****返回结构体*******/
+        GateSwapWs {
+            label,
+            request_url,
+            proxy: parsing_detail,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            sender,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<GateSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //自定义
+    pub async fn custom_subscribe(&mut self, b_array: Vec<String>)
+    {
+        let mut symbol_s = b_array.clone();
+        for symbol in symbol_s.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("-", "_");
+        }
+        self.symbol_s = symbol_s;
+        self.run().await;
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: GateSubscribeType, login_param: BTreeMap<String, String>) -> Value {
+        let time = chrono::Utc::now().timestamp();
+        let mut access_key = "".to_string();
+        if login_param.contains_key("access_key") {
+            access_key = login_param.get("access_key").unwrap().to_string();
+        }
+        let mut secret_key = "".to_string();
+        if login_param.contains_key("secret_key") {
+            secret_key = login_param.get("secret_key").unwrap().to_string();
+        }
+
+        match subscribe_type {
+            GateSubscribeType::PuFuturesOrderBook => {
+                json!({
+                    "time": time,
+                    "channel": "futures.order_book",
+                    "event": "subscribe",
+                    "payload":  [symbol, "20", "0"]
+                })
+            }
+            GateSubscribeType::PuFuturesCandlesticks => {
+                json!({
+                    "time": time,
+                    "channel": "futures.candlesticks",
+                    "event": "subscribe",
+                    "payload":  ["1m", symbol]
+                })
+            }
+            GateSubscribeType::PrFuturesOrders(user_id) => {
+                json!({
+                    "time": time,
+                    "channel": "futures.orders",
+                    "event": "subscribe",
+                    "payload":  [user_id, symbol],
+                    "auth": {
+                        "method": "api_key",
+                        "KEY": access_key,
+                        "SIGN":Self::sign(secret_key.to_string(),
+                              "futures.orders".to_string(),
+                              "subscribe".to_string(),
+                              time.to_string())
+                    }
+                })
+            }
+            GateSubscribeType::PrFuturesPositions(user_id) => {
+                json!({
+                    "time": time,
+                    "channel": "futures.positions",
+                    "event": "subscribe",
+                    "payload":  [user_id, symbol],
+                    "auth": {
+                        "method": "api_key",
+                        "KEY": access_key,
+                        "SIGN":Self::sign(secret_key.to_string(),
+                              "futures.positions".to_string(),
+                              "subscribe".to_string(),
+                              time.to_string())
+                    }
+                })
+            }
+            GateSubscribeType::PrFuturesBalances(user_id) => {
+                json!({
+                    "time": time,
+                    "channel": "futures.balances",
+                    "event": "subscribe",
+                    "payload":  [user_id],
+                    "auth": {
+                        "method": "api_key",
+                        "KEY": access_key,
+                        "SIGN":Self::sign(secret_key.to_string(),
+                              "futures.balances".to_string(),
+                              "subscribe".to_string(),
+                              time.to_string())
+                    }
+                })
+            }
+            GateSubscribeType::PuFuturesTrades => {
+                json!({
+                    "time": time,
+                    "channel": "futures.trades",
+                    "event": "subscribe",
+                    "payload":  [symbol]
+                })
+            }
+        }
+    }
+    //组装订阅数据
+    pub fn get_subscription(&self) -> Vec<Value> {
+        let mut args = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(),
+                                                  subscribe_type.clone(),
+                                                  self.login_param.clone(),
+                );
+                args.push(ty_str);
+            }
+        }
+        args
+    }
+
+    fn sign(secret_key: String, channel: String, event: String, time: String) -> String {
+        let message = format!("channel={}&event={}&time={}", channel, event, time);
+        let mut mac = Hmac::<Sha512>::new_varkey(secret_key.as_bytes()).expect("Failed to create HMAC");
+        mac.update(message.as_bytes());
+        let result = mac.finalize().into_bytes();
+        let sign = hex::encode(result);
+        sign
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    async fn run(&mut self)
+    {
+        //订阅信息组装
+        let subscription = self.get_subscription();
+        loop {
+            println!("要连接咯~~!!{}", self.request_url);
+
+            let request_url = Url::parse(self.request_url.as_str()).unwrap();
+            //1. 判断是否需要代理,根据代理地址是否存来选择
+            if self.proxy.ip_address.len() > 0 {
+                let ip_array: Vec<&str> = self.proxy.ip_address.split(".").collect();
+                let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+                    ip_array[0].parse().unwrap(),
+                    ip_array[1].parse().unwrap(),
+                    ip_array[2].parse().unwrap(),
+                    ip_array[3].parse().unwrap())
+                ), self.proxy.port.parse().unwrap());
+                let websocket_config = Some(WebSocketConfig {
+                    max_send_queue: Some(16),
+                    max_message_size: Some(16 * 1024 * 1024),
+                    max_frame_size: Some(16 * 1024 * 1024),
+                    accept_unmasked_frames: false,
+                });
+                let max_redirects = 5;
+                match connect_with_proxy(request_url.clone(),
+                                         proxy_address, websocket_config, max_redirects) {
+                    Ok(ws) => {
+                        self.proxy_subscription(ws.0, subscription.clone()).await;
+                    }
+                    Err(err) => {
+                        println!("Can't connect(无法连接): {}", err);
+                    }
+                };
+            } else {
+                match connect(request_url.clone()) {
+                    Ok(ws) => {
+                        self.subscription(ws.0, subscription.clone()).await;
+                    }
+                    Err(err) => {
+                        // 连接失败时执行的操作
+                        println!("Can't connect(无法连接): {}", err);
+                        // 返回一个默认的 WebSocket 对象或其他适当的值
+                        // 或者根据需要触发 panic 或返回错误信息
+                    }
+                };
+            }
+            println!("退出来咯")
+        }
+    }
+
+    //代理
+    async fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>, subscription: Vec<Value>)
+    {
+        let label = self.label.clone();
+        /*****订阅***/
+        for sub in &subscription {
+            println!("订阅信息:{}", sub.to_string());
+            web_socket.write_message(Message::Text(sub.to_string()))
+                .unwrap();
+        }
+        /*****消息溜***/
+        let mut stdout = io::stdout();
+        loop {
+            let msg = web_socket.read_message();
+            match msg {
+                Ok(Message::Text(text)) => {
+                    let res_data = Self::ok_text(label.to_string(), text);
+                    if res_data.code == "-200" {
+                        writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
+                    } else {
+                        self.sender.send(res_data).await.unwrap();
+                    }
+                }
+                Ok(Message::Ping(s)) => {
+                    writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s.clone())).unwrap();
+                    let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
+                    writeln!(stdout, "回应-pong---{:?}", String::from_utf8(s.clone())).unwrap();
+                }
+                Ok(Message::Pong(s)) => {
+                    // println!("Pong-响应--{:?}", String::from_utf8(s));
+                    writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s.clone())).unwrap();
+                }
+                Ok(Message::Close(_)) => {
+                    // println!("socket 关闭: ");
+                    writeln!(stdout, "Close-响应").unwrap();
+                }
+                Err(error) => {
+                    // println!("Error receiving message: {}", error);
+                    writeln!(stdout, "Err-响应{}", error).unwrap();
+                    break;
+                }
+                _ => {}
+            }
+        }
+        web_socket.close(None).unwrap();
+    }
+
+    //非代理
+    async fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
+                    subscription: Vec<Value>)
+    {
+        let label = self.label.clone();
+        /*****订阅***/
+        for sub in &subscription {
+            web_socket.write_message(Message::Text(sub.to_string()))
+                .unwrap();
+        }
+        /*****消息溜***/
+        let mut stdout = io::stdout();
+        loop {
+            let msg = web_socket.read_message();
+            match msg {
+                Ok(Message::Text(text)) => {
+                    let res_data = Self::ok_text(label.to_string(), text);
+                    if res_data.code == "-200" {
+                        writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
+                    } else {
+                        self.sender.send(res_data).await.unwrap();
+                    }
+                }
+                Ok(Message::Ping(s)) => {
+                    println!("Ping-响应--{:?}", String::from_utf8(s));
+                }
+                Ok(Message::Pong(s)) => {
+                    println!("Pong-响应--{:?}", String::from_utf8(s));
+                }
+                Ok(Message::Close(_)) => {
+                    println!("socket 关闭: ");
+                }
+                Err(error) => {
+                    println!("Error receiving message: {}", error);
+                    break;
+                }
+                _ => {}
+            }
+        }
+        web_socket.close(None).unwrap();
+    }
+
+    //数据解析
+    pub fn ok_text(label: String, text: String) -> ResponseData
+    {
+        // println!("原始数据:{}", text);
+        let mut res_data = ResponseData::new(label.to_string(), "200".to_string(), "success".to_string(), "".to_string());
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+
+        if json_value.get("error").is_some() {
+            let message = json_value["error"]["message"].as_str().unwrap().to_string();
+            let mes = message.trim_end_matches('\n');
+
+            // let mes_json_value: serde_json::Value = serde_json::from_str(mes).unwrap();
+            // // println!("错误信息:{}", mes_json_value.to_string());
+            res_data.code = json_value["error"]["code"].to_string();
+            res_data.message = mes.clone().to_string();
+        } else if json_value["result"]["status"].as_str() == Option::from("success") {//订阅返回
+            res_data.code = "-200".to_string();
+            res_data.data = text;
+        } else {
+            res_data.channel = format!("{}", json_value["channel"].as_str().unwrap());
+            res_data.code = "200".to_string();
+            res_data.data = json_value["result"].to_string();
+        }
+        res_data
+    }
+}

+ 16 - 15
exchanges/src/kuconin_swap_ws.rs

@@ -2,7 +2,7 @@ use std::collections::{BTreeMap, HashSet};
 use std::{io};
 use std::io::{Write};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
-use std::sync::{mpsc};
+use tokio::sync::mpsc::Sender;
 use serde_json::{json};
 use crate::{proxy};
 use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
@@ -13,6 +13,7 @@ use crate::kucoin_swap_rest::KucoinSwapRest;
 use crate::proxy::ParsingDetail;
 use crate::response_base::ResponseData;
 
+
 pub enum KuconinWsType {
     Public,
     Private,
@@ -53,7 +54,7 @@ pub struct KuconinSwapWs {
     //订阅币对
     subscribe_types: Vec<KuconinSubscribeType>,
     //订阅信息
-    sender: mpsc::Sender<ResponseData>,     //数据通道
+    sender: Sender<ResponseData>,     //数据通道
 }
 
 impl KuconinSwapWs {
@@ -63,14 +64,14 @@ impl KuconinSwapWs {
     pub async fn new(is_colo: bool,
                      login_param: BTreeMap<String, String>,
                      ws_type: KuconinWsType,
-                     sender: mpsc::Sender<ResponseData>,
+                     sender: Sender<ResponseData>,
     ) -> KuconinSwapWs {
         return KuconinSwapWs::new_label("default-KuconinSwapWs".to_string(), is_colo, login_param, ws_type, sender).await;
     }
     pub async fn new_label(label: String, is_colo: bool,
                            login_param: BTreeMap<String, String>,
                            ws_type: KuconinWsType,
-                           sender: mpsc::Sender<ResponseData>,
+                           sender: Sender<ResponseData>,
     ) -> KuconinSwapWs
     {
         if is_colo {
@@ -161,11 +162,11 @@ impl KuconinSwapWs {
         }
     }
     //自定义
-    pub fn custom_subscribe(&mut self, b_array: Vec<String>)
+    pub async fn custom_subscribe(&mut self, b_array: Vec<String>)
     {
         self.symbol_s = b_array.clone();
         self.request_url = format!("{}?token={}", self.ws_param.ws_url, self.ws_param.token);
-        self.run();
+        self.run().await;
     }
 
     /*******************************************************************************************************/
@@ -211,7 +212,7 @@ impl KuconinSwapWs {
     /*******************************************************************************************************/
     /*****************************************socket基本*****************************************************/
     /*******************************************************************************************************/
-    fn run(&self)
+    async fn run(&self)
     {
         //订阅信息组装
         let subscription = self.get_subscription();
@@ -242,7 +243,7 @@ impl KuconinSwapWs {
                 let _ = match connect_with_proxy(request_url.clone(),
                                                  proxy_address, websocket_config, max_redirects) {
                     Ok(ws) => {
-                        self.proxy_subscription(ws.0, subscription.clone());
+                        self.proxy_subscription(ws.0, subscription.clone()).await;
                     }
                     Err(err) => {
                         println!("Can't connect(无法连接): {}", err);
@@ -251,7 +252,7 @@ impl KuconinSwapWs {
             } else {
                 let _ = match connect(request_url.clone()) {
                     Ok(ws) => {
-                        self.subscription(ws.0, subscription.clone());
+                        self.subscription(ws.0, subscription.clone()).await;
                     }
                     Err(err) => {
                         // 连接失败时执行的操作
@@ -266,8 +267,8 @@ impl KuconinSwapWs {
     }
 
     //代理
-    fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>,
-                          subscription: Vec<String>)
+    async fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>,
+                                subscription: Vec<String>)
     {
         let label = self.label.clone();
         /*****消息溜***/
@@ -304,7 +305,7 @@ impl KuconinSwapWs {
                             break;
                         }
 
-                        self.sender.send(res_data).unwrap();
+                        self.sender.send(res_data).await.unwrap();
                     }
                 }
                 Ok(Message::Ping(s)) => {
@@ -333,8 +334,8 @@ impl KuconinSwapWs {
     }
 
     //非代理
-    fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
-                    subscription: Vec<String>)
+    async fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
+                          subscription: Vec<String>)
     {
         let label = self.label.clone();
         /*****消息溜***/
@@ -371,7 +372,7 @@ impl KuconinSwapWs {
                             break;
                         }
 
-                        self.sender.send(res_data).unwrap();
+                        self.sender.send(res_data).await.unwrap();
                     }
                 }
                 Ok(Message::Ping(s)) => {

+ 17 - 17
exchanges/src/okx_swap_ws.rs

@@ -2,11 +2,11 @@ use std::collections::{BTreeMap};
 use std::{io, thread};
 use std::io::{Write};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
-use std::sync::{mpsc};
 use std::time::Duration;
 use chrono::Utc;
 use serde_json::{json, Value};
 use ring::hmac;
+use tokio::sync::mpsc::Sender;
 use crate::{proxy};
 use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
 use tungstenite::{connect, Message, WebSocket};
@@ -45,25 +45,25 @@ pub struct OkxSwapWs {
     //订阅币对
     subscribe_types: Vec<OkxSubscribeType>,
     //订阅信息
-    sender: mpsc::Sender<ResponseData>,     //数据通道
+    sender: Sender<ResponseData>,     //数据通道
 }
 
 impl OkxSwapWs {
     /*******************************************************************************************************/
     /*****************************************获取一个对象****************************************************/
     /*******************************************************************************************************/
-    pub async fn new(is_colo: bool,
+    pub  fn new(is_colo: bool,
                      login_param: BTreeMap<String, String>,
                      ws_type: OkxWsType,
-                     sender: mpsc::Sender<ResponseData>,
+                     sender: Sender<ResponseData>,
     ) -> OkxSwapWs
     {
-        return OkxSwapWs::new_label("default-OkxSwapWs".to_string(), is_colo, login_param, ws_type, sender).await;
+        return OkxSwapWs::new_label("default-OkxSwapWs".to_string(), is_colo, login_param, ws_type, sender);
     }
-    pub async fn new_label(label: String, is_colo: bool,
+    pub  fn new_label(label: String, is_colo: bool,
                            login_param: BTreeMap<String, String>,
                            ws_type: OkxWsType,
-                           sender: mpsc::Sender<ResponseData>,
+                           sender: Sender<ResponseData>,
     ) -> OkxSwapWs
     {
         if is_colo {
@@ -106,7 +106,7 @@ impl OkxSwapWs {
         self.subscribe_types.extend(subscribe_types);
     }
     //自定义
-    pub fn custom_subscribe(&mut self, b_array: Vec<String>)
+    pub async fn custom_subscribe(&mut self, b_array: Vec<String>)
     {
         let mut symbol_s = b_array.clone();
         for symbol in symbol_s.iter_mut() {
@@ -116,7 +116,7 @@ impl OkxSwapWs {
             *symbol = symbol.replace("_", "-");
         }
         self.symbol_s = symbol_s;
-        self.run();
+        self.run().await;
     }
 
     /*******************************************************************************************************/
@@ -206,7 +206,7 @@ impl OkxSwapWs {
     /*******************************************************************************************************/
     /*****************************************socket基本*****************************************************/
     /*******************************************************************************************************/
-    fn run(&mut self)
+    async fn run(&mut self)
     {
         //订阅信息组装
         let subscription = self.get_subscription();
@@ -233,7 +233,7 @@ impl OkxSwapWs {
                 match connect_with_proxy(request_url.clone(),
                                          proxy_address, websocket_config, max_redirects) {
                     Ok(ws) => {
-                        self.proxy_subscription(ws.0, subscription.clone());
+                        self.proxy_subscription(ws.0, subscription.clone()).await;
                     }
                     Err(err) => {
                         println!("Can't connect(无法连接): {}", err);
@@ -242,7 +242,7 @@ impl OkxSwapWs {
             } else {
                 match connect(request_url.clone()) {
                     Ok(ws) => {
-                        self.subscription(ws.0, subscription.clone());
+                        self.subscription(ws.0, subscription.clone()).await;
                     }
                     Err(err) => {
                         // 连接失败时执行的操作
@@ -257,7 +257,7 @@ impl OkxSwapWs {
     }
 
     //代理
-    fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>, subscription: String)
+    async fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>, subscription: String)
     {
         let lable = self.label.clone();
         /*****登陆***/
@@ -279,7 +279,7 @@ impl OkxSwapWs {
                     if res_data.code == "-200" {
                         writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
                     } else {
-                        self.sender.send(res_data).unwrap();
+                        self.sender.send(res_data).await.unwrap();
                     }
                 }
                 Ok(Message::Ping(s)) => {
@@ -307,7 +307,7 @@ impl OkxSwapWs {
     }
 
     //非代理
-    fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
+    async fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
                     subscription: String)
     {
         let lable = self.label.clone();
@@ -320,11 +320,11 @@ impl OkxSwapWs {
             let msg = web_socket.read_message();
             match msg {
                 Ok(Message::Text(text)) => {
-                    let res_data = Self::ok_text(lable.to_string(),text);
+                    let res_data = Self::ok_text(lable.to_string(), text);
                     if res_data.code == "-200" {
                         writeln!(stdout, "订阅成功:{:?}", res_data.data).unwrap();
                     } else {
-                        self.sender.send(res_data).unwrap();
+                        self.sender.send(res_data).await.unwrap();
                     }
                 }
                 Ok(Message::Ping(s)) => {

+ 109 - 48
exchanges/tests/test.rs

@@ -14,6 +14,10 @@ use exchanges::okx_swap_ws::{OkxSubscribeType, OkxSwapWs, OkxWsType};
 
 use std::io::{Read, Write};
 use std::net::{TcpStream, ToSocketAddrs};
+use std::time::Duration;
+use tokio::sync::mpsc::channel;
+use tokio::try_join;
+use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
 
 
 #[tokio::test]
@@ -27,12 +31,14 @@ async fn test_import() {
     // demo_get_http_proxy();
 
     //币安---深度socket-公共频道订阅
-    // demo_pub_ws_ba();
+    // demo_pub_ws_ba().await;
     // 币安-rest-获取账户信息
     // demo_rest_ba().await;
 
     //gate-rest -账户信息
     // demo_rest_gate().await;
+    //gate-ws-public-private频道
+    // demo_ws_gate().await;
 
 
     //kucoin_rest -账户信息
@@ -47,13 +53,45 @@ async fn test_import() {
     //okx - Business 频道
     // demo_ws_okx_bu().await;
     //okx - public 频道
-    // demo_ws_okx_pu().await;
+    demo_ws_okx_pu().await;
 
-    demo_so();
+
+    // demo_so();
+}
+
+async fn demo_ws_gate() {
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+    btree_map.insert("access_key".to_string(), "4181c882718a95e72122ac1d52c88533".to_string());
+    btree_map.insert("secret_key".to_string(), "de82d1507b843ff08d81a0e9b878b721359f274937216b307834b570b676fa3c".to_string());
+    let (tx, mut rx) = channel(1024);
+
+    let mut gate_ws = GateSwapWs::new(false, btree_map,
+                                      GateWsType::PublicAndPrivate("usdt".to_string()),
+                                      tx);
+    gate_ws.set_subscribe(vec![
+        GateSubscribeType::PuFuturesOrderBook,
+        GateSubscribeType::PuFuturesCandlesticks,
+        // GateSubscribeType::PrFuturesOrders("".to_string()),
+        // GateSubscribeType::PrFuturesPositions("".to_string()),
+        GateSubscribeType::PrFuturesBalances("".to_string()),
+        // GateSubscribeType::PuFuturesTrades,
+    ]);
+    let t01 = tokio::spawn(async move {
+        gate_ws.custom_subscribe(vec!["BTC-USDT".to_string()]).await;
+    });
+    let t02 = tokio::spawn(async move {
+        let mut stdout = std::io::stdout();
+        loop {
+            if let Ok(received) = rx.try_recv() {
+                writeln!(stdout, "age: {:?}", received).unwrap();
+            }
+        }
+    });
+    try_join!(t01,t02).unwrap();
 }
 
 
-  fn demo_so() {
+fn demo_so() {
     // 代理服务器地址和端口
     // let proxy_address = "127.0.0.1:7890";
     // // 目标服务器地址和端口
@@ -107,34 +145,40 @@ async fn test_import() {
 
 async fn demo_ws_okx_pu() {
     let btree_map: BTreeMap<String, String> = BTreeMap::new();
-    let (tx, rx) = mpsc::channel();
-    let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Public, tx).await;
+    let (tx, mut rx) = channel(1024);
+    let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Public, tx);
     ku_ws.set_subscribe(vec![OkxSubscribeType::PuIndexTickers]);
-    thread::spawn(move || {
-        ku_ws.custom_subscribe(vec!["BTC-USD".to_string()]);
+    let t1 = tokio::spawn(async move {
+        ku_ws.custom_subscribe(vec!["BTC-USD".to_string()]).await;
     });
-    let mut stdout = std::io::stdout();
-    loop {
-        if let Ok(received) = rx.try_recv() {
-            writeln!(stdout, "age: {:?}", received).unwrap();
+    let t2 = tokio::spawn(async move {
+        let mut stdout = std::io::stdout();
+        loop {
+            if let Ok(received) = rx.try_recv() {
+                writeln!(stdout, "age: {:?}", received).unwrap();
+            }
         }
-    }
+    });
+    try_join!(t1,t2).unwrap();
 }
 
 async fn demo_ws_okx_bu() {
     let btree_map: BTreeMap<String, String> = BTreeMap::new();
-    let (tx, rx) = mpsc::channel();
-    let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Business, tx).await;
+    let (tx, mut rx) = channel(1024);
+    let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Business, tx);
     ku_ws.set_subscribe(vec![OkxSubscribeType::BuIndexCandle30m]);
-    thread::spawn(move || {
-        ku_ws.custom_subscribe(vec!["BTC-USD".to_string()]);
+    let t1 = tokio::spawn(async move {
+        ku_ws.custom_subscribe(vec!["BTC-USD".to_string()]).await;
     });
-    let mut stdout = std::io::stdout();
-    loop {
-        if let Ok(received) = rx.try_recv() {
-            writeln!(stdout, "age: {:?}", received).unwrap();
+    let t2 = tokio::spawn(async move {
+        let mut stdout = std::io::stdout();
+        loop {
+            if let Ok(received) = rx.try_recv() {
+                writeln!(stdout, "age: {:?}", received).unwrap();
+            }
         }
-    }
+    });
+    try_join!(t1,t2).unwrap();
 }
 
 async fn demo_ws_kucoin_pr() {
@@ -143,35 +187,46 @@ async fn demo_ws_kucoin_pr() {
     btree_map.insert("secret_key".to_string(), "9c0df8b7-daaa-493e-a53a-82703067f7dd".to_string());
     btree_map.insert("pass_key".to_string(), "b87d055f".to_string());
     println!("----------------------btree_map{:?}", btree_map.clone());
-    let (tx, rx) = mpsc::channel();
+    let (tx, mut rx) = channel(1024);
     let mut ku_ws = KuconinSwapWs::new(false, btree_map.clone(),
                                        KuconinWsType::Private, tx).await;
     ku_ws.set_subscribe(vec![KuconinSubscribeType::PrContractMarketTradeOrdersSys]);
-    thread::spawn(move || {
-        ku_ws.custom_subscribe(vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]);
+
+    let t1 = tokio::spawn(async move {
+        ku_ws.custom_subscribe(vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
     });
-    let mut stdout = std::io::stdout();
-    loop {
-        if let Ok(received) = rx.try_recv() {
-            writeln!(stdout, "age: {:?}", received).unwrap();
+
+    let t2 = tokio::spawn(async move {
+        let mut stdout = std::io::stdout();
+        loop {
+            if let Ok(received) = rx.try_recv() {
+                writeln!(stdout, "age: {:?}", received).unwrap();
+            }
         }
-    }
+    });
+
+    try_join!(t1,t2).unwrap();
 }
 
 async fn demo_ws_kucoin_pu() {
     let btree_map: BTreeMap<String, String> = BTreeMap::new();
-    let (tx, rx) = mpsc::channel();
+    let (tx, mut rx) = channel(1024);
     let mut ku_ws = KuconinSwapWs::new(false, btree_map, KuconinWsType::Public, tx).await;
     ku_ws.set_subscribe(vec![KuconinSubscribeType::PuContractMarketLevel2Depth50]);
-    thread::spawn(move || {
-        ku_ws.custom_subscribe(vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]);
+
+    let t1 = tokio::spawn(async move {
+        ku_ws.custom_subscribe(vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
     });
-    let mut stdout = std::io::stdout();
-    loop {
-        if let Ok(received) = rx.try_recv() {
-            writeln!(stdout, "age: {:?}", received).unwrap();
+    let t2 = tokio::spawn(async move {
+        let mut stdout = std::io::stdout();
+        loop {
+            if let Ok(received) = rx.try_recv() {
+                writeln!(stdout, "age: {:?}", received).unwrap();
+            }
         }
-    }
+    });
+
+    try_join!(t1,t2).unwrap();
 }
 
 async fn demo_rest_kucoin() {
@@ -284,21 +339,27 @@ async fn demo_rest_ba() {
     println!("币安-rest-获取账户信息{:?}", res_data);
 }
 
-fn demo_pub_ws_ba() {
+async fn demo_pub_ws_ba() {
     let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
     btree_map.insert("lable".parse().unwrap(), "binance".parse().unwrap());//交易行名称
-    let (tx, rx) = mpsc::channel();
-    thread::spawn(move || {
-        let ba_exc = BinanceUsdtSwapWs::new(false, btree_map, tx);
-        ba_exc.custom_subscribe(vec![&"BTCUSDT", "ROSEUSDT"], 1, 0);
+    let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
+    let ba_exc = BinanceUsdtSwapWs::new(false, btree_map, tx);
+
+    let t1 = tokio::spawn(async move {
+        ba_exc.custom_subscribe(vec!["BTCUSDT".to_string(),
+                                     "ROSEUSDT".to_string()], 1, 1).await;
     });
 
-    let mut stdout = std::io::stdout();
-    loop {
-        if let Ok(received) = rx.try_recv() {
-            writeln!(stdout, "age: {:?}", received).unwrap();
+
+    let t2 = tokio::spawn(async move {
+        let mut stdout = std::io::stdout();
+        loop {
+            if let Ok(received) = rx.try_recv() {
+                writeln!(stdout, "age: {:?}", received).unwrap();
+            }
         }
-    }
+    });
+    try_join!(t1,t2).unwrap();
 }
 
 fn demo_get_http_proxy() {