Browse Source

※※※※※ update部分更新过,记得测试 ※※※※※

非阻塞式处理消费信息。
skyfffire 2 years ago
parent
commit
ed8fc3c8bd
3 changed files with 252 additions and 248 deletions
  1. 71 71
      strategy/src/binance_spot.rs
  2. 87 84
      strategy/src/gate_swap.rs
  3. 94 93
      strategy/src/kucoin_swap.rs

+ 71 - 71
strategy/src/binance_spot.rs

@@ -4,12 +4,12 @@ use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 use rust_decimal::Decimal;
 use serde_json::Value;
-use tracing::{error};
 use tokio::spawn;
 use tokio::sync::mpsc::channel;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
 use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
+use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::BinanceSpot;
 use standard::SpecialTicker;
@@ -39,78 +39,78 @@ pub async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc
         loop {
             sleep(Duration::from_millis(1)).await;
 
-            match rx.recv().await {
-                Some(data) => {
-                    let mut trace_stack = TraceStack::default();
-                    trace_stack.on_network(data.time);
-                    trace_stack.on_before_quant();
-
-                    if data.code != "200".to_string() {
-                        continue;
-                    }
-                    if data.channel == "aggTrade" {
-                        let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
-                        let str = data.label.clone();
-                        let mut quant = bot_arc_clone.lock().await;
-                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
-                            max_buy = Decimal::ZERO;
-                            min_sell = Decimal::ZERO;
-                            quant.is_update.remove(str.as_str());
-                        }
-                        if trade.p > max_buy || max_buy == Decimal::ZERO{
-                            max_buy = trade.p
-                        }
-                        if trade.p < min_sell || min_sell == Decimal::ZERO{
-                            min_sell = trade.p
-                        }
-                        {
-                            quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
-                        }
-                    } else if data.channel == "bookTicker" {
-                        trace_stack.on_before_format();
-                        let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
-                        if ticker.u > update_flag_u {
-                            {
-                                let mut quant = bot_arc_clone.lock().await;
-                                // time_delay.quant_start = Utc::now().timestamp_micros();
-                                quant._update_ticker(SpecialTicker{
-                                    sell: ticker.a.clone(),
-                                    buy: ticker.b.clone(),
-                                    mid_price: Default::default(),
-                                }, data.label.clone());
-                                let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
-                                trace_stack.on_after_format();
-                                quant._update_depth(depth.clone(), data.label.clone(), &mut trace_stack);
-                                quant.local_depths.insert(data.label.clone(), depth);
-                            }
-                        } else {
-                            update_flag_u = ticker.u;
-                        }
-                    } else if data.channel == "depth" {
-                        trace_stack.on_before_format();
-                        let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
-                        let u = v["lastUpdateId"].as_i64().unwrap();
-                        if u > update_flag_u {
-                            let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSpot, data);
-                            trace_stack.on_after_format();
-                            {
-                                let mut quant = bot_arc_clone.lock().await;
-                                quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                                quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
-                                quant.local_depths.insert(depth.name, depth.depth);
-                            }
-                        } else {
-                            update_flag_u = u;
-                        }
-
-                    }
+            match rx.try_recv() {
+                Ok(data) => {
+                    on_data(bot_arc_clone.clone(), &mut update_flag_u, &mut max_buy, &mut min_sell, data).await;
                 },
-                None => {
-                    error!("币安现货参考交易所通道错误");
-                    break;
-                }
+                Err(_e) => { }
             }
 
         }
     });
