Browse Source

※※※※※ 记得测试 ※※※※※

修复耗时追踪功能可能会导致协程锁的bug
skyfffire 2 years ago
parent
commit
97c0687ccf

+ 1 - 0
exchanges/src/kucoin_spot_ws.rs

@@ -307,6 +307,7 @@ impl KucoinSpotWs {
                                 subscription: Vec<String>)
     {
         info!("走代理-链接成功!开始数据读取");
+        info!(?subscription);
         let label = self.label.clone();
         /*****消息溜***/
         let mut ping_interval = chrono::Utc::now().timestamp_millis();

+ 2 - 2
strategy/src/binance_spot.rs

@@ -90,7 +90,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut i64, max_
                 }, data.label.clone());
                 let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
                 trace_stack.on_after_format();
-                quant._update_depth(depth.clone(), data.label.clone(), &mut trace_stack);
+                quant._update_depth(depth.clone(), data.label.clone(), trace_stack.clone());
                 quant.local_depths.insert(data.label.clone(), depth);
             }
         } else {
@@ -106,7 +106,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut i64, max_
             {
                 let mut quant = bot_arc_clone.lock().await;
                 quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+                quant._update_depth(depth.depth.clone(), depth.name.clone(), trace_stack.clone());
                 quant.local_depths.insert(depth.name, depth.depth);
             }
         } else {

+ 2 - 2
strategy/src/binance_usdt_swap.rs

@@ -88,7 +88,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut i64, max_
                 }, data.label.clone());
                 let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
                 trace_stack.on_after_format();
-                quant._update_depth(depth.clone(), data.label.clone(), &mut trace_stack);
+                quant._update_depth(depth.clone(), data.label.clone(), trace_stack.clone());
                 quant.local_depths.insert(data.label.clone(), depth);
             }
         } else {
@@ -104,7 +104,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut i64, max_
             {
                 let mut quant = bot_arc_clone.lock().await;
                 quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+                quant._update_depth(depth.depth.clone(), depth.name.clone(), trace_stack.clone());
                 quant.local_depths.insert(depth.name, depth.depth);
             }
         } else {

+ 1 - 1
strategy/src/bitget_spot.rs

@@ -100,7 +100,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, max_buy: &mu
         {
             let mut quant = bot_arc_clone.lock().await;
             quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+            quant._update_depth(depth.depth.clone(), depth.name.clone(), trace_stack.clone());
             quant.local_depths.insert(depth.name, depth.depth);
         }
     } else if data.channel == "trade" {

+ 1 - 1
strategy/src/gate_swap.rs

@@ -92,7 +92,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, multiplier: Decimal, run_symb
         {
             let mut quant = bot_arc_clone.lock().await;
             quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+            quant._update_depth(depth.depth.clone(), depth.name.clone(), trace_stack.clone());
             quant.local_depths.insert(depth.name, depth.depth);
         }
     } else if data.channel == "futures.balances" {

+ 82 - 80
strategy/src/kucoin_spot.rs

@@ -20,7 +20,7 @@ pub async fn kucoin_spot_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
     let mut symbol_arr = Vec::new();
     for symbol in symbols_clone {
         let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSpot,symbol.as_str());
-        let new_symbol = symbol_mapper.replace("_", "").to_uppercase() + "M";
+        let new_symbol = symbol_mapper.replace("_", "-").to_uppercase();
         symbol_arr.push(new_symbol);
     }
     spawn( async move {
@@ -29,7 +29,7 @@ pub async fn kucoin_spot_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
         if type_num == 0 {
             kucoin_exc.set_subscribe(vec![
                 KucoinSubscribeType::PuSpotMarketLevel2Depth50,
-                KucoinSubscribeType::PuMarketMatch
+                KucoinSubscribeType::PuMarketTicker,
             ]);
             kucoin_exc.custom_subscribe(bool_v1, symbol_arr).await;
         }
@@ -41,7 +41,8 @@ pub async fn kucoin_spot_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
         // trade
         let mut max_buy = Decimal::ZERO;
         let mut min_sell = Decimal::ZERO;
-        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
+        // let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
+        let multiplier = Decimal::ONE;
         loop {
             sleep(Duration::from_millis(1)).await;
 
@@ -55,7 +56,7 @@ pub async fn kucoin_spot_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
     });
 }
 
