Browse Source

波动率模块重建

JiahengHe 2 years ago
parent
commit
aa951c09d6

+ 2 - 1
Cargo.toml

@@ -20,9 +20,10 @@ actix-rt = "2.5.0"
 actix-web = "4.0.0-beta.12"
 ctrlc = "3.2.5"
 serde_json = "1.0.105"
-rust_decimal = "1.32.0"
+rust_decimal = { version = "1.32.0", features = ["maths"] }
 rust_decimal_macros = "1.32.0"
 
+
 [workspace]
 members=[
     "exchanges",

+ 1 - 0
global/Cargo.toml

@@ -21,3 +21,4 @@ base64 = "0.13.0"
 reqwest = "0.11.22"
 uuid = { version = "1.5.0", features = ["v4"] }
 simple_excel_writer = "0.2.0"
+num-traits = "0.2.17"

+ 1 - 1
global/src/lib.rs

@@ -2,4 +2,4 @@ pub mod public_params;
 pub mod log_utils;
 pub mod params;
 pub mod trace_stack;
-pub mod export_utils;
+pub mod export_utils;

+ 11 - 0
standard/src/kucoin_swap.rs

@@ -345,12 +345,20 @@ impl Platform for KucoinSwap {
         match origin_side {
             "kd" => {
                 params["side"] = json!("buy");
+                // params["stop"] = json!("down");
+                // let stop_price = utils::fix_price(price * Decimal::from_str("0.095").unwrap(), self.market.tick_size);
+                // params["stopPrice"] = json!(stop_price.to_string());
+                // params["stopPriceType"] =  json!("TP"); // 止损参考价 最近一次成交价格
             }
             "pd" => {
                 params["side"] = json!("sell");
             }
             "kk" => {
                 params["side"] = json!("sell");
+                // params["stop"] = json!("up");
+                // let stop_price = utils::fix_price(price * Decimal::from_str("1.005").unwrap(), self.market.tick_size);
+                // params["stopPrice"] = json!(stop_price.to_string());
+                // params["stopPriceType"] =  json!("TP"); // 止损参考价 最近一次成交价格
             }
             "pk" => {
                 params["side"] = json!("buy");
@@ -578,6 +586,9 @@ impl Platform for KucoinSwap {
         let mut limits = HashMap::new();
         limits.extend(order_command.limits_open);
         limits.extend(order_command.limits_close);
+        if limits.len() > 3usize{
+            info!(?limits);
+        }
         for item in limits.keys() {
             let mut self_clone = self.clone();
             let limits_clone = limits.clone();

+ 8 - 0
standard/src/utils.rs

@@ -1,3 +1,5 @@
+use std::ops::{Div, Mul};
+use rust_decimal::Decimal;
 use tracing::trace;
 use exchanges::proxy;
 use crate::exchange::ExchangeEnum;
@@ -46,4 +48,10 @@ pub fn symbol_out_mapper(exchange_enum: ExchangeEnum, symbol: &str) -> String {
             symbol_upper.to_string()
         }
     }
+}
+
+// 步长修复价格
+pub fn fix_price(amount: Decimal, tick_size: Decimal) -> Decimal {
+    // 思路:做除法后四舍五入,然后使用这个值乘以步长
+    amount.div(tick_size).round().mul(tick_size)
 }

+ 1 - 1
strategy/Cargo.toml

@@ -10,7 +10,7 @@ serde = "1.0.183"
 serde_derive = "1.0"
 serde_json = "1.0.104"
 tokio = { version = "1.31.0", features = ["full"] }
-rust_decimal = "1.32.0"
+rust_decimal = { version = "1.32.0", features = ["maths"]}
 rust_decimal_macros = "1.32.0"
 rand = "0.8.4"
 chrono = "0.4.26"

+ 55 - 0
strategy/src/instant_volatility_indicator.rs

@@ -0,0 +1,55 @@
+use std::io::{Error, ErrorKind};
+use rust_decimal::{Decimal, MathematicalOps};
+use tracing::error;
+use crate::ring_buffer::RingBuffer;
+
+pub struct InstantVolatilityIndicator {
+    sampling_buffer: RingBuffer,
+    processing_buffer: RingBuffer,
+    samples_length: usize
+}
+
+impl InstantVolatilityIndicator {
+    pub fn new(sampling_length: usize, processing_length: usize) -> Self{
+        InstantVolatilityIndicator{
+            sampling_buffer: RingBuffer::new(sampling_length),
+            processing_buffer: RingBuffer::new(processing_length),
+            samples_length: 0,
+        }
+    }
+
+    pub fn add_sample(&mut self, value: Decimal) {
+        self.sampling_buffer.add_value(value);
+        let processing_value = self.indicator_calculation();
+        self.processing_buffer.add_value(processing_value);
+    }
+
+    pub fn indicator_calculation(&self) -> Decimal{
+        let sampling_buffer_vec: Vec<Decimal> = self.sampling_buffer.get_buffer_vec();
+        let mut diff_buffer: Vec<Decimal> = sampling_buffer_vec.windows(2).map(|w| w[1] - w[0]).collect();
+        for item in diff_buffer.iter_mut() {
+            *item = item.powi(2i64);
+        }
+        let mut sum: Decimal = diff_buffer.iter().sum();
+        sum /= Decimal::from(sampling_buffer_vec.len());
+        sum = match sum.sqrt() {
+            None => {
+                error!("波动率计算,开方计算错误! buffer: {:?}", sampling_buffer_vec);
+                Err(Error::new(ErrorKind::Other, "波动率计算,开方计算错误!"))
+            }
+            Some(value) => {
+                Ok(value)
+            }
+        }.unwrap();
+
+        sum
+    }
+
+    // 获取波动率,缓存器没满不返回波动率
+    pub fn processing_calculation(&self) -> Decimal{
+        if self.processing_buffer.is_full() {
+            self.processing_buffer.get_last_value().unwrap()
+        }
+        Decimal::ZERO
+    }
+}

+ 3 - 1
strategy/src/lib.rs

@@ -11,4 +11,6 @@ mod gate_swap;
 mod kucoin_swap;
 mod kucoin_spot;
 mod bitget_spot;
-mod predictor_new;
+mod predictor_new;
+mod instant_volatility_indicator;
+mod ring_buffer;

+ 19 - 6
strategy/src/predictor_new.rs

@@ -3,6 +3,8 @@ use rust_decimal::prelude::*;
 use rust_decimal_macros::dec;
 use tracing::{debug};
 use global::public_params;
+use standard::Ticker;
+use crate::instant_volatility_indicator::InstantVolatilityIndicator;
 
 #[derive(Debug)]
 pub struct PredictorNew {
@@ -23,7 +25,9 @@ pub struct PredictorNew {
     pub max_spread: Decimal,                                                // 最大点差
     pub min_spread: Decimal,                                                // 最小点差
     pub rl_num: Decimal,                                                    // 取价坐标
-    pub max_position_value: Decimal                                         // 最大持仓(u)
+    pub max_position_value: Decimal,                                         // 最大持仓(u)
+
+    pub vol: InstantVolatilityIndicator                                     // 波动率计算类
 }
 
 /*
@@ -48,7 +52,8 @@ impl PredictorNew {
             variance: Decimal::ZERO,
             balance_value: Decimal::ZERO,
             rl_num,
-            max_position_value
+            max_position_value,
+            vol: InstantVolatilityIndicator::new(30, 15)
         }
     }
 
@@ -113,8 +118,8 @@ impl PredictorNew {
     /**
        计算gamma值
     **/
-    pub fn calc_gamma(&mut self, ira: Decimal) -> Decimal {
-        self.max_spread / Decimal::TWO * self.balance_value * self.variance * self.variance * ira
+    pub fn calc_gamma(&mut self, ira: Decimal, variance: Decimal) -> Decimal {
+        self.max_spread / Decimal::TWO * self.balance_value * variance.powi(2) * ira
     }
 
     /**
@@ -213,6 +218,10 @@ impl PredictorNew {
         (*self).check_length();
     }
 
+    pub fn market_update(&mut self, best_mid_price: Decimal){
+        self.vol.add_sample(best_mid_price);
+    }
+
     // 获取预定价格, 也就是python的Get_ref函数
     // pub fn get_ref_price(&mut self, ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
     //     let mut ref_price_list = vec![];
@@ -234,10 +243,14 @@ impl PredictorNew {
     //     return ref_price_list;
     // }
     pub fn get_ref_price(&mut self) -> Vec<Vec<Decimal>>{
-        let mut ref_price_list = vec![];
+        let mut ref_price_list = Vec::new();
+        let std = self.vol.processing_calculation();
+        if std == Decimal::ZERO {
+            Vec::new()
+        }
         let ira = Decimal::from_str("5").unwrap();
         let dd = self.calc_deviation_range(ira.clone());
-        let gamma = self.calc_gamma(ira);
+        let gamma = self.calc_gamma(ira, std);
         let rp = self.calc_rp(&gamma);
         let dk = self.calc_dk(&gamma);
         let kappa = self.calc_kappa(&gamma, dk, dd);

+ 11 - 18
strategy/src/quant.rs

@@ -164,24 +164,7 @@ impl Quant {
             ref_name: Default::default(),
             trade_name: "".to_string(),
             ready: 0,
-            predictor: PredictorNew {
-                loop_count: 0,
-                market_info_list: vec![],
-                mid_price_list: vec![],
-                ref_mid_price_per_exchange_per_frame: vec![],
-                ref_exchange_length: 0,
-                data_length_max: 0,
-                alpha: vec![],
-                gamma: Default::default(),
-                avg_spread_list: vec![],
-                transaction_prices: vec![],
-                variance: Default::default(),
-                max_spread: Decimal::ZERO,
-                min_spread: Decimal::ZERO,
-                balance_value: Decimal::ZERO,
-                rl_num: Decimal::ZERO,
-                max_position_value: Decimal::ZERO
-            },
+            predictor: Default::default(),
             market: Market {
                 symbol: symbol.clone(),
                 base_asset: "".to_string(),
@@ -698,7 +681,13 @@ impl Quant {
         // if spread < self.predictor.min_spread || self.predictor.min_spread == Decimal::ZERO{
         //     self.predictor.min_spread = spread;
         // }
+        // 只用第一参考交易所最佳买卖价的中间价
+        if name == self.ref_name[0] {
+            self.predictor.market_update(data.mid_price);
+        }
         self.tickers.insert(name, data);
+
+
     }
 
     pub fn on_agg_market(&mut self) {
@@ -742,8 +731,12 @@ impl Quant {
         let bp = self.trade_msg.market[BID_PRICE_INDEX];
         let ap = self.trade_msg.market[ASK_PRICE_INDEX];
         let mp = (bp + ap) * dec!(0.5);
+        // 更新持仓价值
         self.predictor.balance_value = self.trade_msg.cash * mp;
         let ref_price: Vec<Vec<Decimal>> = self.predictor.get_ref_price();
+        if ref_price.len() == 0{
+            return;
+        }
         self.trade_msg.ref_price = ref_price;
     }
 

+ 116 - 0
strategy/src/ring_buffer.rs

@@ -0,0 +1,116 @@
+use std::collections::VecDeque;
+use rust_decimal::{Decimal, MathematicalOps};
+use rust_decimal::prelude::{FromPrimitive};
+
+/**
+    环形缓存器
+**/
+pub struct RingBuffer {
+    buffer: VecDeque<Decimal>,
+    length: usize,
+    delimiter: usize
+}
+
+impl RingBuffer {
+    pub fn new(length: usize) -> Self {
+        RingBuffer {
+            buffer: VecDeque::with_capacity(length),
+            length,
+            delimiter: 0usize
+        }
+    }
+
+    // 清空缓存
+    pub fn dealloc(&mut self){
+        self.buffer = VecDeque::with_capacity(self.length);
+    }
+
+    // 添加元素
+    pub fn add_value(&mut self, val: Decimal) {
+        if self.buffer.len() < self.length {
+            self.buffer.push_back(val);
+        } else {
+            self.buffer.pop_front();
+            self.buffer.push_back(val);
+        }
+    }
+
+    pub fn get_buffer_vec(&self) -> Vec<Decimal>{
+        let mut buffer_vec: Vec<Decimal> = Vec::with_capacity(self.buffer.len());
+        buffer_vec.extend(self.buffer.iter().clone());
+        buffer_vec
+    }
+
+    // 获取最后一个元素
+    pub fn get_last_value(&self) -> Option<Decimal> {
+        self.buffer.back().copied()
+    }
+
+    // 是否是空的
+    pub fn is_empty(&self) -> bool {
+        return self.buffer.len() == 0
+    }
+
+    // 是否已达到最大缓存长度
+    pub fn is_full(&self) -> bool {
+        self.buffer.len() == self.length
+    }
+
+    // 平均值
+    pub fn mean_value(&self) -> Decimal {
+        if self.is_full() {
+            let item_sum: Decimal = self.buffer.iter().sum();
+            return item_sum / Decimal::from_usize(self.length).unwrap();
+        }
+        Decimal::ZERO
+    }
+
+    // 获取标准差(标准差 = 方差 * 方差)
+    pub fn std_dev(&self) -> Decimal {
+        self.variance().sqrt().unwrap()
+    }
+
+    // 获取方差
+    pub fn variance(&self) -> Decimal {
+        let data_mean = self.mean_value();
+        let std_deviation = self.buffer.iter().map(|value| {
+            let diff = data_mean - value;
+            diff * diff
+        }).sum::<Decimal>() / Decimal::from_usize(self.length).unwrap();
+        std_deviation
+    }
+
+    // 获取缓存长度
+    pub fn len(&self) -> usize {
+        self.buffer.len()
+    }
+
+    // 设置最大缓存长度
+    pub fn set_length(&mut self, new_length: usize) {
+        if new_length < self.length {
+            for _ in 0..(self.length - new_length) {
+                self.buffer.pop_front();
+            }
+        }
+        self.length = new_length;
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use rust_decimal::{Decimal, MathematicalOps};
+
+    #[test]
+    fn len_test() {
+        let mut arr: Vec<Decimal> = Vec::from(vec![Decimal::new(1, 0), Decimal::new(2, 0), Decimal::new(3, 0), Decimal::new(4, 0), Decimal::new(5, 0)]);
+        let mut diff_buffer: Vec<Decimal> = arr.windows(2).map(|w| w[1] - w[0]).collect();
+        for item in diff_buffer.iter_mut() {
+            *item = item.powi(2i64);
+        }
+        let len: Decimal = Decimal::from(arr.len());
+        let mut sum: Decimal = diff_buffer.iter().sum();
+        sum /= len;
+        sum = sum.sqrt().unwrap();
+        println!("{:?}", sum);
+    }
+}

+ 50 - 4
strategy/src/strategy.rs

@@ -1,6 +1,7 @@
 use std::cmp::{max, min};
 use std::collections::HashMap;
 use std::ops::{Div, Mul};
+use std::str::FromStr;
 use chrono::Utc;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
@@ -294,9 +295,8 @@ impl Strategy {
 
         // 参考价格
         if trader_msg.ref_price.len() == 0 {
-            self.ref_bp = self.bp;
-            self.ref_ap = self.ap;
-            self.ref_price = self.mp;
+            debug!("参考价格还未预热完成,等待预热...");
+            return false;
         } else {
             self.ref_bp = trader_msg.ref_price[self.ref_index][0];
             self.ref_ap = trader_msg.ref_price[self.ref_index][1];
@@ -473,6 +473,17 @@ impl Strategy {
         let grid = self.grid;
         let mode = self.maker_mode.clone();
 
+        // let mut mp = (ref_bp + ref_ap) * dec!(0.5);
+        // let mut buy_start = ref_bp;
+        // let mut sell_start = ref_ap;
+        // let mut avoid = min(dec!(0.0005), close * dec!(0.5));
+        // let mut close_dist = vec![
+        //     buy_start ,                    // buy upper
+        //     buy_start ,                    // buy lower
+        //     sell_start ,                   // sell lower
+        //     sell_start ,                   // sell upper
+        // ];
+
         // 平仓相关
         let mut mp = (ref_bp + ref_ap) * dec!(0.5);
         let mut buy_start = mp;
@@ -515,6 +526,14 @@ impl Strategy {
         ];
         debug!(?avoid, ?buy_shift, ?sell_shift, ?avoid, ?open_dist);
 
+
+        // let mut open_dist = vec![
+        //     ref_bp ,             // buy upper
+        //     ref_bp ,             // buy lower
+        //     ref_ap ,            // sell lower
+        //     ref_ap ,            // sell upper
+        // ];
+        // debug!(?open_dist);
         // 修复价格
         for open_price in &mut open_dist {
             *open_price = utils::fix_price(*open_price, self.tick_size);
@@ -950,9 +969,22 @@ impl Strategy {
                         price.to_string(),
                         order_client_id.clone()
                     ];
-
                     command.limits_close.insert(order_client_id, order.clone());
 
+                    // 定价止损平仓单
+                    let mut price_limit = self.pos.long_avg * Decimal::from_str("0.995").unwrap();
+                    price_limit = utils::fix_price(price_limit, self.tick_size);
+
+                    let order_client_limit_id = utils::generate_client_id(Some(self.broker_id.clone()));
+                    let order_limit = vec![
+                        self.pos.long_pos.to_string(),
+                        "pd".to_string(),
+                        price_limit.to_string(),
+                        order_client_limit_id.clone()
+                    ];
+
+                    command.limits_close.insert(order_client_limit_id, order_limit.clone());
+                    info!(?command);
                     debug!(?command);
                 }
             }
@@ -973,6 +1005,20 @@ impl Strategy {
 
                     command.limits_close.insert(order_client_id, order.clone());
 
+                    // 定价止损平仓单
+                    let mut price_limit = self.pos.short_avg * Decimal::from_str("1.005").unwrap();
+                    price_limit = utils::fix_price(price_limit, self.tick_size);
+
+                    let order_client_limit_id = utils::generate_client_id(Some(self.broker_id.clone()));
+                    let order_limit = vec![
+                        self.pos.short_pos.to_string(),
+                        "pk".to_string(),
+                        price_limit.to_string(),
+                        order_client_limit_id.clone()
+                    ];
+
+                    command.limits_close.insert(order_client_limit_id, order_limit.clone());
+                    info!(?command);
                     debug!(?command);
                 }
             }

+ 21 - 1
strategy/src/utils.rs

@@ -95,11 +95,24 @@ pub fn get_limit_order_requests_num_per_second(exchange: String) -> i64 {
     }
 }
 
+// 生成等差数列 [start, end)
+pub fn equidistant_sequence(start: i32, end: i32, step: i32) -> Vec<i32> {
+    let mut current = start;
+    let series: Vec<i32> = std::iter::repeat_with(move || {
+        let i = current;
+        current += step;
+        i
+    })
+        .take_while(|&i| i < end)
+        .collect();
+    series
+}
+
 #[cfg(test)]
 mod tests {
     use chrono::Utc;
     use rust_decimal_macros::dec;
-    use crate::utils::{clip, fix_amount, fix_price, generate_client_id};
+    use crate::utils::{clip, equidistant_sequence, fix_amount, fix_price, generate_client_id};
 
     #[test]
     fn clip_test() {
@@ -146,4 +159,11 @@ mod tests {
         println!("timestamp_micros: {}", now.timestamp_micros());
         println!("timestamp_nanos: {}", now.timestamp_nanos());
     }
+
+    #[test]
+    fn equidistant_sequence_test() {
+        let arr = equidistant_sequence(0, 10, 1);
+        print!("{:?}", arr);
+    }
+
 }