Prechádzať zdrojové kódy

ticker数据格式化完毕,兼容了bybit的部分推送逻辑。

skyffire 1 rok pred
rodič
commit
8762a445ca

+ 2 - 1
standard/Cargo.toml

@@ -19,4 +19,5 @@ futures = "0.3"
 tracing = "0.1"
 tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
 toml = "0.5.11"
-futures-channel = "0.3.29"
+futures-channel = "0.3.29"
+lazy_static = "1.4.0"

+ 41 - 20
standard/src/bybit_swap_handle.rs

@@ -1,8 +1,11 @@
 use std::str::FromStr;
+use std::sync::Arc;
 use chrono::Utc;
-use rust_decimal::Decimal;
+use lazy_static::lazy_static;
+use rust_decimal::{Decimal};
 use rust_decimal::prelude::FromPrimitive;
 use serde_json::{from_value, Value};
+use tokio::sync::Mutex;
 use tokio::time::Instant;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
@@ -194,27 +197,45 @@ pub fn handle_book_ticker(res_data: &ResponseData, mul: &Decimal) -> Depth {
     }
 }
 
-pub fn handle_ticker(res_data: &ResponseData) -> Ticker {
-    let time = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
-    let high = Decimal::from_str(res_data.data["highPrice24h"].as_str().unwrap()).unwrap();
-    let low = Decimal::from_str(res_data.data["lowPrice24h"].as_str().unwrap()).unwrap();
-    let sell = Decimal::from_str(res_data.data["ask1Price"].as_str().unwrap()).unwrap();
-    let buy = Decimal::from_str(res_data.data["bid1Price"].as_str().unwrap()).unwrap();
-    let last = Decimal::from_str(res_data.data["lastPrice"].as_str().unwrap()).unwrap();
-    let volume = Decimal::from_str(res_data.data["volume24h"].as_str().unwrap()).unwrap();
-    let open_interest = Decimal::from_str(res_data.data["openInterest"].as_str().unwrap()).unwrap();
+lazy_static! {
+    static ref TICKER: Arc<Mutex<Ticker>> = Arc::new(Mutex::new(Ticker::new()));
+}
+
+pub async fn handle_ticker(res_data: &ResponseData) -> Ticker {
+    let mut ticker = TICKER.lock().await;
+
+    ticker.time = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
+    ticker.high = match res_data.data["highPrice24h"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.high }
+    };
+    ticker.low = match res_data.data["lowPrice24h"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.low }
+    };
+    ticker.sell = match res_data.data["ask1Price"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.sell }
+    };
+    ticker.buy = match res_data.data["bid1Price"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.buy }
+    };
+    ticker.last = match res_data.data["lastPrice"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.last }
+    };
+    ticker.volume = match res_data.data["volume24h"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.volume }
+    };
+    ticker.open_interest = match res_data.data["openInterest"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.open_interest }
+    };
     // let s = res_data.data["symbol"].as_str().unwrap().replace("USDT", "_USDT");
 
-    Ticker {
-        time,
-        high,
-        low,
-        sell,
-        buy,
-        last,
-        volume,
-        open_interest,
-    }
+    ticker.clone()
 }
 
 pub fn format_depth_items(value: Value, mul: &Decimal) -> Vec<OrderBook> {

+ 2 - 2
standard/src/exchange_struct_handler.rs

@@ -230,10 +230,10 @@ impl ExchangeStructHandler {
         }
     }
     // 处理ticker信息
-    pub fn ticker_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Ticker {
+    pub async fn ticker_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Ticker {
         match exchange {
             ExchangeEnum::BybitSwap => {
-                bybit_swap_handle::handle_ticker(res_data)
+                bybit_swap_handle::handle_ticker(res_data).await
             },
             _ => {
                 error!("未找到该交易所!trades_handle: {:?}", exchange);

+ 6 - 1
strategy/src/bybit_usdt_swap.rs

@@ -28,7 +28,8 @@ pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         let mut ws = BybitSwapWs::new_label(name, is_colo, None, BybitSwapWsType::Public);
         ws.set_subscribe(vec![
             BybitSwapSubscribeType::PuOrderBook1,
-            BybitSwapSubscribeType::PuTrade
+            BybitSwapSubscribeType::PuTrade,
+            BybitSwapSubscribeType::PuTickers
         ]);
 
         // 读取数据
@@ -158,6 +159,10 @@ async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &Re
                 on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
             }
         }
+        "tickers" => {
+            let ticker = ExchangeStructHandler::ticker_handle(BybitSwap, response).await;
+            info!(?ticker)
+        }
         _ => {
             error!("未知推送类型");
             error!(?response);

+ 1 - 1
strategy/src/strategy.rs

@@ -1110,7 +1110,7 @@ impl Strategy {
         }
 
         self._cancel_open(&mut command, local_orders);              // 撤单命令处理
-        self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
+        // self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
         self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
         self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter
         self._check_request_limit(&mut command);                    // 限制频率,移除不合规则之订单,是一个filter