-async fn on_data(_bot_arc_clone: Arc<Mutex<Quant>>, _multiplier: Decimal, _max_buy: &mut Decimal, _min_sell: &mut Decimal, data: ResponseData) {
+async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, _multiplier: Decimal, _max_buy: &mut Decimal, _min_sell: &mut Decimal, data: ResponseData) {
     let mut trace_stack = TraceStack::default();
     trace_stack.on_network(data.time);
     trace_stack.on_before_quant();
@@ -63,82 +64,83 @@ async fn on_data(_bot_arc_clone: Arc<Mutex<Quant>>, _multiplier: Decimal, _max_b
     if data.code != "200".to_string() {
         return;
     }
+
     if data.channel == "level2" {
-        // trace_stack.on_before_format();
-        // let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data);
-        // trace_stack.on_after_format();
-        // {
-        //     let mut quant = bot_arc_clone.lock().await;
-        //     // time_delay.quant_start = Utc::now().timestamp_micros();
-        //     quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-        //     quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
-        //     quant.local_depths.insert(depth.name, depth.depth);
-        // }
-    } else if data.channel == "tickerV2" {
-        // let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data);
-        // {
-        //     let mut quant = bot_arc_clone.lock().await;
-        //     quant._update_ticker(ticker.ticker, ticker.name);
-        // }
-    } else if data.channel == "availableBalance.change" {
-        // 取消原有推送解析,因为推送的信息不准确
-        // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
-        // {
-        //     let mut quant = bot_arc_clone.lock().await;
-        //     quant.update_equity(account);
-        // }
-    } else if data.channel == "symbolOrderChange" {
-        // trace_stack.on_before_format();
-        // let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
-        // trace_stack.on_after_format();
-        // let mut order_infos:Vec<OrderInfo> = Vec::new();
-        // for order in orders.order {
-        //     if order.status == "NULL" {
-        //         continue;
-        //     }
-        //     let order_info = OrderInfo {
-        //         symbol: "".to_string(),
-        //         amount: order.amount.abs(),
-        //         side: "".to_string(),
-        //         price: order.price,
-        //         client_id: order.custom_id,
-        //         filled_price: order.avg_price,
-        //         filled: order.deal_amount.abs(),
-        //         order_id: order.id,
-        //         local_time: 0,
-        //         create_time: 0,
-        //         status: order.status,
-        //         fee: Default::default(),
-        //         trace_stack: Default::default(),
-        //     };
-        //     order_infos.push(order_info);
-        // }
-        //
-        // {
-        //     let mut quant = bot_arc_clone.lock().await;
-        //     quant.update_order(order_infos, trace_stack);
-        // }
-    } else if data.channel == "position.change" {
-        // let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
-        // {
-        //     let mut quant = bot_arc_clone.lock().await;
-        //     quant.update_position(positions);
-        // }
-    } else if data.channel == "match" {
-        // let mut quant = bot_arc_clone.lock().await;
-        // let str = data.label.clone();
-        // if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
-        //     *max_buy = Decimal::ZERO;
-        //     *min_sell = Decimal::ZERO;
-        //     quant.is_update.remove(str.as_str());
-        // }
-        // let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
-        // if trade.price > *max_buy || *max_buy == Decimal::ZERO {
-        //     *max_buy = trade.price
-        // }
-        // if trade.price < *min_sell || *min_sell == Decimal::ZERO {
-        //     *min_sell = trade.price
-        // }
-        // quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+        trace_stack.on_before_format();
+        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSpot,data);
+        trace_stack.on_after_format();
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+            quant._update_depth(depth.depth.clone(), depth.name.clone(), trace_stack.clone());
+            quant.local_depths.insert(depth.name, depth.depth);
+        }
+    } else if data.channel == "trade.ticker" {
+        let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSpot, data);
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant._update_ticker(ticker.ticker, ticker.name);
+        }
     }
+    // else if data.channel == "availableBalance.change" {
+    //     // 取消原有推送解析,因为推送的信息不准确
+    //     // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
+    //     // {
+    //     //     let mut quant = bot_arc_clone.lock().await;
+    //     //     quant.update_equity(account);
+    //     // }
+    // } else if data.channel == "symbolOrderChange" {
+    //     // trace_stack.on_before_format();
+    //     // let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
+    //     // trace_stack.on_after_format();
+    //     // let mut order_infos:Vec<OrderInfo> = Vec::new();
+    //     // for order in orders.order {
+    //     //     if order.status == "NULL" {
+    //     //         continue;
+    //     //     }
+    //     //     let order_info = OrderInfo {
+    //     //         symbol: "".to_string(),
+    //     //         amount: order.amount.abs(),
+    //     //         side: "".to_string(),
+    //     //         price: order.price,
+    //     //         client_id: order.custom_id,
+    //     //         filled_price: order.avg_price,
+    //     //         filled: order.deal_amount.abs(),
+    //     //         order_id: order.id,
+    //     //         local_time: 0,
+    //     //         create_time: 0,
+    //     //         status: order.status,
+    //     //         fee: Default::default(),
+    //     //         trace_stack: Default::default(),
+    //     //     };
+    //     //     order_infos.push(order_info);
+    //     // }
+    //     //
+    //     // {
+    //     //     let mut quant = bot_arc_clone.lock().await;
+    //     //     quant.update_order(order_infos, trace_stack);
+    //     // }
+    // } else if data.channel == "position.change" {
+    //     // let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
+    //     // {
+    //     //     let mut quant = bot_arc_clone.lock().await;
+    //     //     quant.update_position(positions);
+    //     // }
+    // } else if data.channel == "match" {
+    //     // let mut quant = bot_arc_clone.lock().await;
+    //     // let str = data.label.clone();
+    //     // if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
+    //     //     *max_buy = Decimal::ZERO;
+    //     //     *min_sell = Decimal::ZERO;
+    //     //     quant.is_update.remove(str.as_str());
+    //     // }
+    //     // let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
+    //     // if trade.price > *max_buy || *max_buy == Decimal::ZERO {
+    //     //     *max_buy = trade.price
+    //     // }
+    //     // if trade.price < *min_sell || *min_sell == Decimal::ZERO {
+    //     //     *min_sell = trade.price
+    //     // }
+    //     // quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+    // }
 }

+ 1 - 1
strategy/src/kucoin_swap.rs

@@ -98,7 +98,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, multiplier: Decimal, max_buy:
             let mut quant = bot_arc_clone.lock().await;
             // time_delay.quant_start = Utc::now().timestamp_micros();
             quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+            quant._update_depth(depth.depth.clone(), depth.name.clone(), trace_stack.clone());
             quant.local_depths.insert(depth.name, depth.depth);
         }
     } else if data.channel == "tickerV2" {

+ 1 - 1
strategy/src/quant.rs

@@ -577,7 +577,7 @@ impl Quant {
         }
     }
 
-    pub fn _update_depth(&mut self, depth: Vec<Decimal>, name: String, trace_stack: &mut TraceStack) {
+    pub fn _update_depth(&mut self, depth: Vec<Decimal>, name: String, mut trace_stack: TraceStack) {
         trace_stack.on_depth();
 
         // 要从回调传入的深度信息中获取data.name