فهرست منبع

修复colo高速通道无法访问的问题,以及移除老的交易所类型判断。

skyfffire 1 سال پیش
والد
کامیت
5033460b3a

+ 14 - 2
src/quant_libs.rs

@@ -33,9 +33,21 @@ pub async fn init(params: Params, ws_running: Arc<AtomicBool>, running: Arc<Atom
     let quant_arc = Arc::new(Mutex::new(quant_obj));
 
     // 参考交易所
-    exchange_disguise::run_reference_exchange(ws_running.clone(),params.ref_exchange.get(0).unwrap().clone(), quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
+    exchange_disguise::run_reference_exchange(ws_running.clone(),
+                                              params.ref_exchange.get(0).unwrap().clone(),
+                                              quant_arc.clone(),
+                                              ref_name,
+                                              params.ref_pair.clone(),
+                                              params.colo != 0i8,
+                                              exchange_params.clone()).await;
     // 交易交易所
-    exchange_disguise::run_transactional_exchange(ws_running.clone(), params.exchange, quant_arc.clone(),  trade_name, vec![params.pair.clone()], exchange_params.clone()).await;
+    exchange_disguise::run_transactional_exchange(ws_running.clone(),
+                                                  params.exchange,
+                                                  quant_arc.clone(),
+                                                  trade_name,
+                                                  vec![params.pair.clone()],
+                                                  params.colo != 0i8,
+                                                  exchange_params.clone()).await;
     // 启动定期触发的系统逻辑
     quant::on_timer(quant_arc.clone());
     // 启动策略逻辑

+ 2 - 3
strategy/src/binance_spot.rs

@@ -26,9 +26,8 @@ pub async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>,
     let mut ws = BinanceSpotWs::new_label(name.clone(), is_colo, None, BinanceSpotWsType::PublicAndPrivate);
     ws.set_symbols(symbols.clone());
     ws.set_subscribe(vec![
-        // TODO 此处应该使用BookTicker,等标准层格式化完成。
-        // BinanceSpotSubscribeType::PuBookTicker,
-        BinanceSpotSubscribeType::PuDepth20levels100ms
+        BinanceSpotSubscribeType::PuBookTicker,
+        // BinanceSpotSubscribeType::PuDepth20levels100ms
     ]);
 
     // 开启数据读取线程

+ 7 - 3
strategy/src/binance_usdt_swap.rs

@@ -13,13 +13,17 @@ use futures_util::StreamExt;
 use crate::exchange_disguise::on_special_depth;
 
 // 参考 币安 合约 启动
-pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>,
-                                               name: String, symbols: Vec<String>, _exchange_params: BTreeMap<String, String>) {
+pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>,
+                                               quant_arc: Arc<Mutex<Quant>>,
+                                               name: String,
+                                               symbols: Vec<String>,
+                                               is_colo: bool,
+                                               _exchange_params: BTreeMap<String, String>) {
     tokio::spawn(async move {
         //创建读写通道
         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
         let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-        let mut ws = BinanceSwapWs::new_label(name, false, None, BinanceSwapWsType::PublicAndPrivate);
+        let mut ws = BinanceSwapWs::new_label(name, is_colo, None, BinanceSwapWsType::PublicAndPrivate);
         ws.set_symbols(symbols);
         ws.set_subscribe(vec![
             // BinanceSwapSubscribeType::PuDepth20levels100ms,

+ 10 - 8
strategy/src/bitget_spot.rs

@@ -16,27 +16,29 @@ use crate::model::{OrderInfo, OriginalTradeGa};
 use crate::quant::Quant;
 
 pub async fn bitget_spot_run(bool_v1 :Arc<AtomicBool>,
-                             type_num: i8,
+                             is_trade: bool,
                              quant_arc: Arc<Mutex<Quant>>,
                              name: String,
                              symbols: Vec<String>,
+                             is_colo: bool,
                              exchange_params: BTreeMap<String, String>) {
     // 开启公共频道
     let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
     let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
     let mut ku_public = BitgetSpotWs::new_label(name.clone(),
-                                                false,
+                                                is_colo,
                                                 None,
                                                 BitgetSpotWsType::Public);
     ku_public.set_symbols(symbols.clone());
-    // 参考交易所还要订阅实时订单流数据
-    if type_num == 0 {
+    // 交易交易所只用订阅深度数据
+    if is_trade {
         ku_public.set_subscribe(vec![
-            BitgetSpotSubscribeType::PuTrade,
             BitgetSpotSubscribeType::PuBooks5
         ]);
-    } else { // 交易交易所只用订阅深度数据
+    } else {
+        // 参考交易所还要订阅实时订单流数据
         ku_public.set_subscribe(vec![
+            BitgetSpotSubscribeType::PuTrade,
             BitgetSpotSubscribeType::PuBooks5
         ]);
     }
@@ -70,7 +72,7 @@ pub async fn bitget_spot_run(bool_v1 :Arc<AtomicBool>,
     });
 
     // 开启私有频道
-    if type_num == 1 {
+    if is_trade {
         // 新增获取余额的协程
         let account_quant_arc = quant_arc.clone();
         spawn(async move {
@@ -93,7 +95,7 @@ pub async fn bitget_spot_run(bool_v1 :Arc<AtomicBool>,
         spawn(async move {
             let login_params = parse_btree_map_to_bitget_spot_login(exchange_params);
             let mut ku_private = BitgetSpotWs::new_label(name.clone(),
-                                                         false,
+                                                         is_colo,
                                                          Some(login_params),
                                                          BitgetSpotWsType::Private);
             ku_private.set_symbols(symbols.clone());

+ 7 - 6
strategy/src/gate_swap.rs

@@ -18,19 +18,20 @@ use crate::exchange_disguise::on_special_depth;
 
 // 1交易、0参考 gate 合约 启动
 pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>,
-                           type_num: i8,
+                           is_trade: bool,
                            quant_arc: Arc<Mutex<Quant>>,
                            name: String,
                            symbols: Vec<String>,
+                           is_colo: bool,
                            exchange_params: BTreeMap<String, String>) {
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
 
-    let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
+    let mut gate_exc = GateSwapRest::new(is_colo, exchange_params.clone());
     let mut user_id= "".to_string();
 
     // 交易
-    if type_num == 1 {
+    if is_trade {
         // 获取user_id
         let res_data = gate_exc.wallet_fee().await;
         assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
@@ -45,9 +46,9 @@ pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>,
     spawn(async move {
         let mut ws;
         // 交易
-        if type_num == 1 {
+        if is_trade {
             let login_param = parse_btree_map_to_gate_swap_login(exchange_params);
-            ws = GateSwapWs::new_label(name.clone(), false, Some(login_param),
+            ws = GateSwapWs::new_label(name.clone(), is_colo, Some(login_param),
                                        GateSwapWsType::PublicAndPrivate("usdt".to_string()));
             ws.set_subscribe(vec![
                 // GateSwapSubscribeType::PuFuturesTrades,
@@ -57,7 +58,7 @@ pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>,
                 GateSwapSubscribeType::PrFuturesBalances(user_id.clone()),
             ]);
         } else { // 参考
-            ws = GateSwapWs::new_label(name.clone(), false, None,
+            ws = GateSwapWs::new_label(name.clone(), is_colo, None,
                                        GateSwapWsType::PublicAndPrivate("usdt".to_string()));
             ws.set_subscribe(vec![
                 GateSwapSubscribeType::PuFuturesTrades,

+ 3 - 2
strategy/src/kucoin_spot.rs

@@ -14,10 +14,11 @@ use crate::quant::Quant;
 
 // 1交易、0参考 kucoin 现货 启动
 pub async fn kucoin_spot_run(bool_v1: Arc<AtomicBool>,
-                             _type_num: i8,
+                             _is_trade: bool,
                              quant_arc: Arc<Mutex<Quant>>,
                              name: String,
                              symbols: Vec<String>,
+                             is_colo: bool,
                              _exchange_params: BTreeMap<String, String>) {
     let mut symbol_arr = Vec::new();
     for symbol in symbols {
@@ -29,7 +30,7 @@ pub async fn kucoin_spot_run(bool_v1: Arc<AtomicBool>,
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
 
-    let mut ws = KucoinSpotWs::new_label(name.clone(), false, None, KucoinSpotWsType::Public).await;
+    let mut ws = KucoinSpotWs::new_label(name.clone(), is_colo, None, KucoinSpotWsType::Public).await;
     ws.set_symbols(symbol_arr);
     ws.set_subscribe(vec![
         KucoinSpotSubscribeType::PuSpotMarketLevel2Depth50,

+ 10 - 5
strategy/src/kucoin_swap.rs

@@ -20,8 +20,13 @@ use crate::model::{OrderInfo, OriginalTradeGa};
 use crate::quant::Quant;
 
 // 1交易、0参考 kucoin 合约 启动
-pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, 
-                             name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
+pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>,
+                             is_trade: bool,
+                             quant_arc: Arc<Mutex<Quant>>,
+                             name: String,
+                             symbols: Vec<String>,
+                             is_colo: bool,
+                             exchange_params: BTreeMap<String, String>) {
     let symbols_clone = symbols.clone();
     let mut symbol_arr = Vec::new();
     for symbol in symbols_clone {
@@ -56,9 +61,9 @@ pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
             let write_tx_am = Arc::new(Mutex::new(write_tx));
 
             // 交易
-            if type_num == 1 {
+            if is_trade {
                 let login_params = parse_btree_map_to_kucoin_swap_login(exchange_params);
-                kucoin_exc = KucoinSwapWs::new_label(name.clone(), false, Option::from(login_params), KucoinSwapWsType::Private).await;
+                kucoin_exc = KucoinSwapWs::new_label(name.clone(), is_colo, Option::from(login_params), KucoinSwapWsType::Private).await;
                 kucoin_exc.set_subscribe(vec![
                     KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
                     // KucoinSwapSubscribeType::PuContractMarkettickerV2,
@@ -67,7 +72,7 @@ pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
                     KucoinSwapSubscribeType::PrContractMarketTradeOrders
                 ]);
             } else { // 参考
-                kucoin_exc = KucoinSwapWs::new_label(name.clone(), false, None, KucoinSwapWsType::Public).await;
+                kucoin_exc = KucoinSwapWs::new_label(name.clone(), is_colo, None, KucoinSwapWsType::Public).await;
                 kucoin_exc.set_subscribe(vec![
                     KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
                     // python注释掉了