Browse Source

测试 示例

875428575@qq.com 2 years ago
parent
commit
f5c551a307
1 changed files with 242 additions and 93 deletions
  1. 242 93
      exchanges/tests/test.rs

+ 242 - 93
exchanges/tests/test.rs

@@ -1,27 +1,22 @@
-use std::{fs, thread};
 use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_rest::GateSwapRest;
 use std::collections::BTreeMap;
 use std::collections::BTreeMap;
-use std::io::{BufRead, BufReader};
-use std::net::{TcpListener};
-use std::sync::{mpsc};
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use exchanges::binance_usdt_swap_rest::BinanceUsdtSwapRest;
+use tokio::io::{AsyncReadExt};
 use exchanges::kucoin_swap_rest::KucoinSwapRest;
 use exchanges::kucoin_swap_rest::KucoinSwapRest;
 use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
 use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
 use exchanges::{proxy};
 use exchanges::{proxy};
-use exchanges::binance_usdt_swap_ws::BinanceUsdtSwapWs;
 use exchanges::okx_swap_ws::{OkxSubscribeType, OkxSwapWs, OkxWsType};
 use exchanges::okx_swap_ws::{OkxSubscribeType, OkxSwapWs, OkxWsType};
 
 
 use std::io::{Read, Write};
 use std::io::{Read, Write};
-use std::net::{TcpStream, ToSocketAddrs};
-use std::time::Duration;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
 use tokio::sync::mpsc::channel;
 use tokio::sync::mpsc::channel;
 use tokio::try_join;
 use tokio::try_join;
-use tracing::{debug, trace};
-use tracing::field::debug;
+use tracing::{trace};
+use tracing::instrument::WithSubscriber;
+use tungstenite::{connect, Message};
 use exchanges::binance_swap_rest::BinanceSwapRest;
 use exchanges::binance_swap_rest::BinanceSwapRest;
