Ver Fonte

加入kucoin,可以与py进行完整对比测试。

skyfffire há 2 anos atrás
pai
commit
51b6a30f39
3 ficheiros alterados com 78 adições e 43 exclusões
  1. 2 1
      strategy/Cargo.toml
  2. 1 1
      strategy/src/binance_usdt_swap.rs
  3. 75 41
      strategy/src/kucoin_swap.rs

+ 2 - 1
strategy/Cargo.toml

@@ -21,4 +21,5 @@ global = { path = "../global" }
 exchanges = { path = "../exchanges" }
 
 futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
-futures-channel = "0.3.28"
+futures-channel = "0.3.28"
+tokio-tungstenite= { git = "https://github.com/HonestHouLiang/tokio-tungstenite.git",rev = "208fc9b09bcc2e2c8cb52e1cde5087446464fc91"  }

+ 1 - 1
strategy/src/binance_usdt_swap.rs

@@ -49,7 +49,7 @@ pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_a
         });
 
         // 链接
-        ws.ws_connect_async(write_rx, read_tx).await.expect("链接失败");
+        ws.ws_connect_async(bool_v1, write_rx, &read_tx).await.expect("链接失败");
     });
 }
 

+ 75 - 41
strategy/src/kucoin_swap.rs

@@ -1,53 +1,56 @@
-use std::collections::BTreeMap;
+use std::collections::{BTreeMap, HashSet};
 use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
+use futures_util::StreamExt;
+
 use rust_decimal::Decimal;
+use tokio_tungstenite::tungstenite::Message;
 use tokio::spawn;
-use tokio::sync::mpsc::channel;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
-use tracing::{error};
-use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
+use tracing::info;
+
+use exchanges::kucoin_swap_ws_async::{KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::KucoinSwap;
+
 use crate::model::{OrderInfo, OriginalTradeGa};
 use crate::quant::Quant;
 
 // 1交易、0参考 kucoin 合约 启动
 pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
-    let (tx, mut rx) = channel(100);
     let symbols_clone = symbols.clone();
     let mut symbol_arr = Vec::new();
-    for symbol in symbols_clone{
+    for symbol in symbols_clone {
         let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSwap,symbol.as_str());
         let new_symbol = symbol_mapper.replace("_", "").to_uppercase() + "M";
         symbol_arr.push(new_symbol);
     }
-    spawn( async move {
-        let mut kucoin_exc;
-        // 交易
-        if type_num == 1 {
-            kucoin_exc = KucoinSwapWs::new_label(name, false, exchange_params, KucoinWsType::Private, tx).await;
-            kucoin_exc.set_subscribe(vec![
-                KucoinSubscribeType::PuContractMarketLevel2Depth50,
-                // KucoinSubscribeType::PuContractMarkettickerV2,
-                KucoinSubscribeType::PrContractAccountWallet,
-                KucoinSubscribeType::PrContractPosition,
-                KucoinSubscribeType::PrContractMarketTradeOrders
-            ]);
-        } else { // 参考
-            kucoin_exc = KucoinSwapWs::new_label(name, false, exchange_params, KucoinWsType::Public, tx).await;
-            kucoin_exc.set_subscribe(vec![
-                KucoinSubscribeType::PuContractMarketLevel2Depth50,
-                // TODO: python注释掉了
-                // KucoinSubscribeType::PuContractMarkettickerV2,
-                KucoinSubscribeType::PuContractMarketExecution
-            ]);
-        }
-        kucoin_exc.custom_subscribe(bool_v1, symbol_arr).await;
-    });
+    // spawn( async move {
+    //     let mut kucoin_exc;
+    //     // 交易
+    //     if type_num == 1 {
+    //         kucoin_exc = KucoinSwapWs::new_label(name, false, exchange_params, KucoinWsType::Private, tx).await;
+    //         kucoin_exc.set_subscribe(vec![
+    //             KucoinSubscribeType::PuContractMarketLevel2Depth50,
+    //             // KucoinSubscribeType::PuContractMarkettickerV2,
+    //             KucoinSubscribeType::PrContractAccountWallet,
+    //             KucoinSubscribeType::PrContractPosition,
+    //             KucoinSubscribeType::PrContractMarketTradeOrders
+    //         ]);
+    //     } else { // 参考
+    //         kucoin_exc = KucoinSwapWs::new_label(name, false, exchange_params, KucoinWsType::Public, tx).await;
+    //         kucoin_exc.set_subscribe(vec![
+    //             KucoinSubscribeType::PuContractMarketLevel2Depth50,
+    //             // TODO: python注释掉了
+    //             // KucoinSubscribeType::PuContractMarkettickerV2,
+    //             KucoinSubscribeType::PuContractMarketExecution
+    //         ]);
+    //     }
+    //     kucoin_exc.custom_subscribe(bool_v1, symbol_arr).await;
+    // });
 
     // 新增获取余额的协程
     let account_quant_arc = quant_arc.clone();
@@ -62,25 +65,56 @@ pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
             }
         }
     });
+    //创建读写通道
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+    let mut ws = KucoinSwapWs::new_label(name, false, None, KucoinSwapWsType::Public).await;
+    ws.set_symbols(vec!["xbt_usdtM".to_string()]);
+    ws.set_subscribe(vec![
+        KucoinSwapSubscribeType::PuContractMarketLevel2Depth50
+    ]);
+    let subscription = ws.get_subscription();
+    let subscription_v: Vec<String> = subscription.into_iter().collect::<HashSet<String>>().into_iter().collect();
 
-    spawn(async move {
+
+    //模拟业务场景 开启链接
+    let bool_v1_clone = Arc::clone(&bool_v1);
+    let t2 = tokio::spawn(async move {
+        ws.ws_connect_async(bool_v1_clone, write_rx, &read_tx).await.unwrap();
+        info!("ws_connect_async 完成");
+    });
+
+    //模拟用户主动写入数据
+    let bool_v2_clone = Arc::clone(&bool_v1);
+    let t3 = tokio::spawn(async move {
+        //模拟 链接之后 服务器响应,可以开始订阅
+        tokio::time::sleep(Duration::from_millis(5000)).await;
+        let bool_v2_v = bool_v2_clone.load(Ordering::SeqCst);
+        bool_v2_clone.store(!bool_v2_v, Ordering::SeqCst);
+        for sub in &subscription_v {
+            info!("--发起订阅:{:?}", sub);
+            write_tx.unbounded_send(Message::Text(sub.parse().unwrap())).unwrap();
+        }
+        tokio::time::sleep(Duration::from_millis(3000)).await;
+
+        //模拟心跳
+        loop {
+            tokio::time::sleep(Duration::from_millis(5000)).await;
+            write_tx.unbounded_send(Message::Ping(Vec::from("ping"))).unwrap();
+        }
+    });
+
+    let t1 = spawn(async move {
         let bot_arc_clone = Arc::clone(&quant_arc);
         // let run_symbol = symbols.clone()[0].clone();
         // trade
         let mut max_buy = Decimal::ZERO;
         let mut min_sell = Decimal::ZERO;
         let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
-        loop {
-            sleep(Duration::from_nanos(1)).await;
 
-            match rx.recv().await {
-                Some(data) => {
-                    on_data(bot_arc_clone.clone(), multiplier, &mut max_buy, &mut min_sell, data).await;
-                },
-                None => {
-                    error!("交易所通道错误");
-                    break;
-                }
+        loop {
+            if let Some(data) = read_rx.next().await {
+                on_data(bot_arc_clone.clone(), multiplier, &mut max_buy, &mut min_sell, data).await;
             }
         }
     });