use std::cmp::{max, min}; use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use chrono::{Utc}; use futures_channel::mpsc::UnboundedSender; use futures_util::StreamExt; use rust_decimal::prelude::*; use rust_decimal_macros::dec; use tokio::sync::{Mutex}; use tracing::{info}; use global::cci::CentralControlInfo; use global::fixed_time_range_deque::FixedTimeRangeDeque; use global::params::Params; use standard::{Depth, Record, Ticker, Trade}; use crate::utils; #[derive(Debug)] pub struct Predictor { pub depth_vec: Vec, // 深度队列 pub volume_vec: Vec, // 交易量队列 pub trade_long_vec: FixedTimeRangeDeque, // 交易队列 pub trade_short_vec: FixedTimeRangeDeque, // 交易队列 pub spread_vec: Vec, // 价差队列 pub record_vec: VecDeque, // 蜡烛队列 pub mid_price: Decimal, // 中间价 pub ask_price: Decimal, // 卖一价 pub bid_price: Decimal, // 买一价 pub last_price: Decimal, // 最后成交价 pub spread: Decimal, // 当前价差 pub spread_sma: Decimal, // 价差的sma,默认是sma5000 pub spread_sma_2000: Decimal, // 价差的sma,2000级别 pub spread_sma_1000: Decimal, // 价差的sma,1000级别 pub optimal_ask_price: Decimal, // 卖出挂单价 pub optimal_bid_price: Decimal, // 买入挂单价 pub inventory: Decimal, // 库存,也就是q pub pos_amount: Decimal, // 原始持仓量 pub pos_avg_price: Decimal, // 原始持仓价格 pub level: Decimal, // martin pub error_rate: Decimal, // 犯错概率(预估) pub ask_delta: Decimal, // δa pub bid_delta: Decimal, // δb pub mid_price_time_vec: FixedTimeRangeDeque, // 中间价格队列,100ms以内的所有中间价格 pub fair_price_time_vec: FixedTimeRangeDeque, // 公平价格队列,50ms以内的所有公平价格 pub spread_sma_1000_time_vec: FixedTimeRangeDeque, // spread队列,100ms以内的所有spread_sma_1000 pub fair_price_vec: Vec, // 公平价格列表,0表示做市所,1表示参考所 pub fair_price: Decimal, // 预定价格 pub fair_rate_focus: Decimal, // 变化幅度焦点 pub fair_price_when_ordering: Decimal, // 下单时的预定价格 pub price_times_avg: Decimal, // 公平所与做市所的价格倍率的平均值 pub is_regressed: bool, // 做市所的价格是否已经回归 pub is_ready: bool, // 是否已准备好 pub close_price: Decimal, // 计划平仓价格 pub prev_trade_time: i64, // 上次交易时间,也就是t pub t_diff: Decimal, // (T-t) pub last_update_time: Decimal, // 最后更新时间(depth) pub last_index: Decimal, // 最后更新的index pub prev_insert_time: Decimal, pub prev_save_time: Decimal, pub params: Params, pub debug_sender: UnboundedSender>, pub long_trade_len_dec: Decimal, pub short_trade_len_dec: Decimal, } impl Predictor { // 时间窗口大小(微秒) // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000; const TIME_DIFF_RANGE_MICROS: i64 = 10 * 60_000_000; const TRADE_LONG_RANGE_MICROS: i64 = 3 * 60_000_000; // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000; const TRADE_SHORT_RANGE_MICROS: i64 = 30_000_000; // const ONE_MILLION: Decimal = dec!(1_000_000); // const TWENTY_THOUSAND: Decimal = dec!(20_000); const UN_VIEW: Decimal = dec!(14142135623730951); pub fn new(_cci_arc: Arc>, params: Params) -> Self { // 创建数据通道 // 创建一个无界通道 let (tx, mut rx) = futures_channel::mpsc::unbounded::>(); tokio::spawn(async move { let len = 16usize; let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis()); let mut debugs: Vec>> = vec![VecDeque::new(); len]; while let Some(value) = rx.next().await { // 数据填充到对应位置 for i in 0..len { if value[i] == Self::UN_VIEW { debugs[i].push_back(None); } else { debugs[i].push_back(Some(value[i])); } } // 长度限制 if debugs[0].len() > 500_000 { for i in 0..len { debugs[i].pop_front(); // 从前面移除元素 } } let now = Decimal::from(Utc::now().timestamp_millis()); if now - prev_save_time < dec!(60000) { continue; } let debugs_clone = debugs.clone(); let temp_html_str = tokio::task::spawn_blocking(move || { utils::build_html_file(&debugs_clone) }).await.unwrap(); utils::write_to_file(&temp_html_str, "./db/db.html".to_string()).await; prev_save_time = Decimal::from(Utc::now().timestamp_millis()); } }); let predictor = Self { // 接针版本 depth_vec: vec![Depth::new(); 10], fair_price_vec: vec![Decimal::ZERO; 10], volume_vec: vec![Decimal::ZERO; 10], // 老的队列 spread_vec: vec![], trade_long_vec: FixedTimeRangeDeque::new(Self::TRADE_LONG_RANGE_MICROS), trade_short_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS), record_vec: VecDeque::new(), mid_price: Default::default(), ask_price: Default::default(), bid_price: Default::default(), last_price: Default::default(), spread: Default::default(), spread_sma: Default::default(), spread_sma_2000: Default::default(), spread_sma_1000: Default::default(), optimal_ask_price: Default::default(), optimal_bid_price: Default::default(), inventory: Default::default(), ask_delta: Default::default(), bid_delta: Default::default(), fair_price_time_vec: FixedTimeRangeDeque::new(50_000), mid_price_time_vec: FixedTimeRangeDeque::new(100_000), spread_sma_1000_time_vec: FixedTimeRangeDeque::new(500_000), fair_price: Default::default(), fair_rate_focus: Default::default(), fair_price_when_ordering: Default::default(), price_times_avg: Default::default(), is_regressed: false, is_ready: false, prev_trade_time: Utc::now().timestamp_micros(), close_price: Default::default(), t_diff: Default::default(), level: Default::default(), pos_amount: Default::default(), error_rate: Default::default(), last_update_time: Default::default(), last_index: Default::default(), pos_avg_price: Default::default(), prev_insert_time: Default::default(), prev_save_time: Decimal::from(Utc::now().timestamp_millis()), params, debug_sender: tx, long_trade_len_dec: Default::default(), short_trade_len_dec: Default::default(), }; predictor } pub async fn on_depth(&mut self, depth: &Depth, index: usize) { self.last_update_time = depth.time; self.last_index = Decimal::from(index); if index == 0 { self.ask_price = depth.asks[0].price; self.bid_price = depth.bids[0].price; self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO; self.mid_price_time_vec.push_back(self.mid_price); } self.update_fair_price(depth, index).await; self.update_spread(); self.depth_vec[index] = depth.clone(); if self.mid_price.is_zero() { return; } self.processor().await; } pub async fn on_trade(&mut self, trade: &Trade, _index: usize) { // self.last_update_time = trade.time; self.trade_long_vec.push_back(trade.clone()); self.trade_short_vec.push_back(trade.clone()); self.long_trade_len_dec = Decimal::from_usize(self.trade_long_vec.len()).unwrap(); self.short_trade_len_dec = Decimal::from_usize(self.trade_short_vec.len()).unwrap(); self.error_rate = self.short_trade_len_dec / self.long_trade_len_dec; self.error_rate.rescale(4); self.last_price = trade.price; // self.processor().await; } pub async fn update_level(&mut self) { self.level = (Decimal::NEGATIVE_ONE + (Decimal::ONE + dec!(8) * self.inventory.abs()).sqrt().unwrap()) / Decimal::TWO; self.level = min(self.level, dec!(6)); } pub async fn on_ticker(&mut self, _ticker: &Ticker) {} pub async fn on_record(&mut self, _record: &Record) {} pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal) { if self.mid_price.is_zero() { return; } let prev_inventory = self.inventory; self.pos_amount = pos_amount.clone(); self.pos_avg_price = pos_avg_price.clone(); self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc(); // 小于1但不为0的情况,需要平完 if self.inventory.is_zero() && !pos_amount.is_zero() { self.inventory = if pos_amount > &Decimal::ZERO { Decimal::ONE } else { Decimal::NEGATIVE_ONE }; } if prev_inventory != self.inventory && prev_inventory.is_zero() { self.prev_trade_time = Utc::now().timestamp_micros(); self.close_price = self.fair_price_when_ordering; } if prev_inventory != self.inventory && self.inventory.is_zero() { self.is_regressed = false; } self.update_level().await; self.processor().await; } pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) { if self.mid_price.is_zero() { return; } let a1 = &depth.asks[0]; let b1 = &depth.bids[0]; // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price // let total = a1.value + b1.value; // let fair_price = a1.price * b1.value / total + b1.price * a1.value / total; let fair_price = (a1.price + b1.price) / Decimal::TWO; self.fair_price_vec[index] = if self.fair_price_vec[index].is_zero() { fair_price } else { self.fair_price_vec[index] * dec!(0.9) + fair_price * dec!(0.1) }; self.fair_price_vec[index].rescale(self.mid_price.scale()); self.volume_vec[index] = a1.size + b1.size; // 合成公平价格 if !self.fair_price_vec[0].is_zero() && !self.fair_price_vec[1].is_zero() { self.price_times_avg = if self.price_times_avg.is_zero() { self.fair_price_vec[1] / self.fair_price_vec[0] } else { self.price_times_avg * dec!(0.9999) + dec!(0.0001) * self.fair_price_vec[1] / self.fair_price_vec[0] }; // 进行价格归一化处理,公平所的价格有可能是带前缀的 let fair_price_part0 = self.fair_price_vec[0] * dec!(0.2); let fair_price_part1 = (self.fair_price_vec[1] / self.price_times_avg) * dec!(0.8); self.fair_price = fair_price_part0 + fair_price_part1; self.fair_price_time_vec.push_back(self.fair_price); // 重置焦点,条件1 if !self.fair_rate_focus.is_zero() && self.mid_price_time_vec.deque.len() >= 2 && self.fair_price_time_vec.deque.len() >= 2 { let mid_price_prev = self.mid_price_time_vec.deque.get(self.mid_price_time_vec.deque.len() - 2).unwrap(); let fair_price_prev = self.fair_price_time_vec.deque.get(self.fair_price_time_vec.deque.len() - 2).unwrap(); // 向上涨,并且mid下穿fair,视为观测阶段结束 if self.fair_rate_focus > Decimal::ZERO && mid_price_prev > fair_price_prev && self.mid_price < self.fair_price { self.fair_rate_focus = Decimal::ZERO; } // 向下跌,并且mid上穿fair,视为观测阶段结束 if self.fair_rate_focus < Decimal::ZERO && mid_price_prev < fair_price_prev && self.mid_price > self.fair_price { self.fair_rate_focus = Decimal::ZERO; } } // 重置焦点,条件2 if !self.fair_rate_focus.is_zero() && !self.inventory.is_zero() { self.fair_rate_focus = Decimal::ZERO; } // 更新程序关注的变化幅度焦点 if self.fair_rate_focus.is_zero() { let last_fair_price = self.fair_price_time_vec.deque.iter().last().unwrap(); let first_fair_price = self.fair_price_time_vec.deque[0]; let mut rate = (last_fair_price - first_fair_price) / first_fair_price; rate.rescale(8); // 只有有强度的rate才有资格被称为针 if rate.abs() > self.params.open { // 向上涨,并且fair下穿mid,视为观测阶段开始 if rate > Decimal::ZERO && self.mid_price > self.fair_price { self.fair_rate_focus = rate; } // 向下跌,并且fair上穿mid,视为观测阶段开始 if rate < Decimal::ZERO && self.mid_price < self.fair_price { self.fair_rate_focus = rate; } } } } // 判断价格是否回归 if !self.is_regressed && self.inventory > Decimal::ZERO && self.spread_sma_1000 < max(self.spread_sma, self.spread_sma_2000) { self.is_regressed = true } else if !self.is_regressed && self.inventory < Decimal::ZERO && self.spread_sma_1000 > min(self.spread_sma, self.spread_sma_2000) { self.is_regressed = true } } pub fn update_spread(&mut self) { if self.mid_price.is_zero() || self.fair_price.is_zero() { return; } self.spread = (self.fair_price - self.mid_price) / self.mid_price; // self.spread.rescale(8); self.spread_vec.push(self.spread); self.spread_sma = if self.spread_sma.is_zero() { self.spread } else { self.spread_sma * dec!(0.9998) + self.spread * dec!(0.0002) }; // self.spread_sma.rescale(8); self.spread_sma_2000 = if self.spread_sma_2000.is_zero() { self.spread } else { self.spread_sma_2000 * dec!(0.9995) + self.spread * dec!(0.0005) }; // self.spread_sma_2000.rescale(8); self.spread_sma_1000 = if self.spread_sma_1000.is_zero() { self.spread } else { self.spread_sma_1000 * dec!(0.999) + self.spread * dec!(0.001) }; self.spread_sma_1000_time_vec.push_back(self.spread_sma_1000); // self.spread_sma_1000.rescale(8); while self.spread_vec.len() > 1_000 { self.spread_vec.remove(0); } } pub fn update_delta(&mut self) { // -2表示不想成交 // -1表示市价成交(委托对手盘的价格,但不一定能市价成交),这里再想想吧,经常委托出去没成交,明显比别人慢了 // 0是买一/卖一成交 if self.fair_price.is_zero() { return; } // 可能是趋势 // let is_open_long = self.spread_sma_1000 - self.spread_sma > self.params.open && self.fair_price > self.mid_price; // let is_open_short = self.spread_sma_1000 - self.spread_sma < self.params.open * Decimal::NEGATIVE_ONE && self.fair_price < self.mid_price; // 可能是接针 let is_open_long = self.fair_rate_focus < Decimal::ZERO && self.fair_price > self.mid_price; let is_open_short = self.fair_rate_focus > Decimal::ZERO && self.fair_price < self.mid_price; let is_close_long = self.inventory > Decimal::ZERO; let is_close_short = self.inventory < Decimal::ZERO; self.bid_delta = dec!(-2); self.ask_delta = dec!(-2); if is_close_long { if self.mid_price > self.pos_avg_price { // let profit_rate = (self.mid_price - self.pos_avg_price) / self.pos_avg_price; // let fill_rate = profit_rate / (self.params.open * dec!(50)); // let close_rate = (Decimal::ONE - fill_rate) * self.params.open + self.params.close; // // self.ask_delta = self.mid_price * close_rate * self.t_diff; self.ask_delta = self.mid_price * self.params.close; } else if self.mid_price > self.fair_price { self.ask_delta = self.mid_price * self.params.close; } // self.ask_delta = self.mid_price * self.params.close; } else if is_close_short { if self.mid_price < self.pos_avg_price { // let profit_rate = (self.pos_avg_price - self.mid_price) / self.pos_avg_price; // let fill_rate = profit_rate / (self.params.open * dec!(50)); // let close_rate = (Decimal::ONE - fill_rate) * self.params.open + self.params.close; // // self.bid_delta = self.mid_price * close_rate * self.t_diff; self.bid_delta = self.mid_price * self.params.close; } else if self.mid_price < self.fair_price { self.bid_delta = self.mid_price * self.params.close; } // self.bid_delta = self.mid_price * self.params.close; } else if is_open_long { // let is_open_long_market = self.spread_sma_1000 - self.spread_sma > self.params.open_market; // self.bid_delta = if is_open_long_market { // dec!(-1) // } else { // dec!(0) // }; self.bid_delta = dec!(0) } else if is_open_short { // let is_open_short_market = self.spread_sma_1000 - self.spread_sma < self.params.open_market * Decimal::NEGATIVE_ONE; // self.ask_delta = if is_open_short_market { // dec!(-1) // } else { // dec!(0) // } self.ask_delta = dec!(0) } } pub fn update_optimal_ask_and_bid(&mut self) { self.optimal_ask_price = if self.ask_delta == dec!(-1) { self.bid_price } else if self.ask_delta == dec!(-2) { Self::UN_VIEW } else { max(self.ask_price + self.ask_delta, self.bid_price) }; self.optimal_bid_price = if self.bid_delta == dec!(-1) { self.ask_price } else if self.bid_delta == dec!(-2) { Self::UN_VIEW } else { min(self.bid_price - self.bid_delta, self.ask_price) }; self.optimal_ask_price.rescale(self.mid_price.scale()); self.optimal_bid_price.rescale(self.mid_price.scale()); } pub fn update_t_diff(&mut self) { if self.prev_trade_time > 0 { let time_diff_decimal = Decimal::from_i64(Utc::now().timestamp_micros() - self.prev_trade_time).unwrap(); self.t_diff = max(Decimal::ONE - time_diff_decimal / Decimal::from_i64(Self::TIME_DIFF_RANGE_MICROS).unwrap(), Decimal::ZERO); } else { self.t_diff = Decimal::ONE; } } pub fn check_ready(&mut self) { if self.is_ready { return; } if self.mid_price == Decimal::ZERO { return; } if self.fair_price == Decimal::ZERO { return; } if self.ask_price == Decimal::ZERO { return; } if self.bid_price == Decimal::ZERO { return; } if self.trade_long_vec.len() < 100 { return; } self.is_ready = true; info!("========================================行情数据预热完毕==================================") } // #[instrument(skip(self), level="TRACE")] async fn processor(&mut self) { self.update_t_diff(); self.update_delta(); self.update_optimal_ask_and_bid(); self.check_ready(); if !self.is_ready { return; } // let mut smm = Decimal::ZERO; // if !self.depth_vec[1].time.is_zero() { // let sma = self.depth_vec[1].asks[0].price; // let smb = self.depth_vec[1].bids[0].price; // smm = (sma + smb) / Decimal::TWO; // } // let cci_arc = self.cci_arc.clone(); let now = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap(); let mid_price = self.mid_price; let ask_price = self.ask_price; let bid_price = self.bid_price; let last_price = self.last_price; let spread = self.spread_sma; let spread_max = self.spread_sma_2000; let spread_min = self.spread_sma_1000; // let spread = self.price_times_avg; // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0]; // let spread_min = self.fair_price / self.mid_price; let optimal_ask_price = self.optimal_ask_price; let optimal_bid_price = self.optimal_bid_price; let inventory = self.inventory; let sigma_square = if self.is_regressed { Decimal::ONE } else { Decimal::ZERO }; let gamma = now - self.last_update_time; let kappa = self.fair_rate_focus; let flow_ratio = Decimal::ZERO; let ref_price = self.fair_price; let need_append = now - self.prev_insert_time > Decimal::ONE_HUNDRED; if !need_append { return; } self.debug_sender.unbounded_send(vec![ now, mid_price, ask_price, bid_price, last_price, spread, spread_max, spread_min, optimal_ask_price, optimal_bid_price, inventory, sigma_square, gamma, kappa, flow_ratio, ref_price ]).unwrap(); self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis()) } // #[instrument(skip(self, ref_ticker_map), level="TRACE")] pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap) -> Vec> { vec![] } }