|
|
@@ -0,0 +1,208 @@
|
|
|
+use std::collections::BTreeMap;
|
|
|
+use std::sync::Arc;
|
|
|
+use std::sync::atomic::AtomicBool;
|
|
|
+use futures_util::StreamExt;
|
|
|
+use rust_decimal::Decimal;
|
|
|
+use tokio::sync::Mutex;
|
|
|
+use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
|
|
|
+use exchanges::response_base::ResponseData;
|
|
|
+use global::trace_stack::TraceStack;
|
|
|
+use standard::exchange::ExchangeEnum::OkxSwap;
|
|
|
+use crate::exchange_disguise::on_special_depth;
|
|
|
+use crate::model::{OrderInfo, OriginalTradeOK};
|
|
|
+use crate::quant::Quant;
|
|
|
+
|
|
|
+pub async fn okex_swap_run(bool_v1: Arc<AtomicBool>,
|
|
|
+ is_trade: bool,
|
|
|
+ _quant_arc: Arc<Mutex<Quant>>,
|
|
|
+ name: String,
|
|
|
+ symbols: Vec<String>,
|
|
|
+ is_colo: bool,
|
|
|
+ exchange_params: BTreeMap<String, String>) {
|
|
|
+ // 启动公共频道
|
|
|
+ let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
|
|
|
+ let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
|
|
|
+
|
|
|
+ let mut ws_public = OkxSwapWs::new_label(name.clone(), is_colo, None, OkxSwapWsType::Public);
|
|
|
+ ws_public.set_symbols(symbols.clone());
|
|
|
+ if is_trade {
|
|
|
+ ws_public.set_subscribe(vec![
|
|
|
+ OkxSwapSubscribeType::PuBooks5
|
|
|
+ ])
|
|
|
+ } else {
|
|
|
+ ws_public.set_subscribe(vec![
|
|
|
+ OkxSwapSubscribeType::PuBooks50L2tbt
|
|
|
+ ])
|
|
|
+ }
|
|
|
+ // 挂起公共ws
|
|
|
+ let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
|
|
|
+ let bool_clone_public = Arc::clone(&bool_v1);
|
|
|
+ tokio::spawn(async move {
|
|
|
+ ws_public.ws_connect_async(bool_clone_public,
|
|
|
+ &write_tx_am_public,
|
|
|
+ write_rx_public,
|
|
|
+ read_tx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
+ });
|
|
|
+ // 消费数据
|
|
|
+ let bot_arc_clone = _quant_arc.clone();
|
|
|
+ // 接收public数据
|
|
|
+ tokio::spawn(async move {
|
|
|
+ // ticker
|
|
|
+ let mut update_flag_u = Decimal::ZERO;
|
|
|
+ let mut max_buy = Decimal::ZERO;
|
|
|
+ let mut min_sell = Decimal::ZERO;
|
|
|
+
|
|
|
+ loop {
|
|
|
+ if let Some(public_data) = read_rx_public.next().await {
|
|
|
+ on_public_data(bot_arc_clone.clone(),
|
|
|
+ &mut update_flag_u,
|
|
|
+ &mut max_buy,
|
|
|
+ &mut min_sell,
|
|
|
+ public_data).await;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // 交易交易所需要启动私有ws
|
|
|
+ if is_trade {
|
|
|
+ let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
|
|
|
+ let (read_tx_private, mut read_rx_private) = futures_channel::mpsc::unbounded();
|
|
|
+ let auth = Some(parse_btree_map_to_okx_swap_login(exchange_params));
|
|
|
+
|
|
|
+ let mut ws_private = OkxSwapWs::new_label(name.clone(), is_colo, auth, OkxSwapWsType::Private);
|
|
|
+ ws_private.set_symbols(symbols.clone());
|
|
|
+ ws_private.set_subscribe(vec![
|
|
|
+ OkxSwapSubscribeType::PrBalanceAndPosition,
|
|
|
+ OkxSwapSubscribeType::PrAccount("USDT".to_string()),
|
|
|
+ OkxSwapSubscribeType::PrOrders
|
|
|
+ ]);
|
|
|
+
|
|
|
+
|
|
|
+ // 挂起私有ws
|
|
|
+ let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
|
|
|
+ let bool_clone_private = Arc::clone(&bool_v1);
|
|
|
+ tokio::spawn(async move {
|
|
|
+ ws_private.ws_connect_async(bool_clone_private,
|
|
|
+ &write_tx_am_private,
|
|
|
+ write_rx_private,
|
|
|
+ read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
+ });
|
|
|
+
|
|
|
+ // 消费数据
|
|
|
+ let bot_arc_clone = _quant_arc.clone();
|
|
|
+ // 接收private信息
|
|
|
+ tokio::spawn(async move {
|
|
|
+ let ct_val = _quant_arc.clone().lock().await.platform_rest.get_self_market().ct_val;
|
|
|
+ let run_symbol = symbols.clone()[0].clone();
|
|
|
+ loop {
|
|
|
+ if let Some(private_data) = read_rx_private.next().await {
|
|
|
+ on_private_data(bot_arc_clone.clone(),
|
|
|
+ ct_val,
|
|
|
+ private_data,
|
|
|
+ run_symbol.clone()).await;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
|
|
|
+ let mut trace_stack = TraceStack::default();
|
|
|
+
|
|
|
+ trace_stack.on_after_network(data.time);
|
|
|
+ trace_stack.on_before_quant();
|
|
|
+
|
|
|
+ if data.code != "200".to_string() {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if data.channel == "orders" {
|
|
|
+ trace_stack.on_before_format();
|
|
|
+ let orders = standard::handle_info::HandleSwapInfo::handle_order(OkxSwap, data.clone(), ct_val);
|
|
|
+ trace_stack.on_after_format();
|
|
|
+ let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
+ for order in orders.order {
|
|
|
+ if order.status == "NULL" {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ let order_info = OrderInfo {
|
|
|
+ symbol: "".to_string(),
|
|
|
+ amount: order.amount.abs(),
|
|
|
+ side: "".to_string(),
|
|
|
+ price: order.price,
|
|
|
+ client_id: order.custom_id,
|
|
|
+ filled_price: order.avg_price,
|
|
|
+ filled: order.deal_amount.abs(),
|
|
|
+ order_id: order.id,
|
|
|
+ local_time: 0,
|
|
|
+ create_time: 0,
|
|
|
+ status: order.status,
|
|
|
+ fee: Default::default(),
|
|
|
+ trace_stack: Default::default(),
|
|
|
+ };
|
|
|
+ order_infos.push(order_info);
|
|
|
+ }
|
|
|
+ {
|
|
|
+ let mut quant = bot_arc_clone.lock().await;
|
|
|
+ quant.update_order(order_infos, trace_stack);
|
|
|
+ }
|
|
|
+ } else if data.channel == "balance_and_position" {
|
|
|
+ let positions = standard::handle_info::HandleSwapInfo::handle_position(OkxSwap,data, ct_val);
|
|
|
+ {
|
|
|
+ let mut quant = bot_arc_clone.lock().await;
|
|
|
+ quant.update_position(positions);
|
|
|
+ }
|
|
|
+ } else if data.channel == "account" {
|
|
|
+ let account = standard::handle_info::HandleSwapInfo::handle_account_info(OkxSwap, data.clone(), run_symbol.clone());
|
|
|
+ {
|
|
|
+ let mut quant = bot_arc_clone.lock().await;
|
|
|
+ quant.update_equity(account);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+async fn on_public_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
|
|
|
+ let mut trace_stack = TraceStack::default();
|
|
|
+ trace_stack.on_after_network(data.time);
|
|
|
+ trace_stack.on_before_quant();
|
|
|
+
|
|
|
+ if data.code != "200".to_string() {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if data.channel == "tickers" {
|
|
|
+ trace_stack.on_before_format();
|
|
|
+ let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(OkxSwap, data.clone());
|
|
|
+ trace_stack.on_after_format();
|
|
|
+ on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
|
|
|
+ } else if data.channel == "trades" {
|
|
|
+ let mut quant = bot_arc_clone.lock().await;
|
|
|
+ let str = data.label.clone();
|
|
|
+ if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
|
|
|
+ *max_buy = Decimal::ZERO;
|
|
|
+ *min_sell = Decimal::ZERO;
|
|
|
+ quant.is_update.remove(str.as_str());
|
|
|
+ }
|
|
|
+ let trades: Vec<OriginalTradeOK> = serde_json::from_str(data.data.as_str()).unwrap();
|
|
|
+ for trade in trades {
|
|
|
+ if trade.px > *max_buy || *max_buy == Decimal::ZERO{
|
|
|
+ *max_buy = trade.px
|
|
|
+ }
|
|
|
+ if trade.px < *min_sell || *min_sell == Decimal::ZERO{
|
|
|
+ *min_sell = trade.px
|
|
|
+ }
|
|
|
+ }
|
|
|
+ quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
|
|
|
+ } else if data.channel == "books5" {
|
|
|
+ trace_stack.on_before_format();
|
|
|
+ let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(OkxSwap, data.clone());
|
|
|
+ trace_stack.on_after_format();
|
|
|
+ on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+fn parse_btree_map_to_okx_swap_login(exchange_params: BTreeMap<String, String>) -> OkxSwapLogin {
|
|
|
+ OkxSwapLogin {
|
|
|
+ api_key: exchange_params.get("access_key").unwrap().clone(),
|
|
|
+ secret_key: exchange_params.get("secret_key").unwrap().clone(),
|
|
|
+ passphrase: exchange_params.get("pass_key").unwrap().clone(),
|
|
|
+ }
|
|
|
+}
|