predictor.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. use std::cmp::max;
  2. use std::collections::{BTreeMap, VecDeque};
  3. use std::sync::Arc;
  4. use chrono::{Utc};
  5. use futures_channel::mpsc::UnboundedSender;
  6. use futures_util::StreamExt;
  7. use rust_decimal::prelude::*;
  8. use rust_decimal_macros::dec;
  9. use tokio::sync::{Mutex};
  10. use tracing::{info};
  11. use global::cci::CentralControlInfo;
  12. use global::fixed_time_range_deque::FixedTimeRangeDeque;
  13. use global::params::Params;
  14. use standard::{Depth, Record, Ticker, Trade};
  15. use crate::utils;
  16. #[derive(Debug, Clone)]
  17. pub struct Predictor {
  18. pub depth_vec: Vec<Depth>, // 深度队列
  19. pub record_vec: VecDeque<Record>, // 蜡烛队列
  20. pub spread_vec: VecDeque<Decimal>, // 价差队列
  21. pub trade_price_long_vec: FixedTimeRangeDeque<Decimal>,
  22. pub trade_233_vec: FixedTimeRangeDeque<Trade>,
  23. pub trade_0_vec: FixedTimeRangeDeque<Trade>,
  24. pub mid_price: Decimal, // 中间价
  25. pub fair_price: Decimal,
  26. pub ask_price: Decimal, // 卖一价
  27. pub bid_price: Decimal, // 买一价
  28. pub last_price: Decimal, // 最后成交价
  29. pub optimal_ask_price: Decimal, // 卖出挂单价
  30. pub optimal_bid_price: Decimal, // 买入挂单价
  31. pub inventory: Decimal, // 库存,也就是q
  32. pub pos_amount: Decimal, // 原始持仓量
  33. pub pos_avg_price: Decimal, // 原始持仓价格
  34. pub balance: Decimal, // 初始余额
  35. pub prev_balance: Decimal,
  36. pub signal: Decimal, // 大于0代表此时是正向信号,小于0则相反
  37. pub ask_delta: Decimal, // δa
  38. pub bid_delta: Decimal, // δb
  39. pub fair_price_vec: Vec<Decimal>, // 公平价格列表
  40. pub fair_price_std_vec: Vec<Decimal>, // 公平价格列表,标准化之后的
  41. pub price_avg_times_vec: Vec<Decimal>, // 公平所与做市所的价格倍率的平均值
  42. pub price_avg_times_long_vec: Vec<Decimal>, // 公平所与做市所的价格倍率的平均值
  43. pub is_ready: bool, // 是否已准备好
  44. pub last_update_time: Decimal, // 最后更新时间(depth)
  45. pub last_index: Decimal, // 最后更新的index
  46. pub prev_insert_time: Decimal,
  47. pub prev_save_time: Decimal,
  48. pub init_time: Decimal,
  49. pub prev_update_open_params_time: Decimal,
  50. pub params: Params,
  51. pub debug_sender: UnboundedSender<Vec<Decimal>>
  52. }
  53. impl Predictor {
  54. // 时间窗口大小(微秒)
  55. // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
  56. // const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000;
  57. const TRADE_LONG_RANGE_MICROS: i64 = 10 * 60_000_000;
  58. // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
  59. const TRADE_SHORT_RANGE_MICROS: i64 = 2 * 60_000_000;
  60. // const ONE_MILLION: Decimal = dec!(1_000_000);
  61. // const TWENTY_THOUSAND: Decimal = dec!(20_000);
  62. const DONT_VIEW: Decimal = dec!(14142135623730951);
  63. pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
  64. // 创建数据通道
  65. // 创建一个无界通道
  66. let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
  67. let account_port = params.port.clone();
  68. tokio::spawn(async move {
  69. let len = 17usize;
  70. let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  71. let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
  72. while let Some(value) = rx.next().await {
  73. // 数据填充到对应位置
  74. for i in 0..len {
  75. if value[i] == Self::DONT_VIEW {
  76. debugs[i].push_back(None);
  77. } else {
  78. debugs[i].push_back(Some(value[i]));
  79. }
  80. }
  81. // 长度限制
  82. if debugs[0].len() > 500_000 {
  83. for i in 0..len {
  84. debugs[i].pop_front(); // 从前面移除元素
  85. }
  86. }
  87. let now = Decimal::from(Utc::now().timestamp_millis());
  88. if now - prev_save_time < dec!(30000) {
  89. continue;
  90. }
  91. let debugs_clone = debugs.clone();
  92. let temp_html_str = tokio::task::spawn_blocking(move || {
  93. utils::build_html_file(&debugs_clone)
  94. }).await.unwrap();
  95. let path = format!("./db/{}.html", account_port);
  96. utils::write_to_file(&temp_html_str, path).await;
  97. prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  98. }
  99. });
  100. let predictor = Self {
  101. // 接针版本
  102. depth_vec: vec![Depth::new(); params.ref_exchange.len()],
  103. fair_price_std_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  104. fair_price_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  105. price_avg_times_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  106. price_avg_times_long_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  107. record_vec: VecDeque::new(),
  108. trade_price_long_vec: FixedTimeRangeDeque::new(Self::TRADE_LONG_RANGE_MICROS),
  109. trade_233_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS),
  110. trade_0_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS),
  111. spread_vec: VecDeque::new(),
  112. mid_price: Default::default(),
  113. fair_price: Default::default(),
  114. ask_price: Default::default(),
  115. bid_price: Default::default(),
  116. last_price: Default::default(),
  117. optimal_ask_price: Default::default(),
  118. optimal_bid_price: Default::default(),
  119. ask_delta: Default::default(),
  120. bid_delta: Default::default(),
  121. is_ready: false,
  122. inventory: Default::default(),
  123. pos_avg_price: Default::default(),
  124. pos_amount: Default::default(),
  125. balance: Default::default(),
  126. prev_balance: Default::default(),
  127. signal: Default::default(),
  128. last_update_time: Default::default(),
  129. last_index: Default::default(),
  130. prev_insert_time: Default::default(),
  131. prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
  132. init_time: Decimal::from(Utc::now().timestamp_millis()),
  133. prev_update_open_params_time: Default::default(),
  134. params,
  135. debug_sender: tx,
  136. };
  137. predictor
  138. }
  139. pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
  140. self.last_update_time = depth.time;
  141. self.last_index = Decimal::from(index);
  142. if index == 233 {
  143. self.ask_price = depth.asks[0].price;
  144. self.bid_price = depth.bids[0].price;
  145. self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
  146. } else {
  147. self.update_fair_price(depth, index).await;
  148. self.depth_vec[index] = depth.clone();
  149. }
  150. if self.mid_price.is_zero() {
  151. return;
  152. }
  153. self.processor(depth.time, false).await;
  154. }
  155. pub async fn on_trade(&mut self, trade: &Trade, index: usize) {
  156. // index == 233代表做市所
  157. // index == 0代表参考所
  158. self.last_price = trade.price;
  159. self.trade_price_long_vec.push_back(trade.price);
  160. if index == 233 {
  161. self.trade_233_vec.push_back(trade.clone());
  162. } else if index == 0 {
  163. self.trade_0_vec.push_back(trade.clone());
  164. }
  165. // self.processor().await;
  166. }
  167. pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
  168. pub async fn on_record(&mut self, _record: &Record) {}
  169. pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal, update_time: Decimal) {
  170. if self.mid_price.is_zero() {
  171. return;
  172. }
  173. let prev_pos_amount = self.pos_amount;
  174. self.pos_amount = pos_amount.clone();
  175. self.pos_avg_price = pos_avg_price.clone();
  176. self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc();
  177. // 小于1但不为0的情况,需要平完
  178. if self.inventory.is_zero() && !pos_amount.is_zero() {
  179. self.inventory = if pos_amount > &Decimal::ZERO {
  180. Decimal::ONE
  181. } else {
  182. Decimal::NEGATIVE_ONE
  183. };
  184. }
  185. if prev_pos_amount != self.pos_amount {
  186. self.processor(update_time, true).await;
  187. }
  188. }
  189. pub async fn on_balance(&mut self, balance: Decimal) {
  190. self.balance = balance;
  191. }
  192. pub fn get_real_rate(price_vec: &FixedTimeRangeDeque<Decimal>) -> Decimal {
  193. let last_fair_price = price_vec.deque.iter().last().unwrap();
  194. let min_price = price_vec.deque.iter().min().unwrap();
  195. let max_price = price_vec.deque.iter().max().unwrap();
  196. let up_rate = (last_fair_price - min_price) / min_price;
  197. let down_rate = (max_price - last_fair_price) / max_price;
  198. if up_rate > down_rate {
  199. up_rate
  200. } else {
  201. -down_rate
  202. }
  203. }
  204. pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) {
  205. if self.mid_price.is_zero() {
  206. return;
  207. }
  208. let a1 = &depth.asks[0];
  209. let b1 = &depth.bids[0];
  210. // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price
  211. // let total = a1.value + b1.value;
  212. // let fair_price = (a1.price + b1.price) / Decimal::TWO;
  213. // 生成fp
  214. // self.fair_price_vec[index] = a1.price * b1.value / total + b1.price * a1.value / total;
  215. self.fair_price_vec[index] = (a1.price + b1.price) / Decimal::TWO;
  216. self.fair_price_vec[index].rescale(self.mid_price.scale());
  217. // 求价格倍率
  218. self.price_avg_times_vec[index] = if !self.is_ready {
  219. self.fair_price_vec[index] / self.mid_price
  220. } else {
  221. self.price_avg_times_vec[index] * dec!(0.999) + dec!(0.002) * self.fair_price_vec[index] / self.mid_price
  222. };
  223. self.price_avg_times_long_vec[index] = if !self.is_ready {
  224. self.fair_price_vec[index] / self.mid_price
  225. } else {
  226. self.price_avg_times_long_vec[index] * dec!(0.9995) + dec!(0.0005) * self.fair_price_vec[index] / self.mid_price
  227. };
  228. // 合成公平价格
  229. self.fair_price_std_vec[index] = self.fair_price_vec[index] / self.price_avg_times_vec[index];
  230. // 开仓信号处理
  231. self.signal = Decimal::ZERO;
  232. for (i, price_avg_times_long) in self.price_avg_times_long_vec.iter().enumerate() {
  233. if price_avg_times_long.is_zero() {
  234. return;
  235. }
  236. let price_avg_times_short = self.price_avg_times_vec[i];
  237. self.signal = self.signal + price_avg_times_short - price_avg_times_long;
  238. }
  239. // self.signal = self.signal / self.params.min_spread;
  240. // self.signal.rescale(0);
  241. // 生成最终用于挂单的公平价格
  242. let fair_price_sum: Decimal = self.fair_price_std_vec.iter().sum();
  243. let fair_price_count = self.fair_price_std_vec.iter()
  244. .filter(|&&value| value != Decimal::new(0, 0)) // 过滤掉0
  245. .count();
  246. if fair_price_count != 0 {
  247. self.fair_price = fair_price_sum / Decimal::from(fair_price_count);
  248. let mut spread_abs = ((self.fair_price - self.mid_price) / self.mid_price).abs();
  249. spread_abs.rescale(5);
  250. self.spread_vec.push_back(spread_abs);
  251. if self.spread_vec.len() > 1000 {
  252. self.spread_vec.pop_front();
  253. }
  254. let opt_abs_value = self.spread_vec.iter().max().unwrap().clone();
  255. self.params.open = max(max(self.params.min_open, dec!(0.0006)), opt_abs_value);
  256. }
  257. }
  258. pub async fn update_delta(&mut self) {
  259. if self.mid_price.is_zero() {
  260. return;
  261. }
  262. for fair_price in &self.fair_price_vec {
  263. if fair_price.is_zero() {
  264. return;
  265. }
  266. }
  267. let is_close_long = self.inventory > Decimal::ZERO;
  268. let is_close_short = self.inventory < Decimal::ZERO;
  269. if is_close_long {
  270. self.ask_delta = dec!(0);
  271. self.bid_delta = dec!(-2);
  272. self.optimal_ask_price = self.fair_price + self.fair_price * self.params.close;
  273. self.optimal_bid_price = Self::DONT_VIEW;
  274. } else if is_close_short {
  275. self.bid_delta = dec!(0);
  276. self.ask_delta = dec!(-2);
  277. self.optimal_bid_price = self.fair_price - self.fair_price * self.params.close;
  278. self.optimal_ask_price = Self::DONT_VIEW;
  279. } else {
  280. if self.signal > Decimal::ZERO {
  281. self.bid_delta = dec!(0);
  282. self.ask_delta = dec!(-2);
  283. self.optimal_bid_price = self.fair_price - self.fair_price * (self.params.open - self.signal);
  284. self.optimal_ask_price = Self::DONT_VIEW;
  285. } else if self.signal < Decimal::ZERO {
  286. self.ask_delta = dec!(0);
  287. self.bid_delta = dec!(-2);
  288. self.optimal_ask_price = self.fair_price + self.fair_price * (self.params.open + self.signal);
  289. self.optimal_bid_price = Self::DONT_VIEW;
  290. } else {
  291. self.bid_delta = dec!(0);
  292. self.ask_delta = dec!(0);
  293. self.optimal_bid_price = self.fair_price - self.fair_price * self.params.open;
  294. self.optimal_ask_price = self.fair_price + self.fair_price * self.params.open;
  295. }
  296. }
  297. self.optimal_ask_price.rescale(self.mid_price.scale());
  298. self.optimal_bid_price.rescale(self.mid_price.scale());
  299. }
  300. pub fn check_ready(&mut self) {
  301. if self.is_ready {
  302. return;
  303. }
  304. if self.mid_price.is_zero() {
  305. return;
  306. }
  307. for fair_price in &self.fair_price_vec {
  308. if fair_price.is_zero() {
  309. return;
  310. }
  311. }
  312. if self.ask_price.is_zero() {
  313. return;
  314. }
  315. if self.bid_price.is_zero() {
  316. return;
  317. }
  318. if self.balance.is_zero() {
  319. return;
  320. }
  321. self.is_ready = true;
  322. info!("========================================行情数据预热完毕==================================")
  323. }
  324. // #[instrument(skip(self), level="TRACE")]
  325. async fn processor(&mut self, data_time: Decimal, is_hard_update: bool) {
  326. self.check_ready();
  327. if !self.is_ready {
  328. return;
  329. }
  330. self.update_delta().await;
  331. // let cci_arc = self.cci_arc.clone();
  332. let now = data_time;
  333. let mid_price = self.mid_price;
  334. let ask_price = if self.params.ref_exchange.len() > 0 {
  335. self.fair_price_vec[0]
  336. } else {
  337. Self::DONT_VIEW
  338. };
  339. let bid_price = if self.params.ref_exchange.len() > 1 {
  340. self.fair_price_vec[1]
  341. } else {
  342. Self::DONT_VIEW
  343. };
  344. let optimal_ask_price = self.optimal_ask_price;
  345. let optimal_bid_price = self.optimal_bid_price;
  346. let last_price = Self::DONT_VIEW;
  347. let fair_price = self.fair_price;
  348. let total_amount_0: Decimal = self.trade_233_vec.deque.iter().map(|trade| trade.value).sum();
  349. let total_amount_1: Decimal = self.trade_0_vec.deque.iter().map(|trade| trade.value).sum();
  350. let spread = Self::DONT_VIEW;
  351. let spread_min = (self.fair_price - self.mid_price) / self.mid_price;
  352. let spread_max = self.params.open;
  353. // let spread = self.price_times_avg;
  354. // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0];
  355. // let spread_min = self.fair_price / self.mid_price;
  356. let inventory = self.inventory;
  357. let sigma_square = if total_amount_0 + total_amount_1 == Decimal::ZERO {
  358. Decimal::ZERO
  359. } else {
  360. total_amount_0 / (total_amount_0 + total_amount_1)
  361. };
  362. let gamma = self.balance;
  363. let kappa = Decimal::from(Utc::now().timestamp_millis()) - data_time;
  364. let flow_ratio = Decimal::ZERO;
  365. let need_append = now - self.prev_insert_time > dec!(500);
  366. if !need_append && !is_hard_update {
  367. return;
  368. }
  369. if !is_hard_update {
  370. self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
  371. }
  372. let pos_avg_price = self.pos_avg_price;
  373. self.debug_sender.unbounded_send(vec![
  374. now,
  375. mid_price,
  376. ask_price,
  377. bid_price,
  378. last_price,
  379. spread,
  380. spread_max,
  381. spread_min,
  382. optimal_ask_price,
  383. optimal_bid_price,
  384. inventory,
  385. sigma_square,
  386. gamma,
  387. kappa,
  388. flow_ratio,
  389. fair_price,
  390. pos_avg_price
  391. ]).unwrap();
  392. }
  393. // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
  394. pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
  395. vec![]
  396. }
  397. }