|
|
@@ -13,8 +13,9 @@ use global::trace_stack::TraceStack;
|
|
|
use standard::exchange::ExchangeEnum::BybitSwap;
|
|
|
use standard::exchange_struct_handler::ExchangeStructHandler;
|
|
|
use standard::{Depth, OrderBook};
|
|
|
+use standard::exchange::ExchangeEnum;
|
|
|
use crate::core::Core;
|
|
|
-use crate::exchange_disguise::on_depth;
|
|
|
+use crate::exchange_disguise::{on_depth, on_trade};
|
|
|
use crate::model::OrderInfo;
|
|
|
|
|
|
// 参考 币安 合约 启动
|
|
|
@@ -28,7 +29,8 @@ pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
|
|
|
let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
|
|
|
let mut ws = BybitSwapWs::new_label(name, is_colo, None, BybitSwapWsType::Public);
|
|
|
ws.set_subscribe(vec![
|
|
|
- BybitSwapSubscribeType::PuOrderBook1
|
|
|
+ BybitSwapSubscribeType::PuOrderBook1,
|
|
|
+ BybitSwapSubscribeType::PuTrade
|
|
|
]);
|
|
|
|
|
|
// 读取数据
|
|
|
@@ -102,18 +104,20 @@ pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
|
|
|
});
|
|
|
}
|
|
|
|
|
|
-async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>) {
|
|
|
+async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>) {
|
|
|
let mut trace_stack = TraceStack::new(response.time, response.ins);
|
|
|
trace_stack.on_after_span_line();
|
|
|
|
|
|
match response.channel.as_str() {
|
|
|
"orderbook" => {
|
|
|
+ trace_stack.set_source("bybit_usdt_swap.bookTicker".to_string());
|
|
|
+
|
|
|
let mut is_update = false;
|
|
|
let data_type = response.data_type.clone();
|
|
|
if data_type == "delta" {
|
|
|
is_update = true;
|
|
|
}
|
|
|
- let mut depth = ExchangeStructHandler::book_ticker_handle(BybitSwap, &response, ct_val);
|
|
|
+ let mut depth = ExchangeStructHandler::book_ticker_handle(BybitSwap, &response, mul);
|
|
|
// 是增量更新
|
|
|
if is_update {
|
|
|
update_order_book(depth_asks, depth_bids, depth.asks, depth.bids);
|
|
|
@@ -134,7 +138,19 @@ async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, resp
|
|
|
};
|
|
|
trace_stack.on_after_format();
|
|
|
|
|
|
- on_depth(core_arc_clone, &response.label, &mut trace_stack, &result_depth).await;
|
|
|
+ on_depth(core_arc, &response.label, &mut trace_stack, &result_depth).await;
|
|
|
+ }
|
|
|
+ "trade" => {
|
|
|
+ trace_stack.set_source("bybit_usdt_swap.trade".to_string());
|
|
|
+
|
|
|
+ let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BybitSwap, response, mul);
|
|
|
+ trace_stack.on_after_format();
|
|
|
+
|
|
|
+ for trade in trades.iter_mut() {
|
|
|
+ let core_arc_clone = core_arc.clone();
|
|
|
+
|
|
|
+ on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
|
|
|
+ }
|
|
|
}
|
|
|
_ => {
|
|
|
error!("未知推送类型");
|