predictor.rs 19 KB


  1. use std::cmp::max;
  2. use std::collections::{BTreeMap, VecDeque};
  3. use std::sync::Arc;
  4. use chrono::{Utc};
  5. use futures_channel::mpsc::UnboundedSender;
  6. use futures_util::StreamExt;
  7. use rust_decimal::prelude::*;
  8. use rust_decimal_macros::dec;
  9. use tokio::sync::{Mutex};
  10. use tracing::{info};
  11. use global::cci::CentralControlInfo;
  12. use global::fixed_time_range_deque::FixedTimeRangeDeque;
  13. use global::params::Params;
  14. use standard::{Depth, Record, Ticker, Trade};
  15. use crate::utils;
  16. #[derive(Debug, Clone)]
  17. pub struct Predictor {
  18. pub depth_vec: Vec<Depth>, // 深度队列
  19. pub record_vec: VecDeque<Record>, // 蜡烛队列
  20. pub spread_vec: VecDeque<Decimal>, // 价差队列
  21. pub trade_price_long_vec: FixedTimeRangeDeque<Decimal>,
  22. pub trade_233_vec: FixedTimeRangeDeque<Trade>,
  23. pub trade_0_vec: FixedTimeRangeDeque<Trade>,
  24. pub prices: Vec<Vec<VecDeque<Decimal>>>, // [[[做市所], [参考所0]], ...]
  25. pub ks: Vec<Decimal>,
  26. pub bs: Vec<Decimal>,
  27. pub mid_price: Decimal, // 中间价
  28. pub fair_price: Decimal,
  29. pub ask_price: Decimal, // 卖一价
  30. pub bid_price: Decimal, // 买一价
  31. pub last_price: Decimal, // 最后成交价
  32. pub optimal_ask_price: Decimal, // 卖出挂单价
  33. pub optimal_bid_price: Decimal, // 买入挂单价
  34. pub inventory: Decimal, // 库存,也就是q
  35. pub pos_amount: Decimal, // 原始持仓量
  36. pub pos_avg_price: Decimal, // 原始持仓价格
  37. pub balance: Decimal, // 初始余额
  38. pub prev_balance: Decimal,
  39. pub ask_delta: Decimal, // δa
  40. pub bid_delta: Decimal, // δb
  41. pub mid_price_vec: Vec<Decimal>, //
  42. pub fair_price_std_vec: Vec<Decimal>, // 公平价格列表,标准化之后的
  43. pub price_avg_times_vec: Vec<Decimal>, // 公平所与做市所的价格倍率的平均值
  44. pub price_avg_times_long_vec: Vec<Decimal>, // 公平所与做市所的价格倍率的平均值
  45. pub is_ready: bool, // 是否已准备好
  46. pub last_update_time: Decimal, // 最后更新时间(depth)
  47. pub last_index: Decimal, // 最后更新的index
  48. pub prev_insert_time: Decimal,
  49. pub prev_save_time: Decimal,
  50. pub init_time: Decimal,
  51. pub fitting_delay: Decimal,
  52. pub prev_fitting_time: Decimal,
  53. pub params: Params,
  54. pub debug_sender: UnboundedSender<Vec<Decimal>>
  55. }
  56. impl Predictor {
  57. // 时间窗口大小(微秒)
  58. // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
  59. // const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000;
  60. const TRADE_LONG_RANGE_MICROS: i64 = 10 * 60_000_000;
  61. // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
  62. const TRADE_SHORT_RANGE_MICROS: i64 = 2 * 60_000_000;
  63. // const ONE_MILLION: Decimal = dec!(1_000_000);
  64. // const TWENTY_THOUSAND: Decimal = dec!(20_000);
  65. const DONT_VIEW: Decimal = dec!(14142135623730951);
  66. pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
  67. // 创建数据通道
  68. // 创建一个无界通道
  69. let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
  70. let account_port = params.port.clone();
  71. tokio::spawn(async move {
  72. let len = 17usize;
  73. let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  74. let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
  75. while let Some(value) = rx.next().await {
  76. // 数据填充到对应位置
  77. for i in 0..len {
  78. if value[i] == Self::DONT_VIEW {
  79. debugs[i].push_back(None);
  80. } else {
  81. debugs[i].push_back(Some(value[i]));
  82. }
  83. }
  84. // 长度限制
  85. if debugs[0].len() > 500_000 {
  86. for i in 0..len {
  87. debugs[i].pop_front(); // 从前面移除元素
  88. }
  89. }
  90. let now = Decimal::from(Utc::now().timestamp_millis());
  91. if now - prev_save_time < dec!(30000) {
  92. continue;
  93. }
  94. let debugs_clone = debugs.clone();
  95. let temp_html_str = tokio::task::spawn_blocking(move || {
  96. utils::build_html_file(&debugs_clone)
  97. }).await.unwrap();
  98. let path = format!("./db/{}.html", account_port);
  99. utils::write_to_file(&temp_html_str, path).await;
  100. prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  101. }
  102. });
  103. let predictor = Self {
  104. // 接针版本
  105. depth_vec: vec![Depth::new(); params.ref_exchange.len()],
  106. fair_price_std_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  107. mid_price_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  108. price_avg_times_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  109. price_avg_times_long_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  110. prices: vec![vec![VecDeque::new(); 2]; params.ref_exchange.len()],
  111. ks: vec![Decimal::ZERO; params.ref_exchange.len()],
  112. bs: vec![Decimal::ZERO; params.ref_exchange.len()],
  113. record_vec: VecDeque::new(),
  114. trade_price_long_vec: FixedTimeRangeDeque::new(Self::TRADE_LONG_RANGE_MICROS),
  115. trade_233_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS),
  116. trade_0_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS),
  117. spread_vec: VecDeque::new(),
  118. mid_price: Default::default(),
  119. fair_price: Default::default(),
  120. ask_price: Default::default(),
  121. bid_price: Default::default(),
  122. last_price: Default::default(),
  123. optimal_ask_price: Default::default(),
  124. optimal_bid_price: Default::default(),
  125. ask_delta: Default::default(),
  126. bid_delta: Default::default(),
  127. is_ready: false,
  128. inventory: Default::default(),
  129. pos_avg_price: Default::default(),
  130. pos_amount: Default::default(),
  131. balance: Default::default(),
  132. prev_balance: Default::default(),
  133. last_update_time: Default::default(),
  134. last_index: Default::default(),
  135. prev_insert_time: Default::default(),
  136. prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
  137. init_time: Decimal::from(Utc::now().timestamp_millis()),
  138. fitting_delay: Default::default(),
  139. prev_fitting_time: Default::default(),
  140. params,
  141. debug_sender: tx,
  142. };
  143. predictor
  144. }
  145. pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
  146. self.last_update_time = depth.time;
  147. self.last_index = Decimal::from(index);
  148. if index == 233 {
  149. self.ask_price = depth.asks[0].price;
  150. self.bid_price = depth.bids[0].price;
  151. self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
  152. // 拟合k与b
  153. for (mid_index, mp) in self.mid_price_vec.iter().enumerate() {
  154. if mp.is_zero() {
  155. continue
  156. }
  157. self.prices[mid_index][0].push_back(self.mid_price);
  158. self.prices[mid_index][1].push_back(mp.clone());
  159. // 长度限制
  160. if self.prices[mid_index][0].len() > 10000 {
  161. self.prices[mid_index][0].pop_front();
  162. self.prices[mid_index][1].pop_front();
  163. }
  164. // 拟合,60s拟合一次
  165. let before_fitting = Utc::now().timestamp_millis();
  166. if Decimal::from(before_fitting) - self.prev_fitting_time > dec!(60_000) || self.prices[mid_index][0].len() < 1000 {
  167. if let Some((k, b)) = self.linear_least_squares(mid_index).await {
  168. self.ks[mid_index] = k;
  169. self.bs[mid_index] = b;
  170. self.fitting_delay = Decimal::from(Utc::now().timestamp_millis() - before_fitting);
  171. self.prev_fitting_time = Decimal::from(before_fitting)
  172. } else {
  173. return;
  174. }
  175. }
  176. }
  177. } else {
  178. self.update_fair_price(depth, index).await;
  179. self.depth_vec[index] = depth.clone();
  180. }
  181. if self.mid_price.is_zero() {
  182. return;
  183. }
  184. self.processor(depth.time, false).await;
  185. }
  186. pub async fn on_trade(&mut self, trade: &Trade, index: usize) {
  187. // index == 233代表做市所
  188. // index == 0代表参考所
  189. self.last_price = trade.price;
  190. self.trade_price_long_vec.push_back(trade.price);
  191. if index == 233 {
  192. self.trade_233_vec.push_back(trade.clone());
  193. } else if index == 0 {
  194. self.trade_0_vec.push_back(trade.clone());
  195. }
  196. // self.processor().await;
  197. }
  198. pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
  199. pub async fn on_record(&mut self, _record: &Record) {}
  200. pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal, update_time: Decimal) {
  201. if self.mid_price.is_zero() {
  202. return;
  203. }
  204. let prev_pos_amount = self.pos_amount;
  205. self.pos_amount = pos_amount.clone();
  206. self.pos_avg_price = pos_avg_price.clone();
  207. self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc();
  208. // 小于1但不为0的情况,需要平完
  209. if self.inventory.is_zero() && !pos_amount.is_zero() {
  210. self.inventory = if pos_amount > &Decimal::ZERO {
  211. Decimal::ONE
  212. } else {
  213. Decimal::NEGATIVE_ONE
  214. };
  215. }
  216. if prev_pos_amount != self.pos_amount {
  217. self.processor(update_time, true).await;
  218. }
  219. }
  220. pub async fn on_balance(&mut self, balance: Decimal) {
  221. self.balance = balance;
  222. }
  223. pub fn get_real_rate(price_vec: &FixedTimeRangeDeque<Decimal>) -> Decimal {
  224. let last_fair_price = price_vec.deque.iter().last().unwrap();
  225. let min_price = price_vec.deque.iter().min().unwrap();
  226. let max_price = price_vec.deque.iter().max().unwrap();
  227. let up_rate = (last_fair_price - min_price) / min_price;
  228. let down_rate = (max_price - last_fair_price) / max_price;
  229. if up_rate > down_rate {
  230. up_rate
  231. } else {
  232. -down_rate
  233. }
  234. }
  235. pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) {
  236. if self.mid_price.is_zero() {
  237. return;
  238. }
  239. let a1 = &depth.asks[0];
  240. let b1 = &depth.bids[0];
  241. // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price
  242. // let total = a1.value + b1.value;
  243. // let fair_price = (a1.price + b1.price) / Decimal::TWO;
  244. // self.fair_price_vec[index] = a1.price * b1.value / total + b1.price * a1.value / total;
  245. let mut mp = (a1.price + b1.price) / Decimal::TWO;
  246. mp.rescale(self.mid_price.scale());
  247. self.mid_price_vec[index] = mp;
  248. // 生成fp
  249. self.fair_price_std_vec[index] = mp * self.ks[index] + self.bs[index];
  250. self.fair_price_std_vec[index].rescale(self.mid_price.scale());
  251. // 生成最终用于挂单的公平价格
  252. let fair_price_sum: Decimal = self.fair_price_std_vec.iter().sum();
  253. let fair_price_count = self.fair_price_std_vec.iter()
  254. .filter(|&&value| value != Decimal::new(0, 0)) // 过滤掉0
  255. .count();
  256. if fair_price_count != 0 {
  257. self.fair_price = fair_price_sum / Decimal::from(fair_price_count);
  258. let mut spread_abs = ((self.fair_price - self.mid_price) / self.mid_price).abs();
  259. spread_abs.rescale(5);
  260. self.spread_vec.push_back(spread_abs);
  261. if self.spread_vec.len() > 1000 {
  262. self.spread_vec.pop_front();
  263. }
  264. let opt_abs_value = self.spread_vec.iter().max().unwrap().clone();
  265. self.params.open = max(max(self.params.min_open, dec!(0.0006)), opt_abs_value);
  266. }
  267. }
  268. pub async fn update_delta(&mut self) {
  269. if self.mid_price.is_zero() {
  270. return;
  271. }
  272. let is_close_long = self.inventory > Decimal::ZERO;
  273. let is_close_short = self.inventory < Decimal::ZERO;
  274. if is_close_long {
  275. self.ask_delta = dec!(0);
  276. self.bid_delta = dec!(-2);
  277. self.optimal_ask_price = self.fair_price + self.fair_price * self.params.close;
  278. self.optimal_bid_price = Self::DONT_VIEW;
  279. } else if is_close_short {
  280. self.bid_delta = dec!(0);
  281. self.ask_delta = dec!(-2);
  282. self.optimal_bid_price = self.fair_price - self.fair_price * self.params.close;
  283. self.optimal_ask_price = Self::DONT_VIEW;
  284. } else {
  285. if self.fair_price > self.mid_price {
  286. self.bid_delta = dec!(0);
  287. self.ask_delta = dec!(-2);
  288. self.optimal_bid_price = self.fair_price - self.fair_price * self.params.open;
  289. self.optimal_ask_price = Self::DONT_VIEW;
  290. } else if self.fair_price < self.mid_price {
  291. self.ask_delta = dec!(0);
  292. self.bid_delta = dec!(-2);
  293. self.optimal_ask_price = self.fair_price + self.fair_price * self.params.open;
  294. self.optimal_bid_price = Self::DONT_VIEW;
  295. } else {
  296. self.bid_delta = dec!(0);
  297. self.ask_delta = dec!(0);
  298. self.optimal_bid_price = self.fair_price - self.fair_price * self.params.open;
  299. self.optimal_ask_price = self.fair_price + self.fair_price * self.params.open;
  300. }
  301. }
  302. self.optimal_ask_price.rescale(self.mid_price.scale());
  303. self.optimal_bid_price.rescale(self.mid_price.scale());
  304. }
  305. pub fn check_ready(&mut self) {
  306. if self.is_ready {
  307. return;
  308. }
  309. if self.mid_price.is_zero() {
  310. return;
  311. }
  312. for fair_price in &self.fair_price_std_vec {
  313. if fair_price.is_zero() {
  314. return;
  315. }
  316. }
  317. if self.ask_price.is_zero() {
  318. return;
  319. }
  320. if self.bid_price.is_zero() {
  321. return;
  322. }
  323. if self.balance.is_zero() {
  324. return;
  325. }
  326. self.is_ready = true;
  327. info!("========================================行情数据预热完毕==================================")
  328. }
  329. // 最小二乘法拟合函数,支持VecDeque
  330. pub async fn linear_least_squares(&self, index: usize) -> Option<(Decimal, Decimal)> {
  331. let x = &self.prices[index][1];
  332. let y = &self.prices[index][0];
  333. // 检查数组长度是否相同
  334. if x.len() != y.len() {
  335. return None;
  336. }
  337. let n = x.len();
  338. if n == 0 {
  339. return None;
  340. }
  341. let mut sum_x = Decimal::zero();
  342. let mut sum_y = Decimal::zero();
  343. let mut sum_xx = Decimal::zero();
  344. let mut sum_xy = Decimal::zero();
  345. // 遍历VecDeque中的元素
  346. for (xi, yi) in x.iter().zip(y.iter()) {
  347. sum_x += xi;
  348. sum_y += yi;
  349. let xi_sq = xi * xi;
  350. sum_xx += xi_sq;
  351. sum_xy += xi * yi;
  352. }
  353. // 计算分子和分母
  354. let numerator = sum_xy - (sum_x * sum_y) / Decimal::from(n);
  355. let denominator = sum_xx - (sum_x * sum_x) / Decimal::from(n);
  356. // 如果分母为0,返回None
  357. if denominator == Decimal::zero() {
  358. return None;
  359. }
  360. let k = numerator / denominator;
  361. let mean_x = sum_x / Decimal::from(n);
  362. let mean_y = sum_y / Decimal::from(n);
  363. let b = mean_y - k * mean_x;
  364. Some((k, b))
  365. }
  366. // #[instrument(skip(self), level="TRACE")]
  367. async fn processor(&mut self, data_time: Decimal, is_hard_update: bool) {
  368. self.check_ready();
  369. if !self.is_ready {
  370. return;
  371. }
  372. self.update_delta().await;
  373. // let cci_arc = self.cci_arc.clone();
  374. let now = data_time;
  375. let mid_price = self.mid_price;
  376. let ask_price = if self.params.ref_exchange.len() > 0 {
  377. self.fair_price_std_vec[0]
  378. } else {
  379. Self::DONT_VIEW
  380. };
  381. let bid_price = if self.params.ref_exchange.len() > 1 {
  382. self.fair_price_std_vec[1]
  383. } else {
  384. Self::DONT_VIEW
  385. };
  386. let optimal_ask_price = self.optimal_ask_price;
  387. let optimal_bid_price = self.optimal_bid_price;
  388. let last_price = Self::DONT_VIEW;
  389. let fair_price = self.fair_price;
  390. let total_amount_0: Decimal = self.trade_233_vec.deque.iter().map(|trade| trade.value).sum();
  391. let total_amount_1: Decimal = self.trade_0_vec.deque.iter().map(|trade| trade.value).sum();
  392. let spread = Self::DONT_VIEW;
  393. let spread_max = self.bs[0];
  394. let spread_min = self.ks[0];
  395. // let spread = self.price_times_avg;
  396. // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0];
  397. // let spread_min = self.fair_price / self.mid_price;
  398. let inventory = self.inventory;
  399. let sigma_square = if total_amount_0 + total_amount_1 == Decimal::ZERO {
  400. Decimal::ZERO
  401. } else {
  402. total_amount_0 / (total_amount_0 + total_amount_1)
  403. };
  404. let gamma = self.balance;
  405. let kappa = self.fitting_delay;
  406. let flow_ratio = Decimal::ZERO;
  407. let need_append = now - self.prev_insert_time > dec!(500);
  408. if !need_append && !is_hard_update {
  409. return;
  410. }
  411. if !is_hard_update {
  412. self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
  413. }
  414. let pos_avg_price = self.pos_avg_price;
  415. self.debug_sender.unbounded_send(vec![
  416. now,
  417. mid_price,
  418. ask_price,
  419. bid_price,
  420. last_price,
  421. spread,
  422. spread_max,
  423. spread_min,
  424. optimal_ask_price,
  425. optimal_bid_price,
  426. inventory,
  427. sigma_square,
  428. gamma,
  429. kappa,
  430. flow_ratio,
  431. fair_price,
  432. pos_avg_price
  433. ]).unwrap();
  434. }
  435. // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
  436. pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
  437. vec![]
  438. }
  439. }