predictor.rs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. use std::cmp::{max, min};
  2. use std::collections::{BTreeMap, VecDeque};
  3. use std::sync::Arc;
  4. use chrono::{Utc};
  5. use futures_channel::mpsc::UnboundedSender;
  6. use futures_util::StreamExt;
  7. use rust_decimal::prelude::*;
  8. use rust_decimal_macros::dec;
  9. use tokio::sync::{Mutex};
  10. use tracing::{info};
  11. use global::cci::CentralControlInfo;
  12. use global::fixed_time_range_deque::FixedTimeRangeDeque;
  13. use global::params::Params;
  14. use standard::{Depth, Record, Ticker, Trade};
  15. use crate::utils;
  16. #[derive(Debug)]
  17. pub struct Predictor {
  18. pub depth_vec: Vec<Depth>, // 深度队列
  19. pub volume_vec: Vec<Decimal>, // 深度队列
  20. pub trade_long_vec: FixedTimeRangeDeque<Trade>, // 交易队列
  21. pub trade_short_vec: FixedTimeRangeDeque<Trade>, // 交易队列
  22. pub spread_vec: Vec<Decimal>, // 市场冲击队列
  23. pub record_vec: VecDeque<Record>, // 蜡烛队列
  24. pub mid_price: Decimal, // 中间价
  25. pub ask_price: Decimal, // 卖一价
  26. pub bid_price: Decimal, // 买一价
  27. pub last_price: Decimal, // 最后成交价
  28. pub spread: Decimal, // 市场冲击
  29. pub spread_max: Decimal, // 最大市场冲击
  30. pub spread_best: Decimal, // 最佳市场冲击
  31. pub optimal_ask_price: Decimal, // 卖出挂单价
  32. pub optimal_bid_price: Decimal, // 买入挂单价
  33. pub inventory: Decimal, // 库存,也就是q
  34. pub pos_amount: Decimal, // 原始持仓量
  35. pub pos_avg_price: Decimal, // 原始持仓价格
  36. pub level: Decimal, // martin
  37. pub sigma_square: Decimal, // σ^2,波动性的平方
  38. pub gamma: Decimal, // γ,库存风险厌恶参数
  39. pub kappa: Decimal, // κ 订单簿 流动性 参数
  40. pub flow_ratio: Decimal, // 资金流比例
  41. pub money_flow_index: Decimal, // MFI
  42. pub long_trade_len_dec: Decimal,
  43. pub short_trade_len_dec: Decimal,
  44. pub error_rate: Decimal, // 犯错概率(预估)
  45. pub dir: Decimal, // 行情方向
  46. pub ask_delta: Decimal, // δa
  47. pub bid_delta: Decimal, // δb
  48. pub base_delta: Decimal, // 基础挂单距离
  49. pub ratio_edge: Decimal, // 资金流修正的挂单距离
  50. pub fair_price_vec: Vec<Decimal>, // 预定价格队列
  51. pub fair_price: Decimal, // 预定价格
  52. pub fair_price_when_ordering: Decimal, // 下单时的预定价格
  53. pub price_times_avg: Decimal, // 公平所与做市所的价格倍率的平均值
  54. pub is_regressed: bool, // 做市所的价格是否已经回归
  55. pub cci_arc: Arc<Mutex<CentralControlInfo>>, // 中控信息
  56. pub is_ready: bool,
  57. pub prev_trade_time: i64, // 上次交易时间,也就是t
  58. pub close_price: Decimal, // 平仓价格
  59. pub t_diff: Decimal, // (T-t)
  60. pub last_update_time: Decimal,
  61. pub last_index: Decimal,
  62. pub prev_insert_time: Decimal,
  63. pub prev_save_time: Decimal,
  64. pub params: Params,
  65. pub debug_sender: UnboundedSender<Vec<Decimal>>,
  66. }
  67. impl Predictor {
  68. // 时间窗口大小(微秒)
  69. // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
  70. const TIME_DIFF_RANGE_MICROS: i64 = 10 * 60_000_000;
  71. const TRADE_LONG_RANGE_MICROS: i64 = 3 * 60_000_000;
  72. // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
  73. const TRADE_SHORT_RANGE_MICROS: i64 = 30_000_000;
  74. // const ONE_MILLION: Decimal = dec!(1_000_000);
  75. // const TWENTY_THOUSAND: Decimal = dec!(20_000);
  76. const IRA: Decimal = dec!(1);
  77. pub fn new(cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
  78. // 创建数据通道
  79. // 创建一个无界通道
  80. let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
  81. tokio::spawn(async move {
  82. let len = 16usize;
  83. let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  84. let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
  85. while let Some(value) = rx.next().await {
  86. // 数据填充到对应位置
  87. for i in 0..len {
  88. if value[i] == dec!(14142135623730951) {
  89. debugs[i].push_back(None);
  90. } else {
  91. debugs[i].push_back(Some(value[i]));
  92. }
  93. }
  94. // 长度限制
  95. if debugs[0].len() > 500_000 {
  96. for i in 0..len {
  97. debugs[i].pop_front(); // 从前面移除元素
  98. }
  99. }
  100. let now = Decimal::from(Utc::now().timestamp_millis());
  101. if now - prev_save_time < dec!(60000) {
  102. continue;
  103. }
  104. let debugs_clone = debugs.clone();
  105. let temp_html_str = tokio::task::spawn_blocking(move || {
  106. utils::build_html_file(&debugs_clone)
  107. }).await.unwrap();
  108. utils::write_to_file(&temp_html_str, "./db/db.html".to_string()).await;
  109. prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  110. }
  111. });
  112. let predictor = Self {
  113. // 接针版本
  114. depth_vec: vec![Depth::new(); 10],
  115. fair_price_vec: vec![Decimal::ZERO; 10],
  116. volume_vec: vec![Decimal::ZERO; 10],
  117. // 老的队列
  118. spread_vec: vec![],
  119. trade_long_vec: FixedTimeRangeDeque::new(Self::TRADE_LONG_RANGE_MICROS),
  120. trade_short_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS),
  121. record_vec: VecDeque::new(),
  122. mid_price: Default::default(),
  123. ask_price: Default::default(),
  124. bid_price: Default::default(),
  125. last_price: Default::default(),
  126. spread: Default::default(),
  127. spread_max: Default::default(),
  128. spread_best: Default::default(),
  129. optimal_ask_price: Default::default(),
  130. optimal_bid_price: Default::default(),
  131. inventory: Default::default(),
  132. gamma: Default::default(),
  133. sigma_square: Default::default(),
  134. ask_delta: Default::default(),
  135. bid_delta: Default::default(),
  136. base_delta: Default::default(),
  137. ratio_edge: Default::default(),
  138. kappa: Default::default(),
  139. fair_price: Default::default(),
  140. fair_price_when_ordering: Default::default(),
  141. price_times_avg: Default::default(),
  142. is_regressed: false,
  143. cci_arc,
  144. is_ready: false,
  145. prev_trade_time: Utc::now().timestamp_micros(),
  146. close_price: Default::default(),
  147. t_diff: Default::default(),
  148. level: Default::default(),
  149. flow_ratio: Default::default(),
  150. money_flow_index: Default::default(),
  151. pos_amount: Default::default(),
  152. error_rate: Default::default(),
  153. long_trade_len_dec: Default::default(),
  154. short_trade_len_dec: Default::default(),
  155. last_update_time: Default::default(),
  156. last_index: Default::default(),
  157. pos_avg_price: Default::default(),
  158. prev_insert_time: Default::default(),
  159. prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
  160. dir: Default::default(),
  161. params,
  162. debug_sender: tx,
  163. };
  164. predictor
  165. }
  166. // 更新最大市场冲击
  167. pub fn update_spread_max(&mut self) {
  168. self.spread_max = if let Some(&max_value) = self.spread_vec.iter().max() {
  169. max_value
  170. } else {
  171. Decimal::NEGATIVE_ONE
  172. };
  173. }
  174. // 更新最佳市场冲击
  175. // pub fn update_spread_best(&mut self) {
  176. // self.spread_best = self.spread_max;
  177. // let mut max_count = 0usize;
  178. //
  179. // for (spread, count) in self.spread_count_map.iter() {
  180. // // info!(?spread, ?count);
  181. //
  182. // if *count < max_count {
  183. // continue
  184. // }
  185. //
  186. // self.spread_best = *spread;
  187. // max_count = *count;
  188. // }
  189. // // info!("======================")
  190. // }
  191. pub fn update_spread(&mut self) {
  192. // if self.trade_long_vec.len() > 0 {
  193. let prev_depth_0 = &self.depth_vec[0];
  194. if prev_depth_0.time.is_zero() {
  195. return;
  196. }
  197. let prev_mid_price = (prev_depth_0.asks[0].price + prev_depth_0.bids[0].price) / Decimal::TWO;
  198. let now_spread = self.mid_price - prev_mid_price;
  199. if !now_spread.is_zero() {
  200. self.spread = now_spread;
  201. self.spread_vec.push(self.spread.abs());
  202. // 看空方向
  203. if now_spread < self.spread_max * Decimal::NEGATIVE_ONE && !self.spread_max.is_zero() {
  204. self.dir = Decimal::NEGATIVE_ONE
  205. }
  206. // 看多方向
  207. if now_spread > self.spread_max && !self.spread_max.is_zero() {
  208. self.dir = Decimal::ONE
  209. }
  210. // if last_trade_price > first_trade_price {
  211. // self.spread_long_vec.push(self.spread);
  212. // } else {
  213. // self.spread_short_vec.push(self.spread);
  214. // }
  215. while self.spread_vec.len() > 1_000 && self.inventory.is_zero() {
  216. self.spread_vec.remove(0);
  217. }
  218. self.update_spread_max();
  219. // self.update_spread_best();
  220. }
  221. // }
  222. }
  223. pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
  224. self.last_update_time = depth.time;
  225. self.last_index = Decimal::from(index);
  226. self.update_fair_price(depth, index).await;
  227. if index == 0 {
  228. self.ask_price = depth.asks[0].price;
  229. self.bid_price = depth.bids[0].price;
  230. self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
  231. self.update_spread();
  232. }
  233. self.depth_vec[index] = depth.clone();
  234. if self.mid_price.is_zero() {
  235. return;
  236. }
  237. self.processor().await;
  238. }
  239. pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
  240. // self.last_update_time = trade.time;
  241. self.trade_long_vec.push_back(trade.clone());
  242. self.trade_short_vec.push_back(trade.clone());
  243. self.long_trade_len_dec = Decimal::from_usize(self.trade_long_vec.len()).unwrap();
  244. self.short_trade_len_dec = Decimal::from_usize(self.trade_short_vec.len()).unwrap();
  245. self.error_rate = self.short_trade_len_dec / self.long_trade_len_dec;
  246. self.error_rate.rescale(4);
  247. self.last_price = trade.price;
  248. // self.processor().await;
  249. }
  250. pub async fn update_level(&mut self) {
  251. self.level = (Decimal::NEGATIVE_ONE + (Decimal::ONE + dec!(8) * self.inventory.abs()).sqrt().unwrap()) / Decimal::TWO;
  252. self.level = min(self.level, dec!(6));
  253. }
  254. pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
  255. pub async fn on_record(&mut self, record: &Record) {
  256. // 添加新蜡烛
  257. if self.record_vec.len() == 0 {
  258. self.record_vec.push_back(record.clone());
  259. } else {
  260. let last_record = self.record_vec.back_mut().unwrap();
  261. if last_record.time == record.time {
  262. *last_record = record.clone();
  263. } else if last_record.time < record.time {
  264. self.record_vec.push_back(record.clone());
  265. }
  266. }
  267. if self.record_vec.len() > 4 {
  268. self.record_vec.pop_front();
  269. }
  270. // 如果蜡烛数量足够,则更新mfi
  271. if self.record_vec.len() >= 4 {
  272. self.update_mfi();
  273. }
  274. }
  275. pub fn update_mfi(&mut self) {
  276. let mut money_flow_in = Decimal::ZERO;
  277. let mut money_flow_out = Decimal::ZERO;
  278. let _3 = dec!(3);
  279. for record in self.record_vec.iter() {
  280. let typical_price = (record.high + record.low + record.close) / _3;
  281. let money_flow = typical_price * record.volume;
  282. if record.close > record.open {
  283. money_flow_in += money_flow;
  284. } else if record.close < record.open {
  285. money_flow_out += money_flow;
  286. }
  287. }
  288. self.money_flow_index = if money_flow_out.is_zero() {
  289. Decimal::ONE_HUNDRED
  290. } else {
  291. let money_flow_ratio = money_flow_in / money_flow_out;
  292. Decimal::ONE_HUNDRED - Decimal::ONE_HUNDRED / (Decimal::ONE + money_flow_ratio)
  293. };
  294. self.update_flow_ratio();
  295. }
  296. pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal) {
  297. if self.mid_price.is_zero() {
  298. return;
  299. }
  300. let prev_inventory = self.inventory;
  301. self.pos_amount = pos_amount.clone();
  302. self.pos_avg_price = pos_avg_price.clone();
  303. self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc();
  304. // 小于1但不为0的情况,需要平完
  305. if self.inventory.is_zero() && !pos_amount.is_zero() {
  306. self.inventory = if pos_amount > &Decimal::ZERO {
  307. Decimal::ONE
  308. } else {
  309. Decimal::NEGATIVE_ONE
  310. };
  311. }
  312. if prev_inventory != self.inventory && prev_inventory.is_zero() {
  313. self.prev_trade_time = Utc::now().timestamp_micros();
  314. self.close_price = self.fair_price_when_ordering;
  315. }
  316. if prev_inventory != self.inventory && self.inventory.is_zero() {
  317. self.is_regressed = false;
  318. }
  319. self.update_level().await;
  320. self.processor().await;
  321. }
  322. pub fn update_sigma_square(&mut self) {
  323. self.sigma_square = self.fair_price * self.params.open;
  324. self.sigma_square.rescale(10);
  325. }
  326. pub fn update_gamma(&mut self) {
  327. self.gamma = dec!(0.236) * Self::IRA;
  328. }
  329. pub fn update_kappa(&mut self) {
  330. if self.mid_price > Decimal::ZERO {
  331. self.kappa = dec!(888) / self.mid_price;
  332. } else {
  333. self.kappa = dec!(1);
  334. }
  335. }
  336. pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) {
  337. if self.mid_price.is_zero() {
  338. return;
  339. }
  340. let a1 = &depth.asks[0];
  341. let b1 = &depth.bids[0];
  342. let mid = (a1.price + b1.price) / Decimal::TWO;
  343. let i_upper = (a1.value - b1.value) / (a1.value + b1.value);
  344. // let s_upper = (a1.price - b1.price) / mid;
  345. // let f_t = Decimal::ZERO;
  346. // let s = (s_upper + f_t) / Decimal::TWO;
  347. //
  348. // let a_upper_t = dec!(0.6);
  349. // let c_t = dec!(0);
  350. // let theta = a_upper_t * s + c_t;
  351. // let fair_price = mid + theta * (i_upper * (i_upper.powd(Decimal::TWO) + Decimal::ONE)) / Decimal::TWO;
  352. let fair_price = mid + (a1.price - b1.price) * i_upper / Decimal::TWO;
  353. self.fair_price_vec[index] = if self.fair_price_vec[index].is_zero() {
  354. fair_price
  355. } else {
  356. self.fair_price_vec[index] * dec!(0.9) + fair_price * dec!(0.1)
  357. };
  358. self.fair_price_vec[index].rescale(self.mid_price.scale());
  359. self.volume_vec[index] = a1.size + b1.size;
  360. // 合成公平价格
  361. if !self.fair_price_vec[0].is_zero() && !self.fair_price_vec[1].is_zero() {
  362. self.price_times_avg = if self.price_times_avg.is_zero() {
  363. self.fair_price_vec[1] / self.fair_price_vec[0]
  364. } else {
  365. self.price_times_avg * dec!(0.9999) + dec!(0.0001) * self.fair_price_vec[1] / self.fair_price_vec[0]
  366. };
  367. // 进行价格归一化处理,公平所的价格有可能是带前缀的
  368. let fair_price_part0 = self.fair_price_vec[0] * dec!(0.2);
  369. let fair_price_part1 = (self.fair_price_vec[1] / self.price_times_avg) * dec!(0.8);
  370. self.fair_price = fair_price_part0 + fair_price_part1;
  371. }
  372. // 判断价格是否回归
  373. if !self.is_regressed && self.inventory > Decimal::ZERO && self.fair_price < self.mid_price * (Decimal::ONE - self.params.open) {
  374. self.is_regressed = true
  375. } else if !self.is_regressed && self.inventory < Decimal::ZERO && self.fair_price > self.mid_price * (Decimal::ONE + self.params.open) {
  376. self.is_regressed = true
  377. }
  378. }
  379. pub fn update_delta(&mut self) {
  380. // -2表示不想成交
  381. // -1表示市价成交
  382. if self.fair_price.is_zero() {
  383. return;
  384. }
  385. let is_open_long = (self.fair_price - self.mid_price) / self.mid_price > self.params.open;
  386. let is_open_short = (self.mid_price - self.fair_price) / self.mid_price > self.params.open;
  387. let is_close_long = self.inventory > Decimal::ZERO;
  388. let is_close_short = self.inventory < Decimal::ZERO;
  389. self.bid_delta = dec!(-2);
  390. self.ask_delta = dec!(-2);
  391. if is_close_long {
  392. if self.mid_price > self.pos_avg_price {
  393. // let profit_rate = (self.mid_price - self.pos_avg_price) / self.pos_avg_price;
  394. // let fill_rate = profit_rate / (self.params.open * dec!(50));
  395. // let close_rate = (Decimal::ONE - fill_rate) * self.params.open + self.params.close;
  396. //
  397. // self.ask_delta = self.mid_price * close_rate * self.t_diff;
  398. self.ask_delta = self.mid_price * self.params.close;
  399. } else if self.mid_price > self.fair_price {
  400. self.ask_delta = self.mid_price * self.params.close;
  401. }
  402. } else if is_close_short {
  403. if self.mid_price < self.pos_avg_price {
  404. // let profit_rate = (self.pos_avg_price - self.mid_price) / self.pos_avg_price;
  405. // let fill_rate = profit_rate / (self.params.open * dec!(50));
  406. // let close_rate = (Decimal::ONE - fill_rate) * self.params.open + self.params.close;
  407. //
  408. // self.bid_delta = self.mid_price * close_rate * self.t_diff;
  409. self.bid_delta = self.mid_price * self.params.close;
  410. } else if self.mid_price < self.fair_price {
  411. self.bid_delta = self.mid_price * self.params.close;
  412. }
  413. } else if is_open_long {
  414. let is_open_long_market = (self.fair_price - self.mid_price) / self.mid_price > self.params.open_market;
  415. self.bid_delta = if is_open_long_market {
  416. dec!(-1)
  417. } else {
  418. dec!(0)
  419. };
  420. } else if is_open_short {
  421. let is_open_short_market = (self.mid_price - self.fair_price) / self.mid_price > self.params.open_market;
  422. self.ask_delta = if is_open_short_market {
  423. dec!(-1)
  424. } else {
  425. dec!(0)
  426. }
  427. }
  428. }
  429. pub fn update_optimal_ask_and_bid(&mut self) {
  430. self.optimal_ask_price = if self.ask_delta == dec!(-1) {
  431. self.bid_price
  432. } else if self.ask_delta == dec!(-2) {
  433. dec!(14142135623730951)
  434. } else {
  435. max(self.ask_price + self.ask_delta, self.bid_price)
  436. };
  437. self.optimal_bid_price = if self.bid_delta == dec!(-1) {
  438. self.ask_price
  439. } else if self.bid_delta == dec!(-2) {
  440. dec!(14142135623730951)
  441. } else {
  442. min(self.bid_price - self.bid_delta, self.ask_price)
  443. };
  444. self.optimal_ask_price.rescale(self.mid_price.scale());
  445. self.optimal_bid_price.rescale(self.mid_price.scale());
  446. }
  447. pub fn update_t_diff(&mut self) {
  448. if self.prev_trade_time > 0 {
  449. let time_diff_decimal = Decimal::from_i64(Utc::now().timestamp_micros() - self.prev_trade_time).unwrap();
  450. self.t_diff = max(Decimal::ONE - time_diff_decimal / Decimal::from_i64(Self::TIME_DIFF_RANGE_MICROS).unwrap(), Decimal::ZERO);
  451. } else {
  452. self.t_diff = Decimal::ONE;
  453. }
  454. }
  455. // fn calc_flow_ratio(_prev_flow_ratio: &Decimal, min_volume: &Decimal, trades: &mut FixedTimeRangeDeque<Trade>) -> Decimal {
  456. // let mut flow_in_value = Decimal::ZERO;
  457. // let mut flow_out_value = Decimal::ZERO;
  458. // for trade_iter in trades.deque.iter() {
  459. // if trade_iter.size > Decimal::ZERO {
  460. // flow_in_value += trade_iter.value;
  461. // } else {
  462. // flow_out_value += trade_iter.value;
  463. // }
  464. // }
  465. //
  466. // // 使用EMA來更新資金流,確保平滑性
  467. // if flow_out_value + flow_in_value > *min_volume {
  468. // let now = (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value);
  469. // now
  470. // } else {
  471. // Decimal::ZERO
  472. // }
  473. // }
  474. pub fn update_flow_ratio(&mut self) {
  475. self.flow_ratio = if self.money_flow_index > dec!(80) {
  476. (self.money_flow_index - dec!(80)) / dec!(20)
  477. } else if self.money_flow_index < dec!(20) {
  478. Decimal::NEGATIVE_ONE * (dec!(20) - self.money_flow_index) / dec!(20)
  479. } else {
  480. Decimal::ZERO
  481. };
  482. }
  483. pub fn check_ready(&mut self) {
  484. if self.is_ready {
  485. return;
  486. }
  487. if self.mid_price == Decimal::ZERO {
  488. return;
  489. }
  490. if self.fair_price == Decimal::ZERO {
  491. return;
  492. }
  493. if self.ask_price == Decimal::ZERO {
  494. return;
  495. }
  496. if self.bid_price == Decimal::ZERO {
  497. return;
  498. }
  499. if self.optimal_ask_price < self.ask_price {
  500. return;
  501. }
  502. if self.optimal_bid_price > self.bid_price {
  503. return;
  504. }
  505. if self.trade_long_vec.len() < 100 {
  506. return;
  507. }
  508. self.is_ready = true;
  509. info!("========================================行情数据预热完毕==================================")
  510. }
  511. // #[instrument(skip(self), level="TRACE")]
  512. async fn processor(&mut self) {
  513. self.update_t_diff();
  514. self.update_sigma_square();
  515. self.update_gamma();
  516. self.update_kappa();
  517. self.update_delta();
  518. self.update_optimal_ask_and_bid();
  519. self.check_ready();
  520. if !self.is_ready {
  521. return;
  522. }
  523. // let mut smm = Decimal::ZERO;
  524. // if !self.depth_vec[1].time.is_zero() {
  525. // let sma = self.depth_vec[1].asks[0].price;
  526. // let smb = self.depth_vec[1].bids[0].price;
  527. // smm = (sma + smb) / Decimal::TWO;
  528. // }
  529. // let cci_arc = self.cci_arc.clone();
  530. let now = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
  531. let mid_price = self.mid_price;
  532. let ask_price = self.ask_price;
  533. let bid_price = self.bid_price;
  534. let last_price = self.last_price;
  535. let spread = self.mid_price;
  536. let spread_max = self.optimal_ask_price;
  537. let spread_min = self.optimal_bid_price;
  538. // let spread = self.price_times_avg;
  539. // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0];
  540. // let spread_min = self.fair_price / self.mid_price;
  541. let optimal_ask_price = self.optimal_ask_price;
  542. let optimal_bid_price = self.optimal_bid_price;
  543. let inventory = self.inventory;
  544. let sigma_square = if self.is_regressed { Decimal::ONE } else { Decimal::ZERO };
  545. let gamma = now - self.last_update_time;
  546. let kappa = self.fair_price / self.mid_price;
  547. let flow_ratio = Decimal::ZERO;
  548. let ref_price = self.fair_price;
  549. let need_append = now - self.prev_insert_time > Decimal::ONE_HUNDRED;
  550. if !need_append {
  551. return;
  552. }
  553. self.debug_sender.unbounded_send(vec![
  554. now,
  555. mid_price,
  556. ask_price,
  557. bid_price,
  558. last_price,
  559. spread,
  560. spread_max,
  561. spread_min,
  562. optimal_ask_price,
  563. optimal_bid_price,
  564. inventory,
  565. sigma_square,
  566. gamma,
  567. kappa,
  568. flow_ratio,
  569. ref_price
  570. ]).unwrap();
  571. self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
  572. }
  573. // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
  574. pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
  575. vec![]
  576. }
  577. }