-use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
-
+use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
+use exchanges::okx_swap_rest::OkxSwapRest;
 
 
 #[tokio::test]
 #[tokio::test]
 async fn test_import() {
 async fn test_import() {
@@ -42,15 +37,13 @@ async fn test_import() {
     // demo_rest_ba().await;
     // demo_rest_ba().await;
 
 
     //gate-rest -账户信息
     //gate-rest -账户信息
-    // demo_rest_gate().await;
+    demo_rest_gate().await;
     //gate-ws-public-private频道
     //gate-ws-public-private频道
     // demo_ws_gate().await;
     // demo_ws_gate().await;
 
 
 
 
     //kucoin_rest -账户信息
     //kucoin_rest -账户信息
-    demo_rest_kucoin().await;
-
-
+    // demo_rest_kucoin().await;
     //Kucoin-ws--公共频道
     //Kucoin-ws--公共频道
     // demo_ws_kucoin_pu().await;
     // demo_ws_kucoin_pu().await;
     //Kucoin-ws--私有频道
     //Kucoin-ws--私有频道
@@ -60,9 +53,134 @@ async fn test_import() {
     // demo_ws_okx_bu().await;
     // demo_ws_okx_bu().await;
     //okx - public 频道
     //okx - public 频道
     // demo_ws_okx_pu().await;
     // demo_ws_okx_pu().await;
-
+    //okx - rest 频道
+    // demo_okx_rest().await;
 
 
     // demo_so();
     // demo_so();
+
+
+    // let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(),
+    //                                   KucoinWsType::Private, tx).await;
+    // ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]);
+    //
+    // let t1 = tokio::spawn(async move {
+    //     ku_ws.custom_subscribe(vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
+    // });
+    //
+    // let t2 = tokio::spawn(async move {
+    //     let mut stdout = std::io::stdout();
+    //     loop {
+    //         if let Ok(received) = rx.try_recv() {
+    //             writeln!(stdout, "age: {:?}", received).unwrap();
+    //         }
+    //     }
+    // let t01 = tokio::spawn(async move {
+    //     loop {
+    //         tokio::time::sleep(Duration::from_secs(2)).await;
+    //         trace!( "发送-指令: ");
+    //     }
+    // });
+    // let t02 = tokio::spawn(async move {
+    //     loop {
+    //         tokio::time::sleep(Duration::from_secs(3)).await;
+    //         trace!( "接收 -指令: ");
+    //     }
+    // });
+    // try_join!(t01,t02).unwrap();
+
+
+    // let (mut tx_end, mut rx_end) = channel::<String>(1024);
+    // let (mut tx_read, mut rx_read) = channel::<String>(1024);
+    // let mut stream = tokio::net::TcpStream::connect("127.0.0.1:8080").await.unwrap();
+    // stream.write_all( b"Hello, server!").await.unwrap();
+    // stream.flush().await.unwrap();
+    // let mutex_stream = Arc::new(Mutex::new(stream));
+    //
+    // //捕捉用户的主动发送的订阅指令
+    // let stream_clone = Arc::clone(&mutex_stream);
+    // let t_1 = tokio::spawn(async move {
+    //     loop {
+    //         tokio::time::sleep(Duration::from_secs(1)).await;
+    //         if let Ok(received) = rx_end.try_recv() {
+    //             trace!("动态订阅内容: {:?}", received);
+    //             let mut stream_lock = stream_clone.lock().await;
+    //             // stream_lock.write_all( b"1111!").await.unwrap();
+    //             // stream_lock.flush().await.unwrap();
+    //
+    //             // stream_lock.write_all(b"Hello, server!").await.unwrap();
+    //         }
+    //     }
+    // });
+    //
+    //
+    // //socket数据获取,装入回显通道
+    // let stream_clone = Arc::clone(&mutex_stream);
+    // let t_2 = tokio::spawn(async move {
+    //     // 创建一个用于存储服务器响应的缓冲区
+    //     let mut buffer = [0; 512];
+    //     loop {
+    //         let mut stream_lock = stream_clone.lock().await;
+    //         tokio::time::sleep(Duration::from_secs(1)).await;
+    //
+    //         let _ = match stream_lock.read(&mut buffer).await {
+    //             Ok(n) => {
+    //                 tokio::time::sleep(Duration::from_secs(1)).await;
+    //                 if n == 0 {
+    //                     // 没有读取到数据
+    //                     trace!("没有数据,主动发送");
+    //                 } else {
+    //                     // 打印服务器的响应
+    //                     trace!("有数据: {}", String::from_utf8_lossy(&buffer[..n]));
+    //                     tx_read.send(format!("{}", String::from_utf8_lossy(&buffer[..n]))).await.unwrap()
+    //                 }
+    //             }
+    //             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+    //                 // 如果读取操作会阻塞,则等待一会儿再试
+    //                 trace!("Would block, sleeping会阻碍睡眠??");
+    //                 tokio::time::sleep(Duration::from_secs(3)).await;
+    //                 continue;
+    //             }
+    //             Err(e) => {
+    //                 trace!("Err:{:?}",e);
+    //                 break;
+    //             }
+    //         };
+    //     }
+    // });
+    //
+    //
+    // //socket 数据回显
+    // let t02 = tokio::spawn(async move {
+    //     loop {
+    //         tokio::time::sleep(Duration::from_secs(1)).await;
+    //         // tx.send("hai!!!".to_string()).await.unwrap();
+    //         if let Ok(received) = rx_read.try_recv() {
+    //             trace!("拿到 socket 的数据: {:?}", received);
+    //         }
+    //     }
+    // });
+    //
+    // //模拟用户动态发送
+    // let t03 = tokio::spawn(async move {
+    //     loop {
+    //         tokio::time::sleep(Duration::from_secs(1)).await;
+    //         tx_end.send("这里是 动态订阅".to_string()).await.unwrap();
+    //     }
+    // });
+    //
+    // try_join!(t_1,t_2,t02,t03).unwrap();
+}
+
+
+async fn demo_okx_rest() {
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+    btree_map.insert("access_key".to_string(), "".to_string());
+    btree_map.insert("secret_key".to_string(), "".to_string());
+    btree_map.insert("pass_key".to_string(), "".to_string());
+
+    let mut okx_rest = OkxSwapRest::new(false, btree_map);
+    let res_data = okx_rest.get_account("BTC".to_string()).await;
+    trace!("okx_rest-rest - get_account- {:?}", res_data);
 }
 }
 
 
 async fn demo_ws_gate() {
 async fn demo_ws_gate() {
@@ -123,6 +241,7 @@ fn demo_so() {
 
 
 
 
 async fn demo_ws_okx_pu() {
 async fn demo_ws_okx_pu() {
+
     let btree_map: BTreeMap<String, String> = BTreeMap::new();
     let btree_map: BTreeMap<String, String> = BTreeMap::new();
     let (tx, mut rx) = channel(1024);
     let (tx, mut rx) = channel(1024);
     let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Public, tx);
     let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Public, tx);
@@ -161,6 +280,7 @@ async fn demo_ws_okx_bu() {
 }
 }
 
 
 async fn demo_ws_kucoin_pr() {
 async fn demo_ws_kucoin_pr() {
+    let mut bool_v1 = Arc::new(AtomicBool::new(true));
     let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
     let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
     btree_map.insert("access_key".to_string(), "".to_string());
     btree_map.insert("access_key".to_string(), "".to_string());
     btree_map.insert("secret_key".to_string(), "".to_string());
     btree_map.insert("secret_key".to_string(), "".to_string());
@@ -168,18 +288,17 @@ async fn demo_ws_kucoin_pr() {
     trace!("----------------------btree_map{:?}", btree_map.clone());
     trace!("----------------------btree_map{:?}", btree_map.clone());
     let (tx, mut rx) = channel(1024);
     let (tx, mut rx) = channel(1024);
     let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(),
     let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(),
-                                       KucoinWsType::Private, tx).await;
+                                      KucoinWsType::Private, tx).await;
     ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]);
     ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]);
 
 
     let t1 = tokio::spawn(async move {
     let t1 = tokio::spawn(async move {
-        ku_ws.custom_subscribe(vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
+        ku_ws.custom_subscribe(bool_v1,vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
     });
     });
 
 
     let t2 = tokio::spawn(async move {
     let t2 = tokio::spawn(async move {
-        let mut stdout = std::io::stdout();
         loop {
         loop {
             if let Ok(received) = rx.try_recv() {
             if let Ok(received) = rx.try_recv() {
-                writeln!(stdout, "age: {:?}", received).unwrap();
+                trace!( "age: {:?}", received);
             }
             }
         }
         }
     });
     });
@@ -188,19 +307,22 @@ async fn demo_ws_kucoin_pr() {
 }
 }
 
 
 async fn demo_ws_kucoin_pu() {
 async fn demo_ws_kucoin_pu() {
+    let mut bool_v1 = Arc::new(AtomicBool::new(true));
     let btree_map: BTreeMap<String, String> = BTreeMap::new();
     let btree_map: BTreeMap<String, String> = BTreeMap::new();
     let (tx, mut rx) = channel(1024);
     let (tx, mut rx) = channel(1024);
     let mut ku_ws = KucoinSwapWs::new(false, btree_map, KucoinWsType::Public, tx).await;
     let mut ku_ws = KucoinSwapWs::new(false, btree_map, KucoinWsType::Public, tx).await;
-    ku_ws.set_subscribe(vec![KucoinSubscribeType::PuContractMarketLevel2Depth50]);
+    ku_ws.set_subscribe(vec![
+        KucoinSubscribeType::PuContractMarketLevel2Depth50,
+        KucoinSubscribeType::PuContractMarkettickerV2,
+    ]);
 
 
     let t1 = tokio::spawn(async move {
     let t1 = tokio::spawn(async move {
-        ku_ws.custom_subscribe(vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
+        ku_ws.custom_subscribe(bool_v1,vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
     });
     });
     let t2 = tokio::spawn(async move {
     let t2 = tokio::spawn(async move {
-        let mut stdout = std::io::stdout();
         loop {
         loop {
             if let Ok(received) = rx.try_recv() {
             if let Ok(received) = rx.try_recv() {
-                writeln!(stdout, "age: {:?}", received).unwrap();
+                trace!("age: {:?}", received);
             }
             }
         }
         }
     });
     });
@@ -215,20 +337,21 @@ async fn demo_rest_kucoin() {
     btree_map.insert("pass_key".to_string(), "".to_string());
     btree_map.insert("pass_key".to_string(), "".to_string());
 
 
     let mut kucoin_exc = KucoinSwapRest::new(false, btree_map);
     let mut kucoin_exc = KucoinSwapRest::new(false, btree_map);
-    let res_data = kucoin_exc.get_server_time().await;
-    trace!("kucoin_exc-rest - get_server_time- {:?}", res_data);
-    trace!("kucoin_exc-rest-get_delays{:?}", kucoin_exc.get_delays() );
-    trace!("kucoin_exc-rest -get_avg_delay{:?}", kucoin_exc.get_avg_delay());
+    // let res_data = kucoin_exc.get_server_time().await;
+    // trace!("kucoin_exc-rest - get_server_time- {:?}", res_data);
+    // trace!("kucoin_exc-rest-get_delays{:?}", kucoin_exc.get_delays() );
+    // trace!("kucoin_exc-rest -get_avg_delay{:?}", kucoin_exc.get_avg_delay());
     // let res_data = kucoin_exc.get_account("USDT".to_string()).await;
     // let res_data = kucoin_exc.get_account("USDT".to_string()).await;
     // trace!("kucoin_exc-rest - get_account- {:?}", res_data);
     // trace!("kucoin_exc-rest - get_account- {:?}", res_data);
-    // let res_data = kucoin_exc.get_position("XBTUSDM".to_string()).await;
+    // let res_data = kucoin_exc.get_position("XBT1USDM".to_string()).await;
     // trace!("kucoin_exc-rest - get_position- {:?}", res_data);
     // trace!("kucoin_exc-rest - get_position- {:?}", res_data);
     // let res_data = kucoin_exc.get_market_details().await;
     // let res_data = kucoin_exc.get_market_details().await;
     // trace!("kucoin_exc-rest - get_market_details- {:?}", res_data);
     // trace!("kucoin_exc-rest - get_market_details- {:?}", res_data);
     // let res_data = kucoin_exc.get_ticker("ROSEUSDTM".to_string()).await;
     // let res_data = kucoin_exc.get_ticker("ROSEUSDTM".to_string()).await;
     // trace!("kucoin_exc-rest - get_ticker- {:?}", res_data);
     // trace!("kucoin_exc-rest - get_ticker- {:?}", res_data);
-    // let res_data = kucoin_exc.get_orders("active".to_string(), "ROSEUSDTM".to_string()).await;
-    // trace!("kucoin_exc-rest - get_orders- {:?}", res_data);
+    let res_data = kucoin_exc.get_orders("active".to_string(),
+                                         "ROSEUSDTM".to_string()).await;
+    trace!("kucoin_exc-rest - get_orders- {:?}", res_data);
     // let res_data = kucoin_exc.get_positions("USDT".to_string()).await;
     // let res_data = kucoin_exc.get_positions("USDT".to_string()).await;
     // trace!("kucoin_exc-rest - get_positions- {:?}", res_data);
     // trace!("kucoin_exc-rest - get_positions- {:?}", res_data);
     // let res_data = kucoin_exc.get_orders_details("111".to_string(), "".to_string()).await;
     // let res_data = kucoin_exc.get_orders_details("111".to_string(), "".to_string()).await;
@@ -243,21 +366,19 @@ async fn demo_rest_kucoin() {
     //     "limit".to_string(),
     //     "limit".to_string(),
     // ).await;
     // ).await;
     // trace!("kucoin_exc-rest - swap_bazaar_order- {:?}
     // trace!("kucoin_exc-rest - swap_bazaar_order- {:?}
-    // let res_data = kucoin_exc.cancel_order("12312".to_string(), "".to_string()).await;
+    // let res_data = kucoin_exc.cancel_order("".to_string(), "999999".to_string()).await;
     // trace!("kucoin_exc-rest - cancel_order- {:?}", res_data);
     // trace!("kucoin_exc-rest - cancel_order- {:?}", res_data);
-
     // let res_data = kucoin_exc.get_public_token().await;
     // let res_data = kucoin_exc.get_public_token().await;
     // trace!("kucoin_exc-rest - get_public_token- {:?}", res_data);
     // trace!("kucoin_exc-rest - get_public_token- {:?}", res_data);
 }
 }
 
 
 async fn demo_rest_gate() {
 async fn demo_rest_gate() {
     let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
     let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
-    btree_map.insert("access_key".to_string(), "4181c882718a95e72122ac1d52c88533".to_string());
-    btree_map.insert("secret_key".to_string(), "de82d1507b843ff08d81a0e9b878b721359f274937216b307834b570b676fa3c".to_string());
+    btree_map.insert("access_key".to_string(), "".to_string());
+    btree_map.insert("secret_key".to_string(), "".to_string());
 
 
     let mut gate_exc = GateSwapRest::new(false, btree_map);
     let mut gate_exc = GateSwapRest::new(false, btree_map);
     let res_data = gate_exc.get_account("usdt".to_string()).await;
     let res_data = gate_exc.get_account("usdt".to_string()).await;
-
     trace!("gate-rest -账户信息{:?}", res_data);
     trace!("gate-rest -账户信息{:?}", res_data);
     trace!("gate-rest -get_delays{:?}", gate_exc.get_delays() );
     trace!("gate-rest -get_delays{:?}", gate_exc.get_delays() );
     trace!("gate-rest -get_avg_delay{:?}", gate_exc.get_avg_delay());
     trace!("gate-rest -get_avg_delay{:?}", gate_exc.get_avg_delay());
@@ -271,7 +392,7 @@ async fn demo_rest_gate() {
     // trace!("gate-rest -get_server_time{:?}", res_data);
     // trace!("gate-rest -get_server_time{:?}", res_data);
     // let res_data = gate_exc.get_order_details("usdt".to_string(), "11335522".to_string()).await;
     // let res_data = gate_exc.get_order_details("usdt".to_string(), "11335522".to_string()).await;
     // trace!("gate-rest -get_order_details{:?}", res_data);
     // trace!("gate-rest -get_order_details{:?}", res_data);
-    // let res_data = gate_exc.get_orders("usdt".to_string(), "open".to_string()).await;
+    // let res_data = gate_exc.get_orders("usd1t".to_string(), "open".to_string()).await;
     // trace!("gate-rest -get_orders{:?}", res_data);
     // trace!("gate-rest -get_orders{:?}", res_data);
     // let params = serde_json::json!({
     // let params = serde_json::json!({
     //         "contract":"CYBER_USDT",
     //         "contract":"CYBER_USDT",
@@ -311,41 +432,69 @@ async fn demo_rest_gate() {
     //     "t-my-custom-id-001".to_string(),
     //     "t-my-custom-id-001".to_string(),
     // ).await;
     // ).await;
     // trace!("gate-rest -order{:?}", res_data);
     // trace!("gate-rest -order{:?}", res_data);
+
+
+    let res_data = gate_exc.my_trades("usdt".to_string()).await;
+    trace!("gate-rest -my_trades{:?}", res_data);
+    let res_data = gate_exc.account_book("usdt".to_string()).await;
+    trace!("gate-rest -account_book{:?}", res_data);
+
 }
 }
 
 
 async fn demo_rest_ba() {
 async fn demo_rest_ba() {
     let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
     let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
-    btree_map.insert("access_key".to_string(), "Z2KWEFuiTR26d0OrRh0UVnALiKwftEGKdaz6AHU7xAqZqkKwhb8OPWlG5uAGqGBI".to_string());
-    btree_map.insert("secret_key".to_string(), "w3HvCw17OHEDjpMwlfywtl3tLAfSOql81nXgLk4HTA2yQ4Qph0ilPkKiLgOOY7tQ".to_string());
+    btree_map.insert("access_key".to_string(), "".to_string());
+    btree_map.insert("secret_key".to_string(), "".to_string());
 
 
     // let ba_exc = BinanceUsdtSwapRest::new(false, btree_map);
     // let ba_exc = BinanceUsdtSwapRest::new(false, btree_map);
     // let res_data = ba_exc.get_account().await;
     // let res_data = ba_exc.get_account().await;
     let mut ba_exc = BinanceSwapRest::new(false, btree_map);
     let mut ba_exc = BinanceSwapRest::new(false, btree_map);
-    let res_data = ba_exc.get_account().await;
+    // let res_data = ba_exc.get_server_time().await;
+    // trace!("币安-rest - get_server_time{:?}", res_data);
+    // let res_data = ba_exc.get_exchange_info().await;
+    // trace!("币安-restget_ get_exchange_info{:?}", res_data);
 
 
 
 
+    let res_data = ba_exc.get_account().await;
     trace!("币安-rest-获取账户信息{:?}", res_data);
     trace!("币安-rest-获取账户信息{:?}", res_data);
-    trace!("币安-rest- -get_delays{:?}", ba_exc.get_delays() );
-    trace!("币安-rest--get_avg_delay{:?}", ba_exc.get_avg_delay());
+    // trace!("币安-rest- -get_delays{:?}", ba_exc.get_delays() );
+    // trace!("币安-rest--get_avg_delay{:?}", ba_exc.get_avg_delay());
+
+    // let res_data = ba_exc.get_order("BTCUSDT".to_string(), 131, "".to_string()).await;
+    // trace!("币安-rest--get_order{:?}", res_data);
+    // let res_data = ba_exc.get_open_orders("BTCUSDT".to_string()).await;
+    // trace!("币安-rest--get_open_orders{:?}", res_data);
+    // let timestamp_start = chrono::Utc::now().timestamp_millis() - 60 * 100 * 1000;
+    // let timestamp_end = chrono::Utc::now().timestamp_millis() +  60 * 100 * 1000;
+    // let res_data = ba_exc.get_all_orders("BTCUSDT".to_string(), 1000, timestamp_start, timestamp_end).await;
+    // trace!("币安-rest--get_all_orders{:?}", res_data);
+    // let res_data = ba_exc.get_book_ticker("BTCUSDT".to_string()).await;
+    // trace!("币安-rest--get_book_ticker{:?}", res_data);
+    // let res_data = ba_exc.get_position_risk("BTCUS1DT".to_string()).await;
+    // trace!("币安-rest--get_position_risk{:?}", res_data);
+    // let res_data = ba_exc.change_pos_side(true).await;
+    // trace!("币安-rest--change_pos_side{:?}", res_data);
+    // let res_data = ba_exc.cancel_order("BTCUSDT".to_string(), 3312331, "".to_string()).await;
+    // trace!("币安-rest--cancel_order{:?}", res_data);
 }
 }
 
 
 async fn demo_pub_ws_ba() {
 async fn demo_pub_ws_ba() {
-    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
-    btree_map.insert("lable".parse().unwrap(), "binance".parse().unwrap());//交易行名称
-    let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
-    let ba_exc = BinanceUsdtSwapWs::new(false, btree_map, tx);
-
+    let mut bool_v1 = Arc::new(AtomicBool::new(true));
+    let btree_map: BTreeMap<String, String> = BTreeMap::new();
+    let (tx, mut rx) = channel(1024);
+    let mut binance_ws = BinanceSwapWs::new(false, btree_map, BinanceWsType::PublicAndPrivate, tx);
+    binance_ws.set_subscribe(vec![
+        BinanceSubscribeType::PuAggTrade,
+        BinanceSubscribeType::PuDepth20levels100ms,
+        BinanceSubscribeType::PuBookTicker,
+    ]);
     let t1 = tokio::spawn(async move {
     let t1 = tokio::spawn(async move {
-        ba_exc.custom_subscribe(vec!["BTCUSDT".to_string(),
-                                     "ROSEUSDT".to_string()], 1, 1).await;
+        binance_ws.custom_subscribe(bool_v1,vec!["BTCUSDT".to_string()]).await;
     });
     });
-
-
     let t2 = tokio::spawn(async move {
     let t2 = tokio::spawn(async move {
-        let mut stdout = std::io::stdout();
         loop {
         loop {
             if let Ok(received) = rx.try_recv() {
             if let Ok(received) = rx.try_recv() {
-                writeln!(stdout, "age: {:?}", received).unwrap();
+                trace!( "age: {:?}", received);
             }
             }
         }
         }
     });
     });
@@ -359,41 +508,41 @@ fn demo_get_http_proxy() {
 }
 }
 
 
 
 
-/*********************web服务*/
-fn demo_http() {
-    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
-    for stream in listener.incoming() {
-        let stream = stream.unwrap();
-
-        handle_connection(TcpStream::try_from(stream).unwrap());
-    }
-}
-
-
-fn handle_connection(mut stream: TcpStream) {
-    let buf_reader = BufReader::new(&mut stream);
-    let http_request: Vec<_> = buf_reader
-        .lines()
-        .map(|result| result.unwrap())
-        .take_while(|line| !line.is_empty())
-        .collect();
-    trace!("Request: {:#?}", http_request);
-    trace!("Request2: {:#?}", http_request[0]);
-    trace!("Request3: {:#?}", http_request[1]);
-
-    let (status_line, filename) = if http_request[0] == "GET / HTTP/1.1" {
-        ("HTTP/1.1 200 OK", "hello.html")
-    } else {
-        ("HTTP/1.1 404 NOT FOUND", "404.html")
-    };
-
-    let status_line = "HTTP/1.1 200 OK";
-    let contents = fs::read_to_string("src/404.html").unwrap();
-    let length = contents.len();
-
-    let response =
-        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
-    // let response = "HTTP/1.1 200 OK\r\n\r\nccccc";
-
-    stream.write_all(response.as_bytes()).unwrap();
-}
+// /*********************web服务*/
+// fn demo_http() {
+//     let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
+//     for stream in listener.incoming() {
+//         let stream = stream.unwrap();
+//
+//         handle_connection(TcpStream::try_from(stream).unwrap());
+//     }
+// }
+//
+//
+// fn handle_connection(mut stream: TcpStream) {
+//     let buf_reader = BufReader::new(&mut stream);
+//     let http_request: Vec<_> = buf_reader
+//         .lines()
+//         .map(|result| result.unwrap())
+//         .take_while(|line| !line.is_empty())
+//         .collect();
+//     trace!("Request: {:#?}", http_request);
+//     trace!("Request2: {:#?}", http_request[0]);
+//     trace!("Request3: {:#?}", http_request[1]);
+//
+//     let (status_line, filename) = if http_request[0] == "GET / HTTP/1.1" {
+//         ("HTTP/1.1 200 OK", "hello.html")
+//     } else {
+//         ("HTTP/1.1 404 NOT FOUND", "404.html")
+//     };
+//
+//     let status_line = "HTTP/1.1 200 OK";
+//     let contents = fs::read_to_string("src/404.html").unwrap();
+//     let length = contents.len();
+//
+//     let response =
+//         format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
+//     // let response = "HTTP/1.1 200 OK\r\n\r\nccccc";
+//
+//     stream.write_all(response.as_bytes()).unwrap();
+// }