|
|
@@ -1,27 +1,34 @@
|
|
|
use std::collections::BTreeMap;
|
|
|
use std::sync::Arc;
|
|
|
-use std::sync::atomic::AtomicBool;
|
|
|
+use std::sync::atomic::{AtomicBool};
|
|
|
use std::time::Duration;
|
|
|
+use rust_decimal::Decimal;
|
|
|
use tokio::{spawn, time};
|
|
|
use tokio::sync::Mutex;
|
|
|
+use tokio::time::Instant;
|
|
|
+use tracing::info;
|
|
|
use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
|
|
|
+use exchanges::response_base::ResponseData;
|
|
|
+use global::trace_stack::TraceStack;
|
|
|
+use standard::exchange::ExchangeEnum::BybitSwap;
|
|
|
use crate::core::Core;
|
|
|
+use crate::model::OrderInfo;
|
|
|
|
|
|
// 1交易、0参考 bybit 合约 启动
|
|
|
-pub async fn bybit_swap_run(_is_shutdown_arc: Arc<AtomicBool>,
|
|
|
- is_trade: bool,
|
|
|
- _core_arc: Arc<Mutex<Core>>,
|
|
|
- name: String,
|
|
|
- symbols: Vec<String>,
|
|
|
- is_colo: bool,
|
|
|
- exchange_params: BTreeMap<String, String>) {
|
|
|
+pub async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
|
|
|
+ is_trade: bool,
|
|
|
+ core_arc: Arc<Mutex<Core>>,
|
|
|
+ name: String,
|
|
|
+ symbols: Vec<String>,
|
|
|
+ is_colo: bool,
|
|
|
+ exchange_params: BTreeMap<String, String>) {
|
|
|
// 启动公共频道
|
|
|
- // let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
|
|
|
+ let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
|
|
|
|
|
|
let mut ws_public = BybitSwapWs::new_label(name.clone(), is_colo, None, BybitSwapWsType::Public);
|
|
|
ws_public.set_symbols(symbols.clone());
|
|
|
ws_public.set_subscribe(vec![
|
|
|
- BybitSwapSubscribeType::PuOrderBook50
|
|
|
+ BybitSwapSubscribeType::PuTickers
|
|
|
]);
|
|
|
if is_trade {
|
|
|
ws_public.set_subscribe(vec![
|
|
|
@@ -29,32 +36,32 @@ pub async fn bybit_swap_run(_is_shutdown_arc: Arc<AtomicBool>,
|
|
|
]);
|
|
|
}
|
|
|
// 挂起公共ws
|
|
|
- // let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
|
|
|
- // let bool_clone_public = Arc::clone(&is_shutdown_arc);
|
|
|
+ let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
|
|
|
+ let is_shutdown_clone_public = Arc::clone(&is_shutdown_arc);
|
|
|
+ let core_arc_clone_public = core_arc.clone();
|
|
|
spawn(async move {
|
|
|
- // ws_public.ws_connect_async(bool_clone_public,
|
|
|
- // &write_tx_am_public,
|
|
|
- // write_rx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
- });
|
|
|
- // 消费数据
|
|
|
- // let core_arc_clone = _core_arc.clone();
|
|
|
- // 接收public数据
|
|
|
- spawn(async move {
|
|
|
- // ticker
|
|
|
- // let mut update_flag_u = Decimal::ZERO;
|
|
|
- // let mut max_buy = Decimal::ZERO;
|
|
|
- // let mut min_sell = Decimal::ZERO;
|
|
|
- // let mut depth_asks: Vec<MarketOrder> = Vec::new();
|
|
|
- // let mut depth_bids: Vec<MarketOrder> = Vec::new();
|
|
|
+ // 消费数据
|
|
|
+ let mut update_flag_u = Decimal::ZERO;
|
|
|
|
|
|
- loop {
|
|
|
+ let fun = move |data: ResponseData| {
|
|
|
+ let core_arc = core_arc_clone_public.clone();
|
|
|
|
|
|
- }
|
|
|
+ async move {
|
|
|
+ on_public_data(core_arc,
|
|
|
+ &mut update_flag_u,
|
|
|
+ data).await;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ ws_public.ws_connect_async(is_shutdown_clone_public,
|
|
|
+ fun,
|
|
|
+ &write_tx_am_public,
|
|
|
+ write_rx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
});
|
|
|
let trade_symbols = symbols.clone();
|
|
|
// 交易交易所需要启动私有ws
|
|
|
if is_trade {
|
|
|
- // let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
|
|
|
+ let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
|
|
|
let auth = Some(parse_btree_map_to_bybit_swap_login(exchange_params));
|
|
|
|
|
|
let mut ws_private = BybitSwapWs::new_label(name.clone(), is_colo, auth, BybitSwapWsType::Private);
|
|
|
@@ -65,35 +72,31 @@ pub async fn bybit_swap_run(_is_shutdown_arc: Arc<AtomicBool>,
|
|
|
BybitSwapSubscribeType::PrWallet
|
|
|
]);
|
|
|
|
|
|
-
|
|
|
// 挂起私有ws
|
|
|
- // let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
|
|
|
- // let bool_clone_private = Arc::clone(&is_shutdown_arc);
|
|
|
+ let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
|
|
|
+ let is_shutdown_clone_private = Arc::clone(&is_shutdown_arc);
|
|
|
+ let core_arc_clone_private = core_arc.clone();
|
|
|
spawn(async move {
|
|
|
- // ws_private.ws_connect_async(bool_clone_private,
|
|
|
- // &write_tx_am_private,
|
|
|
- // write_rx_private,
|
|
|
- // read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
- });
|
|
|
+ let ct_val = core_arc_clone_private.lock().await.platform_rest.get_self_market().ct_val;
|
|
|
+ let run_symbol = symbols.clone()[0].clone();
|
|
|
|
|
|
- // 消费数据
|
|
|
- // let core_arc_clone = _core_arc.clone();
|
|
|
- // 接收private信息
|
|
|
- spawn(async move {
|
|
|
- // let ct_val = core_arc_clone.clone().lock().await.platform_rest.get_self_market().ct_val;
|
|
|
- // let run_symbol = symbols.clone()[0].clone();
|
|
|
- // loop {
|
|
|
- // if let Some(private_data) = read_rx_private.next().await {
|
|
|
- // // on_private_data(core_arc_clone.clone(),
|
|
|
- // // ct_val,
|
|
|
- // // private_data,
|
|
|
- // // run_symbol.clone()).await;
|
|
|
- // }
|
|
|
- // }
|
|
|
+ let fun = move |data: ResponseData| {
|
|
|
+ let core_arc_clone = core_arc_clone_private.clone();
|
|
|
+ let run_symbol_clone = run_symbol.clone();
|
|
|
+
|
|
|
+ async move {
|
|
|
+ on_private_data(core_arc_clone.clone(), &ct_val, data, &run_symbol_clone).await;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ ws_private.ws_connect_async(is_shutdown_clone_private,
|
|
|
+ fun,
|
|
|
+ &write_tx_am_private,
|
|
|
+ write_rx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
});
|
|
|
|
|
|
// 定时获取仓位信息
|
|
|
- let position_core_clone = _core_arc.clone();
|
|
|
+ let position_core_clone = core_arc.clone();
|
|
|
spawn(async move {
|
|
|
let mut interval = time::interval(Duration::from_secs(30));
|
|
|
loop {
|
|
|
@@ -107,114 +110,116 @@ pub async fn bybit_swap_run(_is_shutdown_arc: Arc<AtomicBool>,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
|
|
|
-// let mut trace_stack = TraceStack::new(0, Instant::now());
|
|
|
-//
|
|
|
-// trace_stack.on_after_network(data.time);
|
|
|
-// trace_stack.on_before_unlock_core();
|
|
|
-//
|
|
|
-// if data.code != "200".to_string() {
|
|
|
-// return;
|
|
|
-// }
|
|
|
-// if data.channel == "wallet" {
|
|
|
-// let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, data, run_symbol.clone());
|
|
|
-// {
|
|
|
-// let mut core = core_arc_clone.lock().await;
|
|
|
-// core.update_equity(account).await;
|
|
|
-// }
|
|
|
-// } else if data.channel == "order" {
|
|
|
-// trace_stack.on_before_format();
|
|
|
-// let orders = standard::handle_info::HandleSwapInfo::handle_order(BybitSwap, data.clone(), ct_val.clone());
|
|
|
-// trace_stack.on_after_format();
|
|
|
-//
|
|
|
-// let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
-// for order in orders.order {
|
|
|
-// if order.status == "NULL" {
|
|
|
-// continue;
|
|
|
-// }
|
|
|
-// let order_info = OrderInfo {
|
|
|
-// symbol: "".to_string(),
|
|
|
-// amount: order.amount.abs(),
|
|
|
-// side: "".to_string(),
|
|
|
-// price: order.price,
|
|
|
-// client_id: order.custom_id,
|
|
|
-// filled_price: order.avg_price,
|
|
|
-// filled: order.deal_amount.abs(),
|
|
|
-// order_id: order.id,
|
|
|
-// local_time: 0,
|
|
|
-// create_time: 0,
|
|
|
-// status: order.status,
|
|
|
-// fee: Default::default(),
|
|
|
-// trace_stack: Default::default(),
|
|
|
-// };
|
|
|
-// order_infos.push(order_info);
|
|
|
-// }
|
|
|
-//
|
|
|
-// {
|
|
|
-// let mut core = core_arc_clone.lock().await;
|
|
|
-// core.update_order(order_infos, trace_stack);
|
|
|
-// }
|
|
|
-// } else if data.channel == "position" {
|
|
|
-// let positions = standard::handle_info::HandleSwapInfo::handle_position(BybitSwap,data, ct_val.clone());
|
|
|
-// {
|
|
|
-// let mut core = core_arc_clone.lock().await;
|
|
|
-// core.update_position(positions).await;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData, depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>) {
|
|
|
-// let mut trace_stack = TraceStack::new(0, Instant::now());
|
|
|
-// trace_stack.on_after_network(data.time);
|
|
|
-// trace_stack.on_before_unlock_core();
|
|
|
-//
|
|
|
-// if data.code != "200".to_string() {
|
|
|
-// return;
|
|
|
-// }
|
|
|
-// if data.channel == "orderbook" {
|
|
|
-// let mut is_update = false;
|
|
|
-// let data_type = data.data_type.clone();
|
|
|
-// let label = data.label.clone();
|
|
|
-// if data_type == "delta" {
|
|
|
-// is_update = true;
|
|
|
-// }
|
|
|
-// trace_stack.on_before_format();
|
|
|
-// let mut depth_format: DepthParam = format_depth(BybitSwap, data);
|
|
|
-// // 是增量更新
|
|
|
-// if is_update {
|
|
|
-// update_order_book(depth_asks, depth_bids, depth_format.depth_asks, depth_format.depth_bids);
|
|
|
-// } else { // 全量
|
|
|
-// depth_asks.clear();
|
|
|
-// depth_asks.append(&mut depth_format.depth_asks);
|
|
|
-// depth_bids.clear();
|
|
|
-// depth_bids.append(&mut depth_format.depth_bids);
|
|
|
-//
|
|
|
-// }
|
|
|
-// let depth = make_special_depth(label.clone(), depth_asks, depth_bids, depth_format.t, depth_format.create_at);
|
|
|
-// trace_stack.on_before_network(depth_format.create_at.clone());
|
|
|
-// trace_stack.on_after_format();
|
|
|
-//
|
|
|
-// on_special_depth(core_arc_clone, update_flag_u, label, trace_stack, depth).await;
|
|
|
-// } else if data.channel == "trade" {
|
|
|
-// let mut core = core_arc_clone.lock().await;
|
|
|
-// let str = data.label.clone();
|
|
|
-// if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
|
|
|
-// *max_buy = Decimal::ZERO;
|
|
|
-// *min_sell = Decimal::ZERO;
|
|
|
-// core.is_update.remove(str.as_str());
|
|
|
-// }
|
|
|
-// let trades: Vec<OriginalTradeBy> = serde_json::from_str(data.data.as_str()).unwrap();
|
|
|
-// for trade in trades {
|
|
|
-// if trade.p > *max_buy || *max_buy == Decimal::ZERO{
|
|
|
-// *max_buy = trade.p
|
|
|
-// }
|
|
|
-// if trade.p < *min_sell || *min_sell == Decimal::ZERO{
|
|
|
-// *min_sell = trade.p
|
|
|
-// }
|
|
|
-// }
|
|
|
-// core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
|
|
|
-// }
|
|
|
-// }
|
|
|
+async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>,
|
|
|
+ ct_val: &Decimal,
|
|
|
+ data: ResponseData,
|
|
|
+ run_symbol: &String) {
|
|
|
+ let mut trace_stack = TraceStack::new(data.time, data.ins);
|
|
|
+ trace_stack.on_after_span_line();
|
|
|
+
|
|
|
+ if data.code != "200".to_string() {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if data.channel == "wallet" {
|
|
|
+ let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, &data, run_symbol);
|
|
|
+ {
|
|
|
+ let mut core = core_arc_clone.lock().await;
|
|
|
+ core.update_equity(account).await;
|
|
|
+ }
|
|
|
+ } else if data.channel == "order" {
|
|
|
+ let orders = standard::handle_info::HandleSwapInfo::handle_order(BybitSwap, data.clone(), ct_val.clone());
|
|
|
+ trace_stack.on_after_format();
|
|
|
+
|
|
|
+ let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
+ for order in orders.order {
|
|
|
+ if order.status == "NULL" {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ let order_info = OrderInfo {
|
|
|
+ symbol: "".to_string(),
|
|
|
+ amount: order.amount.abs(),
|
|
|
+ side: "".to_string(),
|
|
|
+ price: order.price,
|
|
|
+ client_id: order.custom_id,
|
|
|
+ filled_price: order.avg_price,
|
|
|
+ filled: order.deal_amount.abs(),
|
|
|
+ order_id: order.id,
|
|
|
+ local_time: 0,
|
|
|
+ create_time: 0,
|
|
|
+ status: order.status,
|
|
|
+ fee: Default::default(),
|
|
|
+ trace_stack: TraceStack::new(0, Instant::now()),
|
|
|
+ };
|
|
|
+ order_infos.push(order_info);
|
|
|
+ }
|
|
|
+
|
|
|
+ {
|
|
|
+ let mut core = core_arc_clone.lock().await;
|
|
|
+ core.update_order(order_infos, trace_stack).await;
|
|
|
+ }
|
|
|
+ } else if data.channel == "position" {
|
|
|
+ let positions = standard::handle_info::HandleSwapInfo::handle_position(BybitSwap, &data, ct_val);
|
|
|
+ {
|
|
|
+ let mut core = core_arc_clone.lock().await;
|
|
|
+ core.update_position(positions).await;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+async fn on_public_data(_core_arc_clone: Arc<Mutex<Core>>,
|
|
|
+ _update_flag_u: &mut Decimal,
|
|
|
+ data: ResponseData) {
|
|
|
+ let mut trace_stack = TraceStack::new(data.time, data.ins);
|
|
|
+ trace_stack.on_after_span_line();
|
|
|
+
|
|
|
+ if data.code != "200".to_string() {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ info!(?data);
|
|
|
+
|
|
|
+ if data.channel == "orderbook" {
|
|
|
+ // let mut is_update = false;
|
|
|
+ // let data_type = data.data_type.clone();
|
|
|
+ // let label = data.label.clone();
|
|
|
+ // if data_type == "delta" {
|
|
|
+ // is_update = true;
|
|
|
+ // }
|
|
|
+ // let mut depth_format: DepthParam = format_depth(BybitSwap, &data);
|
|
|
+ // // 是增量更新
|
|
|
+ // if is_update {
|
|
|
+ // update_order_book(depth_asks, depth_bids, depth_format.depth_asks, depth_format.depth_bids);
|
|
|
+ // } else { // 全量
|
|
|
+ // depth_asks.clear();
|
|
|
+ // depth_asks.append(&mut depth_format.depth_asks);
|
|
|
+ // depth_bids.clear();
|
|
|
+ // depth_bids.append(&mut depth_format.depth_bids);
|
|
|
+ // }
|
|
|
+ // let depth = make_special_depth(label.clone(), depth_asks, depth_bids, depth_format.t, depth_format.create_at);
|
|
|
+ // trace_stack.on_before_network(depth_format.create_at.clone());
|
|
|
+ // trace_stack.on_after_format();
|
|
|
+ //
|
|
|
+ // on_special_depth(core_arc_clone, update_flag_u, &label, &mut trace_stack, &depth).await;
|
|
|
+ } else if data.channel == "trade" {
|
|
|
+ // let mut core = core_arc_clone.lock().await;
|
|
|
+ // let str = data.label.clone();
|
|
|
+ // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
|
|
|
+ // *max_buy = Decimal::ZERO;
|
|
|
+ // *min_sell = Decimal::ZERO;
|
|
|
+ // core.is_update.remove(str.as_str());
|
|
|
+ // }
|
|
|
+ // let trades: Vec<OriginalTradeBy> = serde_json::from_str(data.data.as_str()).unwrap();
|
|
|
+ // for trade in trades {
|
|
|
+ // if trade.p > *max_buy || *max_buy == Decimal::ZERO{
|
|
|
+ // *max_buy = trade.p
|
|
|
+ // }
|
|
|
+ // if trade.p < *min_sell || *min_sell == Decimal::ZERO{
|
|
|
+ // *min_sell = trade.p
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>) -> BybitSwapLogin {
|
|
|
BybitSwapLogin {
|
|
|
@@ -222,6 +227,7 @@ fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>
|
|
|
secret_key: exchange_params.get("secret_key").unwrap().clone(),
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
// fn update_order_book(depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, asks : Vec<MarketOrder>, bids: Vec<MarketOrder>) {
|
|
|
// for i in asks {
|
|
|
// let index_of_value = depth_asks.iter().position(|x| x.price == i.price);
|
|
|
@@ -259,4 +265,4 @@ fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>
|
|
|
// // 限制总长度100
|
|
|
// depth_asks.truncate(100);
|
|
|
// depth_bids.truncate(100);
|
|
|
-// }
|
|
|
+// }
|