Эх сурвалжийг харах

编译成功通过,等待打通,工程化。

skyfffire 1 жил өмнө
parent
commit
ee4f77b298

+ 6 - 6
exchanges/src/binance_spot_ws.rs

@@ -223,7 +223,7 @@ impl BinanceSpotWs {
                                 subscription: String)
     {
         // info!("走代理-链接成功!开始数据读取");
-        // let lable = self.label.clone();
+        // let label = self.label.clone();
         // /*****订阅消息**/
         // if subscription.len() > 0 {
         //     web_socket.write_message(Message::Text(subscription)).unwrap();
@@ -235,7 +235,7 @@ impl BinanceSpotWs {
         //     match msg {
         //         Ok(Message::Text(text)) => {
         //             // trace!("获取推送:{}",text.clone());
-        //             let mut res_data = Self::ok_text(lable.to_string(), text);
+        //             let mut res_data = Self::ok_text(label.to_string(), text);
         //             res_data.time = get_time_microsecond();
         //             if res_data.code == "-200" {
         //                 trace!("订阅成功:{:?}", res_data.data);
@@ -282,7 +282,7 @@ impl BinanceSpotWs {
                           subscription: String)
     {
         // info!("链接成功!开始数据读取");
-        // let lable = self.label.clone();
+        // let label = self.label.clone();
         // /*****订阅消息**/
         // if subscription.len() > 0 {
         //     web_socket.write_message(Message::Text(subscription.clone())).unwrap();
@@ -294,7 +294,7 @@ impl BinanceSpotWs {
         //     match msg {
         //         Ok(Message::Text(text)) => {
         //             // trace!("获取推送:{}",text.clone());
-        //             let mut res_data = Self::ok_text(lable.to_string(), text);
+        //             let mut res_data = Self::ok_text(label.to_string(), text);
         //             res_data.time = get_time_microsecond();
         //             if res_data.code == "-200" {
         //                 trace!("订阅成功:{:?}", subscription.clone());
@@ -336,11 +336,11 @@ impl BinanceSpotWs {
     }
 
     //数据解析
-    pub fn ok_text(lable: String, text: String) -> ResponseData
+    pub fn ok_text(label: String, text: String) -> ResponseData
     {
         // trace!("原始数据");
         // trace!(?text);
-        let mut res_data = ResponseData::new(lable, "200".to_string(), "success".to_string(), "".to_string());
+        let mut res_data = ResponseData::new(label, "200".to_string(), "success".to_string(), "".to_string());
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
 
         if json_value.get("result").is_some() &&

+ 1 - 1
exchanges/src/binance_swap_ws.rs

@@ -327,7 +327,7 @@
 //     }
 //
 //     //数据解析
-//     pub fn ok_text(lable: String, text: String) -> ResponseData
+//     pub fn ok_text(label: String, text: String) -> ResponseData
 //     {
 //         // trace!("原始数据");
 //         // trace!(?text);

+ 5 - 5
exchanges/src/binance_swap_ws_async.rs

@@ -32,7 +32,7 @@ pub struct BinanceSwapLogin {
 #[derive(Clone)]
 pub struct BinanceSwapWs {
     //类型
-    lable: String,
+    label: String,
     //地址
     address_url: String,
     //账号
@@ -52,7 +52,7 @@ impl BinanceSwapWs {
     pub fn new(is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
         return BinanceSwapWs::new_label("default-BinanceSwapWs".to_string(), is_colo, login_param, ws_type);
     }
-    pub fn new_label(lable: String, is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
+    pub fn new_label(label: String, is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
         /*******公共频道-私有频道数据组装*/
         let address_url = match ws_type {
             BinanceSwapWsType::PublicAndPrivate => {
@@ -67,7 +67,7 @@ impl BinanceSwapWs {
             info!("走普通通道:{}",address_url);
         }
         BinanceSwapWs {
-            lable,
+            label,
             address_url,
             login_param,
             symbol_s: vec![],
@@ -217,7 +217,7 @@ impl BinanceSwapWs {
         let login_is = self.contains_pr();
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
-        let lable = self.lable.clone();
+        let label = self.label.clone();
         let heartbeat_time = self.heartbeat_time.clone();
 
 
@@ -243,7 +243,7 @@ impl BinanceSwapWs {
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
             AbstractWsMode::ws_connect_async(address_url.clone(),
-                                             lable.clone(), subscribe_array,
+                                             label.clone(), subscribe_array,
                                              write_rx, read_tx, BinanceSwapWs::analysis_message,
             ).await.expect("币安");
             trace!("线程-异步链接-结束");

+ 1 - 1
exchanges/src/kucoin_spot_ws.rs

@@ -469,7 +469,7 @@
 //     }
 //
 //     //数据解析
-//     pub fn ok_text(lable: String, text: String) -> ResponseData
+//     pub fn ok_text(label: String, text: String) -> ResponseData
 //     {
 //         let mut res_data = ResponseData::new(lable, "200".to_string(), "success".to_string(), "".to_string());
 //         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();

+ 1 - 1
exchanges/src/kucoin_swap_ws.rs

@@ -479,7 +479,7 @@
 //     }
 //
 //     //数据解析
-//     pub fn ok_text(lable: String, text: String) -> ResponseData
+//     pub fn ok_text(label: String, text: String) -> ResponseData
 //     {
 //         let mut res_data = ResponseData::new(lable, "200".to_string(), "success".to_string(), "".to_string());
 //         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();

+ 5 - 5
exchanges/src/kucoin_swap_ws_async.rs

@@ -49,7 +49,7 @@ pub struct KucoinSwapLogin {
 #[derive(Clone)]
 pub struct KucoinSwapWs {
     //类型
-    lable: String,
+    label: String,
     //地址
     address_url: String,
     //账号信息
@@ -71,7 +71,7 @@ impl KucoinSwapWs {
     pub async fn new(is_colo: bool, login_param: Option<KucoinSwapLogin>, ws_type: KucoinSwapWsType) -> KucoinSwapWs {
         return Self::new_label("default-KucoinSwapWs".to_string(), is_colo, login_param, ws_type).await;
     }
-    pub async fn new_label(lable: String, is_colo: bool, login_param: Option<KucoinSwapLogin>, ws_type: KucoinSwapWsType) -> KucoinSwapWs {
+    pub async fn new_label(label: String, is_colo: bool, login_param: Option<KucoinSwapLogin>, ws_type: KucoinSwapWsType) -> KucoinSwapWs {
         /*******公共频道-私有频道数据组装*/
         let mut ws_param = KucoinSwapWsParam {
             token: "".to_string(),
@@ -101,7 +101,7 @@ impl KucoinSwapWs {
         }
 
         KucoinSwapWs {
-            lable,
+            label,
             address_url,
             login_param,
             ws_param,
@@ -354,7 +354,7 @@ impl KucoinSwapWs {
         let login_is = self.contains_pr();
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
-        let lable = self.lable.clone();
+        let label = self.label.clone();
         let heartbeat_time = self.heartbeat_time.clone();
 
         //心跳-- 方法内部线程启动
@@ -378,7 +378,7 @@ impl KucoinSwapWs {
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
             AbstractWsMode::ws_connect_async(address_url.clone(),
-                                             lable.clone(), subscribe_array,
+                                             label.clone(), subscribe_array,
                                              write_rx, read_tx, KucoinSwapWs::analysis_message,
             ).await.expect("kucoin");
             trace!("线程-异步链接-结束");

+ 1 - 1
exchanges/src/okx_swap_ws.rs

@@ -442,7 +442,7 @@
 //     }
 //
 //     //数据解析
-//     pub fn ok_text(lable: String, text: String) -> ResponseData
+//     pub fn ok_text(label: String, text: String) -> ResponseData
 //     {
 //         // trace!("元数据:{}",text);
 //         let mut res_data = ResponseData::new(lable, "200".to_string(), "success".to_string(), "".to_string());

+ 3 - 2
strategy/src/binance_usdt_swap.rs

@@ -28,8 +28,9 @@ pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_a
 
     tokio::spawn(async move {
         //创建读写通道
-        let (_write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
         let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(std::sync::Mutex::new(write_tx));
         let mut ws = BinanceSwapWs::new_label(name, false, None, BinanceSwapWsType::PublicAndPrivate);
 
         //读取数据
@@ -49,7 +50,7 @@ pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_a
         });
 
         // 链接
-        ws.ws_connect_async(bool_v1, write_rx, &read_tx).await.expect("链接失败");
+        ws.ws_connect_async(bool_v1, &write_tx_am, write_rx, read_tx).await.expect("链接失败");
     });
 }
 

+ 27 - 27
strategy/src/bitget_spot.rs

@@ -7,7 +7,7 @@ use tokio::spawn;
 use tokio::sync::mpsc::channel;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
-use exchanges::bitget_spot_ws::{BitgetSpotWs, BitgetSubscribeType, BitgetWsType};
+// use exchanges::bitget_spot_ws::{BitgetSpotWs, BitgetSubscribeType, BitgetWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::BitgetSpot;
@@ -24,32 +24,32 @@ pub async fn bitget_spot_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
     let t0 = tx.clone();
     let b0 = bool_v1.clone();
     let s0 = symbols_1.clone();
-    spawn(async move {
-        let mut bit_exc_public = BitgetSpotWs::new_label(name0, false, ep0, BitgetWsType::Public, t0);
-        bit_exc_public.set_subscribe(vec![
-            BitgetSubscribeType::PuTrade,
-            BitgetSubscribeType::PuBooks5
-        ]);
-        bit_exc_public.custom_subscribe(b0, s0).await;
-    });
-
-    // 交易交易所需要再开一个私有频道
-    if type_num == 1 {
-        let name1 = name.clone();
-        let ep1 = exchange_params.clone();
-        let t1 = tx.clone();
-        let b1 = bool_v1.clone();
-        let s1 = symbols_1.clone();
-        spawn( async move {
-            let mut bit_exc_private = BitgetSpotWs::new_label(name1, false, ep1, BitgetWsType::Private, t1);
-            // 交易
-            bit_exc_private.set_subscribe(vec![
-                BitgetSubscribeType::PrAccount,
-                BitgetSubscribeType::PrOrders
-            ]);
-            bit_exc_private.custom_subscribe(b1, s1).await;
-        });
-    }
+    // spawn(async move {
+    //     let mut bit_exc_public = BitgetSpotWs::new_label(name0, false, ep0, BitgetWsType::Public, t0);
+    //     bit_exc_public.set_subscribe(vec![
+    //         BitgetSubscribeType::PuTrade,
+    //         BitgetSubscribeType::PuBooks5
+    //     ]);
+    //     bit_exc_public.custom_subscribe(b0, s0).await;
+    // });
+    //
+    // // 交易交易所需要再开一个私有频道
+    // if type_num == 1 {
+    //     let name1 = name.clone();
+    //     let ep1 = exchange_params.clone();
+    //     let t1 = tx.clone();
+    //     let b1 = bool_v1.clone();
+    //     let s1 = symbols_1.clone();
+    //     spawn( async move {
+    //         let mut bit_exc_private = BitgetSpotWs::new_label(name1, false, ep1, BitgetWsType::Private, t1);
+    //         // 交易
+    //         bit_exc_private.set_subscribe(vec![
+    //             BitgetSubscribeType::PrAccount,
+    //             BitgetSubscribeType::PrOrders
+    //         ]);
+    //         bit_exc_private.custom_subscribe(b1, s1).await;
+    //     });
+    // }
 
     // 新增获取余额的协程
     let account_quant_arc = quant_arc.clone();

+ 37 - 40
strategy/src/gate_swap.rs

@@ -3,14 +3,11 @@ use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 use rust_decimal::Decimal;
-use serde_json::Value;
-use tracing::{info};
 use tokio::spawn;
-use tokio::sync::mpsc::channel;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
-use exchanges::gate_swap_rest::GateSwapRest;
-use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
+use tokio::sync::mpsc::channel;
+// use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::GateSwap;
@@ -20,41 +17,41 @@ use crate::quant::Quant;
 // 1交易、0参考 gate 合约 启动
 pub async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
     let (tx, mut rx) = channel(100);
-    let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
-    let mut user_id= "".to_string();
-    let symbols_one = symbols.clone();
-
-    // 交易
-    if type_num == 1{
-        // 获取user_id
-        let res_data = gate_exc.wallet_fee().await;
-        assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
-
-        let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
-        info!(?wallet_obj);
-        user_id = wallet_obj["user_id"].to_string();
-    }
-
-    spawn( async move {
-        let mut gate_exc = GateSwapWs::new_label(name, false, exchange_params,
-                                                 GateWsType::PublicAndPrivate("usdt".to_string()), tx);
-        // 交易
-        if type_num == 1 {
-            gate_exc.set_subscribe(vec![
-                GateSubscribeType::PuFuturesTrades,
-                GateSubscribeType::PuFuturesOrderBook,
-                GateSubscribeType::PrFuturesOrders(user_id.clone()),
-                GateSubscribeType::PrFuturesPositions(user_id.clone()),
-                GateSubscribeType::PrFuturesBalances(user_id.clone()),
-            ]);
-        } else { // 参考
-            gate_exc.set_subscribe(vec![
-                GateSubscribeType::PuFuturesTrades,
-                GateSubscribeType::PuFuturesOrderBook
-            ]);
-        }
-        gate_exc.custom_subscribe(bool_v1,symbols_one).await;
-    });
+    // let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
+    // let mut user_id= "".to_string();
+    // let symbols_one = symbols.clone();
+    //
+    // // 交易
+    // if type_num == 1{
+    //     // 获取user_id
+    //     let res_data = gate_exc.wallet_fee().await;
+    //     assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
+    //
+    //     let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
+    //     info!(?wallet_obj);
+    //     user_id = wallet_obj["user_id"].to_string();
+    // }
+    //
+    // spawn( async move {
+    //     let mut gate_exc = GateSwapWs::new_label(name, false, exchange_params,
+    //                                              GateWsType::PublicAndPrivate("usdt".to_string()), tx);
+    //     // 交易
+    //     if type_num == 1 {
+    //         gate_exc.set_subscribe(vec![
+    //             GateSubscribeType::PuFuturesTrades,
+    //             GateSubscribeType::PuFuturesOrderBook,
+    //             GateSubscribeType::PrFuturesOrders(user_id.clone()),
+    //             GateSubscribeType::PrFuturesPositions(user_id.clone()),
+    //             GateSubscribeType::PrFuturesBalances(user_id.clone()),
+    //         ]);
+    //     } else { // 参考
+    //         gate_exc.set_subscribe(vec![
+    //             GateSubscribeType::PuFuturesTrades,
+    //             GateSubscribeType::PuFuturesOrderBook
+    //         ]);
+    //     }
+    //     gate_exc.custom_subscribe(bool_v1,symbols_one).await;
+    // });
     spawn(async move {
         let bot_arc_clone = Arc::clone(&quant_arc);
         let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;

+ 12 - 12
strategy/src/kucoin_spot.rs

@@ -7,7 +7,7 @@ use tokio::spawn;
 use tokio::sync::mpsc::channel;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
-use exchanges::kucoin_spot_ws::{KucoinSubscribeType, KucoinSpotWs, KucoinWsType};
+// use exchanges::kucoin_spot_ws::{KucoinSubscribeType, KucoinSpotWs, KucoinWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::KucoinSpot;
@@ -23,17 +23,17 @@ pub async fn kucoin_spot_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
         let new_symbol = symbol_mapper.replace("_", "-").to_uppercase();
         symbol_arr.push(new_symbol);
     }
-    spawn( async move {
-        let mut kucoin_exc;
-        kucoin_exc = KucoinSpotWs::new_label(name, false, exchange_params, KucoinWsType::Public, tx).await;
-        if type_num == 0 {
-            kucoin_exc.set_subscribe(vec![
-                KucoinSubscribeType::PuSpotMarketLevel2Depth50,
-                KucoinSubscribeType::PuMarketTicker,
-            ]);
-            kucoin_exc.custom_subscribe(bool_v1, symbol_arr).await;
-        }
-    });
+    // spawn( async move {
+    //     let mut kucoin_exc;
+    //     kucoin_exc = KucoinSpotWs::new_label(name, false, exchange_params, KucoinWsType::Public, tx).await;
+    //     if type_num == 0 {
+    //         kucoin_exc.set_subscribe(vec![
+    //             KucoinSubscribeType::PuSpotMarketLevel2Depth50,
+    //             KucoinSubscribeType::PuMarketTicker,
+    //         ]);
+    //         kucoin_exc.custom_subscribe(bool_v1, symbol_arr).await;
+    //     }
+    // });
 
     spawn(async move {
         let bot_arc_clone = Arc::clone(&quant_arc);

+ 36 - 35
strategy/src/kucoin_swap.rs

@@ -66,7 +66,7 @@ pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
         }
     });
     //创建读写通道
-    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
     let mut ws = KucoinSwapWs::new_label(name, false, None, KucoinSwapWsType::Public).await;
     ws.set_symbols(vec!["xbt_usdtM".to_string()]);
@@ -79,45 +79,46 @@ pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
 
     //模拟业务场景 开启链接
     let bool_v1_clone = Arc::clone(&bool_v1);
+    let write_tx_am = Arc::new(std::sync::Mutex::new(write_tx));
     let t2 = tokio::spawn(async move {
-        ws.ws_connect_async(bool_v1_clone, write_rx, &read_tx).await.unwrap();
+        ws.ws_connect_async(bool_v1_clone, &write_tx_am, write_rx, read_tx).await.unwrap();
         info!("ws_connect_async 完成");
     });
 
     //模拟用户主动写入数据
-    let bool_v2_clone = Arc::clone(&bool_v1);
-    let t3 = tokio::spawn(async move {
-        //模拟 链接之后 服务器响应,可以开始订阅
-        tokio::time::sleep(Duration::from_millis(5000)).await;
-        let bool_v2_v = bool_v2_clone.load(Ordering::SeqCst);
-        bool_v2_clone.store(!bool_v2_v, Ordering::SeqCst);
-        for sub in &subscription_v {
-            info!("--发起订阅:{:?}", sub);
-            write_tx.unbounded_send(Message::Text(sub.parse().unwrap())).unwrap();
-        }
-        tokio::time::sleep(Duration::from_millis(3000)).await;
-
-        //模拟心跳
-        loop {
-            tokio::time::sleep(Duration::from_millis(5000)).await;
-            write_tx.unbounded_send(Message::Ping(Vec::from("ping"))).unwrap();
-        }
-    });
-
-    let t1 = spawn(async move {
-        let bot_arc_clone = Arc::clone(&quant_arc);
-        // let run_symbol = symbols.clone()[0].clone();
-        // trade
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
-
-        loop {
-            if let Some(data) = read_rx.next().await {
-                on_data(bot_arc_clone.clone(), multiplier, &mut max_buy, &mut min_sell, data).await;
-            }
-        }
-    });
+    // let bool_v2_clone = Arc::clone(&bool_v1);
+    // let t3 = tokio::spawn(async move {
+    //     //模拟 链接之后 服务器响应,可以开始订阅
+    //     tokio::time::sleep(Duration::from_millis(5000)).await;
+    //     let bool_v2_v = bool_v2_clone.load(Ordering::SeqCst);
+    //     bool_v2_clone.store(!bool_v2_v, Ordering::SeqCst);
+    //     for sub in &subscription_v {
+    //         info!("--发起订阅:{:?}", sub);
+    //         write_tx.unbounded_send(Message::Text(sub.parse().unwrap())).unwrap();
+    //     }
+    //     tokio::time::sleep(Duration::from_millis(3000)).await;
+    //
+    //     //模拟心跳
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(5000)).await;
+    //         write_tx.unbounded_send(Message::Ping(Vec::from("ping"))).unwrap();
+    //     }
+    // });
+    //
+    // let t1 = spawn(async move {
+    //     let bot_arc_clone = Arc::clone(&quant_arc);
+    //     // let run_symbol = symbols.clone()[0].clone();
+    //     // trade
+    //     let mut max_buy = Decimal::ZERO;
+    //     let mut min_sell = Decimal::ZERO;
+    //     let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
+    //
+    //     loop {
+    //         if let Some(data) = read_rx.next().await {
+    //             on_data(bot_arc_clone.clone(), multiplier, &mut max_buy, &mut min_sell, data).await;
+    //         }
+    //     }
+    // });
 }
 
 async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, multiplier: Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {