|
|
@@ -78,12 +78,42 @@ impl Predictor {
|
|
|
let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
|
|
|
|
|
|
while let Some(value) = rx.next().await {
|
|
|
+ // 获取插入下标
|
|
|
+ // 反向遍历 VecDeque
|
|
|
+ let mut insert_index = 0usize;
|
|
|
+ if debugs[0].len() > 0 {
|
|
|
+ let mut j = debugs[0].len() - 1;
|
|
|
+ loop {
|
|
|
+ if debugs[0][j].unwrap() <= value[0] {
|
|
|
+ insert_index = j + 1;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ j = j - 1;
|
|
|
+ if j == 0 {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// 数据填充到对应位置
|
|
|
for i in 0..len {
|
|
|
if value[i] == Self::DONT_VIEW {
|
|
|
- debugs[i].push_back(None);
|
|
|
+ // 可能会遇见按时序插入的
|
|
|
+ if debugs[i].len() > 0 && insert_index < debugs.len() {
|
|
|
+ // 在合适的位置插入新元素
|
|
|
+ debugs[i].insert(insert_index, None);
|
|
|
+ } else {
|
|
|
+ debugs[i].push_back(None);
|
|
|
+ }
|
|
|
} else {
|
|
|
- debugs[i].push_back(Some(value[i]));
|
|
|
+ // 可能会遇见按时序插入的
|
|
|
+ if debugs[i].len() > 0 && insert_index < debugs.len() {
|
|
|
+ // 在合适的位置插入新元素
|
|
|
+ debugs[i].insert(insert_index, Some(value[i]));
|
|
|
+ } else {
|
|
|
+ debugs[i].push_back(Some(value[i]));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -95,7 +125,7 @@ impl Predictor {
|
|
|
}
|
|
|
|
|
|
let now = Decimal::from(Utc::now().timestamp_millis());
|
|
|
- if now - prev_save_time < dec!(60000) {
|
|
|
+ if now - prev_save_time < dec!(30000) {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
@@ -171,7 +201,7 @@ impl Predictor {
|
|
|
if self.mid_price.is_zero() {
|
|
|
return;
|
|
|
}
|
|
|
- self.processor().await;
|
|
|
+ self.processor(depth.time).await;
|
|
|
}
|
|
|
|
|
|
pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
|
|
|
@@ -201,7 +231,7 @@ impl Predictor {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- self.processor().await;
|
|
|
+ self.processor(Decimal::from(Utc::now().timestamp_millis())).await;
|
|
|
}
|
|
|
|
|
|
pub async fn on_balance(&mut self, balance: Decimal) {
|
|
|
@@ -346,7 +376,7 @@ impl Predictor {
|
|
|
}
|
|
|
|
|
|
// #[instrument(skip(self), level="TRACE")]
|
|
|
- async fn processor(&mut self) {
|
|
|
+ async fn processor(&mut self, data_time: Decimal) {
|
|
|
self.check_ready();
|
|
|
if !self.is_ready {
|
|
|
return;
|
|
|
@@ -355,7 +385,7 @@ impl Predictor {
|
|
|
self.update_delta();
|
|
|
|
|
|
// let cci_arc = self.cci_arc.clone();
|
|
|
- let now = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
|
|
|
+ let now = data_time;
|
|
|
let mid_price = self.mid_price;
|
|
|
let ask_price = self.fair_price_std_vec[0];
|
|
|
let bid_price = self.fair_price_std_vec[1];
|