Jelajahi Sumber

Full orderbook levels (#7)

* initial commit

* refactors to work with new orderbook structure, btreemap

* local depth tweaks to keep it in sync

* boxes, allowing more efficient storing the depth cache

* fixes to work with new boxed "order" structs

* fix ordering fault when managing local depth cache

* impl. y-axis scaling, +tweaks on y-label render logic

* fix faulty filtering, +fix typo on a parameter

* fix faulty y-axis scaling

* replaced unnecessary vectors with fixed size structs

* iced master migrate

* systematic cleanup for unn. historical data

* iced master migrate

* always show pane controls

* y-axis scaling fix, +performant rendering with lines instead of circles

* sort orders before processing to render
Berke 1 tahun lalu
induk
melakukan
ade20459ed
4 mengubah file dengan 844 tambahan dan 395 penghapusan
  1. 264 142
      Cargo.lock
  2. 205 76
      src/charts/heatmap.rs
  3. 282 42
      src/data_providers/binance/market_data.rs
  4. 93 135
      src/main.rs

File diff ditekan karena terlalu besar
+ 264 - 142
Cargo.lock


+ 205 - 76
src/charts/heatmap.rs

@@ -4,7 +4,7 @@ use iced::{
     alignment, mouse, widget::{button, canvas::{self, event::{self, Event}, stroke::Stroke, Cache, Canvas, Geometry, Path}}, window, Border, Color, Element, Length, Point, Rectangle, Renderer, Size, Theme, Vector
 };
 use iced::widget::{Column, Row, Container, Text};
-use crate::data_providers::binance::market_data::{Trade, Depth};
+use crate::data_providers::binance::market_data::{LocalDepthCache, Trade};
 
 #[derive(Debug, Clone, Copy)]
 pub enum Message {
@@ -14,6 +14,7 @@ pub enum Message {
     AutoscaleToggle,
     CrosshairToggle,
     CrosshairMoved(Point),
+    YScaling(f32),
 }
 
 #[derive(Debug)]
@@ -26,8 +27,9 @@ pub struct Heatmap {
     x_crosshair_cache: Cache,
     translation: Vector,
     scaling: f32,
+    y_scaling: f32,
     
-    data_points: BTreeMap<i64, (Depth, Vec<Trade>, (f32, f32))>,
+    data_points: BTreeMap<i64, (LocalDepthCache, Box<[Trade]>)>,
     size_filter: f32,
 
     autoscale: bool,
@@ -40,9 +42,12 @@ pub struct Heatmap {
     bounds: Rectangle,
 }
 impl Heatmap {
-    const MIN_SCALING: f32 = 0.4;
+    const MIN_SCALING: f32 = 0.6;
     const MAX_SCALING: f32 = 3.6;
 
+    const THREE_MIN: i64 = 3 * 60 * 1000;
+    const ONE_MIN: i64 = 1 * 60 * 1000;
+
     pub fn new() -> Heatmap {
         let _size = window::Settings::default().size;
     
@@ -59,6 +64,7 @@ impl Heatmap {
 
             translation: Vector::default(),
             scaling: 1.0,
+            y_scaling: 0.0001,
             autoscale: true,
             crosshair: false,
             crosshair_position: Point::new(0.0, 0.0),
@@ -74,22 +80,11 @@ impl Heatmap {
         self.size_filter = size_filter;
     }
 
-    pub fn insert_datapoint(&mut self, mut trades_buffer: Vec<Trade>, depth_update: i64, depth: Depth) {
+    pub fn insert_datapoint(&mut self, trades_buffer: Vec<Trade>, depth_update: i64, depth: LocalDepthCache) {
         let aggregate_time = 100; // 100 ms
         let rounded_depth_update = (depth_update / aggregate_time) * aggregate_time;
-
-        self.data_points.entry(rounded_depth_update).or_insert((depth, vec![], (0.0, 0.0)));
         
-        for trade in trades_buffer.drain(..) {
-            if let Some((_, trades, volume)) = self.data_points.get_mut(&rounded_depth_update) {
-                if trade.is_sell {
-                    volume.1 += trade.qty;
-                } else {
-                    volume.0 += trade.qty;
-                }
-                trades.push(trade);
-            }
-        }
+        self.data_points.entry(rounded_depth_update).or_insert((depth, trades_buffer.into_boxed_slice()));
 
         self.render_start();
     }
@@ -97,7 +92,7 @@ impl Heatmap {
     pub fn get_raw_trades(&mut self) -> Vec<Trade> {
         let mut trades_source = vec![];
 
-        for (_, (_, trades, _)) in &self.data_points {
+        for (_, (_, trades)) in &self.data_points {
             trades_source.extend(trades.iter().cloned());
         }
 
@@ -111,31 +106,44 @@ impl Heatmap {
 
         let latest: i64 = *timestamp_latest - (self.translation.x*80.0) as i64;
         let earliest: i64 = latest - (64000.0 / (self.scaling / (self.bounds.width/800.0))) as i64;
-    
-        let (mut lowest, mut highest) = (f32::MAX, 0.0f32);
-    
-        for (_, (depth, _, _)) in self.data_points.range(earliest..=latest) {
-            if let Some(max_price) = depth.asks.iter().map(|order| order.price).max_by(|a, b| a.partial_cmp(b).unwrap()) {
-                highest = highest.max(max_price);
-            }            
-            if let Some(min_price) = depth.bids.iter().map(|order| order.price).min_by(|a, b| a.partial_cmp(b).unwrap()) {
-                lowest = lowest.min(min_price);
-            }
-        }
-    
-        if earliest != self.x_min_time || latest != self.x_max_time {            
+            
+        if self.data_points.len() > 1 {
+            let mut max_ask_price = f32::MIN;
+            let mut min_bid_price = f32::MAX;
+
+            for (_, (depth, _)) in self.data_points.range(earliest..=latest) {
+                if depth.asks.len() > 20 && depth.bids.len() > 20 {
+                    let ask_price = depth.asks[20].price;
+                    let bid_price = depth.bids[20].price;
+
+                    if ask_price > max_ask_price {
+                        max_ask_price = ask_price;
+                    };
+                    if bid_price < min_bid_price {
+                        min_bid_price = bid_price;
+                    };
+                };
+            };
+
+            let lowest = min_bid_price - (min_bid_price * self.y_scaling);
+            let highest = max_ask_price + (max_ask_price * self.y_scaling);
+
+            if lowest != self.y_min_price || highest != self.y_max_price {   
+                self.y_min_price = lowest;
+                self.y_max_price = highest;
+
+                self.y_labels_cache.clear();
+                self.y_croshair_cache.clear();
+            };
+        };
+
+        if earliest != self.x_min_time || latest != self.x_max_time {         
+            self.x_min_time = earliest;
+            self.x_max_time = latest;
+
             self.x_labels_cache.clear();
             self.x_crosshair_cache.clear();
-        }
-        if lowest != self.y_min_price || highest != self.y_max_price {            
-            self.y_labels_cache.clear();
-            self.y_croshair_cache.clear();
-        }
-    
-        self.x_min_time = earliest;
-        self.x_max_time = latest;
-        self.y_min_price = lowest;
-        self.y_max_price = highest;
+        };
         
         self.crosshair_cache.clear();        
     }
@@ -183,6 +191,10 @@ impl Heatmap {
                     self.x_crosshair_cache.clear();
                 }
             }
+            Message::YScaling(scaling) => {
+                self.y_scaling = *scaling;
+                self.render_start();
+            }
         }
     }
 
@@ -210,7 +222,8 @@ impl Heatmap {
                 min: self.y_min_price,
                 max: self.y_max_price,
                 crosshair_position: self.crosshair_position, 
-                crosshair: self.crosshair
+                crosshair: self.crosshair,
+                y_scaling: self.y_scaling,
             })
             .width(Length::Fixed(60.0))
             .height(Length::FillPortion(10));
@@ -305,7 +318,7 @@ impl canvas::Program<Message> for Heatmap {
         if bounds != self.bounds {
             return (event::Status::Ignored, Some(Message::ChartBounds(bounds)));
         } 
-        
+    
         if let Event::Mouse(mouse::Event::ButtonReleased(_)) = event {
             *interaction = Interaction::None;
         }
@@ -442,26 +455,50 @@ impl canvas::Program<Message> for Heatmap {
             let mut max_depth_qty: f32 = 0.0;
 
             if self.data_points.len() > 1 {
-                for (_, (depth, trades, volume)) in self.data_points.range(earliest..=latest) {
-                    for trade in trades {
+                for (_, (depth, trades)) in self.data_points.range(earliest..=latest) {
+                    let mut buy_volume: f32 = 0.0;
+                    let mut sell_volume: f32 = 0.0;
+
+                    for trade in trades.iter() {
                         max_trade_qty = max_trade_qty.max(trade.qty);
                         min_trade_qty = min_trade_qty.min(trade.qty);
+
+                        if trade.is_sell {
+                            sell_volume += trade.qty;
+                        } else {
+                            buy_volume += trade.qty;
+                        }
                     }
+
+                    max_volume = max_volume.max(buy_volume).max(sell_volume);
             
-                    for bid in &depth.bids {
-                        max_depth_qty = max_depth_qty.max(bid.qty);
-                    } 
-                    for ask in &depth.asks {
+                    for ask in depth.asks.iter() {
+                        if ask.price > highest {
+                            continue;
+                        };
                         max_depth_qty = max_depth_qty.max(ask.qty);
                     }
-            
-                    max_volume = max_volume.max(volume.0).max(volume.1);
-                }
+                    for bid in depth.bids.iter() {
+                        if bid.price < lowest {
+                            continue;
+                        };
+                        max_depth_qty = max_depth_qty.max(bid.qty);
+                    }   
+                };
                 
-                for (time, (depth, trades, volume)) in self.data_points.range(earliest..=latest) {
+                for (time, (depth, trades)) in self.data_points.range(earliest..=latest) {
                     let x_position = ((time - earliest) as f64 / (latest - earliest) as f64) * bounds.width as f64;
 
-                    for trade in trades {
+                    let mut buy_volume: f32 = 0.0;
+                    let mut sell_volume: f32 = 0.0;
+
+                    for trade in trades.iter() {
+                        if trade.is_sell {
+                            sell_volume += trade.qty;
+                        } else {
+                            buy_volume += trade.qty;
+                        }
+
                         if trade.qty * trade.price > self.size_filter {
                             let x_position = ((time - earliest) as f64 / (latest - earliest) as f64) * bounds.width as f64;
                             let y_position = heatmap_area_height - ((trade.price - lowest) / y_range * heatmap_area_height);
@@ -482,24 +519,42 @@ impl canvas::Program<Message> for Heatmap {
                         }
                     }
 
-                    for order in &depth.bids {
-                        let y_position = heatmap_area_height - ((order.price - lowest) / y_range * heatmap_area_height);
-                        let color_alpha = (order.qty / max_depth_qty).min(1.0);
+                    for bid in depth.bids.iter() {
+                        if bid.price < lowest {
+                            continue;
+                        }
 
-                        let circle = Path::circle(Point::new(x_position as f32, y_position), 1.0);
-                        frame.fill(&circle, Color::from_rgba8(0, 144, 144, color_alpha));
-                    }
-                    for order in &depth.asks {
-                        let y_position = heatmap_area_height - ((order.price - lowest) / y_range * heatmap_area_height);
-                        let color_alpha = (order.qty / max_depth_qty).min(1.0);
+                        let y_position = heatmap_area_height - ((bid.price - lowest) / y_range * heatmap_area_height);
+                        let color_alpha = (bid.qty / max_depth_qty).min(1.0);
 
-                        let circle = Path::circle(Point::new(x_position as f32, y_position), 1.0);
-                        frame.fill(&circle, Color::from_rgba8(192, 0, 192, color_alpha));
+                        let path = Path::line(
+                            Point::new(x_position as f32, y_position), 
+                            Point::new(x_position as f32 + 1.0, y_position)
+                        );
+                        let stroke = Stroke::default().with_color(Color::from_rgba8(0, 144, 144, color_alpha)).with_width(1.0);
+
+                        frame.stroke(&path, stroke);
                     }
+                    for ask in depth.asks.iter() {
+                        if ask.price > highest {
+                            continue;
+                        }
+
+                        let y_position = heatmap_area_height - ((ask.price - lowest) / y_range * heatmap_area_height);
+                        let color_alpha = (ask.qty / max_depth_qty).min(1.0);
+
+                        let path = Path::line(
+                            Point::new(x_position as f32, y_position), 
+                            Point::new(x_position as f32 + 1.0, y_position)
+                        );
+                        let stroke = Stroke::default().with_color(Color::from_rgba8(192, 0, 192, color_alpha)).with_width(1.0);
+
+                        frame.stroke(&path, stroke);
+                    };
 
                     if max_volume > 0.0 {
-                        let buy_bar_height = (volume.0 / max_volume) * volume_area_height;
-                        let sell_bar_height = (volume.1 / max_volume) * volume_area_height;
+                        let buy_bar_height = (buy_volume / max_volume) * volume_area_height;
+                        let sell_bar_height = (sell_volume / max_volume) * volume_area_height;
 
                         let sell_bar = Path::rectangle(
                             Point::new(x_position as f32, bounds.height - sell_bar_height), 
@@ -513,15 +568,22 @@ impl canvas::Program<Message> for Heatmap {
                         );
                         frame.fill(&buy_bar, Color::from_rgb8(81, 205, 160));
                     }
-                } 
-            }
+                };
+            };
         
             // current orderbook as bars
             if let Some(latest_data_points) = self.data_points.iter().last() {
                 let latest_timestamp = latest_data_points.0 + 200;
 
-                let latest_bids: Vec<(f32, f32)> = latest_data_points.1.0.bids.iter().map(|order| (order.price, order.qty)).collect::<Vec<_>>();
-                let latest_asks: Vec<(f32, f32)> = latest_data_points.1.0.asks.iter().map(|order| (order.price, order.qty)).collect::<Vec<_>>();
+                let latest_bids: Vec<(f32, f32)> = latest_data_points.1.0.bids.iter()
+                    .filter(|order| (order.price) >= lowest)
+                    .map(|order| (order.price, order.qty))
+                    .collect::<Vec<_>>();
+
+                let latest_asks: Vec<(f32, f32)> = latest_data_points.1.0.asks.iter()
+                    .filter(|order| (order.price) <= highest)
+                    .map(|order| (order.price, order.qty))
+                    .collect::<Vec<_>>();
 
                 let max_qty = latest_bids.iter().map(|(_, qty)| qty).chain(latest_asks.iter().map(|(_, qty)| qty)).fold(f32::MIN, |arg0: f32, other: &f32| f32::max(arg0, *other));
 
@@ -839,18 +901,85 @@ pub struct AxisLabelYCanvas<'a> {
     max: f32,
     crosshair_position: Point,
     crosshair: bool,
+    y_scaling: f32,
 }
 impl canvas::Program<Message> for AxisLabelYCanvas<'_> {
     type State = Interaction;
 
     fn update(
         &self,
-        _interaction: &mut Interaction,
-        _event: Event,
-        _bounds: Rectangle,
-        _cursor: mouse::Cursor,
-    ) -> (event::Status, Option<Message>) {
-        (event::Status::Ignored, None)
+        interaction: &mut Interaction,
+        event: Event,
+        bounds: Rectangle,
+        cursor: mouse::Cursor,
+    ) -> (event::Status, Option<Message>) {        
+        if let Event::Mouse(mouse::Event::ButtonReleased(_)) = event {
+            *interaction = Interaction::None;
+        }
+
+        if !cursor.is_over(bounds) {
+            return (event::Status::Ignored, None);
+        };
+
+        match event {
+            Event::Mouse(mouse_event) => match mouse_event {
+                mouse::Event::ButtonPressed(button) => {
+                    let message = match button {
+                        mouse::Button::Right => {
+                            *interaction = Interaction::Drawing;
+                            None
+                        }
+                        mouse::Button::Left => {
+                            None
+                        }
+                        _ => None,
+                    };
+
+                    (event::Status::Captured, message)
+                }
+                mouse::Event::CursorMoved { .. } => {
+                    let message = match *interaction {
+                        Interaction::Drawing => None,
+                        Interaction::Erasing => None,
+                        Interaction::Panning { translation, start } => {
+                            None
+                        }
+                        Interaction::None => 
+                            None
+                    };
+
+                    let event_status = match interaction {
+                        Interaction::None => event::Status::Ignored,
+                        _ => event::Status::Captured,
+                    };
+
+                    (event_status, message)
+                }
+                mouse::Event::WheelScrolled { delta } => match delta {
+                    mouse::ScrollDelta::Lines { y, .. }
+                    | mouse::ScrollDelta::Pixels { y, .. } => {
+                        if y > 0.0 && self.y_scaling > 0.00001
+                            || y < 0.0 && self.y_scaling < 0.001
+                        {
+                            let scaling = (self.y_scaling * (1.0 - y / 30.0))
+                                .clamp(
+                                    0.00001, 
+                                    0.001,  
+                                );
+
+                            (
+                                event::Status::Captured,
+                                Some(Message::YScaling(scaling)),
+                            )
+                        } else {
+                            (event::Status::Captured, None)
+                        }
+                    }
+                },
+                _ => (event::Status::Ignored, None),
+            },
+            _ => (event::Status::Ignored, None),
+        }
     }
     
     fn draw(

+ 282 - 42
src/data_providers/binance/market_data.rs

@@ -1,13 +1,20 @@
+use hmac::digest::typenum::Or;
 use iced::futures;  
 use iced::subscription::{self, Subscription};
-use serde::{Deserialize, Deserializer};
+use serde::{de, Deserialize, Deserializer};
 use futures::channel::mpsc;
 use futures::sink::SinkExt;
 use futures::stream::StreamExt;
 
 use async_tungstenite::tungstenite;
+use serde_json::Value;
 use crate::{Ticker, Timeframe};
 
+use tokio::time::{interval, Duration};
+use futures::FutureExt;
+use std::sync::{Arc, RwLock, Mutex};
+use std::collections::{BTreeMap, HashMap};
+
 #[derive(Debug)]
 #[allow(clippy::large_enum_variant)]
 enum State {
@@ -23,13 +30,149 @@ enum State {
 pub enum Event {
     Connected(Connection),
     Disconnected,
-    DepthReceived(i64, Depth, Vec<Trade>),
+    DepthReceived(i64, LocalDepthCache, Vec<Trade>),
     KlineReceived(Kline, Timeframe),
 }
 
 #[derive(Debug, Clone)]
 pub struct Connection(mpsc::Sender<String>);
 
+impl<'de> Deserialize<'de> for Order {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let arr: Vec<&str> = Vec::<&str>::deserialize(deserializer)?;
+        let price: f32 = arr[0].parse::<f32>().map_err(serde::de::Error::custom)?;
+        let qty: f32 = arr[1].parse::<f32>().map_err(serde::de::Error::custom)?;
+        Ok(Order { price, qty })
+    }
+}
+#[derive(Debug, Deserialize, Clone)]
+pub struct FetchedDepth {
+    #[serde(rename = "lastUpdateId")]
+    pub update_id: i64,
+    #[serde(rename = "T")]
+    pub time: i64,
+    #[serde(rename = "bids")]
+    pub bids: Vec<Order>,
+    #[serde(rename = "asks")]
+    pub asks: Vec<Order>,
+}
+#[derive(Debug, Clone, Copy, Default)]
+pub struct Order {
+    pub price: f32,
+    pub qty: f32,
+}
+#[derive(Debug, Clone, Default)]
+pub struct LocalDepthCache {
+    pub time: i64,
+    pub bids: Box<[Order]>,
+    pub asks: Box<[Order]>,
+}
+#[derive(Debug, Deserialize, Clone, Default)]
+pub struct Depth {
+    #[serde(default)]
+    pub last_update_id: i64,
+    #[serde(rename = "T")]
+    pub time: i64,
+    #[serde(rename = "b")]
+    pub bids: Vec<Order>,
+    #[serde(rename = "a")]
+    pub asks: Vec<Order>,
+}
+
+impl Depth {
+    pub fn new() -> Self {
+        Self {
+            last_update_id: 0,
+            time: 0,
+            bids: Vec::new(),
+            asks: Vec::new(),
+        }
+    }
+
+    pub fn fetched(&mut self, new_depth: Depth) {
+        self.last_update_id = new_depth.last_update_id;        
+        self.time = new_depth.time;
+
+        self.bids = new_depth.bids;
+        self.asks = new_depth.asks;
+    }
+
+    pub fn update_depth_cache(&mut self, new_bids: &[Order], new_asks: &[Order]) {
+        for order in new_bids {
+            if order.qty == 0.0 {
+                self.bids.retain(|x| x.price != order.price);
+            } else {
+                if let Some(existing_order) = self.bids.iter_mut().find(|x| x.price == order.price) {
+                    existing_order.qty = order.qty;
+                } else {
+                    self.bids.push(*order);
+                }
+            }
+        }
+        for order in new_asks {
+            if order.qty == 0.0 {
+                self.asks.retain(|x| x.price != order.price);
+            } else {
+                if let Some(existing_order) = self.asks.iter_mut().find(|x| x.price == order.price) {
+                    existing_order.qty = order.qty;
+                } else {
+                    self.asks.push(*order);
+                }
+            }
+        }
+    }
+
+    pub fn update_levels(&mut self, new_depth: Depth) -> (Box<[Order]>, Box<[Order]>) {
+        self.last_update_id = new_depth.last_update_id;
+        self.time = new_depth.time;
+
+        let mut best_ask_price = f32::MAX;
+        let mut best_bid_price = 0.0f32;
+
+        self.bids.iter().for_each(|order| {
+            if order.price > best_bid_price {
+                best_bid_price = order.price;
+            }
+        });
+        self.asks.iter().for_each(|order| {
+            if order.price < best_ask_price {
+                best_ask_price = order.price;
+            }
+        });
+
+        let highest: f32 = best_ask_price * 1.001;
+        let lowest: f32 = best_bid_price * 0.999;
+
+        self.update_depth_cache(&new_depth.bids, &new_depth.asks);
+
+        let mut local_bids: Vec<Order> = Vec::new();
+        let mut local_asks: Vec<Order> = Vec::new();
+
+        for order in &self.bids {
+            if order.price >= lowest {
+                local_bids.push(*order);
+            }
+        }
+        for order in &self.asks {
+            if order.price <= highest {
+                local_asks.push(*order);
+            }
+        }
+
+        // first sort by price
+        local_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap());
+        local_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap());
+
+        (local_bids.into_boxed_slice(), local_asks.into_boxed_slice())
+    }
+
+    pub fn get_fetch_id(&self) -> i64 {
+        self.last_update_id
+    }
+}
 
 pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
     struct Connect;
@@ -49,8 +192,14 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
             };
 
             let stream_1 = format!("{symbol_str}@aggTrade");
-            let stream_2 = format!("{symbol_str}@depth20@100ms");
-            
+            let stream_2 = format!("{symbol_str}@depth@100ms");
+
+            let mut orderbook: Depth = Depth::new();
+
+            let mut already_fetching: bool = false;
+
+            let mut prev_id: i64 = 0;
+
             loop {
                 match &mut state {
                     State::Disconnected => {        
@@ -60,7 +209,33 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                             websocket_server,
                         )
                         .await {
-                           state = State::Connected(websocket);
+                            let (tx, rx) = tokio::sync::oneshot::channel();
+                                                
+                            tokio::spawn(async move {
+                                let fetched_depth = fetch_depth(selected_ticker).await;
+
+                                let depth: Depth = match fetched_depth {
+                                    Ok(depth) => {
+                                        Depth {
+                                            last_update_id: depth.update_id,
+                                            time: depth.time,
+                                            bids: depth.bids,
+                                            asks: depth.asks,
+                                        }
+                                    },
+                                    Err(_) => return,
+                                };
+
+                                let _ = tx.send(depth);
+                            });
+                            match rx.await {
+                                Ok(depth) => {
+                                    orderbook.fetched(depth);
+                                    state = State::Connected(websocket);
+                                },
+                                Err(_) => orderbook.fetched(Depth::default()),
+                            }
+                            
                         } else {
                             tokio::time::sleep(tokio::time::Duration::from_secs(1))
                            .await;
@@ -75,22 +250,94 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                 match received {
                                     Ok(tungstenite::Message::Text(message)) => {
                                         let stream: Stream = serde_json::from_str(&message).unwrap_or(Stream { stream: String::new() });
+                                        
                                         if stream.stream == stream_1 {
                                             let agg_trade: AggTrade = serde_json::from_str(&message).unwrap();
                                             trades_buffer.push(agg_trade.data);
+                                            
                                         } else if stream.stream == stream_2 {
-                                            let depth_update: DepthUpdate = serde_json::from_str(&message).unwrap();
-                                            let _ = output.send(
-                                                Event::DepthReceived(
-                                                    depth_update.data.time, 
-                                                    Depth {
-                                                        time: depth_update.data.time,
-                                                        bids: depth_update.data.bids,
-                                                        asks: depth_update.data.asks,
-                                                    }, 
-                                                    std::mem::take(&mut trades_buffer)
-                                                )
-                                            ).await;
+                                            if already_fetching {
+                                                println!("Already fetching...\n");
+
+                                                continue;
+                                            }
+
+                                            let depth_update: Value = serde_json::from_str(&message).unwrap();
+                                            let depth_data = depth_update.get("data").unwrap();
+
+                                            let first_update_id = depth_data.get("U").unwrap().as_i64().unwrap();
+                                            let final_update_id = depth_data.get("u").unwrap().as_i64().unwrap();
+
+                                            let last_final_update_id = depth_data.get("pu").unwrap().as_i64().unwrap();
+
+                                            let last_update_id = orderbook.get_fetch_id();
+
+                                            if (final_update_id <= last_update_id) || last_update_id == 0 {
+                                                continue;
+                                            }
+
+                                            if prev_id == 0 && (first_update_id > last_update_id + 1) || (last_update_id + 1 > final_update_id) {
+                                                println!("Out of sync on first event...\nU: {first_update_id}, last_id: {last_update_id}, u: {final_update_id}, pu: {last_final_update_id}\n");
+
+                                                let (tx, rx) = tokio::sync::oneshot::channel();
+                                                already_fetching = true;
+
+                                                tokio::spawn(async move {
+                                                    let fetched_depth = fetch_depth(selected_ticker).await;
+
+                                                    let depth: Depth = match fetched_depth {
+                                                        Ok(depth) => {
+                                                            Depth {
+                                                                last_update_id: depth.update_id,
+                                                                time: depth.time,
+                                                                bids: depth.bids,
+                                                                asks: depth.asks,
+                                                            }
+                                                        },
+                                                        Err(_) => return,
+                                                    };
+
+                                                    let _ = tx.send(depth);
+                                                });
+                                                match rx.await {
+                                                    Ok(depth) => {
+                                                        orderbook.fetched(depth)
+                                                    },
+                                                    Err(_) => orderbook.fetched(Depth::default()),
+                                                }
+                                                already_fetching = false;
+                                            }
+                                    
+                                            if (prev_id == 0) || (prev_id == last_final_update_id) {
+                                                let depth_update: DepthUpdate = serde_json::from_str(&message).unwrap();
+
+                                                let time = depth_update.data.time;
+                                                let bids = depth_update.data.bids;
+                                                let asks = depth_update.data.asks;
+
+                                                let depth = Depth { last_update_id: final_update_id, time, bids, asks };
+
+                                                let (local_bids, local_asks) = orderbook.update_levels(depth);
+
+                                                let local_depth_cache = LocalDepthCache {
+                                                    time: time,
+                                                    bids: local_bids,
+                                                    asks: local_asks,
+                                                };
+
+                                                let _ = output.send(
+                                                    Event::DepthReceived(
+                                                        time, 
+                                                        local_depth_cache,
+                                                        std::mem::take(&mut trades_buffer)
+                                                    )
+                                                ).await;
+
+                                                prev_id = final_update_id;
+                                            } else {
+                                                println!("Out of sync...\n");
+                                            }
+
                                         } else {
                                             dbg!(stream.stream);
                                         }
@@ -234,31 +481,6 @@ pub struct Trade {
     #[serde(with = "string_to_f32", rename = "q")]
     pub qty: f32,
 }
-#[derive(Debug, Deserialize, Clone)]
-pub struct Depth {
-    #[serde(rename = "T")]
-    pub time: i64,
-    #[serde(rename = "b")]
-    pub bids: Vec<Order>,
-    #[serde(rename = "a")]
-    pub asks: Vec<Order>,
-}
-#[derive(Debug, Clone)]
-pub struct Order {
-    pub price: f32,
-    pub qty: f32,
-}
-impl<'de> Deserialize<'de> for Order {
-    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-    where
-        D: Deserializer<'de>,
-    {
-        let arr: Vec<&str> = Vec::<&str>::deserialize(deserializer)?;
-        let price: f32 = arr[0].parse::<f32>().map_err(serde::de::Error::custom)?;
-        let qty: f32 = arr[1].parse::<f32>().map_err(serde::de::Error::custom)?;
-        Ok(Order { price, qty })
-    }
-}
 
 #[derive(Deserialize, Debug, Clone, Copy)]
 pub struct Kline {
@@ -305,6 +527,7 @@ impl From<FetchedKlines> for Kline {
         }
     }
 }
+
 pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kline>, reqwest::Error> {
     let symbol_str = match ticker {
         Ticker::BTCUSDT => "btcusdt",
@@ -328,4 +551,21 @@ pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kl
     let klines: Vec<Kline> = fetched_klines.unwrap().into_iter().map(Kline::from).collect();
 
     Ok(klines)
+}
+
+pub async fn fetch_depth(ticker: Ticker) -> Result<FetchedDepth, reqwest::Error> {
+    let symbol_str = match ticker {
+        Ticker::BTCUSDT => "btcusdt",
+        Ticker::ETHUSDT => "ethusdt",
+        Ticker::SOLUSDT => "solusdt",
+        Ticker::LTCUSDT => "ltcusdt",
+    };
+
+    let url = format!("https://fapi.binance.com/fapi/v1/depth?symbol={symbol_str}&limit=500");
+
+    let response = reqwest::get(&url).await?;
+    let text = response.text().await?;
+    let depth: FetchedDepth = serde_json::from_str(&text).unwrap();
+
+    Ok(depth)
 }

+ 93 - 135
src/main.rs

@@ -10,13 +10,12 @@ use charts::footprint::{self, Footprint};
 use std::vec;
 use chrono::{NaiveDateTime, DateTime, Utc};
 use iced::{
-    alignment, executor, font, widget::{
+    alignment, font, widget::{
         button, center, checkbox, mouse_area, opaque, pick_list, stack, text_input, tooltip, Column, Container, Row, Slider, Space, Text
-    }, Alignment, Color, Command, Element, Font, Length, Renderer, Settings, Size, Subscription, Theme
+    }, Alignment, Color, Task, Element, Font, Length, Renderer, Settings, Size, Subscription, Theme
 };
-use iced::advanced::Application;
 
-use iced::widget::pane_grid::{self, PaneGrid};
+use iced::widget::pane_grid::{self, PaneGrid, Configuration};
 use iced::widget::{
     container, row, scrollable, text, responsive
 };
@@ -154,21 +153,24 @@ impl PaneSpec {
     }
 }
 
-fn main() {
-    State::run(Settings {
-        antialiasing: true,
-        window: {
-            iced::window::Settings {
-                min_size: Some(Size {
-                    width: 800.0,
-                    height: 600.0,
-                }),
-                ..iced::window::Settings::default()
-            }
-        },
-        ..Settings::default()
-    })
-    .unwrap();
+fn main() -> iced::Result {
+    iced::application(
+        "Iced Trade",
+        State::update,
+        State::view,
+    )
+    .subscription(State::subscription)
+    .theme(|_| Theme::KanagawaDragon)
+    .antialiasing(true)
+    .window_size(iced::Size::new(1600.0, 900.0))
+    .centered()   
+    .font(ICON_BYTES)
+    .run()
+}
+impl Default for State {
+    fn default() -> Self {
+        Self::new()
+    }
 }
 
 #[derive(Debug, Clone)]
@@ -200,9 +202,6 @@ pub enum Message {
     Close(pane_grid::Pane),
     ToggleLayoutLock,
 
-    // Font
-    FontLoaded(Result<(), font::Error>),
-
     // Modal
     OpenModal(pane_grid::Pane),
     CloseModal,
@@ -250,16 +249,8 @@ struct State {
     tick_size: Option<f32>,
 }
 
-impl Application for State {
-    type Renderer = Renderer;
-    type Message = self::Message;
-    type Executor = executor::Default;
-    type Flags = ();
-    type Theme = Theme;
-
-    fn new(_flags: Self::Flags) -> (Self, Command<Self::Message>) {
-        use pane_grid::Configuration;
-
+impl State {
+    fn new() -> Self {
         let pane_config: Configuration<PaneSpec> = Configuration::Split {
             axis: pane_grid::Axis::Vertical,
             ratio: 0.8,
@@ -313,86 +304,60 @@ impl Application for State {
         };
         let panes: pane_grid::State<PaneSpec> = pane_grid::State::with_configuration(pane_config);
         let first_pane: pane_grid::Pane = *panes.panes.iter().next().unwrap().0;
-
-        (
-            Self { 
-                show_layout_modal: false,
-
-                size_filter_timesales: 0.0,
-                size_filter_heatmap: 0.0,
-                sync_heatmap: false,
-                kline_stream: true,
-
-                candlestick_chart: None,
-                time_and_sales: None,
-                custom_line: None,
-                heatmap_chart: None,
-                footprint_chart: None,
-
-                listen_key: None,
-                selected_ticker: None,
-                selected_exchange: Some("Binance Futures"),
-                ws_state: WsState::Disconnected,
-                user_ws_state: UserWsState::Disconnected,
-                ws_running: false,
-                panes,
-                focus: None,
-                first_pane,
-                pane_lock: false,
-                tick_size: Some(1.0), 
-            },
-            Command::batch(vec![
-                font::load(ICON_BYTES).map(Message::FontLoaded),
-
-                if !SECRET_KEY.is_empty() && !SECRET_KEY.is_empty() {
-                    Command::perform(user_data::get_listen_key(API_KEY, SECRET_KEY), |res| {
-                        match res {
-                            Ok(listen_key) => {
-                                Message::UserKeySucceed(listen_key)
-                            },
-                            Err(err) => {
-                                dbg!(err);
-                                Message::UserKeyError
-                            }
-                        }
-                    })
-                } else {
-                    eprintln!("API keys not set");
-                    Command::none()
-                },
-            ]),
-        )
-    }
-
-    fn title(&self) -> String {
-        "Iced Trade".to_owned()
+        
+        Self { 
+            show_layout_modal: false,
+
+            size_filter_timesales: 0.0,
+            size_filter_heatmap: 0.0,
+            sync_heatmap: false,
+            kline_stream: true,
+
+            candlestick_chart: None,
+            time_and_sales: None,
+            custom_line: None,
+            heatmap_chart: None,
+            footprint_chart: None,
+
+            listen_key: None,
+            selected_ticker: None,
+            selected_exchange: Some("Binance Futures"),
+            ws_state: WsState::Disconnected,
+            user_ws_state: UserWsState::Disconnected,
+            ws_running: false,
+            panes,
+            focus: None,
+            first_pane,
+            pane_lock: false,
+            tick_size: Some(1.0), 
+        }
     }
 
-    fn update(&mut self, message: Self::Message) -> Command<Self::Message> {
+    fn update(&mut self, message: Message) -> Task<Message> {
         match message {
             Message::CustomLine(message) => {
                 if let Some(custom_line) = &mut self.custom_line {
                     custom_line.update(&message);
                 }
-                Command::none()
+                Task::none()
             },
             Message::Candlestick(message) => {
                 if let Some(candlesticks) = &mut self.candlestick_chart {
                     candlesticks.update(&message);
                 }
-                Command::none()
+                Task::none()
             },
             Message::Heatmap(message) => {
                 if let Some(heatmap) = &mut self.heatmap_chart {
                     heatmap.update(&message);
                 }
-                Command::none()
+                Task::none()
             },
             Message::Footprint(message) => {
                 if let Some(footprint) = &mut self.footprint_chart {
                     footprint.update(&message);
                 }
-                Command::none()
+                Task::none()
             },
 
             Message::TickerSelected(ticker) => {
@@ -403,21 +368,21 @@ impl Application for State {
                     pane_state.stream.0 = Some(ticker);
                 }
 
-                Command::none()
+                Task::none()
             },
             Message::TimeframeSelected(timeframe, pane) => {
                 if !self.ws_running {
-                    return Command::none();
+                    return Task::none();
                 }
 
                 let Some(selected_ticker) = &self.selected_ticker else {
                     eprintln!("No ticker selected");
-                    return Command::none();
+                    return Task::none();
                 };
 
                 self.kline_stream = false;
                 
-                let mut commands = vec![];
+                let mut Tasks = vec![];
                 let mut dropped_streams = vec![];
 
                 if let Some(pane) = self.panes.panes.get_mut(&pane) {
@@ -425,7 +390,7 @@ impl Application for State {
 
                     pane.stream.1 = Some(timeframe);
                     
-                    let fetch_klines = Command::perform(
+                    let fetch_klines = Task::perform(
                     market_data::fetch_klines(*selected_ticker, timeframe)
                         .map_err(|err| format!("{err}")), 
                     move |klines| {
@@ -434,29 +399,29 @@ impl Application for State {
 
                     dropped_streams.push(pane.id);
                     
-                    commands.push(fetch_klines);                                  
+                    Tasks.push(fetch_klines);                                  
                 };
         
                 // sleep to drop existent stream and create new one
-                let remove_active_stream = Command::perform(
+                let remove_active_stream = Task::perform(
                     async {
                         tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                     },
                     move |()| Message::CutTheKlineStream
                 );
-                commands.push(remove_active_stream);
+                Tasks.push(remove_active_stream);
 
-                Command::batch(commands)
+                Task::batch(Tasks)
             },
             Message::ExchangeSelected(exchange) => {
                 self.selected_exchange = Some(exchange);
-                Command::none()
+                Task::none()
             },
             Message::WsToggle() => {
                 self.ws_running = !self.ws_running;
 
                 if self.ws_running {  
-                    let mut commands = vec![];
+                    let mut Tasks = vec![];
 
                     let first_pane = self.first_pane;
         
@@ -475,16 +440,16 @@ impl Application for State {
 
                         let pane_id = pane_state.id;
 
-                        let fetch_klines = Command::perform(
+                        let fetch_klines = Task::perform(
                             market_data::fetch_klines(self.selected_ticker.unwrap_or(Ticker::BTCUSDT), selected_timeframe)
                                 .map_err(|err| format!("{err}")), 
                             move |klines: Result<Vec<market_data::Kline>, String>| {
                                 Message::FetchEvent(klines, pane_id, selected_timeframe)
                             }
                         );
-                        commands.push(fetch_klines);
+                        Tasks.push(fetch_klines);
                     }
-                    Command::batch(commands)
+                    Task::batch(Tasks)
 
                 } else {
                     self.ws_state = WsState::Disconnected;
@@ -495,7 +460,7 @@ impl Application for State {
                     self.custom_line = None;
                     self.footprint_chart = None;
 
-                    Command::none()
+                    Task::none()
                 }
             },       
             Message::FetchEvent(klines, target_pane, timeframe) => {
@@ -542,7 +507,7 @@ impl Application for State {
                         self.candlestick_chart = Some(CustomLine::new(vec![], Timeframe::M1)); 
                     },
                 }
-                Command::none()
+                Task::none()
             },
             Message::MarketWsEvent(event) => {
                 match event {
@@ -593,15 +558,15 @@ impl Application for State {
                         }
                     }
                 };
-                Command::none()
+                Task::none()
             },
             Message::UserKeySucceed(listen_key) => {
                 self.listen_key = Some(listen_key);
-                Command::none()
+                Task::none()
             },
             Message::UserKeyError => {
                 eprintln!("Check API keys");
-                Command::none()
+                Task::none()
             },
 
             // Pane grid
@@ -618,44 +583,44 @@ impl Application for State {
                     self.focus = focus_pane;
                 } 
 
-                Command::none()
+                Task::none()
             },
             Message::Clicked(pane) => {
                 self.focus = Some(pane);
-                Command::none()
+                Task::none()
             },
             Message::Resized(pane_grid::ResizeEvent { split, ratio }) => {
                 self.panes.resize(split, ratio);
-                Command::none()
+                Task::none()
             },
             Message::Dragged(pane_grid::DragEvent::Dropped {
                 pane,
                 target,
             }) => {
                 self.panes.drop(pane, target);
-                Command::none()
+                Task::none()
             },
             Message::Dragged(_) => {
-                Command::none()
+                Task::none()
             },
             Message::Maximize(pane) => {
                 self.panes.maximize(pane);
-                Command::none()
+                Task::none()
             },
             Message::Restore => {
                 self.panes.restore();
-                Command::none()
+                Task::none()
             },
             Message::Close(pane) => {                
                 if let Some((_, sibling)) = self.panes.close(pane) {
                     self.focus = Some(sibling);
                 }
-                Command::none()
+                Task::none()
             },
             Message::ToggleLayoutLock => {
                 self.focus = None;
                 self.pane_lock = !self.pane_lock;
-                Command::none()
+                Task::none()
             },
 
             Message::Debug(_msg) => {
@@ -663,24 +628,20 @@ impl Application for State {
                 dbg!(layout);
                 let state_config = &self.panes.panes;
                 dbg!(state_config);
-                Command::none()
-            },
-            Message::FontLoaded(_) => {
-                dbg!("Font loaded");
-                Command::none()
+                Task::none()
             },
 
             Message::OpenModal(pane) => {
                 if let Some(pane) = self.panes.get_mut(pane) {
                     pane.show_modal = true;
                 };
-                Command::none()
+                Task::none()
             },
             Message::CloseModal => {
                 for pane in self.panes.panes.values_mut() {
                     pane.show_modal = false;
                 }
-                Command::none()
+                Task::none()
             },
 
             Message::SliderChanged(pane_id, value) => {
@@ -701,7 +662,7 @@ impl Application for State {
                     time_and_sales.set_size_filter(self.size_filter_timesales);
                 };
 
-                Command::none()
+                Task::none()
             },
             Message::SyncWithHeatmap(sync) => {
                 self.sync_heatmap = sync;
@@ -713,11 +674,11 @@ impl Application for State {
                     }
                 }
             
-                Command::none()
+                Task::none()
             },
             Message::CutTheKlineStream => {
                 self.kline_stream = true;
-                Command::none()
+                Task::none()
             },
 
             Message::ShowLayoutModal => {
@@ -726,7 +687,7 @@ impl Application for State {
             },
             Message::HideLayoutModal => {
                 self.show_layout_modal = false;
-                Command::none()
+                Task::none()
             },
 
             Message::TicksizeSelected(ticksize) => {
@@ -740,12 +701,12 @@ impl Application for State {
                     }
                 }
 
-                Command::none()
+                Task::none()
             },
         }
     }
 
-    fn view(&self) -> Element<'_, Self::Message> {
+    fn view(&self) -> Element<'_, Message> {
         let focus = self.focus;
         let total_panes = self.panes.len();
 
@@ -790,6 +751,7 @@ impl Application for State {
                 };
 
                 let title_bar = pane_grid::TitleBar::new(title)
+                    .always_show_controls()
                     .controls(view_controls(
                         id,
                         pane.id,
@@ -963,10 +925,6 @@ impl Application for State {
         
         Subscription::batch(subscriptions)
     }    
-
-    fn theme(&self) -> Self::Theme {
-        Theme::KanagawaDragon
-    }
 }
 
 fn modal<'a, Message>(

Beberapa file tidak ditampilkan karena terlalu banyak file yang berubah dalam diff ini