ソースを参照

bybit完善(待测)

JiahengHe 1 年間 前
コミット
3d13d3a43a

+ 18 - 3
standard/src/bybit_swap_handle.rs

@@ -1,11 +1,12 @@
 use std::str::FromStr;
 use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
 use serde_json::{from_value, Value};
 use tokio::time::Instant;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
-use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder};
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Depth};
 
 // 处理账号信息
 pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account {
@@ -157,7 +158,21 @@ pub fn format_order_item(order: Value, ct_val: &Decimal) -> Order {
 //     }
 // }
 
-pub fn format_depth_items(value: Value) -> Vec<OrderBook> {
+pub fn handle_book_ticker(res_data: &ResponseData, mul: &Decimal) -> Depth {
+    let asks = format_depth_items(res_data.data["a"].clone(), mul);
+    let bids = format_depth_items(res_data.data["b"].clone(), mul);
+    let t = Decimal::from_i64(res_data.reach_time).unwrap();
+    let s = res_data.data["s"].as_str().unwrap().replace("USDT", "_USDT");
+
+    Depth {
+        time: t,
+        symbol: s.to_string(),
+        asks,
+        bids,
+    }
+}
+
+pub fn format_depth_items(value: Value, mul: &Decimal) -> Vec<OrderBook> {
     let mut depth_items: Vec<OrderBook> = vec![];
     for val in value.as_array().unwrap() {
         let arr = val.as_array().unwrap();
@@ -166,7 +181,7 @@ pub fn format_depth_items(value: Value) -> Vec<OrderBook> {
         depth_items.push(OrderBook {
             price,
             size,
-            value: price * size,
+            value: price * size * mul,
         })
     }
     depth_items

+ 4 - 9
standard/src/exchange_struct_handler.rs

@@ -52,8 +52,8 @@ impl ExchangeStructHandler {
             //     t = Decimal::from_str(res_data.data[0]["ts"].as_str().unwrap()).unwrap();
             // }
             ExchangeEnum::BybitSwap => {
-                depth_asks = bybit_swap_handle::format_depth_items(res_data.data["a"].clone());
-                depth_bids = bybit_swap_handle::format_depth_items(res_data.data["b"].clone());
+                depth_asks = bybit_swap_handle::format_depth_items(res_data.data["a"].clone(), mul);
+                depth_bids = bybit_swap_handle::format_depth_items(res_data.data["b"].clone(), mul);
                 t = Decimal::from_i64(res_data.reach_time).unwrap();
 
                 res_data.data["s"].as_str().unwrap().to_string().replace("USDT", "_USDT")
@@ -168,13 +168,8 @@ impl ExchangeStructHandler {
             ExchangeEnum::BinanceSwap => {
                 binance_swap_handle::handle_book_ticker(&res_data, mul)
             }
-            ExchangeEnum::BybitSwap => {  // 未使用暂不实现
-                Depth {
-                    time: Decimal::ZERO,
-                    symbol: "".to_string(),
-                    asks: vec![],
-                    bids: vec![],
-                }
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::handle_book_ticker(&res_data, mul)
             }
             _ => {
                 error!("未找到该交易所!book_ticker_handle: {:?}", exchange);

+ 90 - 64
strategy/src/bybit_usdt_swap.rs

@@ -1,3 +1,4 @@
+use std::cmp::Ordering;
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool};
@@ -11,7 +12,9 @@ use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::BybitSwap;
 use standard::exchange_struct_handler::ExchangeStructHandler;
+use standard::{Depth, OrderBook};
 use crate::core::Core;
+use crate::exchange_disguise::on_depth;
 use crate::model::OrderInfo;
 
 // 参考 币安 合约 启动
@@ -31,17 +34,22 @@ pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         // 读取数据
         let core_arc_clone = Arc::clone(&core_arc);
         let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().multiplier;
-        let run_symbol = symbols.clone()[0].clone();
+        let depth_asks = Arc::new(Mutex::new(Vec::new()));
+        let depth_bids = Arc::new(Mutex::new(Vec::new()));
 
         let fun = move |data: ResponseData| {
             // 在 async 块之前克隆 Arc
             let core_arc_cc = core_arc_clone.clone();
             let mul = multiplier.clone();
-            let rs = run_symbol.clone();
+
+            let depth_asks = Arc::clone(&depth_asks);
+            let depth_bids = Arc::clone(&depth_bids);
 
             async move {
+                let mut depth_asks = depth_asks.lock().await;
+                let mut depth_bids = depth_bids.lock().await;
                 // 使用克隆后的 Arc,避免 move 语义
-                on_data(core_arc_cc, &mul, &rs, &data).await
+                on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
             }
         };
 
@@ -83,7 +91,7 @@ pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
 
             async move {
                 // 使用克隆后的 Arc,避免 move 语义
-                on_data(core_arc_cc, &mul, &rs, &data).await;
+                on_private_data(core_arc_cc, &mul, &rs, &data).await;
             }
         };
 
@@ -94,34 +102,52 @@ pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
     });
 }
 
-async fn on_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) {
+async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>) {
     let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
 
     match response.channel.as_str() {
         "orderbook" => {
-            // let mut is_update = false;
-            // let data_type = data.data_type.clone();
-            // let label = data.label.clone();
-            // if data_type == "delta"  {
-            //     is_update = true;
-            // }
-            // let mut depth_format: DepthParam = format_depth(BybitSwap, &data);
-            // // 是增量更新
-            // if is_update {
-            //     update_order_book(depth_asks, depth_bids, depth_format.depth_asks, depth_format.depth_bids);
-            // } else { // 全量
-            //     depth_asks.clear();
-            //     depth_asks.append(&mut depth_format.depth_asks);
-            //     depth_bids.clear();
-            //     depth_bids.append(&mut depth_format.depth_bids);
-            // }
-            // let depth = make_special_depth(label.clone(), depth_asks, depth_bids, depth_format.t, depth_format.create_at);
-            // trace_stack.on_before_network(depth_format.create_at.clone());
-            // trace_stack.on_after_format();
-            //
-            // on_special_depth(core_arc_clone, update_flag_u, &label, &mut trace_stack, &depth).await;
+            let mut is_update = false;
+            let data_type = response.data_type.clone();
+            if data_type == "delta"  {
+                is_update = true;
+            }
+            let mut depth = ExchangeStructHandler::book_ticker_handle(BybitSwap, &response, ct_val);
+            // 是增量更新
+            if is_update {
+                update_order_book(depth_asks, depth_bids, depth.asks, depth.bids);
+            } else { // 全量
+                depth_asks.clear();
+                depth_asks.append(&mut depth.asks);
+                depth_bids.clear();
+                depth_bids.append(&mut depth.bids);
+            }
+
+            let ask_one = depth_asks[0].clone();
+            let bid_one = depth_bids[0].clone();
+            let result_depth = Depth {
+                time: depth.time,
+                symbol: depth.symbol,
+                asks: vec![ask_one],
+                bids: vec![bid_one],
+            };
+            trace_stack.on_after_format();
+
+            on_depth(core_arc_clone, &response.label, &mut trace_stack, &result_depth).await;
         }
+        _ => {
+            error!("未知推送类型");
+            error!(?response);
+        }
+    }
+}
+
+async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) {
+    let mut trace_stack = TraceStack::new(response.time, response.ins);
+    trace_stack.on_after_span_line();
+
+    match response.channel.as_str() {
         "wallet" => {
             let account = ExchangeStructHandler::account_info_handle(BybitSwap, response, run_symbol);
             let mut core = core_arc_clone.lock().await;
@@ -169,41 +195,41 @@ fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>
     }
 }
 
-// fn update_order_book(depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, asks : Vec<MarketOrder>, bids: Vec<MarketOrder>) {
-//     for i in asks {
-//         let index_of_value = depth_asks.iter().position(|x| x.price == i.price);
-//         match index_of_value {
-//             Some(index) => {
-//                 if i.amount == Decimal::ZERO {
-//                     depth_asks.remove(index);
-//                 } else {
-//                     depth_asks[index].amount = i.amount.clone();
-//                 }
-//             },
-//             None => {
-//                 depth_asks.push(i.clone());
-//             },
-//         }
-//     }
-//     for i in bids {
-//         let index_of_value = depth_bids.iter().position(|x| x.price == i.price);
-//         match index_of_value {
-//             Some(index) => {
-//                 if i.amount == Decimal::ZERO {
-//                     depth_bids.remove(index);
-//                 } else {
-//                     depth_bids[index].amount = i.amount.clone();
-//                 }
-//             },
-//             None => {
-//                 depth_bids.push(i.clone());
-//             },
-//         }
-//     }
-//     depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
-//     depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
-//
-//     // 限制总长度100
-//     depth_asks.truncate(100);
-//     depth_bids.truncate(100);
-// }
+fn update_order_book(depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>, asks : Vec<OrderBook>, bids: Vec<OrderBook>) {
+    for i in asks {
+        let index_of_value = depth_asks.iter().position(|x| x.price == i.price);
+        match index_of_value {
+            Some(index) => {
+                if i.size == Decimal::ZERO {
+                    depth_asks.remove(index);
+                } else {
+                    depth_asks[index].size = i.size.clone();
+                }
+            },
+            None => {
+                depth_asks.push(i.clone());
+            },
+        }
+    }
+    for i in bids {
+        let index_of_value = depth_bids.iter().position(|x| x.price == i.price);
+        match index_of_value {
+            Some(index) => {
+                if i.size == Decimal::ZERO {
+                    depth_bids.remove(index);
+                } else {
+                    depth_bids[index].size = i.size.clone();
+                }
+            },
+            None => {
+                depth_bids.push(i.clone());
+            },
+        }
+    }
+    depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
+    depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
+
+    // 限制总长度100
+    depth_asks.truncate(100);
+    depth_bids.truncate(100);
+}