predictor.rs 17 KB


  1. use std::collections::{BTreeMap, VecDeque};
  2. use std::sync::Arc;
  3. use chrono::{Utc};
  4. use futures_channel::mpsc::UnboundedSender;
  5. use futures_util::StreamExt;
  6. use rust_decimal::prelude::*;
  7. use rust_decimal_macros::dec;
  8. use tokio::sync::{Mutex};
  9. use tracing::{info};
  10. use global::cci::CentralControlInfo;
  11. use global::fixed_time_range_deque::FixedTimeRangeDeque;
  12. use global::params::Params;
  13. use standard::{Depth, Record, Ticker, Trade};
  14. use crate::utils;
  15. #[derive(Debug, Clone)]
  16. pub struct Predictor {
  17. pub depth_vec: Vec<Depth>, // 深度队列
  18. pub spread_vec: Vec<Decimal>, // 价差队列
  19. pub record_vec: VecDeque<Record>, // 蜡烛队列
  20. pub mid_price: Decimal, // 中间价
  21. pub ask_price: Decimal, // 卖一价
  22. pub bid_price: Decimal, // 买一价
  23. pub last_price: Decimal, // 最后成交价
  24. pub optimal_ask_price: Decimal, // 卖出挂单价
  25. pub optimal_bid_price: Decimal, // 买入挂单价
  26. pub inventory: Decimal, // 库存,也就是q
  27. pub pos_amount: Decimal, // 原始持仓量
  28. pub pos_avg_price: Decimal, // 原始持仓价格
  29. pub balance: Decimal, // 初始余额
  30. pub signal: Decimal, // 大于0代表此时是正向信号,小于0则相反
  31. pub ask_delta: Decimal, // δa
  32. pub bid_delta: Decimal, // δb
  33. pub fair_price_vec: Vec<Decimal>, // 公平价格列表
  34. pub fair_price_std_vec: Vec<Decimal>, // 公平价格列表,标准化之后的
  35. pub price_avg_times_vec: Vec<Decimal>, // 公平所与做市所的价格倍率的平均值
  36. pub is_ready: bool, // 是否已准备好
  37. pub last_update_time: Decimal, // 最后更新时间(depth)
  38. pub last_index: Decimal, // 最后更新的index
  39. pub prev_insert_time: Decimal,
  40. pub prev_save_time: Decimal,
  41. pub init_time: Decimal,
  42. pub params: Params,
  43. pub debug_sender: UnboundedSender<Vec<Decimal>>
  44. }
  45. impl Predictor {
  46. // 时间窗口大小(微秒)
  47. // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
  48. // const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000;
  49. // const TRADE_LONG_RANGE_MICROS: i64 = 60_000_000;
  50. // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
  51. // const TRADE_SHORT_RANGE_MICROS: i64 = 2 * 60_000_000;
  52. // const ONE_MILLION: Decimal = dec!(1_000_000);
  53. // const TWENTY_THOUSAND: Decimal = dec!(20_000);
  54. const DONT_VIEW: Decimal = dec!(14142135623730951);
  55. pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
  56. // 创建数据通道
  57. // 创建一个无界通道
  58. let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
  59. let account_port = params.port.clone();
  60. tokio::spawn(async move {
  61. let len = 17usize;
  62. let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  63. let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
  64. while let Some(value) = rx.next().await {
  65. // 获取插入下标
  66. // 反向遍历 VecDeque
  67. let mut insert_index = 0usize;
  68. if debugs[0].len() > 0 {
  69. let mut j = debugs[0].len() - 1;
  70. loop {
  71. if debugs[0][j].unwrap() <= value[0] {
  72. insert_index = j + 1;
  73. break;
  74. }
  75. j = j - 1;
  76. if j == 0 {
  77. break;
  78. }
  79. }
  80. }
  81. // 数据填充到对应位置
  82. for i in 0..len {
  83. if value[i] == Self::DONT_VIEW {
  84. // 可能会遇见按时序插入的
  85. if debugs[i].len() > 0 && insert_index < debugs[i].len() {
  86. // 在合适的位置插入新元素
  87. debugs[i].insert(insert_index, None);
  88. } else {
  89. debugs[i].push_back(None);
  90. }
  91. } else {
  92. // 可能会遇见按时序插入的
  93. if debugs[i].len() > 0 && insert_index < debugs[i].len() {
  94. // 在合适的位置插入新元素
  95. debugs[i].insert(insert_index, Some(value[i]));
  96. } else {
  97. debugs[i].push_back(Some(value[i]));
  98. }
  99. }
  100. }
  101. // 长度限制
  102. if debugs[0].len() > 500_000 {
  103. for i in 0..len {
  104. debugs[i].pop_front(); // 从前面移除元素
  105. }
  106. }
  107. let now = Decimal::from(Utc::now().timestamp_millis());
  108. if now - prev_save_time < dec!(30000) {
  109. continue;
  110. }
  111. let debugs_clone = debugs.clone();
  112. let temp_html_str = tokio::task::spawn_blocking(move || {
  113. utils::build_html_file(&debugs_clone)
  114. }).await.unwrap();
  115. let path = format!("./db/{}.html", account_port);
  116. utils::write_to_file(&temp_html_str, path).await;
  117. prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  118. }
  119. });
  120. let predictor = Self {
  121. // 接针版本
  122. depth_vec: vec![Depth::new(); params.ref_exchange.len()],
  123. fair_price_std_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  124. fair_price_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  125. price_avg_times_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  126. spread_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  127. record_vec: VecDeque::new(),
  128. mid_price: Default::default(),
  129. ask_price: Default::default(),
  130. bid_price: Default::default(),
  131. last_price: Default::default(),
  132. optimal_ask_price: Default::default(),
  133. optimal_bid_price: Default::default(),
  134. ask_delta: Default::default(),
  135. bid_delta: Default::default(),
  136. is_ready: false,
  137. inventory: Default::default(),
  138. pos_avg_price: Default::default(),
  139. pos_amount: Default::default(),
  140. balance: Default::default(),
  141. signal: Default::default(),
  142. last_update_time: Default::default(),
  143. last_index: Default::default(),
  144. prev_insert_time: Default::default(),
  145. prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
  146. init_time: Decimal::from(Utc::now().timestamp_millis()),
  147. params,
  148. debug_sender: tx,
  149. };
  150. predictor
  151. }
  152. pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
  153. self.last_update_time = depth.time;
  154. self.last_index = Decimal::from(index);
  155. if index == 233 {
  156. self.ask_price = depth.asks[0].price;
  157. self.bid_price = depth.bids[0].price;
  158. self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
  159. } else {
  160. self.update_fair_price(depth, index).await;
  161. self.depth_vec[index] = depth.clone();
  162. }
  163. if self.mid_price.is_zero() {
  164. return;
  165. }
  166. self.processor(depth.time, false).await;
  167. }
  168. pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
  169. self.last_price = trade.price;
  170. // self.processor().await;
  171. }
  172. pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
  173. pub async fn on_record(&mut self, _record: &Record) {}
  174. pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal, update_time: Decimal) {
  175. if self.mid_price.is_zero() {
  176. return;
  177. }
  178. let prev_pos_amount = self.pos_amount;
  179. self.pos_amount = pos_amount.clone();
  180. self.pos_avg_price = pos_avg_price.clone();
  181. self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc();
  182. // 小于1但不为0的情况,需要平完
  183. if self.inventory.is_zero() && !pos_amount.is_zero() {
  184. self.inventory = if pos_amount > &Decimal::ZERO {
  185. Decimal::ONE
  186. } else {
  187. Decimal::NEGATIVE_ONE
  188. };
  189. }
  190. if prev_pos_amount != self.pos_amount {
  191. self.processor(update_time, true).await;
  192. }
  193. }
  194. pub async fn on_balance(&mut self, balance: Decimal) {
  195. self.balance = balance;
  196. }
  197. pub fn get_real_rate(price_vec: &FixedTimeRangeDeque<Decimal>) -> Decimal {
  198. let last_fair_price = price_vec.deque.iter().last().unwrap();
  199. let min_price = price_vec.deque.iter().min().unwrap();
  200. let max_price = price_vec.deque.iter().max().unwrap();
  201. let up_rate = (last_fair_price - min_price) / min_price;
  202. let down_rate = (max_price - last_fair_price) / max_price;
  203. if up_rate > down_rate {
  204. up_rate
  205. } else {
  206. -down_rate
  207. }
  208. }
  209. pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) {
  210. if self.mid_price.is_zero() {
  211. return;
  212. }
  213. let a1 = &depth.asks[0];
  214. let b1 = &depth.bids[0];
  215. // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price
  216. let total = a1.value + b1.value;
  217. // let fair_price = (a1.price + b1.price) / Decimal::TWO;
  218. // 生成fp
  219. self.fair_price_vec[index] = a1.price * b1.value / total + b1.price * a1.value / total;
  220. self.fair_price_vec[index].rescale(self.mid_price.scale());
  221. // 求价格倍率
  222. self.price_avg_times_vec[index] = if self.price_avg_times_vec[index].is_zero() {
  223. self.fair_price_vec[index] / self.mid_price
  224. } else {
  225. self.price_avg_times_vec[index] * dec!(0.9995) + dec!(0.0005) * self.fair_price_vec[index] / self.mid_price
  226. };
  227. // 合成公平价格
  228. self.fair_price_std_vec[index] = self.fair_price_vec[index] / self.price_avg_times_vec[index];
  229. // 开仓信号处理
  230. self.signal = Decimal::ZERO;
  231. for (i, fair_price_std) in self.fair_price_std_vec.iter().enumerate() {
  232. if fair_price_std.is_zero() {
  233. return;
  234. }
  235. self.spread_vec[i] = fair_price_std - self.mid_price;
  236. self.signal = self.signal + self.spread_vec[i];
  237. }
  238. self.signal = self.signal / self.params.min_spread;
  239. self.signal.rescale(0);
  240. }
  241. pub fn update_delta(&mut self) {
  242. if self.mid_price.is_zero() {
  243. return;
  244. }
  245. for fair_price in &self.fair_price_vec {
  246. if fair_price.is_zero() {
  247. return;
  248. }
  249. }
  250. let is_close_long = self.inventory > Decimal::ZERO;
  251. let is_close_short = self.inventory < Decimal::ZERO;
  252. if is_close_long {
  253. self.ask_delta = dec!(0);
  254. self.bid_delta = dec!(-2);
  255. self.optimal_ask_price = self.mid_price + self.mid_price * self.params.close;
  256. self.optimal_bid_price = Self::DONT_VIEW;
  257. } else if is_close_short {
  258. self.bid_delta = dec!(0);
  259. self.ask_delta = dec!(-2);
  260. self.optimal_bid_price = self.mid_price - self.mid_price * self.params.close;
  261. self.optimal_ask_price = Self::DONT_VIEW;
  262. } else {
  263. if self.signal > Decimal::ZERO {
  264. self.bid_delta = dec!(0);
  265. self.ask_delta = dec!(-2);
  266. self.optimal_bid_price = self.mid_price - self.mid_price * self.params.open;
  267. self.optimal_ask_price = Self::DONT_VIEW;
  268. } else if self.signal < Decimal::ZERO {
  269. self.ask_delta = dec!(0);
  270. self.bid_delta = dec!(-2);
  271. self.optimal_ask_price = self.mid_price + self.mid_price * self.params.open;
  272. self.optimal_bid_price = Self::DONT_VIEW;
  273. } else {
  274. self.bid_delta = dec!(0);
  275. self.ask_delta = dec!(0);
  276. self.optimal_bid_price = self.mid_price - self.mid_price * self.params.open;
  277. self.optimal_ask_price = self.mid_price + self.mid_price * self.params.open;
  278. }
  279. }
  280. self.optimal_ask_price.rescale(self.mid_price.scale());
  281. self.optimal_bid_price.rescale(self.mid_price.scale());
  282. }
  283. pub fn check_ready(&mut self) {
  284. if self.is_ready {
  285. return;
  286. }
  287. if self.mid_price.is_zero() {
  288. return;
  289. }
  290. for fair_price in &self.fair_price_vec {
  291. if fair_price.is_zero() {
  292. return;
  293. }
  294. }
  295. if self.ask_price.is_zero() {
  296. return;
  297. }
  298. if self.bid_price.is_zero() {
  299. return;
  300. }
  301. if self.balance.is_zero() {
  302. return;
  303. }
  304. self.is_ready = true;
  305. info!("========================================行情数据预热完毕==================================")
  306. }
  307. // #[instrument(skip(self), level="TRACE")]
  308. async fn processor(&mut self, data_time: Decimal, is_hard_update: bool) {
  309. self.check_ready();
  310. if !self.is_ready {
  311. return;
  312. }
  313. self.update_delta();
  314. // let cci_arc = self.cci_arc.clone();
  315. let now = data_time;
  316. let mid_price = self.mid_price;
  317. let ask_price = self.fair_price_std_vec[0];
  318. let bid_price = self.fair_price_std_vec[1];
  319. let optimal_ask_price = self.optimal_ask_price;
  320. let optimal_bid_price = self.optimal_bid_price;
  321. let last_price = Self::DONT_VIEW;
  322. let fair_price = Self::DONT_VIEW;
  323. let spread = Self::DONT_VIEW;
  324. let spread_min = self.spread_vec[0];
  325. let spread_max = self.spread_vec[1];
  326. // let spread = self.price_times_avg;
  327. // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0];
  328. // let spread_min = self.fair_price / self.mid_price;
  329. let inventory = self.inventory;
  330. let sigma_square = self.signal;
  331. let gamma = self.balance;
  332. let kappa = Decimal::ZERO;
  333. let flow_ratio = Decimal::ZERO;
  334. let need_append = now - self.prev_insert_time > Decimal::ONE_HUNDRED;
  335. if !need_append && !is_hard_update {
  336. return;
  337. }
  338. if !is_hard_update {
  339. self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
  340. }
  341. let pos_avg_price = self.pos_avg_price;
  342. if is_hard_update {
  343. self.debug_sender.unbounded_send(vec![
  344. now,
  345. if pos_avg_price.is_zero() { mid_price } else { pos_avg_price },
  346. Self::DONT_VIEW,
  347. Self::DONT_VIEW,
  348. Self::DONT_VIEW,
  349. Self::DONT_VIEW,
  350. Self::DONT_VIEW,
  351. Self::DONT_VIEW,
  352. Self::DONT_VIEW,
  353. Self::DONT_VIEW,
  354. inventory,
  355. Self::DONT_VIEW,
  356. Self::DONT_VIEW,
  357. Self::DONT_VIEW,
  358. Self::DONT_VIEW,
  359. Self::DONT_VIEW,
  360. pos_avg_price
  361. ]).unwrap();
  362. } else {
  363. self.debug_sender.unbounded_send(vec![
  364. now,
  365. mid_price,
  366. ask_price,
  367. bid_price,
  368. last_price,
  369. spread,
  370. spread_max,
  371. spread_min,
  372. optimal_ask_price,
  373. optimal_bid_price,
  374. inventory,
  375. sigma_square,
  376. gamma,
  377. kappa,
  378. flow_ratio,
  379. fair_price,
  380. pos_avg_price
  381. ]).unwrap();
  382. }
  383. }
  384. // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
  385. pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
  386. vec![]
  387. }
  388. }