use std::cmp::{max, min}; use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use chrono::{Utc}; use futures_channel::mpsc::UnboundedSender; use futures_util::StreamExt; use rust_decimal::prelude::*; use rust_decimal_macros::dec; use tokio::sync::{Mutex}; use tracing::{info}; use global::cci::CentralControlInfo; use global::fixed_time_range_deque::FixedTimeRangeDeque; use global::params::Params; use standard::{Depth, Record, Ticker, Trade}; use crate::utils; #[derive(Debug, Clone)] pub struct Predictor { pub depth_vec: Vec, // 深度队列 pub record_vec: VecDeque, // 蜡烛队列 // 做市所的计算 pub close_price_vec: FixedTimeRangeDeque, pub r_short: Decimal, pub r_long: Decimal, pub speed: Decimal, pub trend: Decimal, pub prices: Vec>>, // [[[做市所], [参考所0]], ...] pub ks: Vec, pub bs: Vec, pub mid_price: Decimal, // 中间价 pub ask_price: Decimal, // 中间价 pub bid_price: Decimal, // 中间价 pub fair_price: Decimal, pub last_price: Decimal, // 最后成交价 pub optimal_ask_price: Decimal, // 卖出挂单价 pub optimal_bid_price: Decimal, // 买入挂单价 pub inventory: Decimal, // 库存,也就是q pub pos_amount: Decimal, // 原始持仓量 pub pos_avg_price: Decimal, // 原始持仓价格 pub balance: Decimal, // 初始余额 pub prev_balance: Decimal, pub profit: Decimal, pub profit_high: Decimal, pub prev_open_time: Decimal, pub trade_condition: Decimal, // 交易信号 pub trade_condition_time: Decimal, // 满足时的瞬时时间,用于控制开仓行为的持续时间 pub ask_delta: Decimal, // δa pub bid_delta: Decimal, // δb pub mid_price_vec: Vec, // 每个交易所的中间价 pub fair_price_std_vec: Vec, // 公平价格列表,标准化之后的 pub price_avg_times_vec: Vec, // 公平所与做市所的价格倍率的平均值 pub price_avg_times_long_vec: Vec, // 公平所与做市所的价格倍率的平均值 pub is_ready: bool, // 是否已准备好 pub last_update_time: Decimal, // 最后更新时间(depth) pub last_index: Decimal, // 最后更新的index pub prev_insert_time: Decimal, pub prev_save_time: Decimal, pub init_time: Decimal, pub fitting_delay: Decimal, pub prev_fitting_time_vec: Vec, pub params: Params, pub debug_sender: UnboundedSender> } impl Predictor { // 时间窗口大小(微秒) // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000; // const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000; // const TRADE_LONG_RANGE_MICROS: i64 = 10 * 60_000_000; // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000; // const TRADE_SHORT_RANGE_MICROS: i64 = 10_000_000; // const ONE_MILLION: Decimal = dec!(1_000_000); // const TWENTY_THOUSAND: Decimal = dec!(20_000); const DONT_VIEW: Decimal = dec!(14142135623730951); pub fn new(_cci_arc: Arc>, params: Params) -> Self { // 创建数据通道 // 创建一个无界通道 let (tx, mut rx) = futures_channel::mpsc::unbounded::>(); let account_port = params.port.clone(); tokio::spawn(async move { let len = 17usize; let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis()); let mut debugs: Vec>> = vec![VecDeque::new(); len]; while let Some(value) = rx.next().await { // 数据填充到对应位置 // // 第一步:获取插入位置, 这里有bug,持仓推送并不连续,有时候会导致图表显示错误…… // let target_ts = value[0]; // 时间戳在values[0] // let insert_pos = debugs[0] // .binary_search_by(|ts| { // ts.as_ref() // 解包Option // .expect("Timestamp cannot be None") // .cmp(&target_ts) // }) // .unwrap_or_else(|e| e); // 第二步:执行插入操作 for i in 0..debugs.len() { let value = value.get(i).cloned().unwrap_or(Self::DONT_VIEW); // 其他队列按规则插入 let elem = if value == Self::DONT_VIEW { None } else { Some(value) }; debugs[i].push_back(elem) } // 长度限制 if debugs[0].len() > 500_000 { for i in 0..len { debugs[i].pop_front(); // 从前面移除元素 } } let now = Decimal::from(Utc::now().timestamp_millis()); if now - prev_save_time < dec!(30000) { continue; } let debugs_clone = debugs.clone(); let temp_html_str = tokio::task::spawn_blocking(move || { utils::build_html_file(&debugs_clone) }).await.unwrap(); let path = format!("./db/{}.html", account_port); utils::write_to_file(&temp_html_str, path).await; prev_save_time = Decimal::from(Utc::now().timestamp_millis()); } }); let predictor = Self { // 接针版本 depth_vec: vec![Depth::new(); params.ref_exchange.len()], fair_price_std_vec: vec![Decimal::ZERO; params.ref_exchange.len()], mid_price_vec: vec![Decimal::ZERO; params.ref_exchange.len()], price_avg_times_vec: vec![Decimal::ZERO; params.ref_exchange.len()], price_avg_times_long_vec: vec![Decimal::ZERO; params.ref_exchange.len()], prices: vec![vec![FixedTimeRangeDeque::new(600_000_000); 2]; params.ref_exchange.len()], ks: vec![Decimal::ZERO; params.ref_exchange.len()], bs: vec![Decimal::ZERO; params.ref_exchange.len()], prev_fitting_time_vec: vec![Decimal::ZERO; params.ref_exchange.len()], close_price_vec: FixedTimeRangeDeque::new(600_000_000), r_short: Default::default(), r_long: Default::default(), speed: Default::default(), trend: Default::default(), mid_price: Default::default(), ask_price: Default::default(), bid_price: Default::default(), fair_price: Default::default(), last_price: Default::default(), optimal_ask_price: Self::DONT_VIEW, optimal_bid_price: Self::DONT_VIEW, ask_delta: dec!(-2), bid_delta: dec!(-2), is_ready: false, inventory: Default::default(), pos_avg_price: Default::default(), pos_amount: Default::default(), balance: Default::default(), prev_balance: Default::default(), profit: Default::default(), profit_high: Default::default(), trade_condition: Default::default(), trade_condition_time: Default::default(), last_update_time: Default::default(), last_index: Default::default(), prev_insert_time: Default::default(), prev_save_time: Decimal::from(Utc::now().timestamp_millis()), init_time: Decimal::from(Utc::now().timestamp_millis()), fitting_delay: Default::default(), params, debug_sender: tx, prev_open_time: Default::default(), record_vec: Default::default(), }; predictor } pub async fn on_depth(&mut self, depth: &Depth, index: usize) { self.last_update_time = depth.time; self.last_index = Decimal::from(index); if index == 233 { self.ask_price = depth.asks[0].price; self.bid_price = depth.bids[0].price; self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO; // 计算利润(预估) if !self.inventory.is_zero() { self.profit = if self.inventory > Decimal::ZERO { (self.mid_price - self.pos_avg_price) / self.pos_avg_price } else { (self.pos_avg_price - self.mid_price) / self.pos_avg_price }; self.profit.rescale(6); if self.profit_high < self.profit { self.profit_high = self.profit } } // 秒级k线处理,先只用处理收盘价就行 let r = Record { time: Decimal::from(Utc::now().timestamp_millis()), open: self.mid_price, high: self.mid_price, low: self.mid_price, close: self.mid_price, volume: Default::default(), symbol: "".to_string(), }; let is_need_push = self.close_price_vec.len() == 0 || self.close_price_vec.deque.iter().last().unwrap().time - r.time > Decimal::ONE_THOUSAND ; if is_need_push { self.close_price_vec.push_back(r); let len = self.close_price_vec.len(); // 求最后10秒的均值 let mean_10s; if len >= 10 { let mut i = len - 1; let mut sum: Decimal = Decimal::ZERO; loop { if i == len - 11 { break } sum += self.close_price_vec.get(i).unwrap().close; i = i - 1; } mean_10s = sum / Decimal::from(10); self.r_short = (self.mid_price - mean_10s) / mean_10s; self.r_short.rescale(8); } else { self.r_short = self.mid_price; mean_10s = self.mid_price; } // 求最后300秒的均值,如果秒级k不到300秒,就用5分钟k的收盘价凑合用用 let mean_300s; if len >= 300 { let mut i = len - 1; let mut sum: Decimal = Decimal::ZERO; loop { if i == len - 301 { break } sum += self.close_price_vec.get(i).unwrap().close; i = i - 1; } mean_300s = sum / Decimal::from(300); self.r_long = (self.mid_price - mean_300s) / mean_300s; self.r_long.rescale(8); } else if self.record_vec.len() == 5 && !self.mid_price.is_zero() { let mut i = self.record_vec.len() - 1; let mut sum: Decimal = Decimal::ZERO; loop { if i == 0 { break } sum += self.record_vec[i].close; i = i - 1; } mean_300s = sum / Decimal::from(5); self.r_long = (self.mid_price - mean_300s) / mean_300s; self.r_long.rescale(8); } else { self.r_long = self.mid_price; mean_300s = self.mid_price; } self.speed = if self.r_short > dec!(-0.0001) || self.r_long > dec!(-0.0001) { Decimal::ONE } else { self.r_short / self.r_long }; self.trend = mean_10s / mean_300s; } // 拟合k与b for (mid_index, mp) in self.mid_price_vec.iter().enumerate() { if mp.is_zero() { continue } self.prices[mid_index][0].push_back(self.mid_price); self.prices[mid_index][1].push_back(mp.clone()); // 拟合,60s拟合一次 let before_fitting = Utc::now().timestamp_millis(); if Decimal::from(before_fitting) - self.prev_fitting_time_vec[mid_index] > dec!(60_000) || self.prices[mid_index][0].len() < 1000 { // if Decimal::from(before_fitting) - self.prev_fitting_time_vec[mid_index] > dec!(60_000) { // info!("{}, {},", mid_index, self.prices[mid_index][0].len()); // } if let Some((k, b)) = self.linear_least_squares(mid_index).await { self.ks[mid_index] = k; self.bs[mid_index] = b; self.fitting_delay = Decimal::from(Utc::now().timestamp_millis() - before_fitting); self.prev_fitting_time_vec[mid_index] = Decimal::from(before_fitting) } else { return; } } } } else { self.depth_vec[index] = depth.clone(); let latest_price = (depth.asks[0].price + depth.bids[0].price) / Decimal::TWO; self.update_fair_price(&latest_price, index).await; } if self.mid_price.is_zero() { return; } self.processor(depth.time, false).await; } pub async fn on_trade(&mut self, trade: &Trade, _index: usize) { // index == 233代表做市所 // index == 0,1,2,3...代表参考所 self.last_price = trade.price; } pub async fn on_ticker(&mut self, _ticker: &Ticker) {} pub async fn on_record(&mut self, record: &Record) { // 添加新蜡烛 if self.record_vec.len() == 0 { self.record_vec.push_back(record.clone()); } else { let last_record = self.record_vec.back_mut().unwrap(); if last_record.time == record.time { *last_record = record.clone(); } else if last_record.time < record.time { self.record_vec.push_back(record.clone()); } } if self.record_vec.len() > 5 { self.record_vec.pop_front(); } } pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal, update_time: Decimal) { if self.mid_price.is_zero() { return; } let prev_pos_amount = self.pos_amount; self.pos_amount = pos_amount.clone(); self.pos_avg_price = pos_avg_price.clone(); self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc(); // 小于1但不为0的情况,需要平完 if self.inventory.is_zero() && !pos_amount.is_zero() { self.inventory = if pos_amount > &Decimal::ZERO { Decimal::ONE } else { Decimal::NEGATIVE_ONE }; } if prev_pos_amount != self.pos_amount { // 重置连续信号 self.trade_condition = Decimal::ZERO; self.trade_condition_time = Decimal::ZERO; // 开仓 if prev_pos_amount.is_zero() { self.prev_open_time = Decimal::from(Utc::now().timestamp_millis()) } // 平仓 if self.pos_amount.is_zero() { self.profit = Decimal::ZERO; self.profit_high = Decimal::ZERO; } self.processor(update_time, true).await; } } pub async fn on_balance(&mut self, balance: Decimal) { self.balance = balance; } pub fn get_real_rate(price_vec: &FixedTimeRangeDeque) -> Decimal { let last_fair_price = price_vec.deque.iter().last().unwrap(); let min_price = price_vec.deque.iter().min().unwrap(); let max_price = price_vec.deque.iter().max().unwrap(); let up_rate = (last_fair_price - min_price) / min_price; let down_rate = (max_price - last_fair_price) / max_price; if up_rate > down_rate { up_rate } else { -down_rate } } pub async fn update_fair_price(&mut self, latest_price: &Decimal, index: usize) { if self.mid_price.is_zero() { return; } // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price // let total = a1.value + b1.value; // let fair_price = (a1.price + b1.price) / Decimal::TWO; // self.fair_price_vec[index] = a1.price * b1.value / total + b1.price * a1.value / total; let mut mp = latest_price.clone(); mp.rescale(self.mid_price.scale()); self.mid_price_vec[index] = mp; // 生成fp self.fair_price_std_vec[index] = mp * self.ks[index] + self.bs[index]; self.fair_price_std_vec[index].rescale(self.mid_price.scale()); // 生成最终用于挂单的公平价格 let fair_price_sum: Decimal = self.fair_price_std_vec.iter().sum(); let fair_price_count = self.fair_price_std_vec.iter() .filter(|&&value| value != Decimal::new(0, 0)) // 过滤掉0 .count(); if fair_price_count != 0 { self.fair_price = if self.fair_price.is_zero() { fair_price_sum / Decimal::from(fair_price_count) } else { dec!(0.9) * self.fair_price + dec!(0.1) * fair_price_sum / Decimal::from(fair_price_count) }; // let mut spread_abs = ((self.fair_price - self.mid_price) / self.mid_price).abs(); // spread_abs.rescale(5); // // self.spread_vec.push_back(spread_abs); // if self.spread_vec.len() > 3000 { // self.spread_vec.pop_front(); // } // // let opt_abs_value = self.spread_vec.iter().max().unwrap().clone(); // // self.params.open = max(max(self.params.min_open, dec!(0.0006)), opt_abs_value); } } pub async fn update_delta(&mut self) -> bool { if self.mid_price.is_zero() { return false; } let prev_bid_delta = self.bid_delta; let prev_ask_delta = self.ask_delta; let now = Decimal::from(Utc::now().timestamp_millis()); let is_close_long = self.inventory > Decimal::ZERO && ( // 硬止损 (self.profit < dec!(-0.01)) // 利润较大时,追踪止盈 || (self.profit > dec!(0.01) && self.profit < self.profit_high * dec!(0.75)) ); let is_close_short = self.inventory < Decimal::ZERO && ( // 硬止损 (self.profit < dec!(-0.01)) // 利润较大时,追踪止盈 || (self.profit > dec!(0.01) && self.profit < self.profit_high * dec!(0.75)) ); let is_open_long = self.inventory.is_zero() && self.fair_price > self.mid_price * (Decimal::ONE + self.params.open) && self.r_short < dec!(-0.001) && self.trend < dec!(0.999) && self.speed < dec!(0.15) ; let is_open_short = self.inventory.is_zero() && false ; // 使信号有一定持续性 if is_close_long { self.trade_condition = dec!(1); } if is_close_short { self.trade_condition = dec!(2); } if is_open_long { self.trade_condition = dec!(3); self.trade_condition_time = now; } if is_open_short { self.trade_condition = dec!(4); self.trade_condition_time = now; } // 开仓信号要过期,只保留2秒 if (self.trade_condition == dec!(3) || self.trade_condition == dec!(4)) && now - self.trade_condition_time > dec!(2_000) { self.trade_condition = Decimal::ZERO; } // 开单信号处理 self.bid_delta = dec!(-2); self.ask_delta = dec!(-2); self.optimal_ask_price = Self::DONT_VIEW; self.optimal_bid_price = Self::DONT_VIEW; if self.trade_condition == dec!(1) && self.inventory > Decimal::ZERO { self.ask_delta = dec!(0); self.bid_delta = dec!(-2); self.optimal_ask_price = min(self.fair_price, self.mid_price) * dec!(0.995); self.optimal_bid_price = Self::DONT_VIEW; } else if self.trade_condition == dec!(2) && self.inventory < Decimal::ZERO { self.bid_delta = dec!(0); self.ask_delta = dec!(-2); self.optimal_bid_price = max(self.fair_price, self.mid_price) * dec!(1.005); self.optimal_ask_price = Self::DONT_VIEW; } else if self.trade_condition == dec!(3) { self.bid_delta = dec!(0); self.ask_delta = dec!(-2); self.optimal_bid_price = max(self.fair_price, self.mid_price) * dec!(1.005); self.optimal_ask_price = Self::DONT_VIEW; } else if self.trade_condition == dec!(4) { self.ask_delta = dec!(0); self.bid_delta = dec!(-2); self.optimal_ask_price = min(self.fair_price, self.mid_price) * dec!(0.995); self.optimal_bid_price = Self::DONT_VIEW; } // 价格处理 self.optimal_ask_price.rescale(self.mid_price.scale()); self.optimal_bid_price.rescale(self.mid_price.scale()); // 返回方向是否改变过,有改变可以立即在图表上显示 prev_ask_delta != self.ask_delta || prev_bid_delta != self.bid_delta } pub fn check_ready(&mut self) { if self.is_ready { return; } if self.mid_price.is_zero() { return; } for fair_price in &self.fair_price_std_vec { if fair_price.is_zero() { return; } } if self.optimal_ask_price.is_zero() { return; } if self.optimal_bid_price.is_zero() { return; } if self.optimal_bid_price.is_zero() { return; } if self.balance.is_zero() { return; } self.is_ready = true; info!("========================================行情数据预热完毕==================================") } // 最小二乘法拟合函数,支持VecDeque pub async fn linear_least_squares(&self, index: usize) -> Option<(Decimal, Decimal)> { let x = &self.prices[index][1]; let y = &self.prices[index][0]; // 检查数组长度是否相同 if x.len() != y.len() { return None; } let n = x.len(); if n == 0 { return None; } let mut sum_x = Decimal::zero(); let mut sum_y = Decimal::zero(); let mut sum_xx = Decimal::zero(); let mut sum_xy = Decimal::zero(); // 遍历VecDeque中的元素 for (xi, yi) in x.deque.iter().zip(y.deque.iter()) { sum_x += xi; sum_y += yi; let xi_sq = xi * xi; sum_xx += xi_sq; sum_xy += xi * yi; } // 计算分子和分母 let numerator = sum_xy - (sum_x * sum_y) / Decimal::from(n); let denominator = sum_xx - (sum_x * sum_x) / Decimal::from(n); // 如果分母为0,返回None if denominator == Decimal::zero() { return None; } let k = numerator / denominator; let mean_x = sum_x / Decimal::from(n); let mean_y = sum_y / Decimal::from(n); let b = mean_y - k * mean_x; Some((k, b)) } // #[instrument(skip(self), level="TRACE")] async fn processor(&mut self, data_time: Decimal, is_hard_update: bool) { self.check_ready(); if !self.is_ready { return; } let is_delta_changed = self.update_delta().await; // let cci_arc = self.cci_arc.clone(); let now = data_time; let mid_price = self.mid_price; let ask_price = if self.params.ref_exchange.len() > 0 { // self.fair_price_std_vec[0] Self::DONT_VIEW } else { Self::DONT_VIEW }; let bid_price = if self.params.ref_exchange.len() > 1 { // self.fair_price_std_vec[1] Self::DONT_VIEW } else { Self::DONT_VIEW }; let optimal_ask_price = self.optimal_ask_price; let optimal_bid_price = self.optimal_bid_price; let last_price = Self::DONT_VIEW; let fair_price = self.fair_price; let spread = Self::DONT_VIEW; let spread_max = self.r_short; let spread_min = self.r_long; // let spread = self.price_times_avg; // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0]; // let spread_min = self.fair_price / self.mid_price; let inventory = self.inventory; // let sigma_square = Decimal::from(Utc::now().timestamp_millis()) - data_time; let sigma_square = self.speed; let gamma = self.trend; let kappa = self.balance; let flow_ratio = Decimal::ZERO; let is_time_over_update = now - self.prev_insert_time > dec!(500); if !is_time_over_update && !is_hard_update && !is_delta_changed { return; } if is_time_over_update { self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis()) } let pos_avg_price = self.pos_avg_price; self.debug_sender.unbounded_send(vec![ now, mid_price, ask_price, bid_price, last_price, spread, spread_max, spread_min, optimal_ask_price, optimal_bid_price, inventory, sigma_square, gamma, kappa, flow_ratio, fair_price, pos_avg_price ]).unwrap(); } // #[instrument(skip(self, ref_ticker_map), level="TRACE")] pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap) -> Vec> { vec![] } }