predictor.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. use std::collections::{BTreeMap, VecDeque};
  2. use std::sync::Arc;
  3. use chrono::{Utc};
  4. use futures_channel::mpsc::UnboundedSender;
  5. use futures_util::StreamExt;
  6. use reqwest::{Client};
  7. use rust_decimal::prelude::*;
  8. use rust_decimal_macros::dec;
  9. use serde_json::{json, Value};
  10. use tokio::sync::{Mutex};
  11. use tracing::{error, info};
  12. use global::cci::CentralControlInfo;
  13. use global::fixed_time_range_deque::FixedTimeRangeDeque;
  14. use global::params::Params;
  15. use standard::{Depth, Record, Ticker, Trade};
  16. use crate::utils;
  17. #[derive(Debug, Clone)]
  18. pub struct Predictor {
  19. pub depth_vec: Vec<Depth>, // 深度队列
  20. pub spread_vec: Vec<Decimal>, // 价差队列
  21. pub record_vec: VecDeque<Record>, // 蜡烛队列
  22. pub mid_price: Decimal, // 中间价
  23. pub fair_price: Decimal,
  24. pub ask_price: Decimal, // 卖一价
  25. pub bid_price: Decimal, // 买一价
  26. pub last_price: Decimal, // 最后成交价
  27. pub optimal_ask_price: Decimal, // 卖出挂单价
  28. pub optimal_bid_price: Decimal, // 买入挂单价
  29. pub inventory: Decimal, // 库存,也就是q
  30. pub pos_amount: Decimal, // 原始持仓量
  31. pub pos_avg_price: Decimal, // 原始持仓价格
  32. pub balance: Decimal, // 初始余额
  33. pub prev_balance: Decimal,
  34. pub signal: Decimal, // 大于0代表此时是正向信号,小于0则相反
  35. pub ask_delta: Decimal, // δa
  36. pub bid_delta: Decimal, // δb
  37. pub fair_price_vec: Vec<Decimal>, // 公平价格列表
  38. pub fair_price_std_vec: Vec<Decimal>, // 公平价格列表,标准化之后的
  39. pub price_avg_times_vec: Vec<Decimal>, // 公平所与做市所的价格倍率的平均值
  40. pub is_ready: bool, // 是否已准备好
  41. pub last_update_time: Decimal, // 最后更新时间(depth)
  42. pub last_index: Decimal, // 最后更新的index
  43. pub prev_insert_time: Decimal,
  44. pub prev_save_time: Decimal,
  45. pub init_time: Decimal,
  46. pub prev_update_open_params_time: Decimal,
  47. pub params: Params,
  48. pub debug_sender: UnboundedSender<Vec<Decimal>>
  49. }
  50. impl Predictor {
  51. // 时间窗口大小(微秒)
  52. // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
  53. // const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000;
  54. // const TRADE_LONG_RANGE_MICROS: i64 = 60_000_000;
  55. // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
  56. // const TRADE_SHORT_RANGE_MICROS: i64 = 2 * 60_000_000;
  57. // const ONE_MILLION: Decimal = dec!(1_000_000);
  58. // const TWENTY_THOUSAND: Decimal = dec!(20_000);
  59. const DONT_VIEW: Decimal = dec!(14142135623730951);
  60. pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
  61. // 创建数据通道
  62. // 创建一个无界通道
  63. let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
  64. let account_port = params.port.clone();
  65. tokio::spawn(async move {
  66. let len = 17usize;
  67. let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  68. let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
  69. while let Some(value) = rx.next().await {
  70. // 数据填充到对应位置
  71. for i in 0..len {
  72. if value[i] == Self::DONT_VIEW {
  73. debugs[i].push_back(None);
  74. } else {
  75. debugs[i].push_back(Some(value[i]));
  76. }
  77. }
  78. // 长度限制
  79. if debugs[0].len() > 500_000 {
  80. for i in 0..len {
  81. debugs[i].pop_front(); // 从前面移除元素
  82. }
  83. }
  84. let now = Decimal::from(Utc::now().timestamp_millis());
  85. if now - prev_save_time < dec!(30000) {
  86. continue;
  87. }
  88. let debugs_clone = debugs.clone();
  89. let temp_html_str = tokio::task::spawn_blocking(move || {
  90. utils::build_html_file(&debugs_clone)
  91. }).await.unwrap();
  92. let path = format!("./db/{}.html", account_port);
  93. utils::write_to_file(&temp_html_str, path).await;
  94. prev_save_time = Decimal::from(Utc::now().timestamp_millis());
  95. }
  96. });
  97. let predictor = Self {
  98. // 接针版本
  99. depth_vec: vec![Depth::new(); params.ref_exchange.len()],
  100. fair_price_std_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  101. fair_price_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  102. price_avg_times_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  103. spread_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
  104. record_vec: VecDeque::new(),
  105. mid_price: Default::default(),
  106. fair_price: Default::default(),
  107. ask_price: Default::default(),
  108. bid_price: Default::default(),
  109. last_price: Default::default(),
  110. optimal_ask_price: Default::default(),
  111. optimal_bid_price: Default::default(),
  112. ask_delta: Default::default(),
  113. bid_delta: Default::default(),
  114. is_ready: false,
  115. inventory: Default::default(),
  116. pos_avg_price: Default::default(),
  117. pos_amount: Default::default(),
  118. balance: Default::default(),
  119. prev_balance: Default::default(),
  120. signal: Default::default(),
  121. last_update_time: Default::default(),
  122. last_index: Default::default(),
  123. prev_insert_time: Default::default(),
  124. prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
  125. init_time: Decimal::from(Utc::now().timestamp_millis()),
  126. prev_update_open_params_time: Default::default(),
  127. params,
  128. debug_sender: tx,
  129. };
  130. predictor
  131. }
  132. pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
  133. self.last_update_time = depth.time;
  134. self.last_index = Decimal::from(index);
  135. if index == 233 {
  136. self.ask_price = depth.asks[0].price;
  137. self.bid_price = depth.bids[0].price;
  138. self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
  139. } else {
  140. self.update_fair_price(depth, index).await;
  141. self.depth_vec[index] = depth.clone();
  142. }
  143. if self.mid_price.is_zero() {
  144. return;
  145. }
  146. self.processor(depth.time, false).await;
  147. }
  148. pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
  149. self.last_price = trade.price;
  150. // self.processor().await;
  151. }
  152. pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
  153. pub async fn on_record(&mut self, _record: &Record) {}
  154. pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal, update_time: Decimal) {
  155. if self.mid_price.is_zero() {
  156. return;
  157. }
  158. let prev_pos_amount = self.pos_amount;
  159. self.pos_amount = pos_amount.clone();
  160. self.pos_avg_price = pos_avg_price.clone();
  161. self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc();
  162. // 小于1但不为0的情况,需要平完
  163. if self.inventory.is_zero() && !pos_amount.is_zero() {
  164. self.inventory = if pos_amount > &Decimal::ZERO {
  165. Decimal::ONE
  166. } else {
  167. Decimal::NEGATIVE_ONE
  168. };
  169. }
  170. if prev_pos_amount != self.pos_amount {
  171. self.processor(update_time, true).await;
  172. }
  173. }
  174. pub async fn on_balance(&mut self, balance: Decimal) {
  175. self.balance = balance;
  176. }
  177. pub fn get_real_rate(price_vec: &FixedTimeRangeDeque<Decimal>) -> Decimal {
  178. let last_fair_price = price_vec.deque.iter().last().unwrap();
  179. let min_price = price_vec.deque.iter().min().unwrap();
  180. let max_price = price_vec.deque.iter().max().unwrap();
  181. let up_rate = (last_fair_price - min_price) / min_price;
  182. let down_rate = (max_price - last_fair_price) / max_price;
  183. if up_rate > down_rate {
  184. up_rate
  185. } else {
  186. -down_rate
  187. }
  188. }
  189. pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) {
  190. if self.mid_price.is_zero() {
  191. return;
  192. }
  193. let a1 = &depth.asks[0];
  194. let b1 = &depth.bids[0];
  195. // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price
  196. let total = a1.value + b1.value;
  197. // let fair_price = (a1.price + b1.price) / Decimal::TWO;
  198. // 生成fp
  199. self.fair_price_vec[index] = a1.price * b1.value / total + b1.price * a1.value / total;
  200. self.fair_price_vec[index].rescale(self.mid_price.scale());
  201. // 求价格倍率
  202. self.price_avg_times_vec[index] = if self.price_avg_times_vec[index].is_zero() {
  203. self.fair_price_vec[index] / self.mid_price
  204. } else {
  205. self.price_avg_times_vec[index] * dec!(0.9995) + dec!(0.0005) * self.fair_price_vec[index] / self.mid_price
  206. };
  207. // 合成公平价格
  208. self.fair_price_std_vec[index] = self.fair_price_vec[index] / self.price_avg_times_vec[index];
  209. // 开仓信号处理
  210. self.signal = Decimal::ZERO;
  211. for (i, fair_price_std) in self.fair_price_std_vec.iter().enumerate() {
  212. if fair_price_std.is_zero() {
  213. return;
  214. }
  215. self.spread_vec[i] = fair_price_std - self.mid_price;
  216. self.signal = self.signal + self.spread_vec[i];
  217. }
  218. self.signal = self.signal / self.params.min_spread;
  219. self.signal.rescale(0);
  220. // 生成最终用于挂单的公平价格
  221. let fair_price_sum: Decimal = self.fair_price_std_vec.iter().sum();
  222. let fair_price_count = self.fair_price_std_vec.iter()
  223. .filter(|&&value| value != Decimal::new(0, 0)) // 过滤掉0
  224. .count();
  225. if fair_price_count != 0 {
  226. self.fair_price = fair_price_sum / Decimal::from(fair_price_count);
  227. }
  228. }
  229. pub async fn update_delta(&mut self) {
  230. if self.mid_price.is_zero() {
  231. return;
  232. }
  233. let now = Decimal::from(Utc::now().timestamp_millis());
  234. if now - self.prev_update_open_params_time > dec!(60_000)
  235. || self.prev_balance != self.balance {
  236. self.update_open_params().await;
  237. self.prev_balance = self.balance;
  238. self.prev_update_open_params_time = now;
  239. }
  240. for fair_price in &self.fair_price_vec {
  241. if fair_price.is_zero() {
  242. return;
  243. }
  244. }
  245. let is_close_long = self.inventory > Decimal::ZERO;
  246. let is_close_short = self.inventory < Decimal::ZERO;
  247. if is_close_long {
  248. self.ask_delta = dec!(0);
  249. self.bid_delta = dec!(-2);
  250. self.optimal_ask_price = self.fair_price + self.fair_price * self.params.close;
  251. self.optimal_bid_price = Self::DONT_VIEW;
  252. } else if is_close_short {
  253. self.bid_delta = dec!(0);
  254. self.ask_delta = dec!(-2);
  255. self.optimal_bid_price = self.fair_price - self.fair_price * self.params.close;
  256. self.optimal_ask_price = Self::DONT_VIEW;
  257. } else {
  258. if self.signal > Decimal::ZERO {
  259. self.bid_delta = dec!(0);
  260. self.ask_delta = dec!(-2);
  261. self.optimal_bid_price = self.fair_price - self.fair_price * self.params.open;
  262. self.optimal_ask_price = Self::DONT_VIEW;
  263. } else if self.signal < Decimal::ZERO {
  264. self.ask_delta = dec!(0);
  265. self.bid_delta = dec!(-2);
  266. self.optimal_ask_price = self.fair_price + self.fair_price * self.params.open;
  267. self.optimal_bid_price = Self::DONT_VIEW;
  268. } else {
  269. self.bid_delta = dec!(0);
  270. self.ask_delta = dec!(0);
  271. self.optimal_bid_price = self.fair_price - self.fair_price * self.params.open;
  272. self.optimal_ask_price = self.fair_price + self.fair_price * self.params.open;
  273. }
  274. }
  275. self.optimal_ask_price.rescale(self.mid_price.scale());
  276. self.optimal_bid_price.rescale(self.mid_price.scale());
  277. }
  278. pub fn check_ready(&mut self) {
  279. if self.is_ready {
  280. return;
  281. }
  282. if self.mid_price.is_zero() {
  283. return;
  284. }
  285. for fair_price in &self.fair_price_vec {
  286. if fair_price.is_zero() {
  287. return;
  288. }
  289. }
  290. if self.ask_price.is_zero() {
  291. return;
  292. }
  293. if self.bid_price.is_zero() {
  294. return;
  295. }
  296. if self.balance.is_zero() {
  297. return;
  298. }
  299. self.is_ready = true;
  300. info!("========================================行情数据预热完毕==================================")
  301. }
  302. // #[instrument(skip(self), level="TRACE")]
  303. async fn processor(&mut self, data_time: Decimal, is_hard_update: bool) {
  304. self.check_ready();
  305. if !self.is_ready {
  306. return;
  307. }
  308. self.update_delta().await;
  309. // let cci_arc = self.cci_arc.clone();
  310. let now = data_time;
  311. let mid_price = self.mid_price;
  312. let ask_price = self.fair_price;
  313. let bid_price = Self::DONT_VIEW;
  314. let optimal_ask_price = self.optimal_ask_price;
  315. let optimal_bid_price = self.optimal_bid_price;
  316. let last_price = Self::DONT_VIEW;
  317. let fair_price = Self::DONT_VIEW;
  318. let spread = Self::DONT_VIEW;
  319. let spread_min = self.spread_vec[0];
  320. let spread_max = self.spread_vec[1];
  321. // let spread = self.price_times_avg;
  322. // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0];
  323. // let spread_min = self.fair_price / self.mid_price;
  324. let inventory = self.inventory;
  325. let sigma_square = self.signal;
  326. let gamma = self.balance;
  327. let kappa = Decimal::ZERO;
  328. let flow_ratio = Decimal::ZERO;
  329. let need_append = now - self.prev_insert_time > dec!(500);
  330. if !need_append && !is_hard_update {
  331. return;
  332. }
  333. if !is_hard_update {
  334. self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
  335. }
  336. let pos_avg_price = self.pos_avg_price;
  337. self.debug_sender.unbounded_send(vec![
  338. now,
  339. mid_price,
  340. ask_price,
  341. bid_price,
  342. last_price,
  343. spread,
  344. spread_max,
  345. spread_min,
  346. optimal_ask_price,
  347. optimal_bid_price,
  348. inventory,
  349. sigma_square,
  350. gamma,
  351. kappa,
  352. flow_ratio,
  353. fair_price,
  354. pos_avg_price
  355. ]).unwrap();
  356. }
  357. // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
  358. pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
  359. vec![]
  360. }
  361. pub async fn update_open_params(&mut self) {
  362. let url = "http://is.skyfffire.com:18888/ia/get_indicator";
  363. let symbol = self.params.pair.to_lowercase();
  364. let exchange = self.params.exchange.to_lowercase();
  365. let params = json!({
  366. "indicator": "msv",
  367. "query": {
  368. "exchange": exchange,
  369. "symbol": symbol,
  370. "minute_time_range": "10",
  371. "mills_back": "37"
  372. }
  373. });
  374. // 创建 HTTP 客户端
  375. let client = Client::new();
  376. // 发送 GET 请求
  377. let response_rst = client.post(url)
  378. .json(&params)
  379. .send()
  380. .await;
  381. match response_rst {
  382. Ok(response) => {
  383. // 错误处理
  384. if response.status().is_success() {
  385. let response_text = response.text().await.unwrap();
  386. let parsed: Value = serde_json::from_str(response_text.as_str()).unwrap();
  387. let msv = parsed["data"]["msv"].clone();
  388. let msv_decimals: Vec<Decimal> = msv.as_array()
  389. .unwrap() // 确保 parsed 是一个数组
  390. .iter()
  391. .filter_map(|item| {
  392. // 尝试提取第二个值并转换为 Decimal
  393. if let Some(value) = item.get(1) {
  394. value.as_str().unwrap_or("0").parse::<Decimal>().ok()
  395. } else {
  396. None
  397. }
  398. })
  399. .collect();
  400. let max_abs_value = msv_decimals.iter()
  401. .map(|&value| value.abs()) // 获取每个数的绝对值
  402. .fold(Decimal::new(0, 0), |a, b| a.max(b)); // 计算最大值
  403. let prev_open = self.params.open.clone();
  404. self.params.open = if max_abs_value.is_zero() {
  405. panic!("十分钟内毫无波动的行情,停机。")
  406. } else {
  407. max_abs_value / Decimal::ONE_HUNDRED
  408. };
  409. if self.params.open != prev_open {
  410. info!("open: {} -> {}", prev_open, self.params.open);
  411. }
  412. } else {
  413. error!("自动参数挂了:{}", response.status());
  414. }
  415. }
  416. Err(_) => {}
  417. }
  418. }
  419. }