Przeglądaj źródła

币安ok 粗版本,待逻辑测试

skyfffire 1 rok temu
rodzic
commit
52ae36507a

+ 1 - 1
exchanges/src/binance_swap_ws.rs

@@ -411,7 +411,7 @@ impl BinanceSwapWs {
             }
         } else if json_value.get("e").is_some() {
             //推送私有数据
-            res_data.data = json_value["data"].clone();
+            res_data.data = json_value.clone();
             res_data.code = 200;
 
             let channel = json_value["e"].as_str().unwrap();

+ 91 - 72
exchanges/tests/binance_swap_test.rs

@@ -1,7 +1,7 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
-
+use std::time::Duration;
 use serde_json::json;
 use tokio::sync::Mutex;
 use tracing::trace;
@@ -20,86 +20,105 @@ async fn ws_custom_subscribe() {
     global::log_utils::init_log_with_trace();
 
 
-    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
-
-    // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
-    // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
+    let tcc = tokio::spawn(async move {
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
 
+        // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
+        // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
 
-    let write_tx_am = Arc::new(Mutex::new(write_tx));
-    let is_shutdown_arc = Arc::new(AtomicBool::new(true));
 
-    //读取
-    let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
-    let _tr = tokio::spawn(async move {
-        trace!("线程-数据读取-开启");
-        loop {
-            // 从通道中接收并丢弃所有的消息,直到通道为空
-            while let Ok(Some(_)) = read_rx.try_next() {
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        let is_shutdown_arc = Arc::new(AtomicBool::new(true));
 
+        //读取
+        let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
+        let _tr = tokio::spawn(async move {
+            trace!("线程-数据读取-开启");
+            loop {
                 // 从通道中接收并丢弃所有的消息,直到通道为空
                 while let Ok(Some(_)) = read_rx.try_next() {
-                    // 消息被忽略
+
+                    // 从通道中接收并丢弃所有的消息,直到通道为空
+                    while let Ok(Some(_)) = read_rx.try_next() {
+                        // 消息被忽略
+                    }
                 }
             }
-        }
-        // trace!("线程-数据读取-结束");
+            // trace!("线程-数据读取-结束");
+        });
+
+        //写数据
+        // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
+        // let write_tx_clone = Arc::clone(&write_tx_am);
+        // let su = ws.get_subscription();
+        // let tw = tokio::spawn(async move {
+        //     trace!("线程-数据写入-开始");
+        //     loop {
+        //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+        //         // let close_frame = CloseFrame {
+        //         //     code: CloseCode::Normal,
+        //         //     reason: Cow::Borrowed("Bye bye"),
+        //         // };
+        //         // let message = Message::Close(Some(close_frame));
+        //
+        //
+        //         let message = Message::Text(su.clone());
+        //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+        //         trace!("发送指令成功");
+        //     }
+        //     trace!("线程-数据写入-结束");
+        // });
+
+        let fun = move |data: ResponseData| {
+            async move {
+                trace!("---传入的方法~~~~{:?}", data);
+            }
+        };
+        let param = BinanceSwapLogin {
+            api_key: ACCESS_KEY.to_string(),
+            api_secret: SECRET_KEY.to_string(),
+        };
+        let t1 = tokio::spawn(async move {
+            let mut ws = get_ws(Option::from(param), BinanceSwapWsType::Private).await;
+            ws.set_symbols(vec!["BTC_USDT".to_string(), "ETC_USDT".to_string()]);
+            ws.set_subscribe(vec![
+                // BinanceSwapSubscribeType::PuBookTicker,
+                // BinanceSwapSubscribeType::PuAggTrade,
+                // BinanceSwapSubscribeType::PuDepth20levels100ms,
+
+                BinanceSwapSubscribeType::PrAccount,
+                BinanceSwapSubscribeType::PrBalance,
+                BinanceSwapSubscribeType::PrPosition
+            ]);
+            //链接
+            let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+            ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+            trace!("test 唯一线程结束--");
+        });
+        tokio::try_join!(t1).unwrap();
+        trace!("当此结束");
+        trace!("重启!");
+        trace!("参考交易所关闭");
+        return;
     });
 
-    //写数据
-    // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
-    // let write_tx_clone = Arc::clone(&write_tx_am);
-    // let su = ws.get_subscription();
-    // let tw = tokio::spawn(async move {
-    //     trace!("线程-数据写入-开始");
-    //     loop {
-    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
-    //         // let close_frame = CloseFrame {
-    //         //     code: CloseCode::Normal,
-    //         //     reason: Cow::Borrowed("Bye bye"),
-    //         // };
-    //         // let message = Message::Close(Some(close_frame));
-    //
-    //
-    //         let message = Message::Text(su.clone());
-    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
-    //         trace!("发送指令成功");
-    //     }
-    //     trace!("线程-数据写入-结束");
-    // });
-
-    let fun = move |data: ResponseData| {
-        async move {
-            trace!("---传入的方法~~~~{:?}", data);
-        }
-    };
-    let param = BinanceSwapLogin {
-        api_key: ACCESS_KEY.to_string(),
-        api_secret: SECRET_KEY.to_string(),
-    };
-    let t1 = tokio::spawn(async move {
-        let mut ws = get_ws(Option::from(param), BinanceSwapWsType::Public).await;
-        ws.set_symbols(vec!["BTC_USDT".to_string(), "ETC_USDT".to_string()]);
-        ws.set_subscribe(vec![
-            BinanceSwapSubscribeType::PuBookTicker,
-            // BinanceSwapSubscribeType::PuAggTrade,
-            // BinanceSwapSubscribeType::PuDepth20levels100ms,
-
-            // BinanceSwapSubscribeType::PrAccount,
-            // BinanceSwapSubscribeType::PrBalance,
-            // BinanceSwapSubscribeType::PrPosition
-        ]);
-        //链接
-        let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-        ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        trace!("test 唯一线程结束--");
-    });
-    tokio::try_join!(t1).unwrap();
-    trace!("当此结束");
-    trace!("重启!");
-    trace!("参考交易所关闭");
-    return;
+
+    tokio::time::sleep(Duration::from_millis(10 * 1000)).await;
+    let mut rest = get_rest();
+    let ttt = chrono::Utc::now().timestamp_millis() + 6000000;
+    let rep_data = rest.swap_order(json!({
+          "symbol":"CFXUSDT",
+          "side":"BUY",
+          "positionSide":"LONG",
+          "type":"LIMIT",
+          "quantity":50,
+          "price":0.11,
+          "timeInForce":"GTD",
+         "goodtilldate":ttt
+    })).await;
+    trace!(?rep_data);
+    trace!("1111111111");
 }
 
 //rest-获取服务器时间
@@ -251,7 +270,7 @@ async fn rest_swap_order_test() {
     global::log_utils::init_log_with_trace();
 
     let mut rest = get_rest();
-    let ttt =  chrono::Utc::now().timestamp_millis()+6000000;
+    let ttt = chrono::Utc::now().timestamp_millis() + 6000000;
     let rep_data = rest.swap_order(json!({
           "symbol":"CFXUSDT",
           "side":"BUY",