| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595 |
- use std::cmp::{max, min};
- use std::collections::{BTreeMap, VecDeque};
- use std::sync::Arc;
- use chrono::{Utc};
- use futures_channel::mpsc::UnboundedSender;
- use futures_util::StreamExt;
- use rust_decimal::prelude::*;
- use rust_decimal_macros::dec;
- use tokio::sync::{Mutex};
- use tracing::{info};
- use global::cci::CentralControlInfo;
- use global::fixed_time_range_deque::FixedTimeRangeDeque;
- use global::params::Params;
- use standard::{Depth, Record, Ticker, Trade};
- use crate::utils;
- #[derive(Debug)]
- pub struct Predictor {
- pub depth_vec: Vec<Depth>, // 深度队列
- pub volume_vec: Vec<Decimal>, // 交易量队列
- pub trade_long_vec: FixedTimeRangeDeque<Trade>, // 交易队列
- pub trade_short_vec: FixedTimeRangeDeque<Trade>, // 交易队列
- pub spread_vec: Vec<Decimal>, // 价差队列
- pub record_vec: VecDeque<Record>, // 蜡烛队列
- pub mid_price: Decimal, // 中间价
- pub ask_price: Decimal, // 卖一价
- pub bid_price: Decimal, // 买一价
- pub last_price: Decimal, // 最后成交价
- pub spread: Decimal, // 当前价差
- pub spread_sma: Decimal, // 价差的sma,默认是sma5000
- pub spread_sma_2000: Decimal, // 价差的sma,2000级别
- pub spread_sma_1000: Decimal, // 价差的sma,1000级别
- pub optimal_ask_price: Decimal, // 卖出挂单价
- pub optimal_bid_price: Decimal, // 买入挂单价
- pub inventory: Decimal, // 库存,也就是q
- pub pos_amount: Decimal, // 原始持仓量
- pub pos_avg_price: Decimal, // 原始持仓价格
- pub level: Decimal, // martin
- pub error_rate: Decimal, // 犯错概率(预估)
- pub ask_delta: Decimal, // δa
- pub bid_delta: Decimal, // δb
- pub mid_price_time_vec: FixedTimeRangeDeque<Decimal>, // 中间价格队列,100ms以内的所有中间价格
- pub fair_price_time_vec: FixedTimeRangeDeque<Decimal>, // 公平价格队列,50ms以内的所有公平价格
- pub spread_sma_1000_time_vec: FixedTimeRangeDeque<Decimal>, // spread队列,100ms以内的所有spread_sma_1000
- pub fair_price_vec: Vec<Decimal>, // 公平价格列表,0表示做市所,1表示参考所
- pub fair_price: Decimal, // 预定价格
- pub fair_rate_focus: Decimal, // 变化幅度焦点
- pub fair_price_when_ordering: Decimal, // 下单时的预定价格
- pub price_times_avg: Decimal, // 公平所与做市所的价格倍率的平均值
- pub is_regressed: bool, // 做市所的价格是否已经回归
- pub is_ready: bool, // 是否已准备好
- pub close_price: Decimal, // 计划平仓价格
- pub prev_trade_time: i64, // 上次交易时间,也就是t
- pub t_diff: Decimal, // (T-t)
- pub last_update_time: Decimal, // 最后更新时间(depth)
- pub last_index: Decimal, // 最后更新的index
- pub prev_insert_time: Decimal,
- pub prev_save_time: Decimal,
- pub params: Params,
- pub debug_sender: UnboundedSender<Vec<Decimal>>,
- pub long_trade_len_dec: Decimal,
- pub short_trade_len_dec: Decimal,
- }
- impl Predictor {
- // 时间窗口大小(微秒)
- // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
- const TIME_DIFF_RANGE_MICROS: i64 = 10 * 60_000_000;
- const TRADE_LONG_RANGE_MICROS: i64 = 3 * 60_000_000;
- // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
- const TRADE_SHORT_RANGE_MICROS: i64 = 30_000_000;
- // const ONE_MILLION: Decimal = dec!(1_000_000);
- // const TWENTY_THOUSAND: Decimal = dec!(20_000);
- const UN_VIEW: Decimal = dec!(14142135623730951);
- pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
- // 创建数据通道
- // 创建一个无界通道
- let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
- tokio::spawn(async move {
- let len = 16usize;
- let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
- let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
- while let Some(value) = rx.next().await {
- // 数据填充到对应位置
- for i in 0..len {
- if value[i] == Self::UN_VIEW {
- debugs[i].push_back(None);
- } else {
- debugs[i].push_back(Some(value[i]));
- }
- }
- // 长度限制
- if debugs[0].len() > 500_000 {
- for i in 0..len {
- debugs[i].pop_front(); // 从前面移除元素
- }
- }
- let now = Decimal::from(Utc::now().timestamp_millis());
- if now - prev_save_time < dec!(60000) {
- continue;
- }
- let debugs_clone = debugs.clone();
- let temp_html_str = tokio::task::spawn_blocking(move || {
- utils::build_html_file(&debugs_clone)
- }).await.unwrap();
- utils::write_to_file(&temp_html_str, "./db/db.html".to_string()).await;
- prev_save_time = Decimal::from(Utc::now().timestamp_millis());
- }
- });
- let predictor = Self {
- // 接针版本
- depth_vec: vec![Depth::new(); 10],
- fair_price_vec: vec![Decimal::ZERO; 10],
- volume_vec: vec![Decimal::ZERO; 10],
- // 老的队列
- spread_vec: vec![],
- trade_long_vec: FixedTimeRangeDeque::new(Self::TRADE_LONG_RANGE_MICROS),
- trade_short_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS),
- record_vec: VecDeque::new(),
- mid_price: Default::default(),
- ask_price: Default::default(),
- bid_price: Default::default(),
- last_price: Default::default(),
- spread: Default::default(),
- spread_sma: Default::default(),
- spread_sma_2000: Default::default(),
- spread_sma_1000: Default::default(),
- optimal_ask_price: Default::default(),
- optimal_bid_price: Default::default(),
- inventory: Default::default(),
- ask_delta: Default::default(),
- bid_delta: Default::default(),
- fair_price_time_vec: FixedTimeRangeDeque::new(50_000),
- mid_price_time_vec: FixedTimeRangeDeque::new(100_000),
- spread_sma_1000_time_vec: FixedTimeRangeDeque::new(500_000),
- fair_price: Default::default(),
- fair_rate_focus: Default::default(),
- fair_price_when_ordering: Default::default(),
- price_times_avg: Default::default(),
- is_regressed: false,
- is_ready: false,
- prev_trade_time: Utc::now().timestamp_micros(),
- close_price: Default::default(),
- t_diff: Default::default(),
- level: Default::default(),
- pos_amount: Default::default(),
- error_rate: Default::default(),
- last_update_time: Default::default(),
- last_index: Default::default(),
- pos_avg_price: Default::default(),
- prev_insert_time: Default::default(),
- prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
- params,
- debug_sender: tx,
- long_trade_len_dec: Default::default(),
- short_trade_len_dec: Default::default(),
- };
- predictor
- }
- pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
- self.last_update_time = depth.time;
- self.last_index = Decimal::from(index);
- if index == 0 {
- self.ask_price = depth.asks[0].price;
- self.bid_price = depth.bids[0].price;
- self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
- self.mid_price_time_vec.push_back(self.mid_price);
- }
- self.update_fair_price(depth, index).await;
- self.update_spread();
- self.depth_vec[index] = depth.clone();
- if self.mid_price.is_zero() {
- return;
- }
- self.processor().await;
- }
- pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
- // self.last_update_time = trade.time;
- self.trade_long_vec.push_back(trade.clone());
- self.trade_short_vec.push_back(trade.clone());
- self.long_trade_len_dec = Decimal::from_usize(self.trade_long_vec.len()).unwrap();
- self.short_trade_len_dec = Decimal::from_usize(self.trade_short_vec.len()).unwrap();
- self.error_rate = self.short_trade_len_dec / self.long_trade_len_dec;
- self.error_rate.rescale(4);
- self.last_price = trade.price;
- // self.processor().await;
- }
- pub async fn update_level(&mut self) {
- self.level = (Decimal::NEGATIVE_ONE + (Decimal::ONE + dec!(8) * self.inventory.abs()).sqrt().unwrap()) / Decimal::TWO;
- self.level = min(self.level, dec!(6));
- }
- pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
- pub async fn on_record(&mut self, _record: &Record) {}
- pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal) {
- if self.mid_price.is_zero() {
- return;
- }
- let prev_inventory = self.inventory;
- self.pos_amount = pos_amount.clone();
- self.pos_avg_price = pos_avg_price.clone();
- self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc();
- // 小于1但不为0的情况,需要平完
- if self.inventory.is_zero() && !pos_amount.is_zero() {
- self.inventory = if pos_amount > &Decimal::ZERO {
- Decimal::ONE
- } else {
- Decimal::NEGATIVE_ONE
- };
- }
- if prev_inventory != self.inventory && prev_inventory.is_zero() {
- self.prev_trade_time = Utc::now().timestamp_micros();
- self.close_price = self.fair_price_when_ordering;
- }
- if prev_inventory != self.inventory && self.inventory.is_zero() {
- self.is_regressed = false;
- }
- self.update_level().await;
- self.processor().await;
- }
- pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) {
- if self.mid_price.is_zero() {
- return;
- }
- let a1 = &depth.asks[0];
- let b1 = &depth.bids[0];
- // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price
- // let total = a1.value + b1.value;
- // let fair_price = a1.price * b1.value / total + b1.price * a1.value / total;
- let fair_price = (a1.price + b1.price) / Decimal::TWO;
- self.fair_price_vec[index] = if self.fair_price_vec[index].is_zero() {
- fair_price
- } else {
- self.fair_price_vec[index] * dec!(0.9) + fair_price * dec!(0.1)
- };
- self.fair_price_vec[index].rescale(self.mid_price.scale());
- self.volume_vec[index] = a1.size + b1.size;
- // 合成公平价格
- if !self.fair_price_vec[0].is_zero() && !self.fair_price_vec[1].is_zero() {
- self.price_times_avg = if self.price_times_avg.is_zero() {
- self.fair_price_vec[1] / self.fair_price_vec[0]
- } else {
- self.price_times_avg * dec!(0.9999) + dec!(0.0001) * self.fair_price_vec[1] / self.fair_price_vec[0]
- };
- // 进行价格归一化处理,公平所的价格有可能是带前缀的
- let fair_price_part0 = self.fair_price_vec[0] * dec!(0.2);
- let fair_price_part1 = (self.fair_price_vec[1] / self.price_times_avg) * dec!(0.8);
- self.fair_price = fair_price_part0 + fair_price_part1;
- self.fair_price_time_vec.push_back(self.fair_price);
- // 重置焦点,条件1
- if !self.fair_rate_focus.is_zero() && self.mid_price_time_vec.deque.len() >= 2 && self.fair_price_time_vec.deque.len() >= 2 {
- let mid_price_prev = self.mid_price_time_vec.deque.get(self.mid_price_time_vec.deque.len() - 2).unwrap();
- let fair_price_prev = self.fair_price_time_vec.deque.get(self.fair_price_time_vec.deque.len() - 2).unwrap();
- // 向上涨,并且mid下穿fair,视为观测阶段结束
- if self.fair_rate_focus > Decimal::ZERO && mid_price_prev > fair_price_prev && self.mid_price < self.fair_price {
- self.fair_rate_focus = Decimal::ZERO;
- }
- // 向下跌,并且mid上穿fair,视为观测阶段结束
- if self.fair_rate_focus < Decimal::ZERO && mid_price_prev < fair_price_prev && self.mid_price > self.fair_price {
- self.fair_rate_focus = Decimal::ZERO;
- }
- }
- // 重置焦点,条件2
- if !self.fair_rate_focus.is_zero() && !self.inventory.is_zero() {
- self.fair_rate_focus = Decimal::ZERO;
- }
- // 更新程序关注的变化幅度焦点
- if self.fair_rate_focus.is_zero() {
- let last_fair_price = self.fair_price_time_vec.deque.iter().last().unwrap();
- let first_fair_price = self.fair_price_time_vec.deque[0];
- let mut rate = (last_fair_price - first_fair_price) / first_fair_price;
- rate.rescale(8);
- // 只有有强度的rate才有资格被称为针
- if rate.abs() > self.params.open {
- // 向上涨,并且fair下穿mid,视为观测阶段开始
- if rate > Decimal::ZERO && self.mid_price > self.fair_price {
- self.fair_rate_focus = rate;
- }
- // 向下跌,并且fair上穿mid,视为观测阶段开始
- if rate < Decimal::ZERO && self.mid_price < self.fair_price {
- self.fair_rate_focus = rate;
- }
- }
- }
- }
- // 判断价格是否回归
- if !self.is_regressed && self.inventory > Decimal::ZERO && self.spread_sma_1000 < max(self.spread_sma, self.spread_sma_2000) {
- self.is_regressed = true
- } else if !self.is_regressed && self.inventory < Decimal::ZERO && self.spread_sma_1000 > min(self.spread_sma, self.spread_sma_2000) {
- self.is_regressed = true
- }
- }
- pub fn update_spread(&mut self) {
- if self.mid_price.is_zero() || self.fair_price.is_zero() {
- return;
- }
- self.spread = (self.fair_price - self.mid_price) / self.mid_price;
- // self.spread.rescale(8);
- self.spread_vec.push(self.spread);
- self.spread_sma = if self.spread_sma.is_zero() {
- self.spread
- } else {
- self.spread_sma * dec!(0.9998) + self.spread * dec!(0.0002)
- };
- // self.spread_sma.rescale(8);
- self.spread_sma_2000 = if self.spread_sma_2000.is_zero() {
- self.spread
- } else {
- self.spread_sma_2000 * dec!(0.9995) + self.spread * dec!(0.0005)
- };
- // self.spread_sma_2000.rescale(8);
- self.spread_sma_1000 = if self.spread_sma_1000.is_zero() {
- self.spread
- } else {
- self.spread_sma_1000 * dec!(0.999) + self.spread * dec!(0.001)
- };
- self.spread_sma_1000_time_vec.push_back(self.spread_sma_1000);
- // self.spread_sma_1000.rescale(8);
- while self.spread_vec.len() > 1_000 {
- self.spread_vec.remove(0);
- }
- }
- pub fn update_delta(&mut self) {
- // -2表示不想成交
- // -1表示市价成交(委托对手盘的价格,但不一定能市价成交),这里再想想吧,经常委托出去没成交,明显比别人慢了
- // 0是买一/卖一成交
- if self.fair_price.is_zero() {
- return;
- }
- // 可能是趋势
- // let is_open_long = self.spread_sma_1000 - self.spread_sma > self.params.open && self.fair_price > self.mid_price;
- // let is_open_short = self.spread_sma_1000 - self.spread_sma < self.params.open * Decimal::NEGATIVE_ONE && self.fair_price < self.mid_price;
- // 可能是接针
- let is_open_long = self.fair_rate_focus < Decimal::ZERO && self.fair_price > self.mid_price;
- let is_open_short = self.fair_rate_focus > Decimal::ZERO && self.fair_price < self.mid_price;
- let is_close_long = self.inventory > Decimal::ZERO;
- let is_close_short = self.inventory < Decimal::ZERO;
- self.bid_delta = dec!(-2);
- self.ask_delta = dec!(-2);
- if is_close_long {
- if self.mid_price > self.pos_avg_price {
- // let profit_rate = (self.mid_price - self.pos_avg_price) / self.pos_avg_price;
- // let fill_rate = profit_rate / (self.params.open * dec!(50));
- // let close_rate = (Decimal::ONE - fill_rate) * self.params.open + self.params.close;
- //
- // self.ask_delta = self.mid_price * close_rate * self.t_diff;
- self.ask_delta = self.mid_price * self.params.close;
- } else if self.mid_price > self.fair_price {
- self.ask_delta = self.mid_price * self.params.close;
- }
- // self.ask_delta = self.mid_price * self.params.close;
- } else if is_close_short {
- if self.mid_price < self.pos_avg_price {
- // let profit_rate = (self.pos_avg_price - self.mid_price) / self.pos_avg_price;
- // let fill_rate = profit_rate / (self.params.open * dec!(50));
- // let close_rate = (Decimal::ONE - fill_rate) * self.params.open + self.params.close;
- //
- // self.bid_delta = self.mid_price * close_rate * self.t_diff;
- self.bid_delta = self.mid_price * self.params.close;
- } else if self.mid_price < self.fair_price {
- self.bid_delta = self.mid_price * self.params.close;
- }
- // self.bid_delta = self.mid_price * self.params.close;
- } else if is_open_long {
- // let is_open_long_market = self.spread_sma_1000 - self.spread_sma > self.params.open_market;
- // self.bid_delta = if is_open_long_market {
- // dec!(-1)
- // } else {
- // dec!(0)
- // };
- self.bid_delta = dec!(0)
- } else if is_open_short {
- // let is_open_short_market = self.spread_sma_1000 - self.spread_sma < self.params.open_market * Decimal::NEGATIVE_ONE;
- // self.ask_delta = if is_open_short_market {
- // dec!(-1)
- // } else {
- // dec!(0)
- // }
- self.ask_delta = dec!(0)
- }
- }
- pub fn update_optimal_ask_and_bid(&mut self) {
- self.optimal_ask_price = if self.ask_delta == dec!(-1) {
- self.bid_price
- } else if self.ask_delta == dec!(-2) {
- Self::UN_VIEW
- } else {
- max(self.ask_price + self.ask_delta, self.bid_price)
- };
- self.optimal_bid_price = if self.bid_delta == dec!(-1) {
- self.ask_price
- } else if self.bid_delta == dec!(-2) {
- Self::UN_VIEW
- } else {
- min(self.bid_price - self.bid_delta, self.ask_price)
- };
- self.optimal_ask_price.rescale(self.mid_price.scale());
- self.optimal_bid_price.rescale(self.mid_price.scale());
- }
- pub fn update_t_diff(&mut self) {
- if self.prev_trade_time > 0 {
- let time_diff_decimal = Decimal::from_i64(Utc::now().timestamp_micros() - self.prev_trade_time).unwrap();
- self.t_diff = max(Decimal::ONE - time_diff_decimal / Decimal::from_i64(Self::TIME_DIFF_RANGE_MICROS).unwrap(), Decimal::ZERO);
- } else {
- self.t_diff = Decimal::ONE;
- }
- }
- pub fn check_ready(&mut self) {
- if self.is_ready {
- return;
- }
- if self.mid_price == Decimal::ZERO {
- return;
- }
- if self.fair_price == Decimal::ZERO {
- return;
- }
- if self.ask_price == Decimal::ZERO {
- return;
- }
- if self.bid_price == Decimal::ZERO {
- return;
- }
- if self.trade_long_vec.len() < 100 {
- return;
- }
- self.is_ready = true;
- info!("========================================行情数据预热完毕==================================")
- }
- // #[instrument(skip(self), level="TRACE")]
- async fn processor(&mut self) {
- self.update_t_diff();
- self.update_delta();
- self.update_optimal_ask_and_bid();
- self.check_ready();
- if !self.is_ready {
- return;
- }
- // let mut smm = Decimal::ZERO;
- // if !self.depth_vec[1].time.is_zero() {
- // let sma = self.depth_vec[1].asks[0].price;
- // let smb = self.depth_vec[1].bids[0].price;
- // smm = (sma + smb) / Decimal::TWO;
- // }
- // let cci_arc = self.cci_arc.clone();
- let now = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
- let mid_price = self.mid_price;
- let ask_price = self.ask_price;
- let bid_price = self.bid_price;
- let last_price = self.last_price;
- let spread = self.spread_sma;
- let spread_max = self.spread_sma_2000;
- let spread_min = self.spread_sma_1000;
- // let spread = self.price_times_avg;
- // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0];
- // let spread_min = self.fair_price / self.mid_price;
- let optimal_ask_price = self.optimal_ask_price;
- let optimal_bid_price = self.optimal_bid_price;
- let inventory = self.inventory;
- let sigma_square = if self.is_regressed { Decimal::ONE } else { Decimal::ZERO };
- let gamma = now - self.last_update_time;
- let kappa = self.fair_rate_focus;
- let flow_ratio = Decimal::ZERO;
- let ref_price = self.fair_price;
- let need_append = now - self.prev_insert_time > Decimal::ONE_HUNDRED;
- if !need_append {
- return;
- }
- self.debug_sender.unbounded_send(vec![
- now,
- mid_price,
- ask_price,
- bid_price,
- last_price,
- spread,
- spread_max,
- spread_min,
- optimal_ask_price,
- optimal_bid_price,
- inventory,
- sigma_square,
- gamma,
- kappa,
- flow_ratio,
- ref_price
- ]).unwrap();
- self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
- }
- // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
- pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
- vec![]
- }
- }
|