-}
+}
+
+async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut i64, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
+    let mut trace_stack = TraceStack::default();
+    trace_stack.on_network(data.time);
+    trace_stack.on_before_quant();
+
+    if data.code != "200".to_string() {
+        return;
+    }
+    if data.channel == "aggTrade" {
+        let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
+        let str = data.label.clone();
+        let mut quant = bot_arc_clone.lock().await;
+        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
+            *max_buy = Decimal::ZERO;
+            *min_sell = Decimal::ZERO;
+            quant.is_update.remove(str.as_str());
+        }
+        if trade.p > *max_buy || *max_buy == Decimal::ZERO{
+            *max_buy = trade.p
+        }
+        if trade.p < *min_sell || *min_sell == Decimal::ZERO{
+            *min_sell = trade.p
+        }
+        {
+            quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+        }
+    } else if data.channel == "bookTicker" {
+        trace_stack.on_before_format();
+        let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
+        if ticker.u > *update_flag_u {
+            {
+                let mut quant = bot_arc_clone.lock().await;
+                // time_delay.quant_start = Utc::now().timestamp_micros();
+                quant._update_ticker(SpecialTicker{
+                    sell: ticker.a.clone(),
+                    buy: ticker.b.clone(),
+                    mid_price: Default::default(),
+                }, data.label.clone());
+                let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
+                trace_stack.on_after_format();
+                quant._update_depth(depth.clone(), data.label.clone(), &mut trace_stack);
+                quant.local_depths.insert(data.label.clone(), depth);
+            }
+        } else {
+            *update_flag_u = ticker.u;
+        }
+    } else if data.channel == "depth" {
+        trace_stack.on_before_format();
+        let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
+        let u = v["lastUpdateId"].as_i64().unwrap();
+        if u > *update_flag_u {
+            let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSpot, data);
+            trace_stack.on_after_format();
+            {
+                let mut quant = bot_arc_clone.lock().await;
+                quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+                quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+                quant.local_depths.insert(depth.name, depth.depth);
+            }
+        } else {
+            *update_flag_u = u;
+        }
+    }
+}

+ 87 - 84
strategy/src/gate_swap.rs

@@ -4,13 +4,14 @@ use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 use rust_decimal::Decimal;
 use serde_json::Value;
-use tracing::{error, info};
+use tracing::{info};
 use tokio::spawn;
 use tokio::sync::mpsc::channel;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
 use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
+use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::GateSwap;
 use crate::model::{OrderInfo, OriginalTradeGa};
@@ -63,93 +64,95 @@ pub async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Ar
         let mut min_sell = Decimal::ZERO;
         loop {
             sleep(Duration::from_millis(1)).await;
-            let mut trace_stack = TraceStack::default();
 
-            match rx.recv().await {
-                Some(data) => {
-                    trace_stack.on_network(data.time);
-                    trace_stack.on_before_quant();
+            match rx.try_recv() {
+                Ok(data) => {
+                    on_data(bot_arc_clone.clone(), multiplier, run_symbol.clone(), &mut max_buy, &mut min_sell, data).await;
+                },
+                Err(_e) => {}
+            }
+        }
+    });
+}
 
