use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use chrono::{Utc}; use futures_channel::mpsc::UnboundedSender; use futures_util::StreamExt; use reqwest::{Client}; use rust_decimal::prelude::*; use rust_decimal_macros::dec; use serde_json::{json, Value}; use tokio::sync::{Mutex}; use tracing::{error, info}; use global::cci::CentralControlInfo; use global::fixed_time_range_deque::FixedTimeRangeDeque; use global::params::Params; use standard::{Depth, Record, Ticker, Trade}; use crate::utils; #[derive(Debug, Clone)] pub struct Predictor { pub depth_vec: Vec, // 深度队列 pub spread_vec: Vec, // 价差队列 pub record_vec: VecDeque, // 蜡烛队列 pub mid_price: Decimal, // 中间价 pub fair_price: Decimal, pub ask_price: Decimal, // 卖一价 pub bid_price: Decimal, // 买一价 pub last_price: Decimal, // 最后成交价 pub optimal_ask_price: Decimal, // 卖出挂单价 pub optimal_bid_price: Decimal, // 买入挂单价 pub inventory: Decimal, // 库存,也就是q pub pos_amount: Decimal, // 原始持仓量 pub pos_avg_price: Decimal, // 原始持仓价格 pub balance: Decimal, // 初始余额 pub prev_balance: Decimal, pub signal: Decimal, // 大于0代表此时是正向信号,小于0则相反 pub ask_delta: Decimal, // δa pub bid_delta: Decimal, // δb pub fair_price_vec: Vec, // 公平价格列表 pub fair_price_std_vec: Vec, // 公平价格列表,标准化之后的 pub price_avg_times_vec: Vec, // 公平所与做市所的价格倍率的平均值 pub is_ready: bool, // 是否已准备好 pub last_update_time: Decimal, // 最后更新时间(depth) pub last_index: Decimal, // 最后更新的index pub prev_insert_time: Decimal, pub prev_save_time: Decimal, pub init_time: Decimal, pub prev_update_open_params_time: Decimal, pub params: Params, pub debug_sender: UnboundedSender> } impl Predictor { // 时间窗口大小(微秒) // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000; // const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000; // const TRADE_LONG_RANGE_MICROS: i64 = 60_000_000; // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000; // const TRADE_SHORT_RANGE_MICROS: i64 = 2 * 60_000_000; // const ONE_MILLION: Decimal = dec!(1_000_000); // const TWENTY_THOUSAND: Decimal = dec!(20_000); const DONT_VIEW: Decimal = dec!(14142135623730951); pub fn new(_cci_arc: Arc>, params: Params) -> Self { // 创建数据通道 // 创建一个无界通道 let (tx, mut rx) = futures_channel::mpsc::unbounded::>(); let account_port = params.port.clone(); tokio::spawn(async move { let len = 17usize; let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis()); let mut debugs: Vec>> = vec![VecDeque::new(); len]; while let Some(value) = rx.next().await { // 数据填充到对应位置 for i in 0..len { if value[i] == Self::DONT_VIEW { debugs[i].push_back(None); } else { debugs[i].push_back(Some(value[i])); } } // 长度限制 if debugs[0].len() > 500_000 { for i in 0..len { debugs[i].pop_front(); // 从前面移除元素 } } let now = Decimal::from(Utc::now().timestamp_millis()); if now - prev_save_time < dec!(30000) { continue; } let debugs_clone = debugs.clone(); let temp_html_str = tokio::task::spawn_blocking(move || { utils::build_html_file(&debugs_clone) }).await.unwrap(); let path = format!("./db/{}.html", account_port); utils::write_to_file(&temp_html_str, path).await; prev_save_time = Decimal::from(Utc::now().timestamp_millis()); } }); let predictor = Self { // 接针版本 depth_vec: vec![Depth::new(); params.ref_exchange.len()], fair_price_std_vec: vec![Decimal::ZERO; params.ref_exchange.len()], fair_price_vec: vec![Decimal::ZERO; params.ref_exchange.len()], price_avg_times_vec: vec![Decimal::ZERO; params.ref_exchange.len()], spread_vec: vec![Decimal::ZERO; params.ref_exchange.len()], record_vec: VecDeque::new(), mid_price: Default::default(), fair_price: Default::default(), ask_price: Default::default(), bid_price: Default::default(), last_price: Default::default(), optimal_ask_price: Default::default(), optimal_bid_price: Default::default(), ask_delta: Default::default(), bid_delta: Default::default(), is_ready: false, inventory: Default::default(), pos_avg_price: Default::default(), pos_amount: Default::default(), balance: Default::default(), prev_balance: Default::default(), signal: Default::default(), last_update_time: Default::default(), last_index: Default::default(), prev_insert_time: Default::default(), prev_save_time: Decimal::from(Utc::now().timestamp_millis()), init_time: Decimal::from(Utc::now().timestamp_millis()), prev_update_open_params_time: Default::default(), params, debug_sender: tx, }; predictor } pub async fn on_depth(&mut self, depth: &Depth, index: usize) { self.last_update_time = depth.time; self.last_index = Decimal::from(index); if index == 233 { self.ask_price = depth.asks[0].price; self.bid_price = depth.bids[0].price; self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO; } else { self.update_fair_price(depth, index).await; self.depth_vec[index] = depth.clone(); } if self.mid_price.is_zero() { return; } self.processor(depth.time, false).await; } pub async fn on_trade(&mut self, trade: &Trade, _index: usize) { self.last_price = trade.price; // self.processor().await; } pub async fn on_ticker(&mut self, _ticker: &Ticker) {} pub async fn on_record(&mut self, _record: &Record) {} pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, min_amount_value: &Decimal, update_time: Decimal) { if self.mid_price.is_zero() { return; } let prev_pos_amount = self.pos_amount; self.pos_amount = pos_amount.clone(); self.pos_avg_price = pos_avg_price.clone(); self.inventory = (pos_amount / (min_amount_value / self.mid_price)).trunc(); // 小于1但不为0的情况,需要平完 if self.inventory.is_zero() && !pos_amount.is_zero() { self.inventory = if pos_amount > &Decimal::ZERO { Decimal::ONE } else { Decimal::NEGATIVE_ONE }; } if prev_pos_amount != self.pos_amount { self.processor(update_time, true).await; } } pub async fn on_balance(&mut self, balance: Decimal) { self.balance = balance; } pub fn get_real_rate(price_vec: &FixedTimeRangeDeque) -> Decimal { let last_fair_price = price_vec.deque.iter().last().unwrap(); let min_price = price_vec.deque.iter().min().unwrap(); let max_price = price_vec.deque.iter().max().unwrap(); let up_rate = (last_fair_price - min_price) / min_price; let down_rate = (max_price - last_fair_price) / max_price; if up_rate > down_rate { up_rate } else { -down_rate } } pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) { if self.mid_price.is_zero() { return; } let a1 = &depth.asks[0]; let b1 = &depth.bids[0]; // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price let total = a1.value + b1.value; // let fair_price = (a1.price + b1.price) / Decimal::TWO; // 生成fp self.fair_price_vec[index] = a1.price * b1.value / total + b1.price * a1.value / total; self.fair_price_vec[index].rescale(self.mid_price.scale()); // 求价格倍率 self.price_avg_times_vec[index] = if self.price_avg_times_vec[index].is_zero() { self.fair_price_vec[index] / self.mid_price } else { self.price_avg_times_vec[index] * dec!(0.9995) + dec!(0.0005) * self.fair_price_vec[index] / self.mid_price }; // 合成公平价格 self.fair_price_std_vec[index] = self.fair_price_vec[index] / self.price_avg_times_vec[index]; // 开仓信号处理 self.signal = Decimal::ZERO; for (i, fair_price_std) in self.fair_price_std_vec.iter().enumerate() { if fair_price_std.is_zero() { return; } self.spread_vec[i] = fair_price_std - self.mid_price; self.signal = self.signal + self.spread_vec[i]; } self.signal = self.signal / self.params.min_spread; self.signal.rescale(0); // 生成最终用于挂单的公平价格 let fair_price_sum: Decimal = self.fair_price_std_vec.iter().sum(); let fair_price_count = self.fair_price_std_vec.iter() .filter(|&&value| value != Decimal::new(0, 0)) // 过滤掉0 .count(); if fair_price_count != 0 { self.fair_price = fair_price_sum / Decimal::from(fair_price_count); } } pub async fn update_delta(&mut self) { if self.mid_price.is_zero() { return; } let now = Decimal::from(Utc::now().timestamp_millis()); if now - self.prev_update_open_params_time > dec!(60_000) || self.prev_balance != self.balance { self.update_open_params().await; self.prev_balance = self.balance; self.prev_update_open_params_time = now; } for fair_price in &self.fair_price_vec { if fair_price.is_zero() { return; } } let is_close_long = self.inventory > Decimal::ZERO; let is_close_short = self.inventory < Decimal::ZERO; if is_close_long { self.ask_delta = dec!(0); self.bid_delta = dec!(-2); self.optimal_ask_price = self.fair_price + self.fair_price * self.params.close; self.optimal_bid_price = Self::DONT_VIEW; } else if is_close_short { self.bid_delta = dec!(0); self.ask_delta = dec!(-2); self.optimal_bid_price = self.fair_price - self.fair_price * self.params.close; self.optimal_ask_price = Self::DONT_VIEW; } else { if self.signal > Decimal::ZERO { self.bid_delta = dec!(0); self.ask_delta = dec!(-2); self.optimal_bid_price = self.fair_price - self.fair_price * self.params.open; self.optimal_ask_price = Self::DONT_VIEW; } else if self.signal < Decimal::ZERO { self.ask_delta = dec!(0); self.bid_delta = dec!(-2); self.optimal_ask_price = self.fair_price + self.fair_price * self.params.open; self.optimal_bid_price = Self::DONT_VIEW; } else { self.bid_delta = dec!(0); self.ask_delta = dec!(0); self.optimal_bid_price = self.fair_price - self.fair_price * self.params.open; self.optimal_ask_price = self.fair_price + self.fair_price * self.params.open; } } self.optimal_ask_price.rescale(self.mid_price.scale()); self.optimal_bid_price.rescale(self.mid_price.scale()); } pub fn check_ready(&mut self) { if self.is_ready { return; } if self.mid_price.is_zero() { return; } for fair_price in &self.fair_price_vec { if fair_price.is_zero() { return; } } if self.ask_price.is_zero() { return; } if self.bid_price.is_zero() { return; } if self.balance.is_zero() { return; } self.is_ready = true; info!("========================================行情数据预热完毕==================================") } // #[instrument(skip(self), level="TRACE")] async fn processor(&mut self, data_time: Decimal, is_hard_update: bool) { self.check_ready(); if !self.is_ready { return; } self.update_delta().await; // let cci_arc = self.cci_arc.clone(); let now = data_time; let mid_price = self.mid_price; let ask_price = self.fair_price; let bid_price = Self::DONT_VIEW; let optimal_ask_price = self.optimal_ask_price; let optimal_bid_price = self.optimal_bid_price; let last_price = Self::DONT_VIEW; let fair_price = Self::DONT_VIEW; let spread = Self::DONT_VIEW; let spread_min = self.spread_vec[0]; let spread_max = self.spread_vec[1]; // let spread = self.price_times_avg; // let spread_max = self.fair_price_vec[1] / self.fair_price_vec[0]; // let spread_min = self.fair_price / self.mid_price; let inventory = self.inventory; let sigma_square = self.signal; let gamma = self.balance; let kappa = Decimal::ZERO; let flow_ratio = Decimal::ZERO; let need_append = now - self.prev_insert_time > dec!(500); if !need_append && !is_hard_update { return; } if !is_hard_update { self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis()) } let pos_avg_price = self.pos_avg_price; self.debug_sender.unbounded_send(vec![ now, mid_price, ask_price, bid_price, last_price, spread, spread_max, spread_min, optimal_ask_price, optimal_bid_price, inventory, sigma_square, gamma, kappa, flow_ratio, fair_price, pos_avg_price ]).unwrap(); } // #[instrument(skip(self, ref_ticker_map), level="TRACE")] pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap) -> Vec> { vec![] } pub async fn update_open_params(&mut self) { let url = "http://is.skyfffire.com:18888/ia/get_indicator"; let symbol = self.params.pair.to_lowercase(); let exchange = self.params.exchange.to_lowercase(); let params = json!({ "indicator": "msv", "query": { "exchange": exchange, "symbol": symbol, "minute_time_range": "10", "mills_back": "37" } }); // 创建 HTTP 客户端 let client = Client::new(); // 发送 GET 请求 let response_rst = client.post(url) .json(¶ms) .send() .await; match response_rst { Ok(response) => { // 错误处理 if response.status().is_success() { let response_text = response.text().await.unwrap(); let parsed: Value = serde_json::from_str(response_text.as_str()).unwrap(); let msv = parsed["data"]["msv"].clone(); let msv_decimals: Vec = msv.as_array() .unwrap() // 确保 parsed 是一个数组 .iter() .filter_map(|item| { // 尝试提取第二个值并转换为 Decimal if let Some(value) = item.get(1) { value.as_str().unwrap_or("0").parse::().ok() } else { None } }) .collect(); let max_abs_value = msv_decimals.iter() .map(|&value| value.abs()) // 获取每个数的绝对值 .fold(Decimal::new(0, 0), |a, b| a.max(b)); // 计算最大值 let prev_open = self.params.open.clone(); self.params.open = if max_abs_value.is_zero() { panic!("十分钟内毫无波动的行情,停机。") } else { max_abs_value / Decimal::ONE_HUNDRED }; if self.params.open != prev_open { info!("open: {} -> {}", prev_open, self.params.open); } } else { error!("自动参数挂了:{}", response.status()); } } Err(_) => {} } } }