瀏覽代碼

improve data integrity checks for charts

Berke 10 月之前
父節點
當前提交
8b1d04dda6
共有 3 個文件被更改,包括 164 次插入26 次删除
  1. 68 9
      src/charts/candlestick.rs
  2. 70 11
      src/charts/footprint.rs
  3. 26 6
      src/data_providers/binance.rs

+ 68 - 9
src/charts/candlestick.rs

@@ -99,6 +99,7 @@ pub struct CandlestickChart {
     indicators: HashMap<CandlestickIndicator, IndicatorData>,
     request_handler: RequestHandler,
     fetching_oi: bool,
+    integrity: bool,
 }
 
 impl CandlestickChart {
@@ -166,6 +167,7 @@ impl CandlestickChart {
             },
             request_handler: RequestHandler::new(),
             fetching_oi: false,
+            integrity: false,
         }
     }
 
@@ -179,6 +181,8 @@ impl CandlestickChart {
     }
 
     pub fn update_latest_kline(&mut self, kline: &Kline) -> Task<Message> {
+        let mut task = None;
+        
         self.data_points.insert(kline.time as i64, *kline);
 
         if let Some(IndicatorData::Volume(_, data)) = 
@@ -199,15 +203,15 @@ impl CandlestickChart {
         };
         
         if !chart.already_fetching {
-            return self.get_missing_data_task();
+            task = self.get_missing_data_task();
         }
 
         self.render_start();
-        Task::none()
+        task.unwrap_or(Task::none())
     }
 
-    fn get_missing_data_task(&mut self) -> Task<Message> {
-        let mut task = Task::none();
+    fn get_missing_data_task(&mut self) -> Option<Task<Message>> {
+        let mut task = None;
 
         let (visible_earliest, visible_latest) = self.get_visible_timerange();
         let (kline_earliest, kline_latest) = self.get_kline_timerange();
@@ -217,11 +221,11 @@ impl CandlestickChart {
         if visible_earliest < kline_earliest {
             let latest = kline_earliest;
 
-            if let Some(task) = request_fetch(
+            if let Some(fetch_task) = request_fetch(
                 &mut self.request_handler, FetchRange::Kline(earliest, latest)
             ) {
                 self.get_common_data_mut().already_fetching = true;
-                return task;
+                return Some(fetch_task);
             }
         }
 
@@ -237,7 +241,7 @@ impl CandlestickChart {
                             &mut self.request_handler, FetchRange::OpenInterest(earliest, latest)
                         ) {
                             self.fetching_oi = true;
-                            task = fetch_task;
+                            task = Some(fetch_task);
                         }
                     } else if oi_latest < kline_latest {
                         let latest = visible_latest;
@@ -246,17 +250,72 @@ impl CandlestickChart {
                             &mut self.request_handler, FetchRange::OpenInterest(oi_latest, latest)
                         ) {
                             self.fetching_oi = true;
-                            task = fetch_task;
+                            task = Some(fetch_task);
                         }
                     }
                 }
             }
         };
 
-        self.render_start();
+        if task.is_none() {
+            if let Some(missing_keys) = self.check_data_integrity(kline_earliest, kline_latest) {
+                let (latest, earliest) = (
+                    missing_keys.iter().max().unwrap_or(&visible_latest) + self.chart.timeframe as i64,
+                    missing_keys.iter().min().unwrap_or(&visible_earliest) - self.chart.timeframe as i64,
+                );
+
+                self.request_handler = RequestHandler::new();
+
+                if let Some(fetch_task) = request_fetch(
+                    &mut self.request_handler, FetchRange::Kline(earliest, latest)
+                ) {
+                    self.get_common_data_mut().already_fetching = true;
+                    task = Some(fetch_task);
+                }
+            }
+        }
+
         task
     }
 