-                    if data.code != "200".to_string() {
-                        continue;
-                    }
-                    if data.channel == "futures.order_book" {
-                        trace_stack.on_before_format();
-                        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data);
-                        trace_stack.on_after_format();
+async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, multiplier: Decimal, run_symbol: String, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
+    let mut trace_stack = TraceStack::default();
+    trace_stack.on_network(data.time);
+    trace_stack.on_before_quant();
 
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
-                            quant.local_depths.insert(depth.name, depth.depth);
-                        }
-                    } else if data.channel == "futures.balances" {
-                        let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_equity(account);
-                        }
-                    } else if data.channel == "futures.orders" {
-                        trace_stack.on_before_format();
-                        let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
-                        trace_stack.on_after_format();
+    if data.code != "200".to_string() {
+        return;
+    }
 
-                        let mut order_infos:Vec<OrderInfo> = Vec::new();
-                        for order in orders.order {
-                            let order_info = OrderInfo {
-                                symbol: "".to_string(),
-                                amount: order.amount.abs(),
-                                side: "".to_string(),
-                                price: order.price,
-                                client_id: order.custom_id,
-                                filled_price: order.avg_price,
-                                filled: order.deal_amount.abs(),
-                                order_id: order.id,
-                                local_time: 0,
-                                create_time: 0,
-                                status: order.status,
-                                fee: Default::default(),
-                                trace_stack: Default::default(),
-                            };
-                            order_infos.push(order_info);
-                        }
+    if data.channel == "futures.order_book" {
+        trace_stack.on_before_format();
+        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data);
+        trace_stack.on_after_format();
 
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_order(order_infos, trace_stack);
-                        }
-                    } else if data.channel == "futures.positions" {
-                        let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_position(positions);
-                        }
-                    } else if data.channel == "futures.trades" {
-                        let mut quant = bot_arc_clone.lock().await;
-                        let str = data.label.clone();
-                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
-                            max_buy = Decimal::ZERO;
-                            min_sell = Decimal::ZERO;
-                            quant.is_update.remove(str.as_str());
-                        }
-                        let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
-                        for trade in trades {
-                            if trade.price > max_buy || max_buy == Decimal::ZERO{
-                                max_buy = trade.price
-                            }
-                            if trade.price < min_sell || min_sell == Decimal::ZERO{
-                                min_sell = trade.price
-                            }
-                        }
-                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
-                    }
-                },
-                None => {
-                    error!("交易所通道错误");
-                    break;
-                }
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+            quant.local_depths.insert(depth.name, depth.depth);
+        }
+    } else if data.channel == "futures.balances" {
+        let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_equity(account);
+        }
+    } else if data.channel == "futures.orders" {
+        trace_stack.on_before_format();
+        let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
+        trace_stack.on_after_format();
+
+        let mut order_infos:Vec<OrderInfo> = Vec::new();
+        for order in orders.order {
+            let order_info = OrderInfo {
+                symbol: "".to_string(),
+                amount: order.amount.abs(),
+                side: "".to_string(),
+                price: order.price,
+                client_id: order.custom_id,
+                filled_price: order.avg_price,
+                filled: order.deal_amount.abs(),
+                order_id: order.id,
+                local_time: 0,
+                create_time: 0,
+                status: order.status,
+                fee: Default::default(),
+                trace_stack: Default::default(),
+            };
+            order_infos.push(order_info);
+        }
+
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_order(order_infos, trace_stack);
+        }
+    } else if data.channel == "futures.positions" {
+        let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_position(positions);
+        }
+    } else if data.channel == "futures.trades" {
+        let mut quant = bot_arc_clone.lock().await;
+        let str = data.label.clone();
+        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
+            *max_buy = Decimal::ZERO;
+            *min_sell = Decimal::ZERO;
+            quant.is_update.remove(str.as_str());
+        }
+        let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
+        for trade in trades {
+            if trade.price > *max_buy || *max_buy == Decimal::ZERO{
+                *max_buy = trade.price
+            }
+            if trade.price < *min_sell || *min_sell == Decimal::ZERO{
+                *min_sell = trade.price
             }
         }
-    });
-}
+        quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+    }
+}

+ 94 - 93
strategy/src/kucoin_swap.rs

@@ -3,12 +3,12 @@ use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 use rust_decimal::Decimal;
-use tracing::{error};
 use tokio::spawn;
 use tokio::sync::mpsc::channel;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
 use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
+use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::KucoinSwap;
 use crate::model::{OrderInfo, OriginalTradeGa};
