|
|
@@ -73,7 +73,7 @@ impl Predictor {
|
|
|
|
|
|
let account_port = params.port.clone();
|
|
|
tokio::spawn(async move {
|
|
|
- let len = 16usize;
|
|
|
+ let len = 17usize;
|
|
|
let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
|
|
|
let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
|
|
|
|
|
|
@@ -100,7 +100,7 @@ impl Predictor {
|
|
|
for i in 0..len {
|
|
|
if value[i] == Self::DONT_VIEW {
|
|
|
// 可能会遇见按时序插入的
|
|
|
- if debugs[i].len() > 0 && insert_index < debugs.len() {
|
|
|
+ if debugs[i].len() > 0 && insert_index < debugs[i].len() {
|
|
|
// 在合适的位置插入新元素
|
|
|
debugs[i].insert(insert_index, None);
|
|
|
} else {
|
|
|
@@ -108,7 +108,7 @@ impl Predictor {
|
|
|
}
|
|
|
} else {
|
|
|
// 可能会遇见按时序插入的
|
|
|
- if debugs[i].len() > 0 && insert_index < debugs.len() {
|
|
|
+ if debugs[i].len() > 0 && insert_index < debugs[i].len() {
|
|
|
// 在合适的位置插入新元素
|
|
|
debugs[i].insert(insert_index, Some(value[i]));
|
|
|
} else {
|
|
|
@@ -201,7 +201,8 @@ impl Predictor {
|
|
|
if self.mid_price.is_zero() {
|
|
|
return;
|
|
|
}
|
|
|
- self.processor(depth.time).await;
|
|
|
+
|
|
|
+ self.processor(depth.time, false).await;
|
|
|
}
|
|
|
|
|
|
pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
|
|
|
@@ -214,7 +215,7 @@ impl Predictor {
|
|
|
|
|
|
pub async fn on_record(&mut self, _record: &Record) {}
|
|
|
|
|
|
- pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal) {
|
|
|
+ pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal, update_time: Decimal) {
|
|
|
if self.mid_price.is_zero() {
|
|
|
return;
|
|
|
}
|
|
|
@@ -231,7 +232,7 @@ impl Predictor {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- self.processor(Decimal::from(Utc::now().timestamp_millis())).await;
|
|
|
+ self.processor(update_time, true).await;
|
|
|
}
|
|
|
|
|
|
pub async fn on_balance(&mut self, balance: Decimal) {
|
|
|
@@ -376,7 +377,7 @@ impl Predictor {
|
|
|
}
|
|
|
|
|
|
// #[instrument(skip(self), level="TRACE")]
|
|
|
- async fn processor(&mut self, data_time: Decimal) {
|
|
|
+ async fn processor(&mut self, data_time: Decimal, is_hard_update: bool) {
|
|
|
self.check_ready();
|
|
|
if !self.is_ready {
|
|
|
return;
|
|
|
@@ -384,6 +385,10 @@ impl Predictor {
|
|
|
|
|
|
self.update_delta();
|
|
|
|
|
|
+ if !self.inventory.is_zero() {
|
|
|
+ info!(?data_time, is_hard_update)
|
|
|
+ }
|
|
|
+
|
|
|
// let cci_arc = self.cci_arc.clone();
|
|
|
let now = data_time;
|
|
|
let mid_price = self.mid_price;
|
|
|
@@ -411,9 +416,13 @@ impl Predictor {
|
|
|
let flow_ratio = Decimal::ZERO;
|
|
|
|
|
|
let need_append = now - self.prev_insert_time > Decimal::ONE_HUNDRED;
|
|
|
- if !need_append {
|
|
|
+ if !need_append && !is_hard_update {
|
|
|
return;
|
|
|
}
|
|
|
+ if !is_hard_update {
|
|
|
+ self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
|
|
|
+ }
|
|
|
+ let pos_avg_price = self.pos_avg_price;
|
|
|
|
|
|
self.debug_sender.unbounded_send(vec![
|
|
|
now,
|
|
|
@@ -431,10 +440,9 @@ impl Predictor {
|
|
|
gamma,
|
|
|
kappa,
|
|
|
flow_ratio,
|
|
|
- fair_price
|
|
|
+ fair_price,
|
|
|
+ pos_avg_price
|
|
|
]).unwrap();
|
|
|
-
|
|
|
- self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
|
|
|
}
|
|
|
|
|
|
// #[instrument(skip(self, ref_ticker_map), level="TRACE")]
|