+    fn check_data_integrity(&mut self, earliest: i64, latest: i64) -> Option<Vec<i64>> {
+        if self.integrity || self.fetching_oi {
+            return None;
+        }
+        if self.get_common_data().already_fetching {
+            return None;
+        }
+    
+        let interval = self.get_common_data().timeframe as i64;
+        
+        let mut time = earliest;
+        let mut missing_count = 0;
+        while time < latest {
+            if !self.data_points.contains_key(&time) {
+                missing_count += 1;
+                break; 
+            }
+            time += interval;
+        }
+    
+        if missing_count > 0 {
+            let mut missing_keys = Vec::with_capacity(((latest - earliest) / interval) as usize);
+            let mut time = earliest;
+            while time < latest {
+                if !self.data_points.contains_key(&time) {
+                    missing_keys.push(time);
+                }
+                time += interval;
+            }
+            
+            log::warn!("Integrity check failed: missing {} klines", missing_keys.len());
+            Some(missing_keys)
+        } else {
+            self.integrity = true;
+            None
+        }
+    }
+
     pub fn insert_new_klines(&mut self, req_id: uuid::Uuid, klines_raw: &Vec<Kline>) {
         let mut volume_data = BTreeMap::new();
 

+ 70 - 11
src/charts/footprint.rs

@@ -104,6 +104,7 @@ pub struct FootprintChart {
     fetching_oi: bool,
     fetching_trades: bool,
     request_handler: RequestHandler,
+    kline_integrity: bool,
 }
 
 impl FootprintChart {
@@ -180,6 +181,7 @@ impl FootprintChart {
                 ..Default::default()
             },
             data_points,
+            kline_integrity: false,
             raw_trades,
             indicators: {
                 let mut indicators = HashMap::new();
@@ -207,6 +209,8 @@ impl FootprintChart {
     }
 
     pub fn update_latest_kline(&mut self, kline: &Kline) -> Task<Message> {
+        let mut task = None;
+
         if let Some((_, kline_value)) = self.data_points.get_mut(&(kline.time as i64)) {
             kline_value.open = kline.open;
             kline_value.high = kline.high;
@@ -238,15 +242,15 @@ impl FootprintChart {
         };
 
         if !chart.already_fetching {
-            return self.get_missing_data_task();
+            task = self.get_missing_data_task();
         }
 
         self.render_start();
-        Task::none()
+        task.unwrap_or(Task::none())
     }
 
-    fn get_missing_data_task(&mut self) -> Task<Message> {
-        let mut task = Task::none();
+    fn get_missing_data_task(&mut self) -> Option<Task<Message>> {
+        let mut task = None;
 
         let (visible_earliest, visible_latest) = self.get_visible_timerange();
         let (kline_earliest, kline_latest) = self.get_kline_timerange();
@@ -256,11 +260,11 @@ impl FootprintChart {
         if visible_earliest < kline_earliest {
             let latest = kline_earliest;
 
-            if let Some(task) = request_fetch(
+            if let Some(fetch_task) = request_fetch(
                 &mut self.request_handler, FetchRange::Kline(earliest, latest)
             ) {
                 self.get_common_data_mut().already_fetching = true;
-                return task;
+                return Some(fetch_task);
             }
         }
 
@@ -274,11 +278,11 @@ impl FootprintChart {
                     .min();
             
                 if let Some(earliest) = trade_earliest {
-                    if let Some(task) = request_fetch(
+                    if let Some(fetch_task) = request_fetch(
                         &mut self.request_handler, FetchRange::Trades(visible_earliest, earliest)
                     ) {
                         self.fetching_trades = true;
-                        return task;
+                        return Some(fetch_task);
                     }
                 }
             }
@@ -296,7 +300,7 @@ impl FootprintChart {
                             &mut self.request_handler, FetchRange::OpenInterest(earliest, latest)
                         ) {
                             self.fetching_oi = true;
-                            task = fetch_task;
+                            task = Some(fetch_task);
                         }
                     } else if oi_latest < kline_latest {
                         let latest = visible_latest;
@@ -305,17 +309,72 @@ impl FootprintChart {
                             &mut self.request_handler, FetchRange::OpenInterest(oi_latest, latest)
                         ) {
                             self.fetching_oi = true;
-                            task = fetch_task;
+                            task = Some(fetch_task);
                         }
                     }
                 }
             }
         };
 
-        self.render_start();
+        if task.is_none() {
+            if let Some(missing_keys) = self.check_data_integrity(kline_earliest, kline_latest) {
+                let (latest, earliest) = (
+                    missing_keys.iter().max().unwrap_or(&visible_latest) + self.chart.timeframe as i64,
+                    missing_keys.iter().min().unwrap_or(&visible_earliest) - self.chart.timeframe as i64,
+                );
+
+                self.request_handler = RequestHandler::new();
+
+                if let Some(fetch_task) = request_fetch(
+                    &mut self.request_handler, FetchRange::Kline(earliest, latest)
+                ) {
+                    self.get_common_data_mut().already_fetching = true;
+                    task = Some(fetch_task);
+                }
+            }
+        }
+
         task
     }
 
+    fn check_data_integrity(&mut self, earliest: i64, latest: i64) -> Option<Vec<i64>> {
+        if self.kline_integrity || self.fetching_oi {
+            return None;
+        }
+        if self.get_common_data().already_fetching {
+            return None;
+        }
+    
+        let interval = self.get_common_data().timeframe as i64;
+        
+        let mut time = earliest;
+        let mut missing_count = 0;
+        while time < latest {
+            if !self.data_points.contains_key(&time) {
+                missing_count += 1;
+                break; 
+            }
+            time += interval;
+        }
+    
+        if missing_count > 0 {
+            let mut missing_keys = Vec::with_capacity(((latest - earliest) / interval) as usize);
+            let mut time = earliest;
+            while time < latest {
+                if !self.data_points.contains_key(&time) {
+                    missing_keys.push(time);
+                }
+                time += interval;
+            }
+            
+            log::warn!("Integrity check failed: missing {} klines", missing_keys.len());
+            Some(missing_keys)
+        } else {
+            self.kline_integrity = true;
+            None
+        }
+    }
+
     pub fn reset_request_handler(&mut self) {
         self.request_handler = RequestHandler::new();
         self.fetching_trades = false;

+ 26 - 6
src/data_providers/binance.rs

@@ -698,9 +698,19 @@ pub async fn fetch_klines(
         let interval_ms = timeframe.to_milliseconds() as i64;
         let num_intervals = ((end - start) / interval_ms).min(1000);
 
-        url.push_str(&format!(
-            "&startTime={start}&endTime={end}&limit={num_intervals}"
-        ));
+        if num_intervals < 3 {
+            let new_start = start - (interval_ms * 5);
+            let new_end = end + (interval_ms * 5);
+            let num_intervals = ((new_end - new_start) / interval_ms).min(1000);
+
+            url.push_str(&format!(
+                "&startTime={new_start}&endTime={new_end}&limit={num_intervals}"
+            ));
+        } else {
+            url.push_str(&format!(
+                "&startTime={start}&endTime={end}&limit={num_intervals}"
+            ));
+        }     
     } else {
         url.push_str(&format!("&limit={}", 200));
     }
@@ -1087,9 +1097,19 @@ pub async fn fetch_historical_oi(
         let interval_ms = period.to_milliseconds() as i64;
         let num_intervals = ((end - start) / interval_ms).min(500);
 
-        url.push_str(&format!(
-            "&startTime={start}&endTime={end}&limit={num_intervals}"
-        ));
+        if num_intervals < 3 {
+            let new_start = start - (interval_ms * 5);
+            let new_end = end + (interval_ms * 5);
+            let num_intervals = ((new_end - new_start) / interval_ms).min(1000);
+            
+            url.push_str(&format!(
+                "&startTime={new_start}&endTime={new_end}&limit={num_intervals}"
+            ));
+        } else {
+            url.push_str(&format!(
+                "&startTime={start}&endTime={end}&limit={num_intervals}"
+            ));
+        }     
     } else {
         url.push_str(&format!("&limit={}", 200));
     }