| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824 |
- use std::collections::{BTreeMap, HashMap, VecDeque};
- use std::sync::Arc;
- use chrono::Utc;
- use futures_channel::mpsc::UnboundedSender;
- use futures_util::StreamExt;
- use rust_decimal::prelude::*;
- use rust_decimal_macros::dec;
- use tokio::sync::Mutex;
- use tracing::info;
- use global::cci::CentralControlInfo;
- use global::fixed_time_range_deque::FixedTimeRangeDeque;
- use global::params::Params;
- use standard::{Depth, Record, Ticker, Trade};
- use crate::utils;
- #[derive(Debug, Clone)]
- pub struct Predictor {
- // MeanReversion相关
- pub close_price_vec: VecDeque<Record>,
- pub deviation: Decimal, // 偏离程度(与均线)
- pub deviation_vec: VecDeque<Decimal>,
- pub deviation_limit: Decimal,
- pub trend: Decimal,
- pub trend_vec: VecDeque<Decimal>,
- pub trend_limit: Decimal,
- pub amplitude: Decimal, // 振幅
- pub close_price_max: Decimal,
- pub close_price_min: Decimal,
- pub amplitude_mean: Decimal, // 振幅平均值
- pub amplitude_times: Decimal, // 当前振幅与振幅平均值的倍数
- pub amplitude_vec: VecDeque<Decimal>,
- // 价格相关
- pub mid_price: Decimal, // 中间价
- pub ask_price: Decimal, // a1
- pub bid_price: Decimal, // b1
- pub fair_price: Decimal, // 公平价
- pub last_price: Decimal, // 最后成交价
- pub depth_vec: Vec<Depth>, // 深度队列,下标是第i个参考所
- pub record_vec: VecDeque<Record>, // 蜡烛队列
- // 库存相关
- pub inventory: Decimal, // 库存,也就是q
- pub pos_amount: Decimal, // 原始持仓量
- pub pos_avg_price: Decimal, // 原始持仓价格
- pub balance: Decimal, // 余额
- pub float_balance: Decimal, // 浮动余额
- pub one_grid_order_value: Decimal, // 每一网格下单价值
- pub profit: Decimal,
- pub profit_high: Decimal, // 总体持仓利润的最大值
- // 交易信号相关
- pub trade_condition: Decimal, // 交易信号,用于保存一段时间的交易信号
- pub trade_condition_time: Decimal, // 满足时的瞬时时间,用于控制开仓行为的持续时间
- pub ask_delta: Decimal, // δa
- pub bid_delta: Decimal, // δb
- pub optimal_ask_price: Decimal, // 卖出挂单价
- pub optimal_bid_price: Decimal, // 买入挂单价
- // 公平价生成相关
- pub prev_fitting_time_vec: Vec<Decimal>, // 生成公平价的时间
- pub mid_price_vec: Vec<Decimal>, // 每个交易所的中间价
- pub fair_price_std_vec: Vec<Decimal>, // 公平价格列表,标准化之后的
- pub prices: Vec<Vec<FixedTimeRangeDeque<Decimal>>>, // [[[做市所], [参考所0]], ...]
- pub ks: Vec<Decimal>, // 拟合的斜率
- pub bs: Vec<Decimal>, // 拟合的截距
- // 网格开平仓相关
- pub trade_time_vec: VecDeque<Decimal>, // 成交时间队列
- pub trade_side_map: HashMap<Decimal, String>, // 成交时间对应的是kk,kd两个方向
- pub trade_amount_map: HashMap<Decimal, Decimal>, // 成交对应的交易数量
- pub trade_price_map: HashMap<Decimal, Decimal>, // 成交对应的交易价格
- pub ready_close_amount: Decimal, // 预计平仓数量
- // 系统相关变量
- pub is_ready: bool, // 是否已准备好
- pub params: Params,
- // 马尔可夫链相关
- // pub prev_update_state_time: Decimal,
- // pub state: usize,
- // pub state_matrix: Vec<Vec<Decimal>>,
- // debug、图表相关
- pub debug_sender: UnboundedSender<Vec<Decimal>>,
- pub prev_plot_time: Decimal,
- }
- impl Predictor {
- const DONT_VIEW: Decimal = dec!(14142135623730951);
- pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
- if params.close.is_zero() {
- panic!("做市策略特殊逻辑要求平仓距离不得为0。");
- }
- // 创建数据通道
- // 创建一个无界通道
- let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
- let account_port = params.port.clone();
- tokio::spawn(async move {
- let len = 17usize;
- let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
- let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
- while let Some(value) = rx.next().await {
- // 数据填充到对应位置
- // // 第一步:获取插入位置, 这里有bug,持仓推送并不连续,有时候会导致图表显示错误……
- // let target_ts = value[0]; // 时间戳在values[0]
- // let insert_pos = debugs[0]
- // .binary_search_by(|ts| {
- // ts.as_ref() // 解包Option
- // .expect("Timestamp cannot be None")
- // .cmp(&target_ts)
- // })
- // .unwrap_or_else(|e| e);
- // 第二步:执行插入操作
- for i in 0..debugs.len() {
- let value = value.get(i).cloned().unwrap_or(Self::DONT_VIEW);
- // 其他队列按规则插入
- let elem = if value == Self::DONT_VIEW {
- None
- } else {
- Some(value)
- };
- debugs[i].push_back(elem)
- }
- // 长度限制
- if debugs[0].len() > 500_000 {
- for i in 0..len {
- debugs[i].pop_front(); // 从前面移除元素
- }
- }
- let now = Decimal::from(Utc::now().timestamp_millis());
- if now - prev_save_time < dec!(30000) {
- continue;
- }
- let debugs_clone = debugs.clone();
- let temp_html_str = tokio::task::spawn_blocking(move || {
- utils::build_html_file(&debugs_clone)
- }).await.unwrap();
- let path = format!("./db/{}.html", account_port);
- utils::write_to_file(&temp_html_str, path).await;
- prev_save_time = Decimal::from(Utc::now().timestamp_millis());
- }
- });
- let predictor = Self {
- depth_vec: vec![Depth::new(); params.ref_exchange.len()],
- record_vec: Default::default(),
- fair_price_std_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
- mid_price_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
- prices: vec![vec![FixedTimeRangeDeque::new(600_000_000); 2]; params.ref_exchange.len()],
- ks: vec![Decimal::ZERO; params.ref_exchange.len()],
- bs: vec![Decimal::ZERO; params.ref_exchange.len()],
- prev_fitting_time_vec: vec![Decimal::ZERO; params.ref_exchange.len()],
- close_price_vec: Default::default(),
- deviation_vec: Default::default(),
- deviation: Default::default(),
- trend: Default::default(),
- trend_vec: Default::default(),
- trend_limit: Default::default(),
- amplitude: Default::default(),
- close_price_max: Default::default(),
- close_price_min: Default::default(),
- amplitude_mean: Default::default(),
- amplitude_times: Default::default(),
- amplitude_vec: Default::default(),
- mid_price: Default::default(),
- ask_price: Default::default(),
- bid_price: Default::default(),
- fair_price: Default::default(),
- last_price: Default::default(),
- optimal_ask_price: Self::DONT_VIEW,
- optimal_bid_price: Self::DONT_VIEW,
- ask_delta: dec!(-2),
- bid_delta: dec!(-2),
- is_ready: false,
- inventory: Default::default(),
- pos_avg_price: Default::default(),
- pos_amount: Default::default(),
- balance: Default::default(),
- float_balance: Default::default(),
- one_grid_order_value: Default::default(),
- profit: Default::default(),
- profit_high: Default::default(),
- trade_condition: Default::default(),
- trade_condition_time: Default::default(),
- prev_plot_time: Default::default(),
- params,
- debug_sender: tx,
- trade_time_vec: VecDeque::new(),
- trade_side_map: HashMap::new(),
- trade_amount_map: HashMap::new(),
- trade_price_map: HashMap::new(),
- ready_close_amount: Default::default(),
- // prev_update_state_time: Default::default(),
- // state: 1,
- // state_matrix: vec![
- // vec![dec!(0.33), dec!(0.33), dec!(0.33)],
- // vec![dec!(0.33), dec!(0.33), dec!(0.33)],
- // vec![dec!(0.33), dec!(0.33), dec!(0.33)],
- // ],
- deviation_limit: Default::default()
- };
- predictor
- }
- fn calculate_quantile(data: &VecDeque<Decimal>, quantile: f64) -> Option<Decimal> {
- if data.is_empty() {
- return None;
- }
- // 将数据复制到一个向量中,以便排序
- let mut sorted_data: Vec<Decimal> = data.iter().cloned().collect();
- sorted_data.sort();
- // 计算分位数的索引位置
- let position = (quantile * (sorted_data.len() - 1) as f64).round() as usize;
- // 返回对应位置的值
- Some(sorted_data[position])
- }
- pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
- if index == 233 {
- self.ask_price = depth.asks[0].price;
- self.bid_price = depth.bids[0].price;
- self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
- // 计算利润(预估)
- if !self.inventory.is_zero() {
- if !self.pos_avg_price.is_zero() {
- let hold_value = self.pos_avg_price * self.pos_amount;
- self.profit = if self.inventory > Decimal::ZERO {
- hold_value * (self.mid_price - self.pos_avg_price) / self.pos_avg_price
- } else {
- hold_value * (self.pos_avg_price - self.mid_price) / self.pos_avg_price
- };
- self.float_balance = self.balance + self.profit;
- self.float_balance.rescale(4);
- }
- self.profit.rescale(6);
- if self.profit_high < self.profit {
- self.profit_high = self.profit
- }
- }
- // 秒级k线处理,先只用处理收盘价就行
- let r = Record {
- time: Decimal::from(Utc::now().timestamp_millis()),
- open: self.mid_price,
- high: self.mid_price,
- low: self.mid_price,
- close: self.mid_price,
- volume: Default::default(),
- symbol: "".to_string(),
- };
- let is_need_push = self.close_price_vec.len() == 0
- || r.time - self.close_price_vec.iter().last().unwrap().time > Decimal::ONE_THOUSAND
- ;
- if is_need_push {
- self.close_price_vec.push_back(r);
- if self.close_price_vec.len() > 360 { self.close_price_vec.pop_front(); }
- let len = self.close_price_vec.len();
- // 求短期涨跌幅的相关数据
- if len >= 10 {
- let mut i = len - 1;
- let mut sum: Decimal = Decimal::ZERO;
- loop {
- sum += self.close_price_vec.get(i).unwrap().close;
- if i == len - 10 {
- break
- }
- i = i - 1;
- }
- let mean_10 = sum / Decimal::from(10);
- self.deviation = (self.mid_price - mean_10) / mean_10;
- self.deviation.rescale(8);
- } else {
- self.deviation = Decimal::ZERO;
- }
- if self.deviation > -self.params.open {
- self.deviation = Decimal::ZERO
- }
- self.deviation_vec.push_back(self.deviation);
- if self.deviation_vec.len() > 600 { self.deviation_vec.pop_front(); }
- self.deviation_limit = match Self::calculate_quantile(&self.deviation_vec, 0.1f64) {
- None => {
- Decimal::ZERO
- }
- Some(limit) => {
- limit
- }
- };
- // 求短期均值
- let mean_short;
- if len >= 5 {
- let mut i = len - 1;
- let mut sum: Decimal = Decimal::ZERO;
- loop {
- sum += self.close_price_vec.get(i).unwrap().close;
- if i == len - 5 {
- break
- }
- i = i - 1;
- }
- mean_short = sum / Decimal::from(5);
- } else {
- mean_short = self.mid_price;
- }
- // 求长周期的均值
- let mean_long;
- if len >= 50 {
- let mut i = len - 1;
- let mut sum: Decimal = Decimal::ZERO;
- loop {
- sum += self.close_price_vec.get(i).unwrap().close;
- if i == len - 50 {
- break
- }
- i = i - 1;
- }
- mean_long = sum / Decimal::from(50);
- } else {
- mean_long = self.mid_price;
- }
- // 趋势计算相关
- self.trend = mean_short / mean_long;
- self.trend_vec.push_back(self.trend);
- if self.trend_vec.len() > 600 { self.trend_vec.pop_front(); }
- self.trend_limit = match Self::calculate_quantile(&self.trend_vec, 0.5f64) {
- None => {Decimal::ZERO}
- Some(limit) => {limit}
- };
- // 振幅计算相关
- self.close_price_max = *self.close_price_vec.iter()
- .map(|record| &record.close)
- .max()
- .unwrap_or(&Decimal::ZERO);
- self.close_price_min = *self.close_price_vec.iter()
- .map(|record| &record.close)
- .min()
- .unwrap_or(&Decimal::ZERO);
- self.amplitude = self.close_price_max / self.close_price_min - Decimal::ONE;
- self.amplitude_vec.push_back(self.amplitude);
- if self.amplitude_vec.len() > 120 { self.amplitude_vec.pop_front(); }
- self.amplitude_mean = self.amplitude_vec.iter().sum::<Decimal>() / Decimal::from(self.amplitude_vec.len());
- self.amplitude_times = if self.amplitude_mean.is_zero() {
- Decimal::ONE
- } else {
- self.amplitude / self.amplitude_mean
- };
- self.amplitude_times.rescale(4);
- }
- // 拟合k与b
- for (mid_index, mp) in self.mid_price_vec.iter().enumerate() {
- if mp.is_zero() {
- continue
- }
- self.prices[mid_index][0].push_back(self.mid_price);
- self.prices[mid_index][1].push_back(mp.clone());
- // 拟合,60s拟合一次
- let before_fitting = Utc::now().timestamp_millis();
- if Decimal::from(before_fitting) - self.prev_fitting_time_vec[mid_index] > dec!(60_000) || self.prices[mid_index][0].len() < 1000 {
- // if Decimal::from(before_fitting) - self.prev_fitting_time_vec[mid_index] > dec!(60_000) {
- // info!("{}, {},", mid_index, self.prices[mid_index][0].len());
- // }
- if let Some((k, b)) = self.linear_least_squares(mid_index).await {
- self.ks[mid_index] = k;
- self.bs[mid_index] = b;
- self.prev_fitting_time_vec[mid_index] = Decimal::from(before_fitting)
- } else {
- return;
- }
- }
- }
- } else {
- self.depth_vec[index] = depth.clone();
- let latest_price = (depth.asks[0].price + depth.bids[0].price) / Decimal::TWO;
- self.update_fair_price(&latest_price, index).await;
- }
- if self.mid_price.is_zero() {
- return;
- }
- self.processor(depth.time, false).await;
- }
- pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
- // index == 233代表做市所
- // index == 0,1,2,3...代表参考所
- self.last_price = trade.price;
- }
- pub async fn on_record(&mut self, record: &Record) {
- // 添加新蜡烛
- if self.record_vec.len() == 0 {
- self.record_vec.push_back(record.clone());
- } else {
- let last_record = self.record_vec.back_mut().unwrap();
- if last_record.time == record.time {
- *last_record = record.clone();
- } else if last_record.time < record.time {
- self.record_vec.push_back(record.clone());
- }
- }
- if self.record_vec.len() > 5 {
- self.record_vec.pop_front();
- }
- }
- // side, pk,pd从HashMap移除,kd,kk添加到HashMap
- pub async fn on_order(&mut self, side: String, amount: Decimal, price: Decimal) {
- self.inventory = match side.as_str() {
- "kk" => {
- self.inventory - Decimal::ONE
- }
- "pd" => {
- self.inventory - Decimal::ONE
- }
- "kd" => {
- self.inventory + Decimal::ONE
- }
- "pk" => {
- self.inventory + Decimal::ONE
- }
- &_ => {
- panic!("不认识的order方向:{}", side);
- }
- };
- if side == "kk" || side == "kd" {
- let now = Decimal::from(Utc::now().timestamp_millis());
- self.trade_time_vec.push_back(now);
- self.trade_side_map.insert(now, side);
- self.trade_amount_map.insert(now, amount);
- self.trade_price_map.insert(now, price);
- } else if side == "pd" || side == "pk" {
- if self.trade_time_vec.len() > 0 {
- let pop_time = self.trade_time_vec.pop_front().unwrap();
- self.trade_side_map.remove(&pop_time);
- self.trade_amount_map.remove(&pop_time);
- self.trade_price_map.remove(&pop_time);
- }
- }
- info!(?self.trade_time_vec);
- info!(?self.trade_side_map);
- info!(?self.trade_amount_map);
- info!(?self.trade_price_map);
- }
- pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
- pub async fn on_balance(&mut self, balance: Decimal) {
- if self.inventory.is_zero() {
- self.balance = balance;
- self.float_balance = self.balance;
- self.one_grid_order_value = (self.params.lever_rate * self.balance) / self.params.grid;
- }
- }
- pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, _min_amount_value: &Decimal, update_time: Decimal) {
- if self.mid_price.is_zero() {
- return;
- }
- let prev_pos_amount = self.pos_amount;
- self.pos_amount = pos_amount.clone();
- self.pos_avg_price = pos_avg_price.clone();
- if prev_pos_amount != self.pos_amount {
- // 重置连续信号
- self.trade_condition = Decimal::ZERO;
- self.trade_condition_time = Decimal::ZERO;
- // 平仓
- if self.pos_amount.is_zero() {
- self.profit = Decimal::ZERO;
- self.profit_high = Decimal::ZERO;
- }
- self.processor(update_time, false).await;
- }
- }
- pub async fn update_fair_price(&mut self, latest_price: &Decimal, index: usize) {
- if self.mid_price.is_zero() {
- return;
- }
- let mut mp = latest_price.clone();
- mp.rescale(self.mid_price.scale());
- self.mid_price_vec[index] = mp;
- // 生成fp
- self.fair_price_std_vec[index] = mp * self.ks[index] + self.bs[index];
- self.fair_price_std_vec[index].rescale(self.mid_price.scale());
- // 生成最终用于挂单的公平价格
- let fair_price_sum: Decimal = self.fair_price_std_vec.iter().sum();
- let fair_price_count = self.fair_price_std_vec.iter()
- .filter(|&&value| value != Decimal::new(0, 0)) // 过滤掉0
- .count();
- if fair_price_count != 0 {
- self.fair_price = if self.fair_price.is_zero() {
- fair_price_sum / Decimal::from(fair_price_count)
- } else {
- dec!(0.9) * self.fair_price + dec!(0.1) * fair_price_sum / Decimal::from(fair_price_count)
- };
- }
- }
- pub async fn update_delta(&mut self) -> bool {
- if self.mid_price.is_zero() {
- return false;
- }
- // 开单信号处理
- let prev_bid_delta = self.bid_delta;
- let prev_ask_delta = self.ask_delta;
- self.bid_delta = dec!(-2);
- self.ask_delta = dec!(-2);
- self.optimal_ask_price = Self::DONT_VIEW;
- self.optimal_bid_price = Self::DONT_VIEW;
- let now = Decimal::from(Utc::now().timestamp_millis());
- // 平仓优先级高一些
- if self.trade_time_vec.len() > 0 {
- let first = self.trade_time_vec.front().unwrap();
- if now - *first > self.params.holding_time * Decimal::ONE_THOUSAND {
- let side = self.trade_side_map.get(first).unwrap();
- self.ready_close_amount = if self.trade_time_vec.len() == 1 {
- self.pos_amount
- } else {
- self.trade_amount_map.get(first).unwrap().clone()
- };
- match side.as_str() {
- "kd" => {
- self.ask_delta = self.mid_price * self.params.close;
- self.optimal_ask_price = self.mid_price + self.ask_delta;
- }
- "kk" => {
- self.bid_delta = self.mid_price * self.params.close;
- self.optimal_bid_price = self.mid_price - self.bid_delta;
- }
- &_ => {
- panic!("什么方向放进来了?side={}", side);
- }
- }
- }
- }
- let prev_order_price = if self.trade_price_map.len() > 0 {
- self.trade_price_map.get(self.trade_time_vec.back().unwrap()).unwrap().clone()
- } else {
- Decimal::MAX
- };
- let is_open_long = self.inventory < self.params.grid
- // && self.fair_price > self.mid_price * dec!(1.0002)
- && self.amplitude_times > dec!(1.5)
- && self.deviation < self.deviation_limit
- && self.trend < self.trend_limit
- // AS加仓法的处理
- && (self.mid_price / prev_order_price - Decimal::ONE) < dec!(-0.0005) * self.inventory.abs()
- ;
- let is_open_short = self.inventory.is_zero()
- && false
- ;
- // 使信号有一定持续性
- if is_open_long {
- self.trade_condition = dec!(3);
- self.trade_condition_time = now;
- // info!(?self.mid_price, ?prev_order_price, ?self.inventory, ?self.trade_condition);
- }
- if is_open_short {
- self.trade_condition = dec!(4);
- self.trade_condition_time = now;
- }
- // 开仓信号要过期处理
- if (self.trade_condition == dec!(3) || self.trade_condition == dec!(4))
- && now - self.trade_condition_time > dec!(60_000) {
- self.trade_condition = Decimal::ZERO;
- }
- if self.trade_condition == dec!(3) {
- self.bid_delta = dec!(0);
- self.ask_delta = dec!(-2);
- self.optimal_bid_price = self.mid_price;
- self.optimal_ask_price = Self::DONT_VIEW;
- } else if self.trade_condition == dec!(4) {
- self.ask_delta = dec!(0);
- self.bid_delta = dec!(-2);
- self.optimal_ask_price = self.mid_price;
- self.optimal_bid_price = Self::DONT_VIEW;
- }
- // 价格处理
- self.optimal_ask_price.rescale(self.mid_price.scale());
- self.optimal_bid_price.rescale(self.mid_price.scale());
- // 返回方向是否改变过,有改变可以立即在图表上显示
- prev_ask_delta != self.ask_delta || prev_bid_delta != self.bid_delta
- }
- pub fn check_ready(&mut self) {
- if self.is_ready {
- return;
- }
- if self.mid_price.is_zero() {
- return;
- }
- for fair_price in &self.fair_price_std_vec {
- if fair_price.is_zero() {
- return;
- }
- }
- if self.optimal_ask_price.is_zero() {
- return;
- }
- if self.optimal_bid_price.is_zero() {
- return;
- }
- if self.balance.is_zero() {
- return;
- }
- self.is_ready = true;
- info!("========================================行情数据预热完毕==================================")
- }
- // 最小二乘法拟合函数,支持VecDeque
- pub async fn linear_least_squares(&self, index: usize) -> Option<(Decimal, Decimal)> {
- let x = &self.prices[index][1];
- let y = &self.prices[index][0];
- // 检查数组长度是否相同
- if x.len() != y.len() {
- return None;
- }
- let n = x.len();
- if n == 0 {
- return None;
- }
- let mut sum_x = Decimal::zero();
- let mut sum_y = Decimal::zero();
- let mut sum_xx = Decimal::zero();
- let mut sum_xy = Decimal::zero();
- // 遍历VecDeque中的元素
- for (xi, yi) in x.deque.iter().zip(y.deque.iter()) {
- sum_x += xi;
- sum_y += yi;
- let xi_sq = xi * xi;
- sum_xx += xi_sq;
- sum_xy += xi * yi;
- }
- // 计算分子和分母
- let numerator = sum_xy - (sum_x * sum_y) / Decimal::from(n);
- let denominator = sum_xx - (sum_x * sum_x) / Decimal::from(n);
- // 如果分母为0,返回None
- if denominator == Decimal::zero() {
- return None;
- }
- let k = numerator / denominator;
- let mean_x = sum_x / Decimal::from(n);
- let mean_y = sum_y / Decimal::from(n);
- let b = mean_y - k * mean_x;
- Some((k, b))
- }
- // #[instrument(skip(self), level="TRACE")]
- async fn processor(&mut self, _data_time: Decimal, is_hard_update: bool) {
- self.check_ready();
- if !self.is_ready {
- return;
- }
- let is_delta_changed = self.update_delta().await;
- // let cci_arc = self.cci_arc.clone();
- let now = Decimal::from(Utc::now().timestamp_millis());
- let mid_price = self.mid_price;
- let ask_price = if self.params.ref_exchange.len() > 0 {
- // self.fair_price_std_vec[0]
- Self::DONT_VIEW
- } else {
- Self::DONT_VIEW
- };
- let bid_price = if self.params.ref_exchange.len() > 1 {
- // self.fair_price_std_vec[1]
- Self::DONT_VIEW
- } else {
- Self::DONT_VIEW
- };
- let optimal_ask_price = self.optimal_ask_price;
- let optimal_bid_price = self.optimal_bid_price;
- let last_price = Self::DONT_VIEW;
- let fair_price = self.fair_price;
- let spread = Self::DONT_VIEW;
- let spread_max = self.deviation;
- let spread_min = self.deviation_limit;
- let inventory = self.inventory;
- let sigma_square = self.trend;
- let gamma = self.balance;
- let kappa = self.float_balance;
- let flow_ratio = Decimal::ZERO;
- let is_time_over_update = now - self.prev_plot_time > dec!(500);
- if !is_time_over_update && !is_hard_update && !is_delta_changed {
- return;
- }
- if is_time_over_update {
- self.prev_plot_time = now
- }
- let pos_avg_price = self.pos_avg_price;
- self.debug_sender.unbounded_send(vec![
- now,
- mid_price,
- ask_price,
- bid_price,
- last_price,
- spread,
- spread_max,
- spread_min,
- optimal_ask_price,
- optimal_bid_price,
- inventory,
- sigma_square,
- gamma,
- kappa,
- flow_ratio,
- fair_price,
- pos_avg_price
- ]).unwrap();
- }
- // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
- pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
- vec![]
- }
- }
|