@@ -70,100 +70,101 @@ pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
         let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
         loop {
             sleep(Duration::from_millis(1)).await;
-            let mut trace_stack = TraceStack::default();
 
-            match rx.recv().await {
-                Some(data) => {
-                    trace_stack.on_network(data.time);
-                    trace_stack.on_before_quant();
-
-                    if data.code != "200".to_string() {
-                        continue;
-                    }
-                    if data.channel == "level2" {
-                        trace_stack.on_before_format();
-                        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data);
-                        trace_stack.on_after_format();
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            // time_delay.quant_start = Utc::now().timestamp_micros();
-                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
-                            quant.local_depths.insert(depth.name, depth.depth);
-                        }
-                    } else if data.channel == "tickerV2" {
-                        let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data);
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant._update_ticker(ticker.ticker, ticker.name);
-                        }
-                    } else if data.channel == "availableBalance.change" {
-                        // 取消原有推送解析,因为推送的信息不准确
-                        // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
-                        // {
-                        //     let mut quant = bot_arc_clone.lock().await;
-                        //     quant.update_equity(account);
-                        // }
-                    } else if data.channel == "symbolOrderChange" {
-                        trace_stack.on_before_format();
-                        let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
-                        trace_stack.on_after_format();
-                        let mut order_infos:Vec<OrderInfo> = Vec::new();
-                        for order in orders.order {
-                            if order.status == "NULL" {
-                                continue;
-                            }
-                            let order_info = OrderInfo {
-                                symbol: "".to_string(),
-                                amount: order.amount.abs(),
-                                side: "".to_string(),
-                                price: order.price,
-                                client_id: order.custom_id,
-                                filled_price: order.avg_price,
-                                filled: order.deal_amount.abs(),
-                                order_id: order.id,
-                                local_time: 0,
-                                create_time: 0,
-                                status: order.status,
-                                fee: Default::default(),
-                                trace_stack: Default::default(),
-                            };
-                            order_infos.push(order_info);
-                        }
-
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_order(order_infos, trace_stack);
-                        }
-                    } else if data.channel == "position.change" {
-                        let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_position(positions);
-                        }
-                    } else if data.channel == "match" {
-                        let mut quant = bot_arc_clone.lock().await;
-                        let str = data.label.clone();
-                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
-                            max_buy = Decimal::ZERO;
-                            min_sell = Decimal::ZERO;
-                            quant.is_update.remove(str.as_str());
-                        }
-                        let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
-                        if trade.price > max_buy || max_buy == Decimal::ZERO {
-                            max_buy = trade.price
-                        }
-                        if trade.price < min_sell || min_sell == Decimal::ZERO {
-                            min_sell = trade.price
-                        }
-                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
-                    }
+            match rx.try_recv() {
+                Ok(data) => {
+                    on_data(bot_arc_clone.clone(), multiplier, &mut max_buy, &mut min_sell, data).await;
                 },
-                None => {
-                    error!("交易所通道错误");
-                    break;
-                }
+                Err(_e) => { }
             }
         }
     });
-}
+}
+
+async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, multiplier: Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
+    let mut trace_stack = TraceStack::default();
+    trace_stack.on_network(data.time);
+    trace_stack.on_before_quant();
+
+    if data.code != "200".to_string() {
+        return;
+    }
+    if data.channel == "level2" {
+        trace_stack.on_before_format();
+        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data);
+        trace_stack.on_after_format();
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            // time_delay.quant_start = Utc::now().timestamp_micros();
+            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+            quant.local_depths.insert(depth.name, depth.depth);
+        }
+    } else if data.channel == "tickerV2" {
+        let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data);
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant._update_ticker(ticker.ticker, ticker.name);
+        }
+    } else if data.channel == "availableBalance.change" {
+        // 取消原有推送解析,因为推送的信息不准确
+        // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
+        // {
+        //     let mut quant = bot_arc_clone.lock().await;
+        //     quant.update_equity(account);
+        // }
+    } else if data.channel == "symbolOrderChange" {
+        trace_stack.on_before_format();
+        let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
+        trace_stack.on_after_format();
+        let mut order_infos:Vec<OrderInfo> = Vec::new();
+        for order in orders.order {
+            if order.status == "NULL" {
+                continue;
+            }
+            let order_info = OrderInfo {
+                symbol: "".to_string(),
+                amount: order.amount.abs(),
+                side: "".to_string(),
+                price: order.price,
+                client_id: order.custom_id,
+                filled_price: order.avg_price,
+                filled: order.deal_amount.abs(),
+                order_id: order.id,
+                local_time: 0,
+                create_time: 0,
+                status: order.status,
+                fee: Default::default(),
+                trace_stack: Default::default(),
+            };
+            order_infos.push(order_info);
+        }
+
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_order(order_infos, trace_stack);
+        }
+    } else if data.channel == "position.change" {
+        let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_position(positions);
+        }
+    } else if data.channel == "match" {
+        let mut quant = bot_arc_clone.lock().await;
+        let str = data.label.clone();
+        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
+            *max_buy = Decimal::ZERO;
+            *min_sell = Decimal::ZERO;
+            quant.is_update.remove(str.as_str());
+        }
+        let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
+        if trade.price > *max_buy || *max_buy == Decimal::ZERO {
+            *max_buy = trade.price
+        }
+        if trade.price < *min_sell || *min_sell == Decimal::ZERO {
+            *min_sell = trade.price
+        }
+        quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+    }
+}