|
|
@@ -3,12 +3,12 @@ use std::sync::Arc;
|
|
|
use std::sync::atomic::AtomicBool;
|
|
|
use std::time::Duration;
|
|
|
use rust_decimal::Decimal;
|
|
|
-use tracing::{error};
|
|
|
use tokio::spawn;
|
|
|
use tokio::sync::mpsc::channel;
|
|
|
use tokio::sync::Mutex;
|
|
|
use tokio::time::sleep;
|
|
|
use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
|
|
|
+use exchanges::response_base::ResponseData;
|
|
|
use global::trace_stack::TraceStack;
|
|
|
use standard::exchange::ExchangeEnum::KucoinSwap;
|
|
|
use crate::model::{OrderInfo, OriginalTradeGa};
|
|
|
@@ -70,100 +70,101 @@ pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc:
|
|
|
let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
|
|
|
loop {
|
|
|
sleep(Duration::from_millis(1)).await;
|
|
|
- let mut trace_stack = TraceStack::default();
|
|
|
|
|
|
- match rx.recv().await {
|
|
|
- Some(data) => {
|
|
|
- trace_stack.on_network(data.time);
|
|
|
- trace_stack.on_before_quant();
|
|
|
-
|
|
|
- if data.code != "200".to_string() {
|
|
|
- continue;
|
|
|
- }
|
|
|
- if data.channel == "level2" {
|
|
|
- trace_stack.on_before_format();
|
|
|
- let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data);
|
|
|
- trace_stack.on_after_format();
|
|
|
- {
|
|
|
- let mut quant = bot_arc_clone.lock().await;
|
|
|
- // time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
- quant._update_ticker(depth.ticker.clone(), depth.name.clone());
|
|
|
- quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
|
|
|
- quant.local_depths.insert(depth.name, depth.depth);
|
|
|
- }
|
|
|
- } else if data.channel == "tickerV2" {
|
|
|
- let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data);
|
|
|
- {
|
|
|
- let mut quant = bot_arc_clone.lock().await;
|
|
|
- quant._update_ticker(ticker.ticker, ticker.name);
|
|
|
- }
|
|
|
- } else if data.channel == "availableBalance.change" {
|
|
|
- // 取消原有推送解析,因为推送的信息不准确
|
|
|
- // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
|
|
|
- // {
|
|
|
- // let mut quant = bot_arc_clone.lock().await;
|
|
|
- // quant.update_equity(account);
|
|
|
- // }
|
|
|
- } else if data.channel == "symbolOrderChange" {
|
|
|
- trace_stack.on_before_format();
|
|
|
- let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
|
|
|
- trace_stack.on_after_format();
|
|
|
- let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
- for order in orders.order {
|
|
|
- if order.status == "NULL" {
|
|
|
- continue;
|
|
|
- }
|
|
|
- let order_info = OrderInfo {
|
|
|
- symbol: "".to_string(),
|
|
|
- amount: order.amount.abs(),
|
|
|
- side: "".to_string(),
|
|
|
- price: order.price,
|
|
|
- client_id: order.custom_id,
|
|
|
- filled_price: order.avg_price,
|
|
|
- filled: order.deal_amount.abs(),
|
|
|
- order_id: order.id,
|
|
|
- local_time: 0,
|
|
|
- create_time: 0,
|
|
|
- status: order.status,
|
|
|
- fee: Default::default(),
|
|
|
- trace_stack: Default::default(),
|
|
|
- };
|
|
|
- order_infos.push(order_info);
|
|
|
- }
|
|
|
-
|
|
|
- {
|
|
|
- let mut quant = bot_arc_clone.lock().await;
|
|
|
- quant.update_order(order_infos, trace_stack);
|
|
|
- }
|
|
|
- } else if data.channel == "position.change" {
|
|
|
- let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
|
|
|
- {
|
|
|
- let mut quant = bot_arc_clone.lock().await;
|
|
|
- quant.update_position(positions);
|
|
|
- }
|
|
|
- } else if data.channel == "match" {
|
|
|
- let mut quant = bot_arc_clone.lock().await;
|
|
|
- let str = data.label.clone();
|
|
|
- if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
|
|
|
- max_buy = Decimal::ZERO;
|
|
|
- min_sell = Decimal::ZERO;
|
|
|
- quant.is_update.remove(str.as_str());
|
|
|
- }
|
|
|
- let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
|
|
|
- if trade.price > max_buy || max_buy == Decimal::ZERO {
|
|
|
- max_buy = trade.price
|
|
|
- }
|
|
|
- if trade.price < min_sell || min_sell == Decimal::ZERO {
|
|
|
- min_sell = trade.price
|
|
|
- }
|
|
|
- quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
|
|
|
- }
|
|
|
+ match rx.try_recv() {
|
|
|
+ Ok(data) => {
|
|
|
+ on_data(bot_arc_clone.clone(), multiplier, &mut max_buy, &mut min_sell, data).await;
|
|
|
},
|
|
|
- None => {
|
|
|
- error!("交易所通道错误");
|
|
|
- break;
|
|
|
- }
|
|
|
+ Err(_e) => { }
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
-}
|
|
|
+}
|
|
|
+
|
|
|
+async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, multiplier: Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
|
|
|
+ let mut trace_stack = TraceStack::default();
|
|
|
+ trace_stack.on_network(data.time);
|
|
|
+ trace_stack.on_before_quant();
|
|
|
+
|
|
|
+ if data.code != "200".to_string() {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if data.channel == "level2" {
|
|
|
+ trace_stack.on_before_format();
|
|
|
+ let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data);
|
|
|
+ trace_stack.on_after_format();
|
|
|
+ {
|
|
|
+ let mut quant = bot_arc_clone.lock().await;
|
|
|
+ // time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ quant._update_ticker(depth.ticker.clone(), depth.name.clone());
|
|
|
+ quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
|
|
|
+ quant.local_depths.insert(depth.name, depth.depth);
|
|
|
+ }
|
|
|
+ } else if data.channel == "tickerV2" {
|
|
|
+ let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data);
|
|
|
+ {
|
|
|
+ let mut quant = bot_arc_clone.lock().await;
|
|
|
+ quant._update_ticker(ticker.ticker, ticker.name);
|
|
|
+ }
|
|
|
+ } else if data.channel == "availableBalance.change" {
|
|
|
+ // 取消原有推送解析,因为推送的信息不准确
|
|
|
+ // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
|
|
|
+ // {
|
|
|
+ // let mut quant = bot_arc_clone.lock().await;
|
|
|
+ // quant.update_equity(account);
|
|
|
+ // }
|
|
|
+ } else if data.channel == "symbolOrderChange" {
|
|
|
+ trace_stack.on_before_format();
|
|
|
+ let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
|
|
|
+ trace_stack.on_after_format();
|
|
|
+ let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
+ for order in orders.order {
|
|
|
+ if order.status == "NULL" {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ let order_info = OrderInfo {
|
|
|
+ symbol: "".to_string(),
|
|
|
+ amount: order.amount.abs(),
|
|
|
+ side: "".to_string(),
|
|
|
+ price: order.price,
|
|
|
+ client_id: order.custom_id,
|
|
|
+ filled_price: order.avg_price,
|
|
|
+ filled: order.deal_amount.abs(),
|
|
|
+ order_id: order.id,
|
|
|
+ local_time: 0,
|
|
|
+ create_time: 0,
|
|
|
+ status: order.status,
|
|
|
+ fee: Default::default(),
|
|
|
+ trace_stack: Default::default(),
|
|
|
+ };
|
|
|
+ order_infos.push(order_info);
|
|
|
+ }
|
|
|
+
|
|
|
+ {
|
|
|
+ let mut quant = bot_arc_clone.lock().await;
|
|
|
+ quant.update_order(order_infos, trace_stack);
|
|
|
+ }
|
|
|
+ } else if data.channel == "position.change" {
|
|
|
+ let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
|
|
|
+ {
|
|
|
+ let mut quant = bot_arc_clone.lock().await;
|
|
|
+ quant.update_position(positions);
|
|
|
+ }
|
|
|
+ } else if data.channel == "match" {
|
|
|
+ let mut quant = bot_arc_clone.lock().await;
|
|
|
+ let str = data.label.clone();
|
|
|
+ if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
|
|
|
+ *max_buy = Decimal::ZERO;
|
|
|
+ *min_sell = Decimal::ZERO;
|
|
|
+ quant.is_update.remove(str.as_str());
|
|
|
+ }
|
|
|
+ let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
|
|
|
+ if trade.price > *max_buy || *max_buy == Decimal::ZERO {
|
|
|
+ *max_buy = trade.price
|
|
|
+ }
|
|
|
+ if trade.price < *min_sell || *min_sell == Decimal::ZERO {
|
|
|
+ *min_sell = trade.price
|
|
|
+ }
|
|
|
+ quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
|
|
|
+ }
|
|
|
+}
|