Bladeren bron

chore: micro optimizations on local depth cache handling

Berke 1 jaar geleden
bovenliggende
commit
afcbc9490f
3 gewijzigde bestanden met toevoegingen van 29 en 67 verwijderingen
  1. 12 46
      src/data_providers.rs
  2. 9 11
      src/data_providers/binance/market_data.rs
  3. 8 10
      src/data_providers/bybit/market_data.rs

+ 12 - 46
src/data_providers.rs

@@ -56,17 +56,20 @@ impl LocalDepthCache {
         self.asks = new_depth.asks;
     }
 
-    pub fn update_depth_cache(&mut self, new_bids: &[Order], new_asks: &[Order]) {
-        for order in new_bids {
+    pub fn update_depth_cache(&mut self, new_depth: LocalDepthCache) {
+        self.last_update_id = new_depth.last_update_id;
+        self.time = new_depth.time;
+
+        for order in new_depth.bids.iter() {
             if order.qty == 0.0 {
                 self.bids.retain(|x| x.price != order.price);
             } else if let Some(existing_order) = self.bids.iter_mut().find(|x| x.price == order.price) {
-                    existing_order.qty = order.qty;
+                existing_order.qty = order.qty;
             } else {
                 self.bids.push(*order);
             }
         }
-        for order in new_asks {
+        for order in new_depth.asks.iter() {
             if order.qty == 0.0 {
                 self.asks.retain(|x| x.price != order.price);
             } else if let Some(existing_order) = self.asks.iter_mut().find(|x| x.price == order.price) {
@@ -77,44 +80,12 @@ impl LocalDepthCache {
         }
     }
 
-    pub fn update_levels(&mut self, new_depth: LocalDepthCache) -> (Vec<Order>, Vec<Order>) {
-        self.last_update_id = new_depth.last_update_id;
-        self.time = new_depth.time;
-
-        let mut best_ask_price = f32::MAX;
-        let mut best_bid_price = 0.0f32;
-
-        self.bids.iter().for_each(|order| {
-            if order.price > best_bid_price {
-                best_bid_price = order.price;
-            }
-        });
-        self.asks.iter().for_each(|order| {
-            if order.price < best_ask_price {
-                best_ask_price = order.price;
-            }
-        });
-
-        let highest: f32 = best_ask_price * 1.01;
-        let lowest: f32 = best_bid_price * 0.99;
-
-        self.update_depth_cache(&new_depth.bids, &new_depth.asks);
-
-        let mut local_bids: Vec<Order> = Vec::new();
-        let mut local_asks: Vec<Order> = Vec::new();
-
-        for order in &self.bids {
-            if order.price >= lowest {
-                local_bids.push(*order);
-            }
+    pub fn get_depth(&self) -> Depth {
+        Depth {
+            time: self.time,
+            bids: self.bids.clone(),
+            asks: self.asks.clone(),
         }
-        for order in &self.asks {
-            if order.price <= highest {
-                local_asks.push(*order);
-            }
-        }
-        
-        (local_bids, local_asks)
     }
 
     pub fn get_fetch_id(&self) -> i64 {
@@ -147,11 +118,6 @@ pub struct FeedLatency {
     pub trade_latency: Option<i64>,
 }
 
-pub trait DataProvider {
-    fn get_orderbook(&self, symbol: &str) -> Result<Depth, Box<dyn std::error::Error>>;
-
-    fn get_trades(&self, symbol: &str) -> Result<Vec<Trade>, Box<dyn std::error::Error>>;
-}
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
 pub struct TickMultiplier(pub u16);

+ 9 - 11
src/data_providers/binance/market_data.rs

@@ -447,17 +447,15 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                     let depth_update = LocalDepthCache {
                                                         last_update_id: de_depth.final_id as i64,
                                                         time,
-                                                        bids: de_depth.bids.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
-                                                        asks: de_depth.asks.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
+                                                        bids: de_depth.bids.iter().map(
+                                                            |x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }
+                                                        ).collect(),
+                                                        asks: de_depth.asks.iter().map(
+                                                            |x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }
+                                                        ).collect(),
                                                     };
     
-                                                    let (local_bids, local_asks) = orderbook.update_levels(depth_update);
-    
-                                                    let current_depth = Depth {
-                                                        time,
-                                                        bids: local_bids,
-                                                        asks: local_asks,
-                                                    };
+                                                    orderbook.update_depth_cache(depth_update);
                                                     
                                                     let avg_trade_latency = if !trade_latencies.is_empty() {
                                                         let avg = trade_latencies.iter().sum::<i64>() / trade_latencies.len() as i64;
@@ -477,7 +475,7 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                             selected_ticker,
                                                             feed_latency,
                                                             time, 
-                                                            current_depth,
+                                                            orderbook.get_depth(),
                                                             std::mem::take(&mut trades_buffer)
                                                         )
                                                     ).await;
@@ -672,7 +670,7 @@ pub async fn fetch_depth(ticker: Ticker) -> Result<FetchedDepth, reqwest::Error>
         Ticker::LTCUSDT => "ltcusdt",
     };
 
-    let url = format!("https://fapi.binance.com/fapi/v1/depth?symbol={symbol_str}&limit=500");
+    let url = format!("https://fapi.binance.com/fapi/v1/depth?symbol={symbol_str}&limit=1000");
 
     let response = reqwest::get(&url).await?;
     let text = response.text().await?;

+ 8 - 10
src/data_providers/bybit/market_data.rs

@@ -390,21 +390,19 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                 let depth_update = LocalDepthCache {
                                                     last_update_id: de_depth.update_id as i64,
                                                     time,
-                                                    bids: de_depth.bids.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
-                                                    asks: de_depth.asks.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
+                                                    bids: de_depth.bids.iter().map(
+                                                        |x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }
+                                                    ).collect(),
+                                                    asks: de_depth.asks.iter().map(
+                                                        |x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }
+                                                    ).collect(),
                                                 };
 
                                                 if (data_type == "snapshot") || (depth_update.last_update_id == 1) {
                                                     orderbook.fetched(depth_update);
 
                                                 } else if data_type == "delta" {
-                                                    let (local_bids, local_asks) = orderbook.update_levels(depth_update);
-
-                                                    let current_depth = Depth {
-                                                        time,
-                                                        bids: local_bids,
-                                                        asks: local_asks,
-                                                    };
+                                                    orderbook.update_depth_cache(depth_update);
 
                                                     let avg_trade_latency = if !trade_latencies.is_empty() {
                                                         let avg = trade_latencies.iter().sum::<i64>() / trade_latencies.len() as i64;
@@ -424,7 +422,7 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                             selected_ticker,
                                                             feed_latency,
                                                             time, 
-                                                            current_depth,
+                                                            orderbook.get_depth(),
                                                             std::mem::take(&mut trades_buffer)
                                                         )
                                                     ).await;