predictor.rs 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. use std::cmp::{max, min};
  2. use std::collections::{BTreeMap, VecDeque};
  3. use std::sync::Arc;
  4. use chrono::{Utc};
  5. use futures_channel::mpsc::UnboundedSender;
  6. use futures_util::StreamExt;
  7. use rust_decimal::prelude::*;
  8. use rust_decimal_macros::dec;
  9. use tokio::sync::{Mutex};
  10. use tracing::{info};
  11. use global::cci::CentralControlInfo;
  12. use global::fixed_time_range_deque::FixedTimeRangeDeque;
  13. use global::params::Params;
  14. use standard::{Depth, Record, Ticker, Trade};
  15. use crate::utils;
  16. #[derive(Debug, Clone)]
  17. pub struct Predictor {
  18. pub depth_vec: Vec<Depth>, // 深度队列
  19. pub record_vec: VecDeque<Record>, // 蜡烛队列
  20. pub spread_vec: VecDeque<Decimal>, // 价差队列
  21. pub mid_price_long_vec: FixedTimeRangeDeque<Decimal>,
  22. pub mid_price_short_vec: FixedTimeRangeDeque<Decimal>,
  23. pub fair_price_short_vec: FixedTimeRangeDeque<Decimal>,
  24. pub trend_long_rate: Decimal,
  25. pub trend_short_rate: Decimal,
  26. pub fair_rate: Decimal,
  27. pub prices: Vec<Vec<FixedTimeRangeDeque<Decimal>>>, // [[[做市所], [参考所0]], ...]
  28. pub ks: Vec<Decimal>,
  29. pub bs: Vec<Decimal>,
  30. pub mid_price: Decimal, // 中间价
  31. pub ask_price: Decimal, // 中间价
  32. pub bid_price: Decimal, // 中间价
  33. pub fair_price: Decimal,
  34. pub last_price: Decimal, // 最后成交价
  35. pub optimal_ask_price: Decimal, // 卖出挂单价
  36. pub optimal_bid_price: Decimal, // 买入挂单价
  37. pub inventory: Decimal, // 库存,也就是q
  38. pub pos_amount: Decimal, // 原始持仓量
  39. pub pos_avg_price: Decimal, // 原始持仓价格
  40. pub balance: Decimal, // 初始余额
  41. pub prev_balance: Decimal,
  42. pub profit: Decimal,
  43. pub profit_high: Decimal,
  44. pub prev_open_time: Decimal,
  45. pub trade_condition: Decimal, // 过去1分钟以内,差价是否满足过扩大行为+行情有针
  46. pub trade_condition_time: Decimal, // 满足时的瞬时时间
  47. pub ask_delta: Decimal, // δa
  48. pub bid_delta: Decimal, // δb
  49. pub mid_price_vec: Vec<Decimal>, //
  50. pub fair_price_std_vec: Vec<Decimal>, // 公平价格列表,标准化之后的
  51. pub price_avg_times_vec: Vec<Decimal>, // 公平所与做市所的价格倍率的平均值
  52. pub price_avg_times_long_vec: Vec<Decimal>, // 公平所与做市所的价格倍率的平均值
  53. pub is_ready: bool, // 是否已准备好
  54. pub last_update_time: Decimal, // 最后更新时间(depth)
  55. pub last_index: Decimal, // 最后更新的index
  56. pub prev_insert_time: Decimal,
  57. pub prev_save_time: Decimal,
  58. pub init_time: Decimal,
  59. pub fitting_delay: Decimal,
  60. pub prev_fitting_time_vec: Vec<Decimal>,
  61. pub params: Params,
  62. pub debug_sender: UnboundedSender<Vec<Decimal>>
  63. }
  64. impl Predictor {
  65. // 时间窗口大小(微秒)
  66. // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
  67. // const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000;
  68. // const TRADE_LONG_RANGE_MICROS: i64 = 10 * 60_000_000;
  69. // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
  70. // const TRADE_SHORT_RANGE_MICROS: i64 = 10_000_000;
  71. // const ONE_MILLION: Decimal = dec!(1_000_000);
  72. // const TWENTY_THOUSAND: Decimal = dec!(20_000);
  73. const DONT_VIEW: Decimal = dec!(14142135623730951);
  74. pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
  75. // 创建数据通道
  76. // 创建一个无界通道
  77. let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
  78. let account_port = params.port.clone();
  79. tokio::spawn(async move {
  80. let len = 17usize;
  81. let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  82. let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
  83. while let Some(value) = rx.next().await {
  84. // 数据填充到对应位置
  85. // // 第一步:获取插入位置, 这里有bug,持仓推送并不连续,有时候会导致图表显示错误……
  86. // let target_ts = value[0]; // 时间戳在values[0]
  87. // let insert_pos = debugs[0]
  88. // .binary_search_by(|ts| {
  89. // ts.as_ref() // 解包Option
  90. // .expect("Timestamp cannot be None")
  91. // .cmp(&target_ts)
  92. // })
  93. // .unwrap_or_else(|e| e);
  94. // 第二步:执行插入操作
  95. for i in 0..debugs.len() {
  96. let value = value.get(i).cloned().unwrap_or(Self::DONT_VIEW);
  97. // 其他队列按规则插入
  98. let elem = if value == Self::DONT_VIEW {
  99. None
  100. } else {
  101. Some(value)
  102. };
  103. debugs[i].push_back(elem)
  104. }
  105. // 长度限制
  106. if debugs[0].len() > 500_000 {
  107. for i in 0..len {
  108. debugs[i].pop_front(); // 从前面移除元素
  109. }
  110. }
  111. let now = Decimal::from(Utc::now().timestamp_millis());
  112. if now - prev_save_time < dec!(30000) {
  113. continue;
  114. }
  115. let debugs_clone = debugs.clone();
  116. let temp_html_str = tokio::task::spawn_blocking(move || {
  117. utils::build_html_file(&debugs_clone)
  118. }).await.unwrap();
  119. let path = format!("./db/{}.html", account_port);
  120. utils::write_to_file(&temp_html_str, path).await;
  121. prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  122. }
  123. });
  124. let predictor = Self {
  125. // 接针版本
  126. depth_vec: vec![Depth::new(); params.ref_exchange.len()],
  127. fair_price_std_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  128. mid_price_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  129. price_avg_times_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  130. price_avg_times_long_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  131. prices: vec![vec![FixedTimeRangeDeque::new(600_000_000); 2]; params.ref_exchange.len()],
  132. ks: vec![Decimal::ZERO; params.ref_exchange.len()],
  133. bs: vec![Decimal::ZERO; params.ref_exchange.len()],
  134. prev_fitting_time_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  135. record_vec: VecDeque::new(),
  136. mid_price_long_vec: FixedTimeRangeDeque::new(10_000_000),
  137. mid_price_short_vec: FixedTimeRangeDeque::new(2_000_000),
  138. fair_price_short_vec: FixedTimeRangeDeque::new(2_000_000),
  139. spread_vec: VecDeque::new(),
  140. mid_price: Default::default(),
  141. ask_price: Default::default(),
  142. bid_price: Default::default(),
  143. fair_price: Default::default(),
  144. last_price: Default::default(),
  145. optimal_ask_price: Self::DONT_VIEW,
  146. optimal_bid_price: Self::DONT_VIEW,
  147. ask_delta: Default::default(),
  148. bid_delta: Default::default(),
  149. is_ready: false,
  150. inventory: Default::default(),
  151. pos_avg_price: Default::default(),
  152. pos_amount: Default::default(),
  153. balance: Default::default(),
  154. prev_balance: Default::default(),
  155. profit: Default::default(),
  156. profit_high: Default::default(),
  157. trade_condition: Default::default(),
  158. trade_condition_time: Default::default(),
  159. last_update_time: Default::default(),
  160. last_index: Default::default(),
  161. prev_insert_time: Default::default(),
  162. prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
  163. init_time: Decimal::from(Utc::now().timestamp_millis()),
  164. fitting_delay: Default::default(),
  165. params,
  166. debug_sender: tx,
  167. prev_open_time: Default::default(),
  168. trend_long_rate: Default::default(),
  169. trend_short_rate: Default::default(),
  170. fair_rate: Default::default(),
  171. };
  172. predictor
  173. }
  174. pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
  175. self.last_update_time = depth.time;
  176. self.last_index = Decimal::from(index);
  177. if index == 233 {
  178. self.ask_price = depth.asks[0].price;
  179. self.bid_price = depth.bids[0].price;
  180. self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
  181. // 计算利润(预估)
  182. if !self.inventory.is_zero() {
  183. self.profit = if self.inventory > Decimal::ZERO {
  184. (self.mid_price - self.pos_avg_price) / self.pos_avg_price
  185. } else {
  186. (self.pos_avg_price - self.mid_price) / self.pos_avg_price
  187. };
  188. self.profit.rescale(6);
  189. if self.profit_high < self.profit {
  190. self.profit_high = self.profit
  191. }
  192. }
  193. self.mid_price_long_vec.push_back(self.mid_price);
  194. self.mid_price_short_vec.push_back(self.mid_price);
  195. self.trend_long_rate = if self.mid_price_long_vec.len() > 2 {
  196. let first = self.mid_price_long_vec.deque.front().unwrap();
  197. let last = self.mid_price_long_vec.deque.back().unwrap();
  198. (last - first) / first
  199. } else {
  200. Decimal::ZERO
  201. };
  202. self.trend_long_rate.rescale(6);
  203. self.trend_short_rate = if self.mid_price_short_vec.len() > 2 {
  204. let first = self.mid_price_short_vec.deque.front().unwrap();
  205. let last = self.mid_price_short_vec.deque.back().unwrap();
  206. (last - first) / first
  207. } else {
  208. Decimal::ZERO
  209. };
  210. self.trend_short_rate.rescale(6);
  211. // 拟合k与b
  212. for (mid_index, mp) in self.mid_price_vec.iter().enumerate() {
  213. if mp.is_zero() {
  214. continue
  215. }
  216. self.prices[mid_index][0].push_back(self.mid_price);
  217. self.prices[mid_index][1].push_back(mp.clone());
  218. // 拟合,60s拟合一次
  219. let before_fitting = Utc::now().timestamp_millis();
  220. if Decimal::from(before_fitting) - self.prev_fitting_time_vec[mid_index] > dec!(60_000) || self.prices[mid_index][0].len() < 1000 {
  221. // if Decimal::from(before_fitting) - self.prev_fitting_time_vec[mid_index] > dec!(60_000) {
  222. // info!("{}, {},", mid_index, self.prices[mid_index][0].len());
  223. // }
  224. if let Some((k, b)) = self.linear_least_squares(mid_index).await {
  225. self.ks[mid_index] = k;
  226. self.bs[mid_index] = b;
  227. self.fitting_delay = Decimal::from(Utc::now().timestamp_millis() - before_fitting);
  228. self.prev_fitting_time_vec[mid_index] = Decimal::from(before_fitting)
  229. } else {
  230. return;
  231. }
  232. }
  233. }
  234. } else {
  235. self.depth_vec[index] = depth.clone();
  236. let latest_price = (depth.asks[0].price + depth.bids[0].price) / Decimal::TWO;
  237. self.update_fair_price(&latest_price, index).await;
  238. }
  239. if self.mid_price.is_zero() {
  240. return;
  241. }
  242. self.processor(depth.time, false).await;
  243. }
  244. pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
  245. // index == 233代表做市所
  246. // index == 0,1,2,3...代表参考所
  247. self.last_price = trade.price;
  248. }
  249. pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
  250. pub async fn on_record(&mut self, _record: &Record) {}
  251. pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal, update_time: Decimal) {
  252. if self.mid_price.is_zero() {
  253. return;
  254. }
  255. let prev_pos_amount = self.pos_amount;
  256. self.pos_amount = pos_amount.clone();
  257. self.pos_avg_price = pos_avg_price.clone();
  258. self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc();
  259. // 小于1但不为0的情况,需要平完
  260. if self.inventory.is_zero() && !pos_amount.is_zero() {
  261. self.inventory = if pos_amount > &Decimal::ZERO {
  262. Decimal::ONE
  263. } else {
  264. Decimal::NEGATIVE_ONE
  265. };
  266. }
  267. if prev_pos_amount != self.pos_amount {
  268. // 重置连续信号
  269. self.trade_condition = Decimal::ZERO;
  270. self.trade_condition_time = Decimal::ZERO;
  271. // 开仓
  272. if prev_pos_amount.is_zero() {
  273. self.prev_open_time = Decimal::from(Utc::now().timestamp_millis())
  274. }
  275. // 平仓
  276. if self.pos_amount.is_zero() {
  277. self.mid_price_long_vec.deque.clear();
  278. self.mid_price_short_vec.deque.clear();
  279. self.fair_price_short_vec.deque.clear();
  280. self.profit = Decimal::ZERO;
  281. self.profit_high = Decimal::ZERO;
  282. }
  283. self.processor(update_time, true).await;
  284. }
  285. }
  286. pub async fn on_balance(&mut self, balance: Decimal) {
  287. self.balance = balance;
  288. }
  289. pub fn get_real_rate(price_vec: &FixedTimeRangeDeque<Decimal>) -> Decimal {
  290. let last_fair_price = price_vec.deque.iter().last().unwrap();
  291. let min_price = price_vec.deque.iter().min().unwrap();
  292. let max_price = price_vec.deque.iter().max().unwrap();
  293. let up_rate = (last_fair_price - min_price) / min_price;
  294. let down_rate = (max_price - last_fair_price) / max_price;
  295. if up_rate > down_rate {
  296. up_rate
  297. } else {
  298. -down_rate
  299. }
  300. }
  301. pub async fn update_fair_price(&mut self, latest_price: &Decimal, index: usize) {
  302. if self.mid_price.is_zero() {
  303. return;
  304. }
  305. // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price
  306. // let total = a1.value + b1.value;
  307. // let fair_price = (a1.price + b1.price) / Decimal::TWO;
  308. // self.fair_price_vec[index] = a1.price * b1.value / total + b1.price * a1.value / total;
  309. let mut mp = latest_price.clone();
  310. mp.rescale(self.mid_price.scale());
  311. self.mid_price_vec[index] = mp;
  312. // 生成fp
  313. self.fair_price_std_vec[index] = mp * self.ks[index] + self.bs[index];
  314. self.fair_price_std_vec[index].rescale(self.mid_price.scale());
  315. // 生成最终用于挂单的公平价格
  316. let fair_price_sum: Decimal = self.fair_price_std_vec.iter().sum();
  317. let fair_price_count = self.fair_price_std_vec.iter()
  318. .filter(|&&value| value != Decimal::new(0, 0)) // 过滤掉0
  319. .count();
  320. if fair_price_count != 0 {
  321. self.fair_price = if self.fair_price.is_zero() {
  322. fair_price_sum / Decimal::from(fair_price_count)
  323. } else {
  324. dec!(0.9) * self.fair_price + dec!(0.1) * fair_price_sum / Decimal::from(fair_price_count)
  325. };
  326. self.fair_price_short_vec.push_back(self.fair_price);
  327. self.fair_rate = if self.fair_price_short_vec.len() > 2 {
  328. let first = self.fair_price_short_vec.deque.front().unwrap();
  329. let last = self.fair_price_short_vec.deque.back().unwrap();
  330. (last - first) / first
  331. } else {
  332. Decimal::ZERO
  333. };
  334. self.fair_rate.rescale(6);
  335. // let mut spread_abs = ((self.fair_price - self.mid_price) / self.mid_price).abs();
  336. // spread_abs.rescale(5);
  337. //
  338. // self.spread_vec.push_back(spread_abs);
  339. // if self.spread_vec.len() > 3000 {
  340. // self.spread_vec.pop_front();
  341. // }
  342. //
  343. // let opt_abs_value = self.spread_vec.iter().max().unwrap().clone();
  344. //
  345. // self.params.open = max(max(self.params.min_open, dec!(0.0006)), opt_abs_value);
  346. }
  347. }
  348. pub async fn update_delta(&mut self) -> bool {
  349. if self.mid_price.is_zero() {
  350. return false;
  351. }
  352. let prev_bid_delta = self.bid_delta;
  353. let prev_ask_delta = self.ask_delta;
  354. let now = Decimal::from(Utc::now().timestamp_millis());
  355. let is_close_long = self.inventory > Decimal::ZERO && (
  356. // 硬止损
  357. (self.profit < dec!(-0.05))
  358. // 利润较大时,追踪止盈
  359. || (self.profit > dec!(0.01) && self.profit < self.profit_high * dec!(0.75))
  360. );
  361. let is_close_short = self.inventory < Decimal::ZERO && (
  362. // 硬止损
  363. (self.profit < dec!(-0.05))
  364. // 利润较大时,追踪止盈
  365. || (self.profit > dec!(0.01) && self.profit < self.profit_high * dec!(0.75))
  366. );
  367. let is_open_long = self.inventory.is_zero()
  368. && self.fair_price > self.mid_price * (Decimal::ONE + self.params.open)
  369. && self.trend_long_rate < dec!(-0.005)
  370. && self.fair_rate > Decimal::ZERO
  371. && self.fair_rate > self.trend_short_rate + dec!(0.0001)
  372. ;
  373. let is_open_short = self.inventory.is_zero()
  374. && self.fair_price < self.mid_price * (Decimal::ONE - self.params.open)
  375. && self.trend_long_rate > dec!(0.005)
  376. && self.fair_rate < Decimal::ZERO
  377. && self.fair_rate < self.trend_short_rate - dec!(0.0001)
  378. ;
  379. // 使信号有一定持续性
  380. if is_close_long {
  381. self.trade_condition = dec!(1);
  382. }
  383. if is_close_short {
  384. self.trade_condition = dec!(2);
  385. }
  386. if is_open_long {
  387. self.trade_condition = dec!(3);
  388. self.trade_condition_time = now;
  389. }
  390. if is_open_short {
  391. self.trade_condition = dec!(4);
  392. self.trade_condition_time = now;
  393. }
  394. // 开仓信号要过期,只保留2秒
  395. if (self.trade_condition == dec!(3) || self.trade_condition == dec!(4))
  396. && now - self.trade_condition_time > dec!(2_000) {
  397. self.trade_condition = Decimal::ZERO;
  398. }
  399. // 开单信号处理
  400. self.bid_delta = dec!(-2);
  401. self.ask_delta = dec!(-2);
  402. self.optimal_ask_price = Self::DONT_VIEW;
  403. self.optimal_bid_price = Self::DONT_VIEW;
  404. if self.trade_condition == dec!(1) && self.inventory > Decimal::ZERO {
  405. self.ask_delta = dec!(0);
  406. self.bid_delta = dec!(-2);
  407. self.optimal_ask_price = min(self.fair_price, self.mid_price) * dec!(0.995);
  408. self.optimal_bid_price = Self::DONT_VIEW;
  409. } else if self.trade_condition == dec!(2) && self.inventory < Decimal::ZERO {
  410. self.bid_delta = dec!(0);
  411. self.ask_delta = dec!(-2);
  412. self.optimal_bid_price = max(self.fair_price, self.mid_price) * dec!(1.005);
  413. self.optimal_ask_price = Self::DONT_VIEW;
  414. } else if self.trade_condition == dec!(3) {
  415. self.bid_delta = dec!(0);
  416. self.ask_delta = dec!(-2);
  417. self.optimal_bid_price = max(self.fair_price, self.mid_price) * dec!(1.005);
  418. self.optimal_ask_price = Self::DONT_VIEW;
  419. } else if self.trade_condition == dec!(4) {
  420. self.ask_delta = dec!(0);
  421. self.bid_delta = dec!(-2);
  422. self.optimal_ask_price = min(self.fair_price, self.mid_price) * dec!(0.995);
  423. self.optimal_bid_price = Self::DONT_VIEW;
  424. }
  425. // 价格处理
  426. self.optimal_ask_price.rescale(self.mid_price.scale());
  427. self.optimal_bid_price.rescale(self.mid_price.scale());
  428. // 返回方向是否改变过,有改变可以立即在图表上显示
  429. prev_ask_delta != self.ask_delta || prev_bid_delta != self.bid_delta
  430. }
  431. pub fn check_ready(&mut self) {
  432. if self.is_ready {
  433. return;
  434. }
  435. if self.mid_price.is_zero() {
  436. return;
  437. }
  438. for fair_price in &self.fair_price_std_vec {
  439. if fair_price.is_zero() {
  440. return;
  441. }
  442. }
  443. if self.optimal_ask_price.is_zero() {
  444. return;
  445. }
  446. if self.optimal_bid_price.is_zero() {
  447. return;
  448. }
  449. if self.balance.is_zero() {
  450. return;
  451. }
  452. self.is_ready = true;
  453. info!("========================================行情数据预热完毕==================================")
  454. }
  455. // 最小二乘法拟合函数,支持VecDeque
  456. pub async fn linear_least_squares(&self, index: usize) -> Option<(Decimal, Decimal)> {
  457. let x = &self.prices[index][1];
  458. let y = &self.prices[index][0];
  459. // 检查数组长度是否相同
  460. if x.len() != y.len() {
  461. return None;
  462. }
  463. let n = x.len();
  464. if n == 0 {
  465. return None;
  466. }
  467. let mut sum_x = Decimal::zero();
  468. let mut sum_y = Decimal::zero();
  469. let mut sum_xx = Decimal::zero();
  470. let mut sum_xy = Decimal::zero();
  471. // 遍历VecDeque中的元素
  472. for (xi, yi) in x.deque.iter().zip(y.deque.iter()) {
  473. sum_x += xi;
  474. sum_y += yi;
  475. let xi_sq = xi * xi;
  476. sum_xx += xi_sq;
  477. sum_xy += xi * yi;
  478. }
  479. // 计算分子和分母
  480. let numerator = sum_xy - (sum_x * sum_y) / Decimal::from(n);
  481. let denominator = sum_xx - (sum_x * sum_x) / Decimal::from(n);
  482. // 如果分母为0,返回None
  483. if denominator == Decimal::zero() {
  484. return None;
  485. }
  486. let k = numerator / denominator;
  487. let mean_x = sum_x / Decimal::from(n);
  488. let mean_y = sum_y / Decimal::from(n);
  489. let b = mean_y - k * mean_x;
  490. Some((k, b))
  491. }
  492. // #[instrument(skip(self), level="TRACE")]
  493. async fn processor(&mut self, data_time: Decimal, is_hard_update: bool) {
  494. self.check_ready();
  495. if !self.is_ready {
  496. return;
  497. }
  498. let is_delta_changed = self.update_delta().await;
  499. // let cci_arc = self.cci_arc.clone();
  500. let now = data_time;
  501. let mid_price = self.mid_price;
  502. let ask_price = if self.params.ref_exchange.len() > 0 {
  503. // self.fair_price_std_vec[0]
  504. Self::DONT_VIEW
  505. } else {
  506. Self::DONT_VIEW
  507. };
  508. let bid_price = if self.params.ref_exchange.len() > 1 {
  509. // self.fair_price_std_vec[1]
  510. Self::DONT_VIEW
  511. } else {
  512. Self::DONT_VIEW
  513. };
  514. let optimal_ask_price = self.optimal_ask_price;
  515. let optimal_bid_price = self.optimal_bid_price;
  516. let last_price = Self::DONT_VIEW;
  517. let fair_price = self.fair_price;
  518. let spread = Self::DONT_VIEW;
  519. let spread_max = self.bs[0];
  520. let spread_min = self.ks[0];
  521. // let spread = self.price_times_avg;
  522. // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0];
  523. // let spread_min = self.fair_price / self.mid_price;
  524. let inventory = self.inventory;
  525. let sigma_square = Decimal::from(Utc::now().timestamp_millis()) - data_time;
  526. let gamma = self.balance;
  527. let kappa = self.profit;
  528. let flow_ratio = Decimal::ZERO;
  529. let is_time_over_update = now - self.prev_insert_time > dec!(500);
  530. if !is_time_over_update && !is_hard_update && !is_delta_changed {
  531. return;
  532. }
  533. if is_time_over_update {
  534. self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
  535. }
  536. let pos_avg_price = self.pos_avg_price;
  537. self.debug_sender.unbounded_send(vec![
  538. now,
  539. mid_price,
  540. ask_price,
  541. bid_price,
  542. last_price,
  543. spread,
  544. spread_max,
  545. spread_min,
  546. optimal_ask_price,
  547. optimal_bid_price,
  548. inventory,
  549. sigma_square,
  550. gamma,
  551. kappa,
  552. flow_ratio,
  553. fair_price,
  554. pos_avg_price
  555. ]).unwrap();
  556. }
  557. // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
  558. pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
  559. vec![]
  560. }
  561. }