Explorar el Código

okx 逻辑接入

JiahengHe hace 1 año
padre
commit
c143249cc7
Se han modificado 2 ficheros con 126 adiciones y 14 borrados
  1. 8 0
      strategy/src/model.rs
  2. 118 14
      strategy/src/okx_usdt_swap.rs

+ 8 - 0
strategy/src/model.rs

@@ -107,6 +107,14 @@ pub struct OriginalTradeGa {
     pub price: Decimal
 }
 
+#[derive(Serialize, Deserialize)]
+pub struct OriginalTradeOK {
+    // 数量
+    pub sz: Decimal,
+    // 价格
+    pub px: Decimal
+}
+
 #[allow(non_snake_case)]
 #[derive(Serialize, Deserialize, Debug)]
 pub struct OriginalTicker {

+ 118 - 14
strategy/src/okx_usdt_swap.rs

@@ -2,11 +2,14 @@ use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use futures_util::StreamExt;
-// use rust_decimal::Decimal;
+use rust_decimal::Decimal;
 use tokio::sync::Mutex;
-use tracing::info;
 use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
-// use exchanges::response_base::ResponseData;
+use exchanges::response_base::ResponseData;
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::OkxSwap;
+use crate::exchange_disguise::on_special_depth;
+use crate::model::{OrderInfo, OriginalTradeOK};
 use crate::quant::Quant;
 
 pub async fn okex_swap_run(bool_v1: Arc<AtomicBool>,
@@ -40,11 +43,22 @@ pub async fn okex_swap_run(bool_v1: Arc<AtomicBool>,
                                    write_rx_public,
                                    read_tx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
     });
+    // 消费数据
+    let bot_arc_clone = _quant_arc.clone();
     // 接收public数据
     tokio::spawn(async move {
+        // ticker
+        let mut update_flag_u = Decimal::ZERO;
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+
         loop {
             if let Some(public_data) = read_rx_public.next().await {
-                info!(?public_data);
+                on_public_data(bot_arc_clone.clone(),
+                               &mut update_flag_u,
+                               &mut max_buy,
+                               &mut min_sell,
+                               public_data).await;
             }
         }
     });
@@ -74,26 +88,116 @@ pub async fn okex_swap_run(bool_v1: Arc<AtomicBool>,
                                 read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         });
 
+        // 消费数据
+        let bot_arc_clone = _quant_arc.clone();
         // 接收private信息
         tokio::spawn(async move {
+            let ct_val = _quant_arc.clone().lock().await.platform_rest.get_self_market().ct_val;
+            let run_symbol = symbols.clone()[0].clone();
             loop {
                 if let Some(private_data) = read_rx_private.next().await {
-                    info!(?private_data);
+                    on_private_data(bot_arc_clone.clone(),
+                                    ct_val,
+                                    private_data,
+                                    run_symbol.clone()).await;
                 }
             }
         });
     }
 }
 
-// async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
-//                  update_flag_u: &mut Decimal,
-//                  multiplier: Decimal,
-//                  run_symbol: String,
-//                  max_buy: &mut Decimal,
-//                  min_sell: &mut Decimal,
-//                  data: ResponseData) {
-//
-// }
+async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
+    let mut trace_stack = TraceStack::default();
+
+    trace_stack.on_after_network(data.time);
+    trace_stack.on_before_quant();
+
+    if data.code != "200".to_string() {
+        return;
+    }
+    if data.channel == "orders" {
+        trace_stack.on_before_format();
+        let orders = standard::handle_info::HandleSwapInfo::handle_order(OkxSwap, data.clone(), ct_val);
+        trace_stack.on_after_format();
+        let mut order_infos:Vec<OrderInfo> = Vec::new();
+        for order in orders.order {
+            if order.status == "NULL" {
+                continue;
+            }
+            let order_info = OrderInfo {
+                symbol: "".to_string(),
+                amount: order.amount.abs(),
+                side: "".to_string(),
+                price: order.price,
+                client_id: order.custom_id,
+                filled_price: order.avg_price,
+                filled: order.deal_amount.abs(),
+                order_id: order.id,
+                local_time: 0,
+                create_time: 0,
+                status: order.status,
+                fee: Default::default(),
+                trace_stack: Default::default(),
+            };
+            order_infos.push(order_info);
+        }
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_order(order_infos, trace_stack);
+        }
+    } else if data.channel == "balance_and_position" {
+        let positions = standard::handle_info::HandleSwapInfo::handle_position(OkxSwap,data, ct_val);
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_position(positions);
+        }
+    } else if data.channel == "account" {
+        let account = standard::handle_info::HandleSwapInfo::handle_account_info(OkxSwap, data.clone(), run_symbol.clone());
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_equity(account);
+        }
+    }
+}
+
+async fn on_public_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
+    let mut trace_stack = TraceStack::default();
+    trace_stack.on_after_network(data.time);
+    trace_stack.on_before_quant();
+
+    if data.code != "200".to_string() {
+        return;
+    }
+    if data.channel == "tickers" {
+        trace_stack.on_before_format();
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(OkxSwap, data.clone());
+        trace_stack.on_after_format();
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+    } else if data.channel == "trades" {
+        let mut quant = bot_arc_clone.lock().await;
+        let str = data.label.clone();
+        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
+            *max_buy = Decimal::ZERO;
+            *min_sell = Decimal::ZERO;
+            quant.is_update.remove(str.as_str());
+        }
+        let trades: Vec<OriginalTradeOK> = serde_json::from_str(data.data.as_str()).unwrap();
+        for trade in trades {
+            if trade.px > *max_buy || *max_buy == Decimal::ZERO{
+                *max_buy = trade.px
+            }
+            if trade.px < *min_sell || *min_sell == Decimal::ZERO{
+                *min_sell = trade.px
+            }
+        }
+        quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+    } else if data.channel == "books" {
+        trace_stack.on_before_format();
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(OkxSwap, data.clone());
+        trace_stack.on_after_format();
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+    }
+}
 
 fn parse_btree_map_to_okx_swap_login(exchange_params: BTreeMap<String, String>) -> OkxSwapLogin {
     OkxSwapLogin {