predictor.rs 24 KB


  1. use std::cmp::{max, min};
  2. use std::collections::{BTreeMap, VecDeque};
  3. use std::sync::Arc;
  4. use chrono::{Utc};
  5. use futures_channel::mpsc::UnboundedSender;
  6. use futures_util::StreamExt;
  7. use rust_decimal::prelude::*;
  8. use rust_decimal_macros::dec;
  9. use tokio::sync::{Mutex};
  10. use tracing::{info};
  11. use global::cci::CentralControlInfo;
  12. use global::fixed_time_range_deque::FixedTimeRangeDeque;
  13. use global::params::Params;
  14. use standard::{Depth, Record, Ticker, Trade};
  15. use crate::utils;
  16. #[derive(Debug)]
  17. pub struct Predictor {
  18. pub depth_vec: Vec<Depth>, // 深度队列
  19. pub volume_vec: Vec<Decimal>, // 交易量队列
  20. pub trade_long_vec: FixedTimeRangeDeque<Trade>, // 交易队列
  21. pub trade_short_vec: FixedTimeRangeDeque<Trade>, // 交易队列
  22. pub spread_vec: Vec<Decimal>, // 价差队列
  23. pub record_vec: VecDeque<Record>, // 蜡烛队列
  24. pub mid_price: Decimal, // 中间价
  25. pub ask_price: Decimal, // 卖一价
  26. pub bid_price: Decimal, // 买一价
  27. pub last_price: Decimal, // 最后成交价
  28. pub spread: Decimal, // 当前价差
  29. pub spread_sma: Decimal, // 价差的sma,默认是sma5000
  30. pub spread_sma_2000: Decimal, // 价差的sma,2000级别
  31. pub spread_sma_1000: Decimal, // 价差的sma,1000级别
  32. pub optimal_ask_price: Decimal, // 卖出挂单价
  33. pub optimal_bid_price: Decimal, // 买入挂单价
  34. pub inventory: Decimal, // 库存,也就是q
  35. pub pos_amount: Decimal, // 原始持仓量
  36. pub pos_avg_price: Decimal, // 原始持仓价格
  37. pub level: Decimal, // martin
  38. pub error_rate: Decimal, // 犯错概率(预估)
  39. pub ask_delta: Decimal, // δa
  40. pub bid_delta: Decimal, // δb
  41. pub mid_price_time_vec: FixedTimeRangeDeque<Decimal>, // 中间价格队列,100ms以内的所有中间价格
  42. pub fair_price_time_vec: FixedTimeRangeDeque<Decimal>, // 公平价格队列,50ms以内的所有公平价格
  43. pub spread_sma_1000_time_vec: FixedTimeRangeDeque<Decimal>, // spread队列,100ms以内的所有spread_sma_1000
  44. pub fair_price_vec: Vec<Decimal>, // 公平价格列表,0表示做市所,1表示参考所
  45. pub fair_price: Decimal, // 预定价格
  46. pub fair_rate_focus: Decimal, // 变化幅度焦点
  47. pub fair_price_when_ordering: Decimal, // 下单时的预定价格
  48. pub price_times_avg: Decimal, // 公平所与做市所的价格倍率的平均值
  49. pub is_regressed: bool, // 做市所的价格是否已经回归
  50. pub is_ready: bool, // 是否已准备好
  51. pub close_price: Decimal, // 计划平仓价格
  52. pub prev_trade_time: i64, // 上次交易时间,也就是t
  53. pub t_diff: Decimal, // (T-t)
  54. pub last_update_time: Decimal, // 最后更新时间(depth)
  55. pub last_index: Decimal, // 最后更新的index
  56. pub prev_insert_time: Decimal,
  57. pub prev_save_time: Decimal,
  58. pub params: Params,
  59. pub debug_sender: UnboundedSender<Vec<Decimal>>,
  60. pub long_trade_len_dec: Decimal,
  61. pub short_trade_len_dec: Decimal,
  62. }
  63. impl Predictor {
  64. // 时间窗口大小(微秒)
  65. // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
  66. const TIME_DIFF_RANGE_MICROS: i64 = 10 * 60_000_000;
  67. const TRADE_LONG_RANGE_MICROS: i64 = 3 * 60_000_000;
  68. // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
  69. const TRADE_SHORT_RANGE_MICROS: i64 = 30_000_000;
  70. // const ONE_MILLION: Decimal = dec!(1_000_000);
  71. // const TWENTY_THOUSAND: Decimal = dec!(20_000);
  72. const UN_VIEW: Decimal = dec!(14142135623730951);
  73. pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
  74. // 创建数据通道
  75. // 创建一个无界通道
  76. let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
  77. tokio::spawn(async move {
  78. let len = 16usize;
  79. let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  80. let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
  81. while let Some(value) = rx.next().await {
  82. // 数据填充到对应位置
  83. for i in 0..len {
  84. if value[i] == Self::UN_VIEW {
  85. debugs[i].push_back(None);
  86. } else {
  87. debugs[i].push_back(Some(value[i]));
  88. }
  89. }
  90. // 长度限制
  91. if debugs[0].len() > 500_000 {
  92. for i in 0..len {
  93. debugs[i].pop_front(); // 从前面移除元素
  94. }
  95. }
  96. let now = Decimal::from(Utc::now().timestamp_millis());
  97. if now - prev_save_time < dec!(60000) {
  98. continue;
  99. }
  100. let debugs_clone = debugs.clone();
  101. let temp_html_str = tokio::task::spawn_blocking(move || {
  102. utils::build_html_file(&debugs_clone)
  103. }).await.unwrap();
  104. utils::write_to_file(&temp_html_str, "./db/db.html".to_string()).await;
  105. prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  106. }
  107. });
  108. let predictor = Self {
  109. // 接针版本
  110. depth_vec: vec![Depth::new(); 10],
  111. fair_price_vec: vec![Decimal::ZERO; 10],
  112. volume_vec: vec![Decimal::ZERO; 10],
  113. // 老的队列
  114. spread_vec: vec![],
  115. trade_long_vec: FixedTimeRangeDeque::new(Self::TRADE_LONG_RANGE_MICROS),
  116. trade_short_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS),
  117. record_vec: VecDeque::new(),
  118. mid_price: Default::default(),
  119. ask_price: Default::default(),
  120. bid_price: Default::default(),
  121. last_price: Default::default(),
  122. spread: Default::default(),
  123. spread_sma: Default::default(),
  124. spread_sma_2000: Default::default(),
  125. spread_sma_1000: Default::default(),
  126. optimal_ask_price: Default::default(),
  127. optimal_bid_price: Default::default(),
  128. inventory: Default::default(),
  129. ask_delta: Default::default(),
  130. bid_delta: Default::default(),
  131. fair_price_time_vec: FixedTimeRangeDeque::new(50_000),
  132. mid_price_time_vec: FixedTimeRangeDeque::new(100_000),
  133. spread_sma_1000_time_vec: FixedTimeRangeDeque::new(500_000),
  134. fair_price: Default::default(),
  135. fair_rate_focus: Default::default(),
  136. fair_price_when_ordering: Default::default(),
  137. price_times_avg: Default::default(),
  138. is_regressed: false,
  139. is_ready: false,
  140. prev_trade_time: Utc::now().timestamp_micros(),
  141. close_price: Default::default(),
  142. t_diff: Default::default(),
  143. level: Default::default(),
  144. pos_amount: Default::default(),
  145. error_rate: Default::default(),
  146. last_update_time: Default::default(),
  147. last_index: Default::default(),
  148. pos_avg_price: Default::default(),
  149. prev_insert_time: Default::default(),
  150. prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
  151. params,
  152. debug_sender: tx,
  153. long_trade_len_dec: Default::default(),
  154. short_trade_len_dec: Default::default(),
  155. };
  156. predictor
  157. }
  158. pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
  159. self.last_update_time = depth.time;
  160. self.last_index = Decimal::from(index);
  161. if index == 0 {
  162. self.ask_price = depth.asks[0].price;
  163. self.bid_price = depth.bids[0].price;
  164. self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
  165. self.mid_price_time_vec.push_back(self.mid_price);
  166. }
  167. self.update_fair_price(depth, index).await;
  168. self.update_spread();
  169. self.depth_vec[index] = depth.clone();
  170. if self.mid_price.is_zero() {
  171. return;
  172. }
  173. self.processor().await;
  174. }
  175. pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
  176. // self.last_update_time = trade.time;
  177. self.trade_long_vec.push_back(trade.clone());
  178. self.trade_short_vec.push_back(trade.clone());
  179. self.long_trade_len_dec = Decimal::from_usize(self.trade_long_vec.len()).unwrap();
  180. self.short_trade_len_dec = Decimal::from_usize(self.trade_short_vec.len()).unwrap();
  181. self.error_rate = self.short_trade_len_dec / self.long_trade_len_dec;
  182. self.error_rate.rescale(4);
  183. self.last_price = trade.price;
  184. // self.processor().await;
  185. }
  186. pub async fn update_level(&mut self) {
  187. self.level = (Decimal::NEGATIVE_ONE + (Decimal::ONE + dec!(8) * self.inventory.abs()).sqrt().unwrap()) / Decimal::TWO;
  188. self.level = min(self.level, dec!(6));
  189. }
  190. pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
  191. pub async fn on_record(&mut self, _record: &Record) {}
  192. pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal) {
  193. if self.mid_price.is_zero() {
  194. return;
  195. }
  196. let prev_inventory = self.inventory;
  197. self.pos_amount = pos_amount.clone();
  198. self.pos_avg_price = pos_avg_price.clone();
  199. self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc();
  200. // 小于1但不为0的情况,需要平完
  201. if self.inventory.is_zero() && !pos_amount.is_zero() {
  202. self.inventory = if pos_amount > &Decimal::ZERO {
  203. Decimal::ONE
  204. } else {
  205. Decimal::NEGATIVE_ONE
  206. };
  207. }
  208. if prev_inventory != self.inventory && prev_inventory.is_zero() {
  209. self.prev_trade_time = Utc::now().timestamp_micros();
  210. self.close_price = self.fair_price_when_ordering;
  211. }
  212. if prev_inventory != self.inventory && self.inventory.is_zero() {
  213. self.is_regressed = false;
  214. }
  215. self.update_level().await;
  216. self.processor().await;
  217. }
  218. pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) {
  219. if self.mid_price.is_zero() {
  220. return;
  221. }
  222. let a1 = &depth.asks[0];
  223. let b1 = &depth.bids[0];
  224. // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price
  225. // let total = a1.value + b1.value;
  226. // let fair_price = a1.price * b1.value / total + b1.price * a1.value / total;
  227. let fair_price = (a1.price + b1.price) / Decimal::TWO;
  228. self.fair_price_vec[index] = if self.fair_price_vec[index].is_zero() {
  229. fair_price
  230. } else {
  231. self.fair_price_vec[index] * dec!(0.9) + fair_price * dec!(0.1)
  232. };
  233. self.fair_price_vec[index].rescale(self.mid_price.scale());
  234. self.volume_vec[index] = a1.size + b1.size;
  235. // 合成公平价格
  236. if !self.fair_price_vec[0].is_zero() && !self.fair_price_vec[1].is_zero() {
  237. self.price_times_avg = if self.price_times_avg.is_zero() {
  238. self.fair_price_vec[1] / self.fair_price_vec[0]
  239. } else {
  240. self.price_times_avg * dec!(0.9999) + dec!(0.0001) * self.fair_price_vec[1] / self.fair_price_vec[0]
  241. };
  242. // 进行价格归一化处理,公平所的价格有可能是带前缀的
  243. let fair_price_part0 = self.fair_price_vec[0] * dec!(0.2);
  244. let fair_price_part1 = (self.fair_price_vec[1] / self.price_times_avg) * dec!(0.8);
  245. self.fair_price = fair_price_part0 + fair_price_part1;
  246. self.fair_price_time_vec.push_back(self.fair_price);
  247. // 重置焦点,条件1
  248. if !self.fair_rate_focus.is_zero() && self.mid_price_time_vec.deque.len() >= 2 && self.fair_price_time_vec.deque.len() >= 2 {
  249. let mid_price_prev = self.mid_price_time_vec.deque.get(self.mid_price_time_vec.deque.len() - 2).unwrap();
  250. let fair_price_prev = self.fair_price_time_vec.deque.get(self.fair_price_time_vec.deque.len() - 2).unwrap();
  251. // 向上涨,并且mid下穿fair,视为观测阶段结束
  252. if self.fair_rate_focus > Decimal::ZERO && mid_price_prev > fair_price_prev && self.mid_price < self.fair_price {
  253. self.fair_rate_focus = Decimal::ZERO;
  254. }
  255. // 向下跌,并且mid上穿fair,视为观测阶段结束
  256. if self.fair_rate_focus < Decimal::ZERO && mid_price_prev < fair_price_prev && self.mid_price > self.fair_price {
  257. self.fair_rate_focus = Decimal::ZERO;
  258. }
  259. }
  260. // 重置焦点,条件2
  261. if !self.fair_rate_focus.is_zero() && !self.inventory.is_zero() {
  262. self.fair_rate_focus = Decimal::ZERO;
  263. }
  264. // 更新程序关注的变化幅度焦点
  265. if self.fair_rate_focus.is_zero() {
  266. let last_fair_price = self.fair_price_time_vec.deque.iter().last().unwrap();
  267. let first_fair_price = self.fair_price_time_vec.deque[0];
  268. let mut rate = (last_fair_price - first_fair_price) / first_fair_price;
  269. rate.rescale(8);
  270. // 只有有强度的rate才有资格被称为针
  271. if rate.abs() > self.params.open {
  272. // 向上涨,并且fair下穿mid,视为观测阶段开始
  273. if rate > Decimal::ZERO && self.mid_price > self.fair_price {
  274. self.fair_rate_focus = rate;
  275. }
  276. // 向下跌,并且fair上穿mid,视为观测阶段开始
  277. if rate < Decimal::ZERO && self.mid_price < self.fair_price {
  278. self.fair_rate_focus = rate;
  279. }
  280. }
  281. }
  282. }
  283. // 判断价格是否回归
  284. if !self.is_regressed && self.inventory > Decimal::ZERO && self.spread_sma_1000 < max(self.spread_sma, self.spread_sma_2000) {
  285. self.is_regressed = true
  286. } else if !self.is_regressed && self.inventory < Decimal::ZERO && self.spread_sma_1000 > min(self.spread_sma, self.spread_sma_2000) {
  287. self.is_regressed = true
  288. }
  289. }
  290. pub fn update_spread(&mut self) {
  291. if self.mid_price.is_zero() || self.fair_price.is_zero() {
  292. return;
  293. }
  294. self.spread = (self.fair_price - self.mid_price) / self.mid_price;
  295. // self.spread.rescale(8);
  296. self.spread_vec.push(self.spread);
  297. self.spread_sma = if self.spread_sma.is_zero() {
  298. self.spread
  299. } else {
  300. self.spread_sma * dec!(0.9998) + self.spread * dec!(0.0002)
  301. };
  302. // self.spread_sma.rescale(8);
  303. self.spread_sma_2000 = if self.spread_sma_2000.is_zero() {
  304. self.spread
  305. } else {
  306. self.spread_sma_2000 * dec!(0.9995) + self.spread * dec!(0.0005)
  307. };
  308. // self.spread_sma_2000.rescale(8);
  309. self.spread_sma_1000 = if self.spread_sma_1000.is_zero() {
  310. self.spread
  311. } else {
  312. self.spread_sma_1000 * dec!(0.999) + self.spread * dec!(0.001)
  313. };
  314. self.spread_sma_1000_time_vec.push_back(self.spread_sma_1000);
  315. // self.spread_sma_1000.rescale(8);
  316. while self.spread_vec.len() > 1_000 {
  317. self.spread_vec.remove(0);
  318. }
  319. }
  320. pub fn update_delta(&mut self) {
  321. // -2表示不想成交
  322. // -1表示市价成交(委托对手盘的价格,但不一定能市价成交),这里再想想吧,经常委托出去没成交,明显比别人慢了
  323. // 0是买一/卖一成交
  324. if self.fair_price.is_zero() {
  325. return;
  326. }
  327. // 可能是趋势
  328. // let is_open_long = self.spread_sma_1000 - self.spread_sma > self.params.open && self.fair_price > self.mid_price;
  329. // let is_open_short = self.spread_sma_1000 - self.spread_sma < self.params.open * Decimal::NEGATIVE_ONE && self.fair_price < self.mid_price;
  330. // 可能是接针
  331. let is_open_long = self.fair_rate_focus < Decimal::ZERO && self.fair_price > self.mid_price;
  332. let is_open_short = self.fair_rate_focus > Decimal::ZERO && self.fair_price < self.mid_price;
  333. let is_close_long = self.inventory > Decimal::ZERO;
  334. let is_close_short = self.inventory < Decimal::ZERO;
  335. self.bid_delta = dec!(-2);
  336. self.ask_delta = dec!(-2);
  337. if is_close_long {
  338. if self.mid_price > self.pos_avg_price {
  339. // let profit_rate = (self.mid_price - self.pos_avg_price) / self.pos_avg_price;
  340. // let fill_rate = profit_rate / (self.params.open * dec!(50));
  341. // let close_rate = (Decimal::ONE - fill_rate) * self.params.open + self.params.close;
  342. //
  343. // self.ask_delta = self.mid_price * close_rate * self.t_diff;
  344. self.ask_delta = self.mid_price * self.params.close;
  345. } else if self.mid_price > self.fair_price {
  346. self.ask_delta = self.mid_price * self.params.close;
  347. }
  348. // self.ask_delta = self.mid_price * self.params.close;
  349. } else if is_close_short {
  350. if self.mid_price < self.pos_avg_price {
  351. // let profit_rate = (self.pos_avg_price - self.mid_price) / self.pos_avg_price;
  352. // let fill_rate = profit_rate / (self.params.open * dec!(50));
  353. // let close_rate = (Decimal::ONE - fill_rate) * self.params.open + self.params.close;
  354. //
  355. // self.bid_delta = self.mid_price * close_rate * self.t_diff;
  356. self.bid_delta = self.mid_price * self.params.close;
  357. } else if self.mid_price < self.fair_price {
  358. self.bid_delta = self.mid_price * self.params.close;
  359. }
  360. // self.bid_delta = self.mid_price * self.params.close;
  361. } else if is_open_long {
  362. // let is_open_long_market = self.spread_sma_1000 - self.spread_sma > self.params.open_market;
  363. // self.bid_delta = if is_open_long_market {
  364. // dec!(-1)
  365. // } else {
  366. // dec!(0)
  367. // };
  368. self.bid_delta = dec!(0)
  369. } else if is_open_short {
  370. // let is_open_short_market = self.spread_sma_1000 - self.spread_sma < self.params.open_market * Decimal::NEGATIVE_ONE;
  371. // self.ask_delta = if is_open_short_market {
  372. // dec!(-1)
  373. // } else {
  374. // dec!(0)
  375. // }
  376. self.ask_delta = dec!(0)
  377. }
  378. }
  379. pub fn update_optimal_ask_and_bid(&mut self) {
  380. self.optimal_ask_price = if self.ask_delta == dec!(-1) {
  381. self.bid_price
  382. } else if self.ask_delta == dec!(-2) {
  383. Self::UN_VIEW
  384. } else {
  385. max(self.ask_price + self.ask_delta, self.bid_price)
  386. };
  387. self.optimal_bid_price = if self.bid_delta == dec!(-1) {
  388. self.ask_price
  389. } else if self.bid_delta == dec!(-2) {
  390. Self::UN_VIEW
  391. } else {
  392. min(self.bid_price - self.bid_delta, self.ask_price)
  393. };
  394. self.optimal_ask_price.rescale(self.mid_price.scale());
  395. self.optimal_bid_price.rescale(self.mid_price.scale());
  396. }
  397. pub fn update_t_diff(&mut self) {
  398. if self.prev_trade_time > 0 {
  399. let time_diff_decimal = Decimal::from_i64(Utc::now().timestamp_micros() - self.prev_trade_time).unwrap();
  400. self.t_diff = max(Decimal::ONE - time_diff_decimal / Decimal::from_i64(Self::TIME_DIFF_RANGE_MICROS).unwrap(), Decimal::ZERO);
  401. } else {
  402. self.t_diff = Decimal::ONE;
  403. }
  404. }
  405. pub fn check_ready(&mut self) {
  406. if self.is_ready {
  407. return;
  408. }
  409. if self.mid_price == Decimal::ZERO {
  410. return;
  411. }
  412. if self.fair_price == Decimal::ZERO {
  413. return;
  414. }
  415. if self.ask_price == Decimal::ZERO {
  416. return;
  417. }
  418. if self.bid_price == Decimal::ZERO {
  419. return;
  420. }
  421. if self.trade_long_vec.len() < 100 {
  422. return;
  423. }
  424. self.is_ready = true;
  425. info!("========================================行情数据预热完毕==================================")
  426. }
  427. // #[instrument(skip(self), level="TRACE")]
  428. async fn processor(&mut self) {
  429. self.update_t_diff();
  430. self.update_delta();
  431. self.update_optimal_ask_and_bid();
  432. self.check_ready();
  433. if !self.is_ready {
  434. return;
  435. }
  436. // let mut smm = Decimal::ZERO;
  437. // if !self.depth_vec[1].time.is_zero() {
  438. // let sma = self.depth_vec[1].asks[0].price;
  439. // let smb = self.depth_vec[1].bids[0].price;
  440. // smm = (sma + smb) / Decimal::TWO;
  441. // }
  442. // let cci_arc = self.cci_arc.clone();
  443. let now = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
  444. let mid_price = self.mid_price;
  445. let ask_price = self.ask_price;
  446. let bid_price = self.bid_price;
  447. let last_price = self.last_price;
  448. let spread = self.spread_sma;
  449. let spread_max = self.spread_sma_2000;
  450. let spread_min = self.spread_sma_1000;
  451. // let spread = self.price_times_avg;
  452. // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0];
  453. // let spread_min = self.fair_price / self.mid_price;
  454. let optimal_ask_price = self.optimal_ask_price;
  455. let optimal_bid_price = self.optimal_bid_price;
  456. let inventory = self.inventory;
  457. let sigma_square = if self.is_regressed { Decimal::ONE } else { Decimal::ZERO };
  458. let gamma = now - self.last_update_time;
  459. let kappa = self.fair_rate_focus;
  460. let flow_ratio = Decimal::ZERO;
  461. let ref_price = self.fair_price;
  462. let need_append = now - self.prev_insert_time > Decimal::ONE_HUNDRED;
  463. if !need_append {
  464. return;
  465. }
  466. self.debug_sender.unbounded_send(vec![
  467. now,
  468. mid_price,
  469. ask_price,
  470. bid_price,
  471. last_price,
  472. spread,
  473. spread_max,
  474. spread_min,
  475. optimal_ask_price,
  476. optimal_bid_price,
  477. inventory,
  478. sigma_square,
  479. gamma,
  480. kappa,
  481. flow_ratio,
  482. ref_price
  483. ]).unwrap();
  484. self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
  485. }
  486. // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
  487. pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
  488. vec![]
  489. }
  490. }