| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- use std::cmp::{max, min};
- use std::collections::{BTreeMap, VecDeque};
- use std::sync::Arc;
- use chrono::Utc;
- use rust_decimal::prelude::*;
- use rust_decimal_macros::dec;
- use tokio::sync::Mutex;
- use tracing::info;
- use global::cci::CentralControlInfo;
- use global::fixed_time_range_deque::FixedTimeRangeDeque;
- use global::predictor_state::PredictorState;
- use standard::{Depth, Record, Ticker, Trade};
- #[derive(Debug)]
- pub struct AvellanedaStoikov {
- pub depth_vec: FixedTimeRangeDeque<Depth>, // 深度队列
- pub trade_long_vec: FixedTimeRangeDeque<Trade>, // 交易队列
- pub trade_short_vec: FixedTimeRangeDeque<Trade>, // 交易队列
- pub spread_vec: FixedTimeRangeDeque<Decimal>,
- pub record_vec: VecDeque<Record>, // 蜡烛队列
- pub mid_price: Decimal, // 中间价
- pub ask_price: Decimal, // 卖一价
- pub bid_price: Decimal, // 买一价
- pub last_price: Decimal, // 最后成交价
- pub spread: Decimal, // 市场冲击
- pub spread_max: Decimal, // 最大市场冲击
- pub spread_min: Decimal, // 最小市场冲击
- pub optimal_ask_price: Decimal, // 卖出挂单价
- pub optimal_bid_price: Decimal, // 买入挂单价
- pub inventory: Decimal, // 库存,也就是q
- pub level: Decimal, // martin
- pub sigma_square: Decimal, // σ^2,波动性的平方
- pub gamma: Decimal, // γ,库存风险厌恶参数
- pub kappa: Decimal, // κ 订单簿 流动性 参数
- pub flow_ratio_mfi: Decimal, // 资金流比例(专供MFI)
- pub flow_ratio_trades: Decimal, // 资金流比例(trades生成)
- pub money_flow_index: Decimal, // MFI
- pub ask_delta: Decimal, // δa
- pub bid_delta: Decimal, // δb
- pub base_delta: Decimal, // 基础挂单距离
- pub ratio_edge: Decimal, // 资金流修正的挂单距离
- pub ref_price: Decimal, // 预定价格
- pub cci_arc: Arc<Mutex<CentralControlInfo>>, // 中控信息
- pub is_ready: bool,
- pub prev_trade_time: i64, // 上次交易时间,也就是t
- pub t_diff: Decimal, // (T-t)
- }
- impl AvellanedaStoikov {
- // 时间窗口大小(微秒)
- const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
- const TRADE_LONG_RANGE_MICROS: i64 = 3 * 60_000_000;
- const TRADE_SHORT_RANGE_MICROS: i64 = 60_000_000;
- // const ONE_MILLION: Decimal = dec!(1_000_000);
- // const TWENTY_THOUSAND: Decimal = dec!(20_000);
- const IRA: Decimal = dec!(1);
- pub fn new(cci_arc: Arc<Mutex<CentralControlInfo>>) -> Self {
- let avellaneda_stoikov = Self {
- // 分别给与的长度
- depth_vec: FixedTimeRangeDeque::new(Self::MAX_TIME_RANGE_MICROS),
- spread_vec: FixedTimeRangeDeque::new(Self::MAX_TIME_RANGE_MICROS),
- trade_long_vec: FixedTimeRangeDeque::new(Self::TRADE_LONG_RANGE_MICROS),
- trade_short_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS),
- record_vec: VecDeque::new(),
- mid_price: Default::default(),
- ask_price: Default::default(),
- bid_price: Default::default(),
- last_price: Default::default(),
- spread: Default::default(),
- spread_max: Default::default(),
- spread_min: Default::default(),
- optimal_ask_price: Default::default(),
- optimal_bid_price: Default::default(),
- inventory: Default::default(),
- gamma: Default::default(),
- sigma_square: Default::default(),
- ask_delta: Default::default(),
- bid_delta: Default::default(),
- base_delta: Default::default(),
- ratio_edge: Default::default(),
- kappa: Default::default(),
- ref_price: Default::default(),
- cci_arc,
- is_ready: false,
- prev_trade_time: Utc::now().timestamp_micros(),
- t_diff: Default::default(),
- level: Default::default(),
- flow_ratio_mfi: Default::default(),
- flow_ratio_trades: Default::default(),
- money_flow_index: Default::default(),
- };
- avellaneda_stoikov
- }
- // 更新最大市场冲击
- pub fn update_spread_max(&mut self) {
- self.spread_max = if let Some(&max_value) = self.spread_vec.deque.iter().max() {
- max_value
- } else {
- Decimal::NEGATIVE_ONE
- };
- }
- // 更新最小市场冲击
- pub fn update_spread_min(&mut self) {
- self.spread_min = if let Some(&min_value) = self.spread_vec.deque.iter().min() {
- min_value
- } else {
- Decimal::NEGATIVE_ONE
- };
- }
- pub fn update_spread(&mut self) {
- if self.trade_long_vec.len() > 0 {
- //
- let last_trade = self.trade_long_vec.get(self.trade_long_vec.len() - 1).unwrap();
- let last_trade_price = last_trade.price;
- let last_trade_time = last_trade.time;
- let mut first_trade_price = last_trade.price;
- for trade in self.trade_long_vec.deque.iter().rev() {
- if last_trade_time - trade.time > Decimal::TEN {
- break;
- }
- first_trade_price = trade.price;
- }
- self.spread = (last_trade_price - first_trade_price).abs();
- if self.spread > Decimal::ZERO {
- self.spread_vec.push_back(self.spread);
- }
- self.update_spread_max();
- self.update_spread_min();
- }
- }
- pub async fn on_depth(&mut self, depth: &Depth) {
- self.depth_vec.push_back(depth.clone());
- self.ask_price = depth.asks[0].price;
- self.bid_price = depth.bids[0].price;
- self.mid_price = (self.ask_price + self.bid_price) / Decimal::TWO;
- self.processor().await;
- }
- pub async fn on_trade(&mut self, trade: &Trade) {
- self.trade_long_vec.push_back(trade.clone());
- self.trade_short_vec.push_back(trade.clone());
- self.last_price = trade.price;
- self.update_spread();
- self.processor().await;
- }
- pub async fn update_level(&mut self) {
- self.level = (Decimal::NEGATIVE_ONE + (Decimal::ONE + dec!(8) * self.inventory.abs()).sqrt().unwrap()) / Decimal::TWO;
- self.level = min(self.level, dec!(6));
- }
- pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
- pub async fn on_record(&mut self, record: &Record) {
- // 添加新蜡烛
- if self.record_vec.len() == 0 {
- self.record_vec.push_back(record.clone());
- } else {
- let last_record = self.record_vec.back_mut().unwrap();
- if last_record.time == record.time {
- *last_record = record.clone();
- } else if last_record.time < record.time {
- self.record_vec.push_back(record.clone());
- }
- }
- if self.record_vec.len() > 4 {
- self.record_vec.pop_front();
- }
- // 如果蜡烛数量足够,则更新mfi
- if self.record_vec.len() >= 4 {
- self.update_mfi();
- }
- }
- pub fn update_mfi(&mut self) {
- let mut money_flow_in = Decimal::ZERO;
- let mut money_flow_out = Decimal::ZERO;
- let _3 = dec!(3);
- for record in self.record_vec.iter() {
- let typical_price = (record.high + record.low + record.close) / _3;
- let money_flow = typical_price * record.volume;
- if record.close > record.open {
- money_flow_in += money_flow;
- } else if record.close < record.open {
- money_flow_out += money_flow;
- }
- }
- self.money_flow_index = if money_flow_out.is_zero() {
- Decimal::ONE_HUNDRED
- } else {
- let money_flow_ratio = money_flow_in / money_flow_out;
- Decimal::ONE_HUNDRED - Decimal::ONE_HUNDRED / (Decimal::ONE + money_flow_ratio)
- };
- }
- pub async fn update_inventory(&mut self, inventory: &Decimal, min_amount_value: &Decimal) {
- let prev_inventory = self.inventory;
- self.inventory = (inventory / (min_amount_value / self.mid_price)).round();
- if prev_inventory != self.inventory {
- self.prev_trade_time = Utc::now().timestamp_micros();
- }
- self.update_level().await;
- self.processor().await;
- }
- pub fn update_sigma_square(&mut self) {
- self.sigma_square = self.spread_max * dec!(0.5);
- self.sigma_square.rescale(10);
- }
- pub fn update_gamma(&mut self) {
- // self.gamma = if self.sigma_square == Decimal::ZERO || self.inventory == Decimal::ZERO {
- // Decimal::ONE
- // } else {
- // Self::IRA * (self.spread_max - self.spread_min) / (Decimal::TWO * self.inventory.abs() * self.sigma_square)
- // };
- // self.gamma.rescale(8);
- self.gamma = dec!(0.236) * Self::IRA;
- }
- pub fn update_kappa(&mut self) {
- // self.kappa = if self.spread_max.is_zero() || self.init_delta_plus.is_zero() {
- // Decimal::ONE
- // } else {
- // let mut temp = self.init_delta_plus * self.gamma - self.sigma_square * self.gamma.powd(Decimal::TWO);
- // temp.rescale(6);
- //
- // self.gamma / (temp.exp() - Decimal::ONE)
- // };
- //
- // self.kappa.rescale(8);
- if self.mid_price > Decimal::ZERO {
- self.kappa = dec!(888) / self.mid_price;
- } else {
- self.kappa = dec!(1);
- }
- }
- pub fn update_ref_price(&mut self) {
- self.ref_price = self.mid_price;
- }
- pub fn update_delta(&mut self) {
- if self.gamma != Decimal::ZERO {
- let pos_edge = self.gamma * self.sigma_square * self.inventory.abs().powd(dec!(2)) * self.t_diff;
- self.base_delta = self.gamma * self.sigma_square * self.t_diff / Decimal::TWO + (Decimal::ONE / self.gamma) * (Decimal::ONE + self.gamma / self.kappa).ln();
- self.ratio_edge = self.flow_ratio_mfi * self.sigma_square;
- self.bid_delta = self.base_delta;
- self.ask_delta = self.base_delta;
- if self.inventory > Decimal::ZERO {
- self.bid_delta += pos_edge;
- } else if self.inventory < Decimal::ZERO {
- self.ask_delta += pos_edge;
- }
- if self.ratio_edge > Decimal::ZERO {
- self.ask_delta -= self.sigma_square.abs() * (Decimal::TWO - self.t_diff);
- self.bid_delta += self.sigma_square.abs() * dec!(10);
- } else if self.ratio_edge < Decimal::ZERO {
- self.ask_delta += self.sigma_square.abs() * dec!(10);
- self.bid_delta -= self.sigma_square.abs() * (Decimal::TWO - self.t_diff);
- } else if self.ratio_edge == Decimal::ZERO {
- self.ask_delta += self.sigma_square.abs() * dec!(10);
- self.bid_delta += self.sigma_square.abs() * dec!(10);
- }
- }
- }
- pub fn update_optimal_ask_and_bid(&mut self) {
- self.optimal_ask_price = max(self.ref_price + self.ask_delta / Decimal::TWO, self.ask_price);
- self.optimal_bid_price = min(self.ref_price - self.bid_delta / Decimal::TWO, self.bid_price);
- }
- pub fn update_t_diff(&mut self) {
- if self.prev_trade_time > 0 {
- let time_diff_decimal = Decimal::from_i64(Utc::now().timestamp_micros() - self.prev_trade_time).unwrap();
- self.t_diff = max(Decimal::ONE - time_diff_decimal / Decimal::from_i64(Self::MAX_TIME_RANGE_MICROS).unwrap(), Decimal::ZERO);
- } else {
- self.t_diff = Decimal::ONE;
- }
- }
- fn calc_flow_ratio(_prev_flow_ratio: &Decimal, min_volume: &Decimal, trades: &mut FixedTimeRangeDeque<Trade>) -> Decimal {
- // let mut flow_in_value = Decimal::ZERO;
- // let mut flow_out_value = Decimal::ZERO;
- // for (index, trade_iter) in trades.deque.iter().enumerate() {
- // if index == 0 {
- // continue
- // }
- //
- // let prev_trade_iter = trades.deque.get(index - 1).unwrap();
- // let trade = trade_iter;
- // if trade.price > prev_trade_iter.price {
- // flow_in_value += trade.value * (prev_trade_iter.price - trade.price).abs();
- // // flow_in_value += Decimal::ONE;
- // } else if trade.price < prev_trade_iter.price {
- // flow_out_value += trade.value * (prev_trade_iter.price - trade.price).abs();
- // // flow_out_value += Decimal::ONE;
- // } else {
- // // if trade.size > Decimal::ZERO {
- // // flow_in_value += trade.value;
- // // } else {
- // // flow_out_value += trade.value;
- // // }
- // }
- //
- // // if trade_iter.size > Decimal::ZERO {
- // // flow_in_value += trade_iter.value;
- // // } else {
- // // flow_out_value += trade_iter.value;
- // // }
- // }
- // if self.trade_vec.deque.len() > 1 {
- // let prev_trade_iter = self.trade_vec.deque.get(self.trade_vec.deque.len() - 2).unwrap();
- // if trade.price > prev_trade_iter.price {
- // self.flow_in_value += trade.value;
- // } else if trade.price < prev_trade_iter.price {
- // self.flow_out_value += trade.value;
- // } else {
- // // if trade.size > Decimal::ZERO {
- // // self.flow_in_value += trade.value;
- // // } else {
- // // self.flow_out_value += trade.value;
- // // }
- // }
- //
- // // if trade.size > Decimal::ZERO {
- // // self.flow_in_value += trade.value;
- // // } else {
- // // self.flow_out_value += trade.value;
- // // }
- //
- // if self.flow_out_value + self.flow_in_value > dec!(2_000_000) {
- // self.flow_out_value = self.flow_out_value * dec!(0.618);
- // self.flow_in_value = self.flow_in_value * dec!(0.618);
- // }
- // }
- // else {
- // if trade.size > Decimal::ZERO {
- // self.flow_in_value += trade.value;
- // } else {
- // self.flow_out_value += trade.value;
- // }
- // }
- let mut flow_in_value = Decimal::ZERO;
- let mut flow_out_value = Decimal::ZERO;
- for trade_iter in trades.deque.iter() {
- if trade_iter.size > Decimal::ZERO {
- flow_in_value += trade_iter.value;
- } else {
- flow_out_value += trade_iter.value;
- }
- }
- // 使用EMA來更新資金流,確保平滑性
- // let a = Decimal::TWO / dec!(50);
- if flow_out_value + flow_in_value > *min_volume {
- let now = (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value);
- // a * now + (Decimal::ONE - a) * prev_flow_ratio
- // Decimal::ONE_HUNDRED * flow_in_value / (flow_out_value + flow_in_value)
- now
- } else {
- Decimal::ZERO
- }
- }
- // fn calc_flow_ratio_2(_prev_flow_ratio: &Decimal, min_volume: &Decimal, trades: &mut FixedTimeRangeDeque<Trade>) -> Decimal {
- // let mut flow_in_value = Decimal::ZERO;
- // let mut flow_out_value = Decimal::ZERO;
- // for (index, trade_iter) in trades.deque.iter().enumerate() {
- // if index == 0 {
- // continue
- // }
- //
- // let prev_trade_iter = trades.deque.get(index - 1).unwrap();
- // let trade = trade_iter;
- // if trade.price > prev_trade_iter.price {
- // flow_in_value += trade.value;
- // // flow_in_value += Decimal::ONE;
- // } else if trade.price < prev_trade_iter.price {
- // flow_out_value += trade.value;
- // // flow_out_value += Decimal::ONE;
- // } else {
- // if trade.size > Decimal::ZERO {
- // flow_in_value += trade.value;
- // } else {
- // flow_out_value += trade.value;
- // }
- // }
- // }
- //
- // // 使用EMA來更新資金流,確保平滑性
- // // let a = Decimal::TWO / dec!(50);
- // if flow_out_value + flow_in_value > *min_volume {
- // // let now = (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value);
- // // a * now + (Decimal::ONE - a) * prev_flow_ratio
- // (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value)
- // } else {
- // Decimal::ZERO
- // }
- // }
- pub fn update_flow_ratio(&mut self) {
- self.flow_ratio_mfi = if self.money_flow_index > dec!(80) {
- (self.money_flow_index - dec!(80)) / dec!(20)
- } else if self.money_flow_index < dec!(20) {
- Decimal::NEGATIVE_ONE * (dec!(20) - self.money_flow_index) / dec!(20)
- } else {
- Decimal::ZERO
- };
- self.flow_ratio_trades = Self::calc_flow_ratio(&self.flow_ratio_trades, &dec!(0), &mut self.trade_short_vec);
- }
- pub fn check_ready(&mut self) {
- if self.is_ready {
- return;
- }
- if self.mid_price == Decimal::ZERO {
- return;
- }
- if self.ask_price == Decimal::ZERO {
- return;
- }
- if self.bid_price == Decimal::ZERO {
- return;
- }
- if self.optimal_ask_price < self.ask_price {
- return;
- }
- if self.optimal_bid_price > self.bid_price {
- return;
- }
- if self.depth_vec.len() < 100 {
- return;
- }
- if self.trade_long_vec.len() < 100 {
- return;
- }
- self.is_ready = true;
- info!("========================================行情数据预热完毕==================================")
- }
- // #[instrument(skip(self), level="TRACE")]
- async fn processor(&mut self) {
- self.update_t_diff();
- // info!(?self.t_diff);
- self.update_flow_ratio();
- // info!(?self.flow_ratio_long);
- self.update_sigma_square();
- // info!(?self.sigma_square);
- self.update_gamma();
- // info!(?self.gamma);
- self.update_kappa();
- // info!(?self.kappa);
- self.update_ref_price();
- // info!(?self.ref_price);
- self.update_delta();
- // info!(?self.ask_delta, ?self.bid_delta);
- self.update_optimal_ask_and_bid();
- // info!("=============================================");
- self.check_ready();
- if !self.is_ready {
- return;
- }
- let mut cci = self.cci_arc.lock().await;
- cci.predictor_state_vec.push_back(PredictorState {
- update_time: Decimal::from_i64(Utc::now().timestamp_millis()).unwrap(),
- mid_price: self.last_price,
- ask_price: self.ask_price,
- bid_price: self.bid_price,
- last_price: self.last_price,
- spread: self.spread,
- spread_max: self.ask_delta,
- spread_min: self.bid_delta,
- optimal_ask_price: self.optimal_ask_price,
- optimal_bid_price: self.optimal_bid_price,
- inventory: self.inventory,
- sigma_square: self.money_flow_index,
- gamma: self.flow_ratio_trades,
- kappa: self.t_diff,
- flow_ratio: self.flow_ratio_mfi,
- ref_price: self.ref_price,
- });
- }
- // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
- pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
- return vec![];
- }
- }
|