predictor.rs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. use std::cmp::{max, min};
  2. use std::collections::{BTreeMap, HashMap, VecDeque};
  3. use std::sync::Arc;
  4. use chrono::{Utc};
  5. use futures_channel::mpsc::UnboundedSender;
  6. use futures_util::StreamExt;
  7. use reqwest::Client;
  8. use rust_decimal::prelude::*;
  9. use rust_decimal_macros::dec;
  10. use serde_json::{json, Value};
  11. use tokio::sync::{Mutex};
  12. use tracing::{error, info};
  13. use global::cci::CentralControlInfo;
  14. use global::fixed_time_range_deque::FixedTimeRangeDeque;
  15. use global::params::Params;
  16. use standard::{Depth, ForceOrder, Record, Ticker, Trade};
  17. use crate::utils;
  18. #[derive(Debug, Clone)]
  19. pub struct Predictor {
  20. pub depth_vec: Vec<Depth>, // 深度队列
  21. pub record_vec: VecDeque<Record>, // 蜡烛队列
  22. pub mid_price: Decimal, // 中间价
  23. pub ask_price: Decimal, // 卖一价
  24. pub bid_price: Decimal, // 买一价
  25. pub last_price: Decimal, // 最后成交价
  26. pub optimal_ask_price: Decimal, // 卖出挂单价
  27. pub optimal_bid_price: Decimal, // 买入挂单价
  28. pub profit_point: Decimal, // 利润点数
  29. pub profit_point_ema: Decimal, // 利润点数的ema
  30. pub profit_point_vec: Vec<Decimal>, // 利润队列
  31. pub inventory: Decimal, // 库存,也就是q
  32. pub pos_amount: Decimal, // 原始持仓量
  33. pub pos_avg_price: Decimal, // 原始持仓价格
  34. pub level: Decimal, // martin
  35. pub money_flow: Decimal, // 资金流
  36. pub ask_delta: Decimal, // δa
  37. pub bid_delta: Decimal, // δb
  38. pub force_order_time_vec: FixedTimeRangeDeque<ForceOrder>, // 爆仓单队列
  39. pub force_order_value: Decimal, // 爆仓单交易量
  40. pub fair_price_vec: Vec<Decimal>, // 公平价格列表,0表示做市所,1表示参考所
  41. pub fair_price: Decimal, // 公平价格
  42. pub fair_price_ema_short: Decimal, // 公平价格_ema
  43. pub fair_price_ema_long: Decimal, // 公平价格_ema
  44. pub mid_rate_focus_open: Decimal, // 变化幅度焦点
  45. pub mid_price_focus_open: Decimal, // 观测焦点时的价格
  46. pub mid_rate_focus_close: Decimal, // 变化幅度焦点
  47. pub fair_price_focus_close: Decimal, // 观测焦点时的价格
  48. pub fair_price_when_ordering: Decimal, // 下单时的公平价格
  49. pub price_times_avg: Decimal, // 公平所与做市所的价格倍率的平均值
  50. pub is_ready: bool, // 是否已准备好
  51. pub balance: Decimal, // 当前资金
  52. pub one_grid_order_value: Decimal, // 每一网格下单价值
  53. pub prev_trade_force_order_value: Decimal, // 上次下单时的爆仓价值ln
  54. pub prev_trade_price: Decimal, // 上次加仓的价格
  55. pub prev_trade_time: Decimal, // 上次交易时间,也就是t
  56. pub t_diff: Decimal, // (T-t)
  57. pub last_update_time: Decimal, // 最后更新时间(depth)
  58. pub last_index: Decimal, // 最后更新的index
  59. pub prev_insert_time: Decimal,
  60. pub prev_save_time: Decimal,
  61. pub init_time: Decimal,
  62. pub params: Params,
  63. pub debug_sender: UnboundedSender<Vec<Decimal>>,
  64. pub trade_time_vec: VecDeque<Decimal>, // 交易时间队列
  65. pub trade_side_map: HashMap<Decimal, String>, // 交易时间,对应的是kk,kd两个方向
  66. pub trade_amount_map: HashMap<Decimal, Decimal>, // 交易数量
  67. pub trade_amount: Decimal,
  68. pub prev_flush_state_time: Decimal,
  69. pub state: usize,
  70. pub state_matrix: Vec<Vec<Decimal>>,
  71. pub spread: Decimal, // 当前价差
  72. pub spread_ema: Decimal, // 价差的sma,默认是sma5000
  73. }
  74. impl Predictor {
  75. // 时间窗口大小(微秒)
  76. // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
  77. // const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000;
  78. // const TRADE_LONG_RANGE_MICROS: i64 = 5 * 60_000_000;
  79. // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
  80. // const TRADE_SHORT_RANGE_MICROS: i64 = 2 * 60_000_000;
  81. // const ONE_MILLION: Decimal = dec!(1_000_000);
  82. // const TWENTY_THOUSAND: Decimal = dec!(20_000);
  83. const DONT_VIEW: Decimal = dec!(14142135623730951);
  84. pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
  85. if params.close.is_zero() {
  86. panic!("做市策略特殊逻辑要求平仓距离不得为0。");
  87. }
  88. // 创建数据通道
  89. // 创建一个无界通道
  90. let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
  91. let account_port = params.port.clone();
  92. tokio::spawn(async move {
  93. let len = 16usize;
  94. let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  95. let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
  96. while let Some(value) = rx.next().await {
  97. // 数据填充到对应位置
  98. for i in 0..len {
  99. if value[i] == Self::DONT_VIEW {
  100. debugs[i].push_back(None);
  101. } else {
  102. debugs[i].push_back(Some(value[i]));
  103. }
  104. }
  105. // 长度限制
  106. if debugs[0].len() > 500_000 {
  107. for i in 0..len {
  108. debugs[i].pop_front(); // 从前面移除元素
  109. }
  110. }
  111. let now = Decimal::from(Utc::now().timestamp_millis());
  112. if now - prev_save_time < dec!(30000) {
  113. continue;
  114. }
  115. let debugs_clone = debugs.clone();
  116. let temp_html_str = tokio::task::spawn_blocking(move || {
  117. utils::build_html_file(&debugs_clone)
  118. }).await.unwrap();
  119. let path = format!("./db/{}.html", account_port);
  120. utils::write_to_file(&temp_html_str, path).await;
  121. prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  122. }
  123. });
  124. let predictor = Self {
  125. // 接针版本
  126. depth_vec: vec![Depth::new(); 10],
  127. fair_price_vec: vec![Decimal::ZERO; 10],
  128. // 老的队列
  129. profit_point_vec: vec![],
  130. record_vec: VecDeque::new(),
  131. mid_price: Default::default(),
  132. ask_price: Default::default(),
  133. bid_price: Default::default(),
  134. last_price: Default::default(),
  135. optimal_ask_price: Default::default(),
  136. optimal_bid_price: Default::default(),
  137. inventory: Default::default(),
  138. ask_delta: Default::default(),
  139. bid_delta: Default::default(),
  140. force_order_time_vec: FixedTimeRangeDeque::new(30 * 1_000_000),
  141. force_order_value: Default::default(),
  142. fair_price: Default::default(),
  143. fair_price_ema_short: Default::default(),
  144. fair_price_ema_long: Default::default(),
  145. mid_rate_focus_open: Default::default(),
  146. mid_price_focus_open: Default::default(),
  147. mid_rate_focus_close: Default::default(),
  148. fair_price_focus_close: Default::default(),
  149. fair_price_when_ordering: Default::default(),
  150. price_times_avg: Default::default(),
  151. is_ready: false,
  152. balance: Default::default(),
  153. one_grid_order_value: Default::default(),
  154. prev_trade_force_order_value: Default::default(),
  155. prev_trade_price: Default::default(),
  156. prev_trade_time: Default::default(),
  157. t_diff: Default::default(),
  158. level: Default::default(),
  159. pos_amount: Default::default(),
  160. money_flow: Default::default(),
  161. profit_point: Default::default(),
  162. profit_point_ema: Default::default(),
  163. last_update_time: Default::default(),
  164. last_index: Default::default(),
  165. pos_avg_price: Default::default(),
  166. prev_insert_time: Default::default(),
  167. prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
  168. init_time: Decimal::from(Utc::now().timestamp_millis()),
  169. params,
  170. debug_sender: tx,
  171. trade_time_vec: VecDeque::new(),
  172. trade_side_map: HashMap::new(),
  173. trade_amount_map: HashMap::new(),
  174. trade_amount: Default::default(),
  175. prev_flush_state_time: Default::default(),
  176. state: 1,
  177. state_matrix: vec![
  178. vec![dec!(0.33), dec!(0.33), dec!(0.33)],
  179. vec![dec!(0.33), dec!(0.33), dec!(0.33)],
  180. vec![dec!(0.33), dec!(0.33), dec!(0.33)],
  181. ],
  182. spread: Default::default(),
  183. spread_ema: Default::default(),
  184. };
  185. predictor
  186. }
  187. pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
  188. self.last_update_time = depth.time;
  189. self.last_index = Decimal::from(index);
  190. if index == 0 {
  191. self.ask_price = depth.asks[0].price;
  192. self.bid_price = depth.bids[0].price;
  193. self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
  194. }
  195. self.update_fair_price(depth, index).await;
  196. self.depth_vec[index] = depth.clone();
  197. if index == 1 {
  198. self.update_spread();
  199. }
  200. if self.mid_price.is_zero() {
  201. return;
  202. }
  203. self.processor(false).await;
  204. }
  205. pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
  206. self.last_price = trade.price;
  207. // self.processor().await;
  208. }
  209. pub async fn on_force_order(&mut self, force_order: ForceOrder) {
  210. // match self.force_order_time_vec.deque.iter().last() {
  211. // Some(last) => {
  212. // // 有的交易所会重复推,这样做个容错处理
  213. // if force_order.time != last.time && force_order.value != last.value {
  214. // self.force_order_time_vec.push_back(force_order);
  215. // }
  216. // }
  217. // None => {
  218. // self.force_order_time_vec.push_back(force_order);
  219. // }
  220. // }
  221. if (force_order.value > Decimal::ONE && force_order.value.abs().ln() > self.force_order_value.abs()) || force_order.value * self.force_order_value < Decimal::ZERO {
  222. self.force_order_value = if force_order.value > Decimal::ONE {
  223. force_order.value.ln()
  224. } else if force_order.value < Decimal::NEGATIVE_ONE {
  225. -force_order.value.abs().ln()
  226. } else {
  227. Decimal::ZERO
  228. };
  229. self.force_order_value.rescale(2);
  230. // 区分是哪个所来的数据
  231. self.force_order_value += force_order.value.fract() * dec!(0.01);
  232. self.processor(true).await;
  233. }
  234. }
  235. // side, pk,pd从HashMap移除,kd,kk添加到HashMap
  236. pub async fn on_order(&mut self, side: String, amount: Decimal) {
  237. let prev_inventory = self.inventory;
  238. self.inventory = match side.as_str() {
  239. "kk" => {
  240. self.inventory - Decimal::ONE
  241. }
  242. "pd" => {
  243. self.inventory - Decimal::ONE
  244. }
  245. "kd" => {
  246. self.inventory + Decimal::ONE
  247. }
  248. "pk" => {
  249. self.inventory + Decimal::ONE
  250. }
  251. &_ => {
  252. panic!("不认识的order方向:{}", side);
  253. }
  254. };
  255. // 重置一些计算
  256. if prev_inventory != self.inventory && self.inventory.is_zero() {
  257. self.profit_point_vec.clear();
  258. self.profit_point = Decimal::ZERO;
  259. self.profit_point_ema = Decimal::ZERO;
  260. self.prev_trade_force_order_value = Decimal::ZERO;
  261. self.prev_trade_price = Decimal::ZERO;
  262. }
  263. if side == "kk" || side == "kd" {
  264. let now = Decimal::from(Utc::now().timestamp_millis());
  265. self.trade_time_vec.push_back(now);
  266. self.trade_side_map.insert(now, side);
  267. self.trade_amount_map.insert(now, amount);
  268. self.prev_trade_time = Decimal::from(Utc::now().timestamp_millis());
  269. self.prev_trade_price = self.mid_price;
  270. self.prev_trade_force_order_value = self.force_order_value;
  271. self.force_order_value = Decimal::ZERO;
  272. } else if side == "pd" || side == "pk" {
  273. let pop_time = self.trade_time_vec.pop_front().unwrap();
  274. self.trade_side_map.remove(&pop_time);
  275. self.trade_amount_map.remove(&pop_time);
  276. }
  277. info!(?self.trade_time_vec);
  278. info!(?self.trade_side_map);
  279. info!(?self.trade_amount_map);
  280. }
  281. pub async fn update_level(&mut self) {
  282. self.level = (Decimal::NEGATIVE_ONE + (Decimal::ONE + dec!(8) * self.inventory.abs()).sqrt().unwrap()) / Decimal::TWO;
  283. self.level = min(self.level, dec!(6));
  284. }
  285. pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
  286. pub async fn on_record(&mut self, _record: &Record) {}
  287. pub async fn on_balance(&mut self, balance: Decimal) {
  288. self.balance = balance;
  289. if self.inventory.is_zero() {
  290. self.one_grid_order_value = (self.params.lever_rate * self.balance) / self.params.grid;
  291. }
  292. }
  293. pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, _min_amount_value: &Decimal) {
  294. if self.mid_price.is_zero() {
  295. return;
  296. }
  297. self.pos_amount = pos_amount.clone();
  298. self.pos_avg_price = pos_avg_price.clone();
  299. self.update_level().await;
  300. self.processor(true).await;
  301. }
  302. pub fn get_real_rate(price_vec: &FixedTimeRangeDeque<Decimal>) -> (Decimal, Decimal, Decimal) {
  303. let last_fair_price = price_vec.deque.iter().last().unwrap();
  304. let min_price = price_vec.deque.iter().min().unwrap();
  305. let max_price = price_vec.deque.iter().max().unwrap();
  306. let up_rate = (last_fair_price - min_price) / min_price;
  307. let down_rate = (max_price - last_fair_price) / max_price;
  308. if up_rate > down_rate {
  309. (up_rate, min_price.clone(), max_price.clone())
  310. } else {
  311. (-down_rate, min_price.clone(), max_price.clone())
  312. }
  313. }
  314. pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) {
  315. if self.mid_price.is_zero() {
  316. return;
  317. }
  318. let a1 = &depth.asks[0];
  319. let b1 = &depth.bids[0];
  320. // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price
  321. let total = a1.value + b1.value;
  322. let fair_price = a1.price * b1.value / total + b1.price * a1.value / total;
  323. // let fair_price = (a1.price + b1.price) / Decimal::TWO;
  324. self.fair_price_vec[index] = if self.fair_price_vec[index].is_zero() {
  325. fair_price
  326. } else {
  327. self.fair_price_vec[index] * dec!(0.5) + fair_price * dec!(0.5)
  328. };
  329. self.fair_price_vec[index].rescale(self.mid_price.scale());
  330. // 合成公平价格
  331. if !self.fair_price_vec[0].is_zero() && !self.fair_price_vec[1].is_zero() {
  332. self.price_times_avg = if self.price_times_avg.is_zero() {
  333. self.fair_price_vec[1] / self.fair_price_vec[0]
  334. } else {
  335. self.price_times_avg * dec!(0.9995) + dec!(0.0005) * self.fair_price_vec[1] / self.fair_price_vec[0]
  336. };
  337. // 进行价格归一化处理,公平所的价格有可能是带前缀的
  338. // let fair_price_part0 = self.fair_price_vec[0] * dec!(0.2);
  339. // let fair_price_part1 = (self.fair_price_vec[1] / self.price_times_avg) * dec!(0.8);
  340. self.fair_price = self.fair_price_vec[1] / self.price_times_avg;
  341. self.fair_price_ema_long = if self.fair_price_ema_long.is_zero() {
  342. self.fair_price
  343. } else {
  344. self.fair_price_ema_long * dec!(0.67) + self.fair_price * dec!(0.33)
  345. };
  346. self.fair_price_ema_short = if self.fair_price_ema_short.is_zero() {
  347. self.fair_price
  348. } else {
  349. self.fair_price_ema_short * dec!(0.999) + self.fair_price * dec!(0.001)
  350. };
  351. }
  352. // // 判断价格是否回归
  353. // if !self.is_regressed && self.inventory > Decimal::ZERO && self.spread_sma_1000 < max(self.spread_sma, self.spread_sma_2000) {
  354. // self.is_regressed = true
  355. // } else if !self.is_regressed && self.inventory < Decimal::ZERO && self.spread_sma_1000 > min(self.spread_sma, self.spread_sma_2000) {
  356. // self.is_regressed = true
  357. // }
  358. }
  359. pub fn update_spread(&mut self) {
  360. let depth = &self.depth_vec[1];
  361. self.spread = depth.asks[0].price - depth.bids[0].price;
  362. self.spread_ema = if self.spread_ema.is_zero() {
  363. self.spread
  364. } else {
  365. self.spread_ema * dec!(0.999) + self.spread * dec!(0.001)
  366. };
  367. }
  368. pub async fn update_delta(&mut self) {
  369. if self.fair_price.is_zero() {
  370. return;
  371. }
  372. // 根据马尔科夫链调整挂单策略
  373. let now = Decimal::from(Utc::now().timestamp_millis());
  374. if now - self.prev_flush_state_time > dec!(60_000) {
  375. self.update_state_matrix().await;
  376. self.prev_flush_state_time = now;
  377. }
  378. self.bid_delta = dec!(-2);
  379. self.ask_delta = dec!(-2);
  380. // 平仓优先级高一些
  381. if self.trade_time_vec.len() > 0 {
  382. let first = self.trade_time_vec.front().unwrap();
  383. if now - *first > self.params.holding_time * Decimal::ONE_THOUSAND {
  384. let side = self.trade_side_map.get(first).unwrap();
  385. self.trade_amount = self.trade_amount_map.get(first).unwrap().clone();
  386. match side.as_str() {
  387. "kd" => {
  388. self.ask_delta = self.mid_price * self.params.close;
  389. }
  390. "kk" => {
  391. self.bid_delta = self.mid_price * self.params.close;
  392. }
  393. &_ => {
  394. panic!("什么方向放进来了?side={}", side);
  395. }
  396. }
  397. }
  398. }
  399. let is_open_long = self.force_order_value < -self.params.open
  400. && (self.mid_price < self.prev_trade_price * dec!(0.999) || self.prev_trade_price.is_zero())
  401. && self.inventory < self.params.grid
  402. && self.state == 0
  403. && self.bid_delta == dec!(-2);
  404. let is_open_short = self.force_order_value > self.params.open
  405. && (self.mid_price > self.prev_trade_price * dec!(1.001) || self.prev_trade_price.is_zero())
  406. && self.inventory > -self.params.grid
  407. && self.state == 0
  408. && self.ask_delta == dec!(-2);
  409. if is_open_long {
  410. self.bid_delta = Decimal::ZERO;
  411. } else if is_open_short {
  412. self.ask_delta = Decimal::ZERO;
  413. }
  414. }
  415. pub fn update_optimal_ask_and_bid(&mut self) {
  416. self.optimal_ask_price = if self.ask_delta == dec!(-1) {
  417. self.bid_price
  418. } else if self.ask_delta == dec!(-2) {
  419. Self::DONT_VIEW
  420. } else {
  421. max(self.mid_price + self.ask_delta, self.bid_price)
  422. };
  423. self.optimal_bid_price = if self.bid_delta == dec!(-1) {
  424. self.ask_price
  425. } else if self.bid_delta == dec!(-2) {
  426. Self::DONT_VIEW
  427. } else {
  428. min(self.mid_price - self.bid_delta, self.ask_price)
  429. };
  430. self.optimal_ask_price.rescale(self.mid_price.scale());
  431. self.optimal_bid_price.rescale(self.mid_price.scale());
  432. }
  433. pub fn update_t_diff(&mut self) {
  434. // if self.prev_trade_time > 0 {
  435. // let time_diff_decimal = Decimal::from_i64(Utc::now().timestamp_micros() - self.prev_trade_time).unwrap();
  436. // self.t_diff = max(Decimal::ONE - time_diff_decimal / Decimal::from_i64(Self::TIME_DIFF_RANGE_MICROS).unwrap(), Decimal::ZERO);
  437. // } else {
  438. // self.t_diff = Decimal::ONE;
  439. // }
  440. }
  441. pub fn check_ready(&mut self) {
  442. if self.is_ready {
  443. return;
  444. }
  445. if self.mid_price == Decimal::ZERO {
  446. return;
  447. }
  448. if self.fair_price == Decimal::ZERO {
  449. return;
  450. }
  451. if self.ask_price == Decimal::ZERO {
  452. return;
  453. }
  454. if self.bid_price == Decimal::ZERO {
  455. return;
  456. }
  457. self.is_ready = true;
  458. info!("========================================行情数据预热完毕==================================")
  459. }
  460. // #[instrument(skip(self), level="TRACE")]
  461. async fn processor(&mut self, is_hard_update: bool) {
  462. self.check_ready();
  463. if !self.is_ready {
  464. return;
  465. }
  466. self.update_t_diff();
  467. self.update_delta().await;
  468. self.update_optimal_ask_and_bid();
  469. // let mut smm = Decimal::ZERO;
  470. // if !self.depth_vec[1].time.is_zero() {
  471. // let sma = self.depth_vec[1].asks[0].price;
  472. // let smb = self.depth_vec[1].bids[0].price;
  473. // smm = (sma + smb) / Decimal::TWO;
  474. // }
  475. // let cci_arc = self.cci_arc.clone();
  476. let now = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
  477. let mid_price = self.mid_price;
  478. let ask_price = self.ask_price;
  479. let bid_price = self.bid_price;
  480. let last_price = self.last_price;
  481. let fair_price = Self::DONT_VIEW;
  482. let spread = self.spread;
  483. let spread_max = self.spread_ema;
  484. let spread_min = Self::DONT_VIEW;
  485. let optimal_ask_price = self.optimal_ask_price;
  486. let optimal_bid_price = self.optimal_bid_price;
  487. let inventory = self.inventory;
  488. let sigma_square = Decimal::from(self.state);
  489. let gamma = self.params.holding_time;
  490. let kappa = self.balance;
  491. let flow_ratio = Decimal::ZERO;
  492. let need_append = now - self.prev_insert_time > dec!(500);
  493. if !need_append && !is_hard_update {
  494. return;
  495. }
  496. self.debug_sender.unbounded_send(vec![
  497. now,
  498. mid_price,
  499. ask_price,
  500. bid_price,
  501. last_price,
  502. spread,
  503. spread_max,
  504. spread_min,
  505. optimal_ask_price,
  506. optimal_bid_price,
  507. inventory,
  508. sigma_square,
  509. gamma,
  510. kappa,
  511. flow_ratio,
  512. fair_price
  513. ]).unwrap();
  514. self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
  515. }
  516. // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
  517. pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
  518. vec![]
  519. }
  520. pub async fn update_state_matrix(&mut self) {
  521. let url = "http://mms.skyfffire.com:9000/getStateData";
  522. let symbol = self.params.ref_pair[0].to_lowercase();
  523. let params = json!({
  524. "symbol": symbol,
  525. });
  526. // 创建 HTTP 客户端
  527. let client = Client::new();
  528. // 发送 GET 请求
  529. let response = client.get(url)
  530. .query(&params)
  531. .send()
  532. .await.unwrap();
  533. // 错误处理
  534. if response.status().is_success() {
  535. let response_text = response.text().await.unwrap();
  536. let parsed: Value = serde_json::from_str(response_text.as_str()).unwrap();
  537. info!("state_matrix={}", parsed.to_string());
  538. self.state_matrix = parsed["state_matrix"]
  539. .as_array()
  540. .unwrap()
  541. .iter()
  542. .map(|row| {
  543. row.as_array()
  544. .unwrap()
  545. .iter()
  546. .map(|v| Decimal::from_str(v.as_str().unwrap()).unwrap())
  547. .collect()
  548. })
  549. .collect();
  550. self.state = parsed["state"].as_i64().unwrap() as usize;
  551. self.params.holding_time = Decimal::from(parsed["min_index"].as_i64().unwrap());
  552. } else {
  553. error!("状态转移链挂了:{}", response.status());
  554. }
  555. }
  556. }