Преглед на файлове

修改bybit获取ticker格式化信息

gepangpang преди 1 година
родител
ревизия
dce40ed880

+ 2 - 1
standard/Cargo.toml

@@ -19,4 +19,5 @@ futures = "0.3"
 tracing = "0.1"
 tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
 toml = "0.5.11"
-futures-channel = "0.3.29"
+futures-channel = "0.3.29"
+lazy_static = "1.4.0"

+ 51 - 14
standard/src/bybit_swap_handle.rs

@@ -1,8 +1,11 @@
 use std::str::FromStr;
+use std::sync::Arc;
+use lazy_static::lazy_static;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
 use rust_decimal_macros::dec;
 use serde_json::{from_value, Value};
+use tokio::sync::Mutex;
 use tokio::time::Instant;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
@@ -15,13 +18,13 @@ pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account
 }
 
 pub fn format_account_info(data: Vec<Value>, symbol: &String) -> Account {
-    let account = data.iter().find(| &item | item["accountType"] == "UNIFIED");
+    let account = data.iter().find(|&item| item["accountType"] == "UNIFIED");
     match account {
         None => {
             error!("Bybit:格式化统一账户信息错误!\nformat_account_info: data={:?}", data);
             panic!("Bybit:格式化统一账户信息错误!\nformat_account_info: data={:?}", data)
         }
-        Some(val) =>{
+        Some(val) => {
             let arr: Vec<Value> = from_value(val["coin"].clone()).unwrap();
             let upper_str = symbol.to_uppercase();
             let symbol_array: Vec<&str> = upper_str.split("_").collect();
@@ -64,7 +67,7 @@ pub fn format_position_item(position: &Value, ct_val: &Decimal) -> Position {
             panic!("bybit_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position)
         }
     };
-    let symbol_mapper =  position["symbol"].as_str().unwrap().to_string();
+    let symbol_mapper = position["symbol"].as_str().unwrap().to_string();
     let currency = "USDT";
     let coin = &symbol_mapper[..symbol_mapper.find(currency).unwrap_or(0)];
     let size_str: String = from_value(position["size"].clone()).unwrap();
@@ -81,7 +84,7 @@ pub fn format_position_item(position: &Value, ct_val: &Decimal) -> Position {
         _ => {}
     }
     Position {
-        symbol: format!{"{}_{}", coin, currency},
+        symbol: format! {"{}_{}", coin, currency},
         margin_level: Decimal::from_str(position["leverage"].as_str().unwrap()).unwrap(),
         amount,
         frozen_amount: Decimal::ZERO,
@@ -137,23 +140,57 @@ pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
     return rst_order;
 }
 
+
+lazy_static! {
+    static ref DEPTH: Arc<Mutex<Vec<Decimal>>> = Arc::new(Mutex::new(vec![dec!(0),dec!(0),dec!(0),dec!(0)]));
+    static ref TICKER: Arc<Mutex<SpecialTicker>> = Arc::new(Mutex::new(SpecialTicker::new()));
+}
+
 // 处理特殊Ticket信息
-pub fn handle_ticker(res_data: &ResponseData) -> SpecialDepth {
-    let ap = Decimal::from_str(res_data.data["ask1Price"].as_str().unwrap()).unwrap();
-    let bp = Decimal::from_str(res_data.data["bid1Price"].as_str().unwrap()).unwrap();
-    let aq = Decimal::from_str(res_data.data["ask1Size"].as_str().unwrap()).unwrap();
-    let bq = Decimal::from_str(res_data.data["bid1Size"].as_str().unwrap()).unwrap();
-    let mp = (bp + ap) * dec!(0.5);
+pub async fn handle_ticker(res_data: &ResponseData) -> SpecialDepth {
+    let mut depth = DEPTH.lock().await;
+    let mut ticker = TICKER.lock().await;
 
     let t = Decimal::from_i64(res_data.data["ts"].as_i64().unwrap()).unwrap();
     let create_at = t.to_i64().unwrap();
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at: 0 };
-    let depth_info = vec![bp, bq, ap, aq];
+    match res_data.data["ask1Price"].as_str() {
+        Some(str) => {
+            let ap = Decimal::from_str(str).unwrap();
+            ticker.sell = ap;
+            depth[2] = ap;
+        }
+        _ => {}
+    }
+    match res_data.data["bid1Price"].as_str() {
+        Some(str) => {
+            let bp = Decimal::from_str(str).unwrap();
+            ticker.buy = bp;
+            depth[0] = bp;
+        }
+        _ => {}
+    }
+    match res_data.data["ask1Size"].as_str() {
+        Some(str) => {
+            depth[3] = Decimal::from_str(str).unwrap();
+        }
+        _ => {}
+    }
+    match res_data.data["bid1Size"].as_str() {
+        Some(str) => {
+            depth[1] = Decimal::from_str(str).unwrap();
+        }
+        _ => {}
+    }
+
+    ticker.mid_price = (ticker.buy + ticker.sell) * dec!(0.5);
+    ticker.t = t;
+    ticker.create_at = 0;
+
     SpecialDepth {
         name: res_data.label.clone(),
-        depth: depth_info,
-        ticker: ticker_info,
+        depth: depth.clone(),
+        ticker: ticker.clone(),
         t,
         create_at,
     }

+ 2 - 2
standard/src/handle_info.rs

@@ -64,7 +64,7 @@ impl HandleSwapInfo {
         }
     }
     // 处理特殊Ticket信息
-    pub fn handle_book_ticker(exchange: ExchangeEnum, res_data: &ResponseData) -> SpecialDepth {
+    pub async fn handle_book_ticker(exchange: ExchangeEnum, res_data: &ResponseData) -> SpecialDepth {
         match exchange {
             // ExchangeEnum::BinanceSpot => {
             //     binance_spot_handle::handle_special_ticker(res_data)
@@ -96,7 +96,7 @@ impl HandleSwapInfo {
                 // bitget_swap_handle::handle_special_ticker(res_data)
             },
             ExchangeEnum::BybitSwap => {
-                bybit_swap_handle::handle_ticker(res_data)
+                bybit_swap_handle::handle_ticker(res_data).await
             }
             ExchangeEnum::CoinexSwap => {
                 coinex_swap_handle::handle_ticker(res_data)

+ 1 - 1
strategy/src/binance_usdt_swap.rs

@@ -83,7 +83,7 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
         "bookTicker" => {
             trace_stack.set_source("binance_usdt_swap.bookTicker".to_string());
             // 将ticker数据转换为模拟深度
-            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BinanceSwap, &response);
+            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BinanceSwap, &response).await;
             trace_stack.on_after_format();
 
             on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;

+ 1 - 1
strategy/src/bybit_usdt_swap.rs

@@ -154,7 +154,7 @@ async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>, update_flag_u: &mut De
             trace_stack.set_source("bybit_usdt_swap.tickers".to_string());
 
             // 处理ticker信息
-            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BybitSwap, &response);
+            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BybitSwap, &response).await;
             trace_stack.on_after_format();
 
             on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;

+ 1 - 1
strategy/src/gate_swap.rs

@@ -111,7 +111,7 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
         "futures.book_ticker" => {
             trace_stack.set_source("gate_usdt_swap.book_ticker".to_string());
             // 将ticker数据转换为模拟深度
-            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(GateSwap, &response);
+            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(GateSwap, &response).await;
             trace_stack.on_after_format();
 
             on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;

+ 1 - 1
strategy/src/kucoin_swap.rs

@@ -122,7 +122,7 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
         "tickerV2" => {
             trace_stack.set_source("kucoin_swap.tickerV2".to_string());
 
-            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(KucoinSwap, &response);
+            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(KucoinSwap, &response).await;
             trace_stack.on_before_network(special_depth.create_at.clone());
 
             on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await