use std::cmp::{max, min}; use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use chrono::{Utc}; use futures_channel::mpsc::UnboundedSender; use futures_util::StreamExt; use rust_decimal::prelude::*; use rust_decimal_macros::dec; use tokio::sync::{Mutex}; use tracing::{info}; use global::cci::CentralControlInfo; use global::fixed_time_range_deque::FixedTimeRangeDeque; use global::params::Params; use standard::{Depth, Record, Ticker, Trade}; use crate::utils; #[derive(Debug, Clone)] pub struct Predictor { pub depth_vec: Vec, // 深度队列 pub volume_vec: Vec, // 交易量队列 pub trade_long_vec: FixedTimeRangeDeque, // 交易队列 pub trade_short_vec: FixedTimeRangeDeque, // 交易队列 pub trade_fixed_vec: Vec, // 交易队列(观察持仓后的资金流) pub spread_vec: Vec, // 价差队列 pub record_vec: VecDeque, // 蜡烛队列 pub mid_price: Decimal, // 中间价 pub ask_price: Decimal, // 卖一价 pub bid_price: Decimal, // 买一价 pub last_price: Decimal, // 最后成交价 pub trades_volume_short: Decimal, // 过去10秒的成交量总和 pub trades_volume_short_ema: Decimal, // 过去10秒的成交量总和的ema pub spread: Decimal, // 当前价差 pub spread_ema_1000: Decimal, // 价差的ema,1000级别 pub optimal_ask_price: Decimal, // 卖出挂单价 pub optimal_bid_price: Decimal, // 买入挂单价 pub profit_point: Decimal, // 利润点数 pub profit_point_ema: Decimal, // 利润点数的ema pub profit_point_vec: Vec, // 利润队列 pub inventory: Decimal, // 库存,也就是q pub pos_amount: Decimal, // 原始持仓量 pub pos_avg_price: Decimal, // 原始持仓价格 pub level: Decimal, // martin pub money_flow: Decimal, // 资金流 pub ask_delta: Decimal, // δa pub bid_delta: Decimal, // δb pub mid_price_time_vec: FixedTimeRangeDeque, // 中间价格队列, pub fair_price_time_vec: FixedTimeRangeDeque, // 公平价格队列, pub fair_price_long_time_vec: FixedTimeRangeDeque, // pub fair_price_vec: Vec, // 公平价格列表,0表示做市所,1表示参考所 pub fair_price: Decimal, // 公平价格 pub fair_price_ema_short: Decimal, // 公平价格_ema pub fair_price_ema_long: Decimal, // 公平价格_ema pub fair_rate_focus_open: Decimal, // 变化幅度焦点 pub mid_price_focus_open: Decimal, // 观测焦点时的价格 pub fair_rate_focus_close: Decimal, // 变化幅度焦点 pub fair_price_focus_close: Decimal, // 观测焦点时的价格 pub fair_price_when_ordering: Decimal, // 下单时的公平价格 pub price_times_avg: Decimal, // 公平所与做市所的价格倍率的平均值 pub is_ready: bool, // 是否已准备好 pub prev_trade_time: i64, // 上次交易时间,也就是t pub t_diff: Decimal, // (T-t) pub last_update_time: Decimal, // 最后更新时间(depth) pub last_index: Decimal, // 最后更新的index pub prev_insert_time: Decimal, pub prev_save_time: Decimal, pub init_time: Decimal, pub params: Params, pub debug_sender: UnboundedSender> } impl Predictor { // 时间窗口大小(微秒) // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000; const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000; const TRADE_LONG_RANGE_MICROS: i64 = 60_000_000; // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000; const TRADE_SHORT_RANGE_MICROS: i64 = 2 * 60_000_000; // const ONE_MILLION: Decimal = dec!(1_000_000); // const TWENTY_THOUSAND: Decimal = dec!(20_000); const DONT_VIEW: Decimal = dec!(14142135623730951); pub fn new(_cci_arc: Arc>, params: Params) -> Self { // 创建数据通道 // 创建一个无界通道 let (tx, mut rx) = futures_channel::mpsc::unbounded::>(); let account_port = params.port.clone(); tokio::spawn(async move { let len = 16usize; let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis()); let mut debugs: Vec>> = vec![VecDeque::new(); len]; while let Some(value) = rx.next().await { // 数据填充到对应位置 for i in 0..len { if value[i] == Self::DONT_VIEW { debugs[i].push_back(None); } else { debugs[i].push_back(Some(value[i])); } } // 长度限制 if debugs[0].len() > 500_000 { for i in 0..len { debugs[i].pop_front(); // 从前面移除元素 } } let now = Decimal::from(Utc::now().timestamp_millis()); if now - prev_save_time < dec!(60000) { continue; } let debugs_clone = debugs.clone(); let temp_html_str = tokio::task::spawn_blocking(move || { utils::build_html_file(&debugs_clone) }).await.unwrap(); let path = format!("./db/{}.html", account_port); utils::write_to_file(&temp_html_str, path).await; prev_save_time = Decimal::from(Utc::now().timestamp_millis()); } }); let predictor = Self { // 接针版本 depth_vec: vec![Depth::new(); 10], fair_price_vec: vec![Decimal::ZERO; 10], volume_vec: vec![Decimal::ZERO; 10], // 老的队列 spread_vec: vec![], trade_long_vec: FixedTimeRangeDeque::new(Self::TRADE_LONG_RANGE_MICROS), trade_short_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS), trade_fixed_vec: vec![], profit_point_vec: vec![], record_vec: VecDeque::new(), mid_price: Default::default(), ask_price: Default::default(), bid_price: Default::default(), last_price: Default::default(), trades_volume_short: Default::default(), trades_volume_short_ema: Default::default(), spread: Default::default(), spread_ema_1000: Default::default(), optimal_ask_price: Default::default(), optimal_bid_price: Default::default(), inventory: Default::default(), ask_delta: Default::default(), bid_delta: Default::default(), fair_price_time_vec: FixedTimeRangeDeque::new((params.second_observation_time.to_f64().unwrap() * 1_000_000f64).to_i64().unwrap()), fair_price_long_time_vec: FixedTimeRangeDeque::new(5 * 60_000_000), mid_price_time_vec: FixedTimeRangeDeque::new(100_000), fair_price: Default::default(), fair_price_ema_short: Default::default(), fair_price_ema_long: Default::default(), fair_rate_focus_open: Default::default(), mid_price_focus_open: Default::default(), fair_rate_focus_close: Default::default(), fair_price_focus_close: Default::default(), fair_price_when_ordering: Default::default(), price_times_avg: Default::default(), is_ready: false, prev_trade_time: Utc::now().timestamp_micros(), t_diff: Default::default(), level: Default::default(), pos_amount: Default::default(), money_flow: Default::default(), profit_point: Default::default(), profit_point_ema: Default::default(), last_update_time: Default::default(), last_index: Default::default(), pos_avg_price: Default::default(), prev_insert_time: Default::default(), prev_save_time: Decimal::from(Utc::now().timestamp_millis()), init_time: Decimal::from(Utc::now().timestamp_millis()), params, debug_sender: tx, }; predictor } pub async fn on_depth(&mut self, depth: &Depth, index: usize) { self.last_update_time = depth.time; self.last_index = Decimal::from(index); if index == 0 { self.ask_price = depth.asks[0].price; self.bid_price = depth.bids[0].price; self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO; self.mid_price_time_vec.push_back(self.mid_price); if !self.inventory.is_zero() { let mut profit_now = if self.inventory > Decimal::ZERO { (self.mid_price - self.pos_avg_price) / self.pos_avg_price } else { (self.pos_avg_price - self.mid_price) / self.pos_avg_price }; profit_now -= dec!(0.0006); profit_now.rescale(8); self.profit_point_vec.push(profit_now); // let total: Decimal = self.profit_fixed_vec.iter().sum(); self.profit_point = profit_now; self.profit_point_ema = self.profit_point_ema * dec!(0.99) + self.profit_point * dec!(0.01); } } self.update_fair_price(depth, index).await; self.update_spread(); self.depth_vec[index] = depth.clone(); if self.mid_price.is_zero() { return; } self.processor().await; } pub async fn on_trade(&mut self, trade: &Trade, _index: usize) { self.trade_long_vec.push_back(trade.clone()); self.trade_short_vec.push_back(trade.clone()); if !self.inventory.is_zero() { self.trade_fixed_vec.push(trade.clone()); if self.trade_fixed_vec.len() > 100 { let (bought_sum, sold_sum): (Decimal, Decimal) = self.trade_fixed_vec.iter() .fold((Decimal::ZERO, Decimal::ZERO), |(buy_sum, sell_sum), item| { if item.size > Decimal::ZERO { (buy_sum + item.value.abs(), sell_sum) } else if item.size < Decimal::ZERO { (buy_sum, sell_sum + item.value.abs()) } else { (buy_sum, sell_sum) } }); self.money_flow = (bought_sum - sold_sum) / (bought_sum + sold_sum); self.money_flow.rescale(4); } } self.last_price = trade.price; // self.processor().await; } pub async fn update_level(&mut self) { self.level = (Decimal::NEGATIVE_ONE + (Decimal::ONE + dec!(8) * self.inventory.abs()).sqrt().unwrap()) / Decimal::TWO; self.level = min(self.level, dec!(6)); } pub async fn on_ticker(&mut self, _ticker: &Ticker) {} pub async fn on_record(&mut self, _record: &Record) {} pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal) { if self.mid_price.is_zero() { return; } let prev_inventory = self.inventory; self.pos_amount = pos_amount.clone(); self.pos_avg_price = pos_avg_price.clone(); self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc(); // 小于1但不为0的情况,需要平完 if self.inventory.is_zero() && !pos_amount.is_zero() { self.inventory = if pos_amount > &Decimal::ZERO { Decimal::ONE } else { Decimal::NEGATIVE_ONE }; } if prev_inventory != self.inventory && prev_inventory.is_zero() { self.prev_trade_time = Utc::now().timestamp_micros(); } // 重置fair数据,用于重新计算幅度 if prev_inventory != self.inventory { self.fair_price_time_vec.deque.clear(); } // 重置资金流计算 if prev_inventory != self.inventory && self.inventory.is_zero() { self.trade_fixed_vec.clear(); self.profit_point_vec.clear(); self.profit_point = Decimal::ZERO; self.profit_point_ema = Decimal::ZERO; self.money_flow = Decimal::ZERO; } self.update_level().await; self.processor().await; } pub fn get_real_rate(price_vec: &FixedTimeRangeDeque) -> Decimal { let last_fair_price = price_vec.deque.iter().last().unwrap(); let min_price = price_vec.deque.iter().min().unwrap(); let max_price = price_vec.deque.iter().max().unwrap(); let up_rate = (last_fair_price - min_price) / min_price; let down_rate = (max_price - last_fair_price) / max_price; if up_rate > down_rate { up_rate } else { -down_rate } } pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) { if self.mid_price.is_zero() { return; } let a1 = &depth.asks[0]; let b1 = &depth.bids[0]; // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price let total = a1.value + b1.value; let fair_price = a1.price * b1.value / total + b1.price * a1.value / total; // let fair_price = (a1.price + b1.price) / Decimal::TWO; self.fair_price_vec[index] = fair_price; self.fair_price_vec[index].rescale(self.mid_price.scale()); self.volume_vec[index] = a1.size + b1.size; // 合成公平价格 if !self.fair_price_vec[0].is_zero() && !self.fair_price_vec[1].is_zero() { self.fair_price = self.fair_price_vec[1]; } } pub fn update_spread(&mut self) { if self.mid_price.is_zero() || self.fair_price.is_zero() { return; } self.spread = (self.fair_price - self.mid_price) / self.mid_price; // self.spread.rescale(8); self.spread_vec.push(self.spread); self.spread_ema_1000 = if self.spread_ema_1000.is_zero() { self.spread } else { self.spread_ema_1000 * dec!(0.999) + self.spread * dec!(0.001) }; // self.spread_sma_1000.rescale(8); // self.spread_sma_1000_time_vec.push_back(self.spread_ema_1000); while self.spread_vec.len() > 1_000 { self.spread_vec.remove(0); } } pub fn update_delta(&mut self) { // -2表示不想成交 // -1表示市价成交(委托对手盘的价格,但不一定能市价成交),这里再想想吧,经常委托出去没成交,明显比别人慢了 // 0是买一/卖一成交 if self.fair_price.is_zero() { return; } // 可能是接针 let is_open_long = self.inventory.is_zero(); let is_open_short = self.inventory.is_zero(); let is_close_long = self.inventory > Decimal::ZERO; let is_close_short = self.inventory < Decimal::ZERO; self.bid_delta = dec!(-2); self.ask_delta = dec!(-2); if is_close_long { self.ask_delta = self.mid_price * self.params.close; } else if is_close_short { self.bid_delta = self.mid_price * self.params.close; } else if is_open_long { self.bid_delta = self.params.open * self.mid_price; } else if is_open_short { self.ask_delta = self.params.open * self.mid_price; } } pub fn update_optimal_ask_and_bid(&mut self) { self.optimal_ask_price = if self.ask_delta == dec!(-1) { self.bid_price } else if self.ask_delta == dec!(-2) { Self::DONT_VIEW } else { max(self.ask_price + self.ask_delta, self.bid_price) }; self.optimal_bid_price = if self.bid_delta == dec!(-1) { self.ask_price } else if self.bid_delta == dec!(-2) { Self::DONT_VIEW } else { min(self.bid_price - self.bid_delta, self.ask_price) }; self.optimal_ask_price.rescale(self.mid_price.scale()); self.optimal_bid_price.rescale(self.mid_price.scale()); } pub fn update_t_diff(&mut self) { if self.prev_trade_time > 0 { let time_diff_decimal = Decimal::from_i64(Utc::now().timestamp_micros() - self.prev_trade_time).unwrap(); self.t_diff = max(Decimal::ONE - time_diff_decimal / Decimal::from_i64(Self::TIME_DIFF_RANGE_MICROS).unwrap(), Decimal::ZERO); } else { self.t_diff = Decimal::ONE; } } pub fn check_ready(&mut self) { if self.is_ready { return; } if self.mid_price == Decimal::ZERO { return; } if self.fair_price == Decimal::ZERO { return; } if self.ask_price == Decimal::ZERO { return; } if self.bid_price == Decimal::ZERO { return; } if self.trade_long_vec.len() < 100 { return; } self.is_ready = true; info!("========================================行情数据预热完毕==================================") } // #[instrument(skip(self), level="TRACE")] async fn processor(&mut self) { self.check_ready(); if !self.is_ready { return; } self.trades_volume_short = self.trade_short_vec.deque.iter().map(|item| item.value).sum(); self.trades_volume_short_ema = if self.trades_volume_short_ema.is_zero() { self.trades_volume_short } else { self.trades_volume_short_ema * dec!(0.9995) + self.trades_volume_short * dec!(0.0005) }; self.update_t_diff(); self.update_delta(); self.update_optimal_ask_and_bid(); // let mut smm = Decimal::ZERO; // if !self.depth_vec[1].time.is_zero() { // let sma = self.depth_vec[1].asks[0].price; // let smb = self.depth_vec[1].bids[0].price; // smm = (sma + smb) / Decimal::TWO; // } // let cci_arc = self.cci_arc.clone(); let now = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap(); let mid_price = self.mid_price; let ask_price = self.ask_price; let bid_price = self.bid_price; let last_price = self.last_price; let fair_price = self.fair_price; let spread = self.profit_point; let spread_max = self.profit_point_ema; let spread_min = Self::DONT_VIEW; // let spread = self.price_times_avg; // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0]; // let spread_min = self.fair_price / self.mid_price; let optimal_ask_price = self.optimal_ask_price; let optimal_bid_price = self.optimal_bid_price; let inventory = self.inventory; let sigma_square = if self.fair_price_time_vec.len() > 1 { Self::get_real_rate(&self.fair_price_time_vec) } else { Decimal::ZERO }; // let sigma_square = self.error_rate; let gamma = self.fair_rate_focus_open; let kappa = self.fair_rate_focus_close; let flow_ratio = Decimal::ZERO; let need_append = now - self.prev_insert_time > Decimal::ONE_HUNDRED; if !need_append { return; } self.debug_sender.unbounded_send(vec![ now, mid_price, ask_price, bid_price, last_price, spread, spread_max, spread_min, optimal_ask_price, optimal_bid_price, inventory, sigma_square, gamma, kappa, flow_ratio, fair_price ]).unwrap(); self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis()) } // #[instrument(skip(self, ref_ticker_map), level="TRACE")] pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap) -> Vec> { vec![] } }