predictor.rs 27 KB


  1. use std::cmp::{max, min};
  2. use std::collections::{BTreeMap, VecDeque};
  3. use std::sync::Arc;
  4. use chrono::{Utc};
  5. use futures_channel::mpsc::UnboundedSender;
  6. use futures_util::StreamExt;
  7. use rust_decimal::prelude::*;
  8. use rust_decimal_macros::dec;
  9. use tokio::sync::{Mutex};
  10. use tracing::{info};
  11. use global::cci::CentralControlInfo;
  12. use global::fixed_time_range_deque::FixedTimeRangeDeque;
  13. use global::params::Params;
  14. use standard::{Depth, Record, Ticker, Trade};
  15. use crate::utils;
  16. #[derive(Debug, Clone)]
  17. pub struct Predictor {
  18. pub depth_vec: Vec<Depth>, // 深度队列
  19. pub record_vec: VecDeque<Record>, // 蜡烛队列
  20. // 做市所的计算
  21. pub close_price_vec: FixedTimeRangeDeque<Record>,
  22. pub r_short: Decimal,
  23. pub r_long: Decimal,
  24. pub speed: Decimal,
  25. pub trend: Decimal,
  26. pub prices: Vec<Vec<FixedTimeRangeDeque<Decimal>>>, // [[[做市所], [参考所0]], ...]
  27. pub ks: Vec<Decimal>,
  28. pub bs: Vec<Decimal>,
  29. pub mid_price: Decimal, // 中间价
  30. pub ask_price: Decimal, // 中间价
  31. pub bid_price: Decimal, // 中间价
  32. pub fair_price: Decimal,
  33. pub last_price: Decimal, // 最后成交价
  34. pub optimal_ask_price: Decimal, // 卖出挂单价
  35. pub optimal_bid_price: Decimal, // 买入挂单价
  36. pub inventory: Decimal, // 库存,也就是q
  37. pub pos_amount: Decimal, // 原始持仓量
  38. pub pos_avg_price: Decimal, // 原始持仓价格
  39. pub balance: Decimal, // 初始余额
  40. pub prev_balance: Decimal,
  41. pub profit: Decimal,
  42. pub profit_high: Decimal,
  43. pub prev_open_time: Decimal,
  44. pub trade_condition: Decimal, // 交易信号
  45. pub trade_condition_time: Decimal, // 满足时的瞬时时间,用于控制开仓行为的持续时间
  46. pub ask_delta: Decimal, // δa
  47. pub bid_delta: Decimal, // δb
  48. pub mid_price_vec: Vec<Decimal>, // 每个交易所的中间价
  49. pub fair_price_std_vec: Vec<Decimal>, // 公平价格列表,标准化之后的
  50. pub price_avg_times_vec: Vec<Decimal>, // 公平所与做市所的价格倍率的平均值
  51. pub price_avg_times_long_vec: Vec<Decimal>, // 公平所与做市所的价格倍率的平均值
  52. pub is_ready: bool, // 是否已准备好
  53. pub last_update_time: Decimal, // 最后更新时间(depth)
  54. pub last_index: Decimal, // 最后更新的index
  55. pub prev_insert_time: Decimal,
  56. pub prev_save_time: Decimal,
  57. pub init_time: Decimal,
  58. pub fitting_delay: Decimal,
  59. pub prev_fitting_time_vec: Vec<Decimal>,
  60. pub params: Params,
  61. pub debug_sender: UnboundedSender<Vec<Decimal>>
  62. }
  63. impl Predictor {
  64. // 时间窗口大小(微秒)
  65. // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
  66. // const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000;
  67. // const TRADE_LONG_RANGE_MICROS: i64 = 10 * 60_000_000;
  68. // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
  69. // const TRADE_SHORT_RANGE_MICROS: i64 = 10_000_000;
  70. // const ONE_MILLION: Decimal = dec!(1_000_000);
  71. // const TWENTY_THOUSAND: Decimal = dec!(20_000);
  72. const DONT_VIEW: Decimal = dec!(14142135623730951);
  73. pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
  74. // 创建数据通道
  75. // 创建一个无界通道
  76. let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
  77. let account_port = params.port.clone();
  78. tokio::spawn(async move {
  79. let len = 17usize;
  80. let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  81. let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
  82. while let Some(value) = rx.next().await {
  83. // 数据填充到对应位置
  84. // // 第一步:获取插入位置, 这里有bug,持仓推送并不连续,有时候会导致图表显示错误……
  85. // let target_ts = value[0]; // 时间戳在values[0]
  86. // let insert_pos = debugs[0]
  87. // .binary_search_by(|ts| {
  88. // ts.as_ref() // 解包Option
  89. // .expect("Timestamp cannot be None")
  90. // .cmp(&target_ts)
  91. // })
  92. // .unwrap_or_else(|e| e);
  93. // 第二步:执行插入操作
  94. for i in 0..debugs.len() {
  95. let value = value.get(i).cloned().unwrap_or(Self::DONT_VIEW);
  96. // 其他队列按规则插入
  97. let elem = if value == Self::DONT_VIEW {
  98. None
  99. } else {
  100. Some(value)
  101. };
  102. debugs[i].push_back(elem)
  103. }
  104. // 长度限制
  105. if debugs[0].len() > 500_000 {
  106. for i in 0..len {
  107. debugs[i].pop_front(); // 从前面移除元素
  108. }
  109. }
  110. let now = Decimal::from(Utc::now().timestamp_millis());
  111. if now - prev_save_time < dec!(30000) {
  112. continue;
  113. }
  114. let debugs_clone = debugs.clone();
  115. let temp_html_str = tokio::task::spawn_blocking(move || {
  116. utils::build_html_file(&debugs_clone)
  117. }).await.unwrap();
  118. let path = format!("./db/{}.html", account_port);
  119. utils::write_to_file(&temp_html_str, path).await;
  120. prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  121. }
  122. });
  123. let predictor = Self {
  124. // 接针版本
  125. depth_vec: vec![Depth::new(); params.ref_exchange.len()],
  126. fair_price_std_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  127. mid_price_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  128. price_avg_times_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  129. price_avg_times_long_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  130. prices: vec![vec![FixedTimeRangeDeque::new(600_000_000); 2]; params.ref_exchange.len()],
  131. ks: vec![Decimal::ZERO; params.ref_exchange.len()],
  132. bs: vec![Decimal::ZERO; params.ref_exchange.len()],
  133. prev_fitting_time_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  134. close_price_vec: FixedTimeRangeDeque::new(600_000_000),
  135. r_short: Default::default(),
  136. r_long: Default::default(),
  137. speed: Default::default(),
  138. trend: Default::default(),
  139. mid_price: Default::default(),
  140. ask_price: Default::default(),
  141. bid_price: Default::default(),
  142. fair_price: Default::default(),
  143. last_price: Default::default(),
  144. optimal_ask_price: Self::DONT_VIEW,
  145. optimal_bid_price: Self::DONT_VIEW,
  146. ask_delta: dec!(-2),
  147. bid_delta: dec!(-2),
  148. is_ready: false,
  149. inventory: Default::default(),
  150. pos_avg_price: Default::default(),
  151. pos_amount: Default::default(),
  152. balance: Default::default(),
  153. prev_balance: Default::default(),
  154. profit: Default::default(),
  155. profit_high: Default::default(),
  156. trade_condition: Default::default(),
  157. trade_condition_time: Default::default(),
  158. last_update_time: Default::default(),
  159. last_index: Default::default(),
  160. prev_insert_time: Default::default(),
  161. prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
  162. init_time: Decimal::from(Utc::now().timestamp_millis()),
  163. fitting_delay: Default::default(),
  164. params,
  165. debug_sender: tx,
  166. prev_open_time: Default::default(),
  167. record_vec: Default::default(),
  168. };
  169. predictor
  170. }
  171. pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
  172. self.last_update_time = depth.time;
  173. self.last_index = Decimal::from(index);
  174. if index == 233 {
  175. self.ask_price = depth.asks[0].price;
  176. self.bid_price = depth.bids[0].price;
  177. self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
  178. // 计算利润(预估)
  179. if !self.inventory.is_zero() {
  180. self.profit = if self.inventory > Decimal::ZERO {
  181. (self.mid_price - self.pos_avg_price) / self.pos_avg_price
  182. } else {
  183. (self.pos_avg_price - self.mid_price) / self.pos_avg_price
  184. };
  185. self.profit.rescale(6);
  186. if self.profit_high < self.profit {
  187. self.profit_high = self.profit
  188. }
  189. }
  190. // 秒级k线处理,先只用处理收盘价就行
  191. let r = Record {
  192. time: Decimal::from(Utc::now().timestamp_millis()),
  193. open: self.mid_price,
  194. high: self.mid_price,
  195. low: self.mid_price,
  196. close: self.mid_price,
  197. volume: Default::default(),
  198. symbol: "".to_string(),
  199. };
  200. let is_need_push = self.close_price_vec.len() == 0
  201. || self.close_price_vec.deque.iter().last().unwrap().time - r.time > Decimal::ONE_THOUSAND
  202. ;
  203. if is_need_push {
  204. self.close_price_vec.push_back(r);
  205. let len = self.close_price_vec.len();
  206. // 求最后10秒的均值
  207. let mean_10s;
  208. if len >= 10 {
  209. let mut i = len - 1;
  210. let mut sum: Decimal = Decimal::ZERO;
  211. loop {
  212. if i == len - 11 {
  213. break
  214. }
  215. sum += self.close_price_vec.get(i).unwrap().close;
  216. i = i - 1;
  217. }
  218. mean_10s = sum / Decimal::from(10);
  219. self.r_short = (self.mid_price - mean_10s) / mean_10s;
  220. self.r_short.rescale(8);
  221. } else {
  222. self.r_short = self.mid_price;
  223. mean_10s = self.mid_price;
  224. }
  225. // 求最后300秒的均值,如果秒级k不到300秒,就用5分钟k的收盘价凑合用用
  226. let mean_300s;
  227. if len >= 300 {
  228. let mut i = len - 1;
  229. let mut sum: Decimal = Decimal::ZERO;
  230. loop {
  231. if i == len - 301 {
  232. break
  233. }
  234. sum += self.close_price_vec.get(i).unwrap().close;
  235. i = i - 1;
  236. }
  237. mean_300s = sum / Decimal::from(300);
  238. self.r_long = (self.mid_price - mean_300s) / mean_300s;
  239. self.r_long.rescale(8);
  240. } else if self.record_vec.len() == 5 && !self.mid_price.is_zero() {
  241. let mut i = self.record_vec.len() - 1;
  242. let mut sum: Decimal = Decimal::ZERO;
  243. loop {
  244. if i == 0 {
  245. break
  246. }
  247. sum += self.record_vec[i].close;
  248. i = i - 1;
  249. }
  250. mean_300s = sum / Decimal::from(5);
  251. self.r_long = (self.mid_price - mean_300s) / mean_300s;
  252. self.r_long.rescale(8);
  253. } else {
  254. self.r_long = self.mid_price;
  255. mean_300s = self.mid_price;
  256. }
  257. self.speed = if self.r_short > dec!(-0.0001) || self.r_long > dec!(-0.0001) {
  258. Decimal::ONE
  259. } else {
  260. self.r_short / self.r_long
  261. };
  262. self.trend = mean_10s / mean_300s;
  263. }
  264. // 拟合k与b
  265. for (mid_index, mp) in self.mid_price_vec.iter().enumerate() {
  266. if mp.is_zero() {
  267. continue
  268. }
  269. self.prices[mid_index][0].push_back(self.mid_price);
  270. self.prices[mid_index][1].push_back(mp.clone());
  271. // 拟合,60s拟合一次
  272. let before_fitting = Utc::now().timestamp_millis();
  273. if Decimal::from(before_fitting) - self.prev_fitting_time_vec[mid_index] > dec!(60_000) || self.prices[mid_index][0].len() < 1000 {
  274. // if Decimal::from(before_fitting) - self.prev_fitting_time_vec[mid_index] > dec!(60_000) {
  275. // info!("{}, {},", mid_index, self.prices[mid_index][0].len());
  276. // }
  277. if let Some((k, b)) = self.linear_least_squares(mid_index).await {
  278. self.ks[mid_index] = k;
  279. self.bs[mid_index] = b;
  280. self.fitting_delay = Decimal::from(Utc::now().timestamp_millis() - before_fitting);
  281. self.prev_fitting_time_vec[mid_index] = Decimal::from(before_fitting)
  282. } else {
  283. return;
  284. }
  285. }
  286. }
  287. } else {
  288. self.depth_vec[index] = depth.clone();
  289. let latest_price = (depth.asks[0].price + depth.bids[0].price) / Decimal::TWO;
  290. self.update_fair_price(&latest_price, index).await;
  291. }
  292. if self.mid_price.is_zero() {
  293. return;
  294. }
  295. self.processor(depth.time, false).await;
  296. }
  297. pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
  298. // index == 233代表做市所
  299. // index == 0,1,2,3...代表参考所
  300. self.last_price = trade.price;
  301. }
  302. pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
  303. pub async fn on_record(&mut self, record: &Record) {
  304. // 添加新蜡烛
  305. if self.record_vec.len() == 0 {
  306. self.record_vec.push_back(record.clone());
  307. } else {
  308. let last_record = self.record_vec.back_mut().unwrap();
  309. if last_record.time == record.time {
  310. *last_record = record.clone();
  311. } else if last_record.time < record.time {
  312. self.record_vec.push_back(record.clone());
  313. }
  314. }
  315. if self.record_vec.len() > 5 {
  316. self.record_vec.pop_front();
  317. }
  318. }
  319. pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal, update_time: Decimal) {
  320. if self.mid_price.is_zero() {
  321. return;
  322. }
  323. let prev_pos_amount = self.pos_amount;
  324. self.pos_amount = pos_amount.clone();
  325. self.pos_avg_price = pos_avg_price.clone();
  326. self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc();
  327. // 小于1但不为0的情况,需要平完
  328. if self.inventory.is_zero() && !pos_amount.is_zero() {
  329. self.inventory = if pos_amount > &Decimal::ZERO {
  330. Decimal::ONE
  331. } else {
  332. Decimal::NEGATIVE_ONE
  333. };
  334. }
  335. if prev_pos_amount != self.pos_amount {
  336. // 重置连续信号
  337. self.trade_condition = Decimal::ZERO;
  338. self.trade_condition_time = Decimal::ZERO;
  339. // 开仓
  340. if prev_pos_amount.is_zero() {
  341. self.prev_open_time = Decimal::from(Utc::now().timestamp_millis())
  342. }
  343. // 平仓
  344. if self.pos_amount.is_zero() {
  345. self.profit = Decimal::ZERO;
  346. self.profit_high = Decimal::ZERO;
  347. }
  348. self.processor(update_time, true).await;
  349. }
  350. }
  351. pub async fn on_balance(&mut self, balance: Decimal) {
  352. self.balance = balance;
  353. }
  354. pub fn get_real_rate(price_vec: &FixedTimeRangeDeque<Decimal>) -> Decimal {
  355. let last_fair_price = price_vec.deque.iter().last().unwrap();
  356. let min_price = price_vec.deque.iter().min().unwrap();
  357. let max_price = price_vec.deque.iter().max().unwrap();
  358. let up_rate = (last_fair_price - min_price) / min_price;
  359. let down_rate = (max_price - last_fair_price) / max_price;
  360. if up_rate > down_rate {
  361. up_rate
  362. } else {
  363. -down_rate
  364. }
  365. }
  366. pub async fn update_fair_price(&mut self, latest_price: &Decimal, index: usize) {
  367. if self.mid_price.is_zero() {
  368. return;
  369. }
  370. // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price
  371. // let total = a1.value + b1.value;
  372. // let fair_price = (a1.price + b1.price) / Decimal::TWO;
  373. // self.fair_price_vec[index] = a1.price * b1.value / total + b1.price * a1.value / total;
  374. let mut mp = latest_price.clone();
  375. mp.rescale(self.mid_price.scale());
  376. self.mid_price_vec[index] = mp;
  377. // 生成fp
  378. self.fair_price_std_vec[index] = mp * self.ks[index] + self.bs[index];
  379. self.fair_price_std_vec[index].rescale(self.mid_price.scale());
  380. // 生成最终用于挂单的公平价格
  381. let fair_price_sum: Decimal = self.fair_price_std_vec.iter().sum();
  382. let fair_price_count = self.fair_price_std_vec.iter()
  383. .filter(|&&value| value != Decimal::new(0, 0)) // 过滤掉0
  384. .count();
  385. if fair_price_count != 0 {
  386. self.fair_price = if self.fair_price.is_zero() {
  387. fair_price_sum / Decimal::from(fair_price_count)
  388. } else {
  389. dec!(0.9) * self.fair_price + dec!(0.1) * fair_price_sum / Decimal::from(fair_price_count)
  390. };
  391. // let mut spread_abs = ((self.fair_price - self.mid_price) / self.mid_price).abs();
  392. // spread_abs.rescale(5);
  393. //
  394. // self.spread_vec.push_back(spread_abs);
  395. // if self.spread_vec.len() > 3000 {
  396. // self.spread_vec.pop_front();
  397. // }
  398. //
  399. // let opt_abs_value = self.spread_vec.iter().max().unwrap().clone();
  400. //
  401. // self.params.open = max(max(self.params.min_open, dec!(0.0006)), opt_abs_value);
  402. }
  403. }
  404. pub async fn update_delta(&mut self) -> bool {
  405. if self.mid_price.is_zero() {
  406. return false;
  407. }
  408. let prev_bid_delta = self.bid_delta;
  409. let prev_ask_delta = self.ask_delta;
  410. let now = Decimal::from(Utc::now().timestamp_millis());
  411. let is_close_long = self.inventory > Decimal::ZERO && (
  412. // 硬止损
  413. (self.profit < dec!(-0.01))
  414. // 利润较大时,追踪止盈
  415. || (self.profit > dec!(0.01) && self.profit < self.profit_high * dec!(0.75))
  416. );
  417. let is_close_short = self.inventory < Decimal::ZERO && (
  418. // 硬止损
  419. (self.profit < dec!(-0.01))
  420. // 利润较大时,追踪止盈
  421. || (self.profit > dec!(0.01) && self.profit < self.profit_high * dec!(0.75))
  422. );
  423. let is_open_long = self.inventory.is_zero()
  424. && self.fair_price > self.mid_price * (Decimal::ONE + self.params.open)
  425. && self.r_short < dec!(-0.001)
  426. && self.trend < dec!(0.999)
  427. && self.speed < dec!(0.15)
  428. ;
  429. let is_open_short = self.inventory.is_zero()
  430. && false
  431. ;
  432. // 使信号有一定持续性
  433. if is_close_long {
  434. self.trade_condition = dec!(1);
  435. }
  436. if is_close_short {
  437. self.trade_condition = dec!(2);
  438. }
  439. if is_open_long {
  440. self.trade_condition = dec!(3);
  441. self.trade_condition_time = now;
  442. }
  443. if is_open_short {
  444. self.trade_condition = dec!(4);
  445. self.trade_condition_time = now;
  446. }
  447. // 开仓信号要过期,只保留2秒
  448. if (self.trade_condition == dec!(3) || self.trade_condition == dec!(4))
  449. && now - self.trade_condition_time > dec!(2_000) {
  450. self.trade_condition = Decimal::ZERO;
  451. }
  452. // 开单信号处理
  453. self.bid_delta = dec!(-2);
  454. self.ask_delta = dec!(-2);
  455. self.optimal_ask_price = Self::DONT_VIEW;
  456. self.optimal_bid_price = Self::DONT_VIEW;
  457. if self.trade_condition == dec!(1) && self.inventory > Decimal::ZERO {
  458. self.ask_delta = dec!(0);
  459. self.bid_delta = dec!(-2);
  460. self.optimal_ask_price = min(self.fair_price, self.mid_price) * dec!(0.995);
  461. self.optimal_bid_price = Self::DONT_VIEW;
  462. } else if self.trade_condition == dec!(2) && self.inventory < Decimal::ZERO {
  463. self.bid_delta = dec!(0);
  464. self.ask_delta = dec!(-2);
  465. self.optimal_bid_price = max(self.fair_price, self.mid_price) * dec!(1.005);
  466. self.optimal_ask_price = Self::DONT_VIEW;
  467. } else if self.trade_condition == dec!(3) {
  468. self.bid_delta = dec!(0);
  469. self.ask_delta = dec!(-2);
  470. self.optimal_bid_price = max(self.fair_price, self.mid_price) * dec!(1.005);
  471. self.optimal_ask_price = Self::DONT_VIEW;
  472. } else if self.trade_condition == dec!(4) {
  473. self.ask_delta = dec!(0);
  474. self.bid_delta = dec!(-2);
  475. self.optimal_ask_price = min(self.fair_price, self.mid_price) * dec!(0.995);
  476. self.optimal_bid_price = Self::DONT_VIEW;
  477. }
  478. // 价格处理
  479. self.optimal_ask_price.rescale(self.mid_price.scale());
  480. self.optimal_bid_price.rescale(self.mid_price.scale());
  481. // 返回方向是否改变过,有改变可以立即在图表上显示
  482. prev_ask_delta != self.ask_delta || prev_bid_delta != self.bid_delta
  483. }
  484. pub fn check_ready(&mut self) {
  485. if self.is_ready {
  486. return;
  487. }
  488. if self.mid_price.is_zero() {
  489. return;
  490. }
  491. for fair_price in &self.fair_price_std_vec {
  492. if fair_price.is_zero() {
  493. return;
  494. }
  495. }
  496. if self.optimal_ask_price.is_zero() {
  497. return;
  498. }
  499. if self.optimal_bid_price.is_zero() {
  500. return;
  501. }
  502. if self.optimal_bid_price.is_zero() {
  503. return;
  504. }
  505. if self.balance.is_zero() {
  506. return;
  507. }
  508. self.is_ready = true;
  509. info!("========================================行情数据预热完毕==================================")
  510. }
  511. // 最小二乘法拟合函数,支持VecDeque
  512. pub async fn linear_least_squares(&self, index: usize) -> Option<(Decimal, Decimal)> {
  513. let x = &self.prices[index][1];
  514. let y = &self.prices[index][0];
  515. // 检查数组长度是否相同
  516. if x.len() != y.len() {
  517. return None;
  518. }
  519. let n = x.len();
  520. if n == 0 {
  521. return None;
  522. }
  523. let mut sum_x = Decimal::zero();
  524. let mut sum_y = Decimal::zero();
  525. let mut sum_xx = Decimal::zero();
  526. let mut sum_xy = Decimal::zero();
  527. // 遍历VecDeque中的元素
  528. for (xi, yi) in x.deque.iter().zip(y.deque.iter()) {
  529. sum_x += xi;
  530. sum_y += yi;
  531. let xi_sq = xi * xi;
  532. sum_xx += xi_sq;
  533. sum_xy += xi * yi;
  534. }
  535. // 计算分子和分母
  536. let numerator = sum_xy - (sum_x * sum_y) / Decimal::from(n);
  537. let denominator = sum_xx - (sum_x * sum_x) / Decimal::from(n);
  538. // 如果分母为0,返回None
  539. if denominator == Decimal::zero() {
  540. return None;
  541. }
  542. let k = numerator / denominator;
  543. let mean_x = sum_x / Decimal::from(n);
  544. let mean_y = sum_y / Decimal::from(n);
  545. let b = mean_y - k * mean_x;
  546. Some((k, b))
  547. }
  548. // #[instrument(skip(self), level="TRACE")]
  549. async fn processor(&mut self, data_time: Decimal, is_hard_update: bool) {
  550. self.check_ready();
  551. if !self.is_ready {
  552. return;
  553. }
  554. let is_delta_changed = self.update_delta().await;
  555. // let cci_arc = self.cci_arc.clone();
  556. let now = data_time;
  557. let mid_price = self.mid_price;
  558. let ask_price = if self.params.ref_exchange.len() > 0 {
  559. // self.fair_price_std_vec[0]
  560. Self::DONT_VIEW
  561. } else {
  562. Self::DONT_VIEW
  563. };
  564. let bid_price = if self.params.ref_exchange.len() > 1 {
  565. // self.fair_price_std_vec[1]
  566. Self::DONT_VIEW
  567. } else {
  568. Self::DONT_VIEW
  569. };
  570. let optimal_ask_price = self.optimal_ask_price;
  571. let optimal_bid_price = self.optimal_bid_price;
  572. let last_price = Self::DONT_VIEW;
  573. let fair_price = self.fair_price;
  574. let spread = Self::DONT_VIEW;
  575. let spread_max = self.r_short;
  576. let spread_min = self.r_long;
  577. // let spread = self.price_times_avg;
  578. // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0];
  579. // let spread_min = self.fair_price / self.mid_price;
  580. let inventory = self.inventory;
  581. // let sigma_square = Decimal::from(Utc::now().timestamp_millis()) - data_time;
  582. let sigma_square = self.speed;
  583. let gamma = self.trend;
  584. let kappa = self.balance;
  585. let flow_ratio = Decimal::ZERO;
  586. let is_time_over_update = now - self.prev_insert_time > dec!(500);
  587. if !is_time_over_update && !is_hard_update && !is_delta_changed {
  588. return;
  589. }
  590. if is_time_over_update {
  591. self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
  592. }
  593. let pos_avg_price = self.pos_avg_price;
  594. self.debug_sender.unbounded_send(vec![
  595. now,
  596. mid_price,
  597. ask_price,
  598. bid_price,
  599. last_price,
  600. spread,
  601. spread_max,
  602. spread_min,
  603. optimal_ask_price,
  604. optimal_bid_price,
  605. inventory,
  606. sigma_square,
  607. gamma,
  608. kappa,
  609. flow_ratio,
  610. fair_price,
  611. pos_avg_price
  612. ]).unwrap();
  613. }
  614. // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
  615. pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
  616. vec![]
  617. }
  618. }