||
- use std::cmp::{max, min};
- use std::collections::{BTreeMap, HashMap, VecDeque};
- use std::sync::Arc;
- use chrono::{Utc};
- use futures_channel::mpsc::UnboundedSender;
- use futures_util::StreamExt;
- use reqwest::Client;
- use rust_decimal::prelude::*;
- use rust_decimal_macros::dec;
- use serde_json::{json, Value};
- use tokio::sync::{Mutex};
- use tracing::{error, info};
- use global::cci::CentralControlInfo;
- use global::fixed_time_range_deque::FixedTimeRangeDeque;
- use global::params::Params;
- use standard::{Depth, ForceOrder, Record, Ticker, Trade};
- use crate::utils;
- #[derive(Debug, Clone)]
- pub struct Predictor {
- pub depth_vec: Vec<Depth>, // 深度队列
- pub record_vec: VecDeque<Record>, // 蜡烛队列
- pub mid_price: Decimal, // 中间价
- pub ask_price: Decimal, // 卖一价
- pub bid_price: Decimal, // 买一价
- pub last_price: Decimal, // 最后成交价
- pub optimal_ask_price: Decimal, // 卖出挂单价
- pub optimal_bid_price: Decimal, // 买入挂单价
- pub profit_point: Decimal, // 利润点数
- pub profit_point_ema: Decimal, // 利润点数的ema
- pub profit_point_vec: Vec<Decimal>, // 利润队列
- pub inventory: Decimal, // 库存,也就是q
- pub pos_amount: Decimal, // 原始持仓量
- pub pos_avg_price: Decimal, // 原始持仓价格
- pub level: Decimal, // martin
- pub money_flow: Decimal, // 资金流
- pub ask_delta: Decimal, // δa
- pub bid_delta: Decimal, // δb
- pub force_order_time_vec: FixedTimeRangeDeque<ForceOrder>, // 爆仓单队列
- pub force_order_value: Decimal, // 爆仓单交易量
- pub fair_price_vec: Vec<Decimal>, // 公平价格列表,0表示做市所,1表示参考所
- pub fair_price: Decimal, // 公平价格
- pub fair_price_ema_short: Decimal, // 公平价格_ema
- pub fair_price_ema_long: Decimal, // 公平价格_ema
- pub mid_rate_focus_open: Decimal, // 变化幅度焦点
- pub mid_price_focus_open: Decimal, // 观测焦点时的价格
- pub mid_rate_focus_close: Decimal, // 变化幅度焦点
- pub fair_price_focus_close: Decimal, // 观测焦点时的价格
- pub fair_price_when_ordering: Decimal, // 下单时的公平价格
- pub price_times_avg: Decimal, // 公平所与做市所的价格倍率的平均值
- pub is_ready: bool, // 是否已准备好
- pub balance: Decimal, // 当前资金
- pub one_grid_order_value: Decimal, // 每一网格下单价值
- pub prev_trade_force_order_value: Decimal, // 上次下单时的爆仓价值ln
- pub prev_trade_price: Decimal, // 上次加仓的价格
- pub prev_trade_time: Decimal, // 上次交易时间,也就是t
- pub t_diff: Decimal, // (T-t)
- pub last_update_time: Decimal, // 最后更新时间(depth)
- pub last_index: Decimal, // 最后更新的index
- pub prev_insert_time: Decimal,
- pub prev_save_time: Decimal,
- pub init_time: Decimal,
- pub params: Params,
- pub debug_sender: UnboundedSender<Vec<Decimal>>,
- pub trade_time_vec: VecDeque<Decimal>, // 交易时间队列
- pub trade_side_map: HashMap<Decimal, String>, // 交易时间,对应的是kk,kd两个方向
- pub trade_amount_map: HashMap<Decimal, Decimal>, // 交易数量
- pub trade_amount: Decimal,
- pub prev_flush_state_time: Decimal,
- pub state: usize,
- pub state_matrix: Vec<Vec<Decimal>>,
- pub spread: Decimal, // 当前价差
- pub spread_ema: Decimal, // 价差的sma,默认是sma5000
- }
- impl Predictor {
- // 时间窗口大小(微秒)
- // const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
- // const TIME_DIFF_RANGE_MICROS: i64 = 15 * 60_000_000;
- // const TRADE_LONG_RANGE_MICROS: i64 = 5 * 60_000_000;
- // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
- // const TRADE_SHORT_RANGE_MICROS: i64 = 2 * 60_000_000;
- // const ONE_MILLION: Decimal = dec!(1_000_000);
- // const TWENTY_THOUSAND: Decimal = dec!(20_000);
- const DONT_VIEW: Decimal = dec!(14142135623730951);
- pub fn new(_cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
- if params.close.is_zero() {
- panic!("做市策略特殊逻辑要求平仓距离不得为0。");
- }
- // 创建数据通道
- // 创建一个无界通道
- let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
- let account_port = params.port.clone();
- tokio::spawn(async move {
- let len = 16usize;
- let mut prev_save_time = Decimal::from(Utc::now().timestamp_millis());
- let mut debugs: Vec<VecDeque<Option<Decimal>>> = vec![VecDeque::new(); len];
- while let Some(value) = rx.next().await {
- // 数据填充到对应位置
- for i in 0..len {
- if value[i] == Self::DONT_VIEW {
- debugs[i].push_back(None);
- } else {
- debugs[i].push_back(Some(value[i]));
- }
- }
- // 长度限制
- if debugs[0].len() > 500_000 {
- for i in 0..len {
- debugs[i].pop_front(); // 从前面移除元素
- }
- }
- let now = Decimal::from(Utc::now().timestamp_millis());
- if now - prev_save_time < dec!(30000) {
- continue;
- }
- let debugs_clone = debugs.clone();
- let temp_html_str = tokio::task::spawn_blocking(move || {
- utils::build_html_file(&debugs_clone)
- }).await.unwrap();
- let path = format!("./db/{}.html", account_port);
- utils::write_to_file(&temp_html_str, path).await;
- prev_save_time = Decimal::from(Utc::now().timestamp_millis());
- }
- });
- let predictor = Self {
- // 接针版本
- depth_vec: vec![Depth::new(); 10],
- fair_price_vec: vec![Decimal::ZERO; 10],
- // 老的队列
- profit_point_vec: vec![],
- record_vec: VecDeque::new(),
- mid_price: Default::default(),
- ask_price: Default::default(),
- bid_price: Default::default(),
- last_price: Default::default(),
- optimal_ask_price: Default::default(),
- optimal_bid_price: Default::default(),
- inventory: Default::default(),
- ask_delta: Default::default(),
- bid_delta: Default::default(),
- force_order_time_vec: FixedTimeRangeDeque::new(30 * 1_000_000),
- force_order_value: Default::default(),
- fair_price: Default::default(),
- fair_price_ema_short: Default::default(),
- fair_price_ema_long: Default::default(),
- mid_rate_focus_open: Default::default(),
- mid_price_focus_open: Default::default(),
- mid_rate_focus_close: Default::default(),
- fair_price_focus_close: Default::default(),
- fair_price_when_ordering: Default::default(),
- price_times_avg: Default::default(),
- is_ready: false,
- balance: Default::default(),
- one_grid_order_value: Default::default(),
- prev_trade_force_order_value: Default::default(),
- prev_trade_price: Default::default(),
- prev_trade_time: Default::default(),
- t_diff: Default::default(),
- level: Default::default(),
- pos_amount: Default::default(),
- money_flow: Default::default(),
- profit_point: Default::default(),
- profit_point_ema: Default::default(),
- last_update_time: Default::default(),
- last_index: Default::default(),
- pos_avg_price: Default::default(),
- prev_insert_time: Default::default(),
- prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
- init_time: Decimal::from(Utc::now().timestamp_millis()),
- params,
- debug_sender: tx,
- trade_time_vec: VecDeque::new(),
- trade_side_map: HashMap::new(),
- trade_amount_map: HashMap::new(),
- trade_amount: Default::default(),
- prev_flush_state_time: Default::default(),
- state: 1,
- state_matrix: vec![
- vec![dec!(0.33), dec!(0.33), dec!(0.33)],
- vec![dec!(0.33), dec!(0.33), dec!(0.33)],
- vec![dec!(0.33), dec!(0.33), dec!(0.33)],
- ],
- spread: Default::default(),
- spread_ema: Default::default(),
- };
- predictor
- }
- pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
- self.last_update_time = depth.time;
- self.last_index = Decimal::from(index);
- if index == 0 {
- self.ask_price = depth.asks[0].price;
- self.bid_price = depth.bids[0].price;
- self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
- }
- self.update_fair_price(depth, index).await;
- self.depth_vec[index] = depth.clone();
- if index == 1 {
- self.update_spread();
- }
- if self.mid_price.is_zero() {
- return;
- }
- self.processor(false).await;
- }
- pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
- self.last_price = trade.price;
- // self.processor().await;
- }
- pub async fn on_force_order(&mut self, force_order: ForceOrder) {
- // match self.force_order_time_vec.deque.iter().last() {
- // Some(last) => {
- // // 有的交易所会重复推,这样做个容错处理
- // if force_order.time != last.time && force_order.value != last.value {
- // self.force_order_time_vec.push_back(force_order);
- // }
- // }
- // None => {
- // self.force_order_time_vec.push_back(force_order);
- // }
- // }
- if (force_order.value > Decimal::ONE && force_order.value.abs().ln() > self.force_order_value.abs()) || force_order.value * self.force_order_value < Decimal::ZERO {
- self.force_order_value = if force_order.value > Decimal::ONE {
- force_order.value.ln()
- } else if force_order.value < Decimal::NEGATIVE_ONE {
- -force_order.value.abs().ln()
- } else {
- Decimal::ZERO
- };
- self.force_order_value.rescale(2);
- // 区分是哪个所来的数据
- self.force_order_value += force_order.value.fract() * dec!(0.01);
- self.processor(true).await;
- }
- }
- // side, pk,pd从HashMap移除,kd,kk添加到HashMap
- pub async fn on_order(&mut self, side: String, amount: Decimal) {
- let prev_inventory = self.inventory;
- self.inventory = match side.as_str() {
- "kk" => {
- self.inventory - Decimal::ONE
- }
- "pd" => {
- self.inventory - Decimal::ONE
- }
- "kd" => {
- self.inventory + Decimal::ONE
- }
- "pk" => {
- self.inventory + Decimal::ONE
- }
- &_ => {
- panic!("不认识的order方向:{}", side);
- }
- };
- // 重置一些计算
- if prev_inventory != self.inventory && self.inventory.is_zero() {
- self.profit_point_vec.clear();
- self.profit_point = Decimal::ZERO;
- self.profit_point_ema = Decimal::ZERO;
- self.prev_trade_force_order_value = Decimal::ZERO;
- self.prev_trade_price = Decimal::ZERO;
- }
- if side == "kk" || side == "kd" {
- let now = Decimal::from(Utc::now().timestamp_millis());
- self.trade_time_vec.push_back(now);
- self.trade_side_map.insert(now, side);
- self.trade_amount_map.insert(now, amount);
- self.prev_trade_time = Decimal::from(Utc::now().timestamp_millis());
- self.prev_trade_price = self.mid_price;
- self.prev_trade_force_order_value = self.force_order_value;
- self.force_order_value = Decimal::ZERO;
- } else if side == "pd" || side == "pk" {
- let pop_time = self.trade_time_vec.pop_front().unwrap();
- self.trade_side_map.remove(&pop_time);
- self.trade_amount_map.remove(&pop_time);
- }
- info!(?self.trade_time_vec);
- info!(?self.trade_side_map);
- info!(?self.trade_amount_map);
- }
- pub async fn update_level(&mut self) {
- self.level = (Decimal::NEGATIVE_ONE + (Decimal::ONE + dec!(8) * self.inventory.abs()).sqrt().unwrap()) / Decimal::TWO;
- self.level = min(self.level, dec!(6));
- }
- pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
- pub async fn on_record(&mut self, _record: &Record) {}
- pub async fn on_balance(&mut self, balance: Decimal) {
- self.balance = balance;
- if self.inventory.is_zero() {
- self.one_grid_order_value = (self.params.lever_rate * self.balance) / self.params.grid;
- }
- }
- pub async fn on_inventory(&mut self, pos_amount: &Decimal, pos_avg_price: &Decimal, _min_amount_value: &Decimal) {
- if self.mid_price.is_zero() {
- return;
- }
- self.pos_amount = pos_amount.clone();
- self.pos_avg_price = pos_avg_price.clone();
- self.update_level().await;
- self.processor(true).await;
- }
- pub fn get_real_rate(price_vec: &FixedTimeRangeDeque<Decimal>) -> (Decimal, Decimal, Decimal) {
- let last_fair_price = price_vec.deque.iter().last().unwrap();
- let min_price = price_vec.deque.iter().min().unwrap();
- let max_price = price_vec.deque.iter().max().unwrap();
- let up_rate = (last_fair_price - min_price) / min_price;
- let down_rate = (max_price - last_fair_price) / max_price;
- if up_rate > down_rate {
- (up_rate, min_price.clone(), max_price.clone())
- } else {
- (-down_rate, min_price.clone(), max_price.clone())
- }
- }
- pub async fn update_fair_price(&mut self, depth: &Depth, index: usize) {
- if self.mid_price.is_zero() {
- return;
- }
- let a1 = &depth.asks[0];
- let b1 = &depth.bids[0];
- // https://quant.stackexchange.com/questions/50651/how-to-understand-micro-price-aka-weighted-mid-price
- let total = a1.value + b1.value;
- let fair_price = a1.price * b1.value / total + b1.price * a1.value / total;
- // let fair_price = (a1.price + b1.price) / Decimal::TWO;
- self.fair_price_vec[index] = if self.fair_price_vec[index].is_zero() {
- fair_price
- } else {
- self.fair_price_vec[index] * dec!(0.5) + fair_price * dec!(0.5)
- };
- self.fair_price_vec[index].rescale(self.mid_price.scale());
- // 合成公平价格
- if !self.fair_price_vec[0].is_zero() && !self.fair_price_vec[1].is_zero() {
- self.price_times_avg = if self.price_times_avg.is_zero() {
- self.fair_price_vec[1] / self.fair_price_vec[0]
- } else {
- self.price_times_avg * dec!(0.9995) + dec!(0.0005) * self.fair_price_vec[1] / self.fair_price_vec[0]
- };
- // 进行价格归一化处理,公平所的价格有可能是带前缀的
- // let fair_price_part0 = self.fair_price_vec[0] * dec!(0.2);
- // let fair_price_part1 = (self.fair_price_vec[1] / self.price_times_avg) * dec!(0.8);
- self.fair_price = self.fair_price_vec[1] / self.price_times_avg;
- self.fair_price_ema_long = if self.fair_price_ema_long.is_zero() {
- self.fair_price
- } else {
- self.fair_price_ema_long * dec!(0.67) + self.fair_price * dec!(0.33)
- };
- self.fair_price_ema_short = if self.fair_price_ema_short.is_zero() {
- self.fair_price
- } else {
- self.fair_price_ema_short * dec!(0.999) + self.fair_price * dec!(0.001)
- };
- }
- // // 判断价格是否回归
- // if !self.is_regressed && self.inventory > Decimal::ZERO && self.spread_sma_1000 < max(self.spread_sma, self.spread_sma_2000) {
- // self.is_regressed = true
- // } else if !self.is_regressed && self.inventory < Decimal::ZERO && self.spread_sma_1000 > min(self.spread_sma, self.spread_sma_2000) {
- // self.is_regressed = true
- // }
- }
- pub fn update_spread(&mut self) {
- let depth = &self.depth_vec[1];
- self.spread = depth.asks[0].price - depth.bids[0].price;
- self.spread_ema = if self.spread_ema.is_zero() {
- self.spread
- } else {
- self.spread_ema * dec!(0.999) + self.spread * dec!(0.001)
- };
- }
- pub async fn update_delta(&mut self) {
- if self.fair_price.is_zero() {
- return;
- }
- // 根据马尔科夫链调整挂单策略
- let now = Decimal::from(Utc::now().timestamp_millis());
- if now - self.prev_flush_state_time > dec!(60_000) {
- self.update_state_matrix().await;
- self.prev_flush_state_time = now;
- }
- self.bid_delta = dec!(-2);
- self.ask_delta = dec!(-2);
- // 平仓优先级高一些
- if self.trade_time_vec.len() > 0 {
- let first = self.trade_time_vec.front().unwrap();
- if now - *first > self.params.holding_time * Decimal::ONE_THOUSAND {
- let side = self.trade_side_map.get(first).unwrap();
- self.trade_amount = self.trade_amount_map.get(first).unwrap().clone();
- match side.as_str() {
- "kd" => {
- self.ask_delta = self.mid_price * self.params.close;
- }
- "kk" => {
- self.bid_delta = self.mid_price * self.params.close;
- }
- &_ => {
- panic!("什么方向放进来了?side={}", side);
- }
- }
- }
- }
- let is_open_long = self.force_order_value < -self.params.open
- && (self.mid_price < self.prev_trade_price * dec!(0.999) || self.prev_trade_price.is_zero())
- && self.inventory < self.params.grid
- && self.state == 0
- && self.bid_delta == dec!(-2);
- let is_open_short = self.force_order_value > self.params.open
- && (self.mid_price > self.prev_trade_price * dec!(1.001) || self.prev_trade_price.is_zero())
- && self.inventory > -self.params.grid
- && self.state == 0
- && self.ask_delta == dec!(-2);
- if is_open_long {
- self.bid_delta = Decimal::ZERO;
- } else if is_open_short {
- self.ask_delta = Decimal::ZERO;
- }
- }
- pub fn update_optimal_ask_and_bid(&mut self) {
- self.optimal_ask_price = if self.ask_delta == dec!(-1) {
- self.bid_price
- } else if self.ask_delta == dec!(-2) {
- Self::DONT_VIEW
- } else {
- max(self.mid_price + self.ask_delta, self.bid_price)
- };
- self.optimal_bid_price = if self.bid_delta == dec!(-1) {
- self.ask_price
- } else if self.bid_delta == dec!(-2) {
- Self::DONT_VIEW
- } else {
- min(self.mid_price - self.bid_delta, self.ask_price)
- };
- self.optimal_ask_price.rescale(self.mid_price.scale());
- self.optimal_bid_price.rescale(self.mid_price.scale());
- }
- pub fn update_t_diff(&mut self) {
- // if self.prev_trade_time > 0 {
- // let time_diff_decimal = Decimal::from_i64(Utc::now().timestamp_micros() - self.prev_trade_time).unwrap();
- // self.t_diff = max(Decimal::ONE - time_diff_decimal / Decimal::from_i64(Self::TIME_DIFF_RANGE_MICROS).unwrap(), Decimal::ZERO);
- // } else {
- // self.t_diff = Decimal::ONE;
- // }
- }
- pub fn check_ready(&mut self) {
- if self.is_ready {
- return;
- }
- if self.mid_price == Decimal::ZERO {
- return;
- }
- if self.fair_price == Decimal::ZERO {
- return;
- }
- if self.ask_price == Decimal::ZERO {
- return;
- }
- if self.bid_price == Decimal::ZERO {
- return;
- }
- self.is_ready = true;
- info!("========================================行情数据预热完毕==================================")
- }
- // #[instrument(skip(self), level="TRACE")]
- async fn processor(&mut self, is_hard_update: bool) {
- self.check_ready();
- if !self.is_ready {
- return;
- }
- self.update_t_diff();
- self.update_delta().await;
- self.update_optimal_ask_and_bid();
- // let mut smm = Decimal::ZERO;
- // if !self.depth_vec[1].time.is_zero() {
- // let sma = self.depth_vec[1].asks[0].price;
- // let smb = self.depth_vec[1].bids[0].price;
- // smm = (sma + smb) / Decimal::TWO;
- // }
- // let cci_arc = self.cci_arc.clone();
- let now = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
- let mid_price = self.mid_price;
- let ask_price = self.ask_price;
- let bid_price = self.bid_price;
- let last_price = self.last_price;
- let fair_price = Self::DONT_VIEW;
- let spread = self.spread;
- let spread_max = self.spread_ema;
- let spread_min = Self::DONT_VIEW;
- let optimal_ask_price = self.optimal_ask_price;
- let optimal_bid_price = self.optimal_bid_price;
- let inventory = self.inventory;
- let sigma_square = Decimal::from(self.state);
- let gamma = self.params.holding_time;
- let kappa = self.balance;
- let flow_ratio = Decimal::ZERO;
- let need_append = now - self.prev_insert_time > dec!(500);
- if !need_append && !is_hard_update {
- return;
- }
- self.debug_sender.unbounded_send(vec![
- now,
- mid_price,
- ask_price,
- bid_price,
- last_price,
- spread,
- spread_max,
- spread_min,
- optimal_ask_price,
- optimal_bid_price,
- inventory,
- sigma_square,
- gamma,
- kappa,
- flow_ratio,
- fair_price
- ]).unwrap();
- self.prev_insert_time = Decimal::from(Utc::now().timestamp_millis())
- }
- // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
- pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
- vec![]
- }
- pub async fn update_state_matrix(&mut self) {
- let url = "http://mms.skyfffire.com:9000/getStateData";
- let symbol = self.params.ref_pair[0].to_lowercase();
- let params = json!({
- "symbol": symbol,
- });
- // 创建 HTTP 客户端
- let client = Client::new();
- // 发送 GET 请求
- let response = client.get(url)
- .query(¶ms)
- .send()
- .await.unwrap();
- // 错误处理
- if response.status().is_success() {
- let response_text = response.text().await.unwrap();
- let parsed: Value = serde_json::from_str(response_text.as_str()).unwrap();
- info!("state_matrix={}", parsed.to_string());
- self.state_matrix = parsed["state_matrix"]
- .as_array()
- .unwrap()
- .iter()
- .map(|row| {
- row.as_array()
- .unwrap()
- .iter()
- .map(|v| Decimal::from_str(v.as_str().unwrap()).unwrap())
- .collect()
- })
- .collect();
- self.state = parsed["state"].as_i64().unwrap() as usize;
- self.params.holding_time = Decimal::from(parsed["min_index"].as_i64().unwrap());
- } else {
- error!("状态转移链挂了:{}", response.status());
- }
- }
- }
|