Forráskód Böngészése

缓存优化开发完成,待测

JiahengHe 1 éve
szülő
commit
5e6a52d1fb
4 módosított fájl, 136 hozzáadás és 172 törlés
  1. 24 2
      src/binance_usdt_swap_data_listener.rs
  2. 24 2
      src/bitget_usdt_swap_data_listener.rs
  3. 79 159
      src/msv.rs
  4. 9 9
      src/rank.rs

+ 24 - 2
src/binance_usdt_swap_data_listener.rs

@@ -65,16 +65,38 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     }
     // 每分钟计算msv
     tokio::spawn(async move {
+        // 上次最后结束时间
+        let mut last_time: i64 = 0;
         loop {
             let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
+            let mut start_timestamp = last_time;
+            // 第一次拿两小时
+            if start_timestamp == 0 {
+                start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
+            }
             for symbol in symbols.clone() {
                 let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);
                 let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
                 let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
+                let new_msv = match indicator_map.get(&symbol) {
+                    None => {
+                        msv
+                    }
+                    Some(old_msv) => {
+                        let mut indicators = Indicators::new();
+                        indicators.indicator.extend(old_msv.indicator.clone());
+                        indicators.indicator.extend(msv.indicator);
+                        indicators.result_size = old_msv.result_size + msv.result_size;
+                        indicators.total_size = old_msv.total_size + msv.total_size;
+                        indicators.last_calc_time = msv.last_calc_time;
+                        indicators
+                    }
+                };
+                indicator_map.insert(symbol, new_msv);
             }
+            // 更新结束时间
+            last_time = end_timestamp;
             tokio::time::sleep(Duration::from_secs(55)).await;
         }
     });

+ 24 - 2
src/bitget_usdt_swap_data_listener.rs

@@ -65,16 +65,38 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     }
     // 每分钟计算msv
     tokio::spawn(async move {
+        // 上次最后结束时间
+        let mut last_time: i64 = 0;
         loop {
             let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
+            let mut start_timestamp = last_time;
+            // 第一次拿两小时
+            if start_timestamp == 0 {
+                start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
+            }
             for symbol in symbols.clone() {
                 let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);
                 let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
                 let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
+                let new_msv = match indicator_map.get(&symbol) {
+                    None => {
+                        msv
+                    }
+                    Some(old_msv) => {
+                        let mut indicators = Indicators::new();
+                        indicators.indicator.extend(old_msv.indicator.clone());
+                        indicators.indicator.extend(msv.indicator);
+                        indicators.result_size = old_msv.result_size + msv.result_size;
+                        indicators.total_size = old_msv.total_size + msv.total_size;
+                        indicators.last_calc_time = msv.last_calc_time;
+                        indicators
+                    }
+                };
+                indicator_map.insert(symbol, new_msv);
             }
+            // 更新结束时间
+            last_time = end_timestamp;
             tokio::time::sleep(Duration::from_secs(70)).await;
         }
     });

+ 79 - 159
src/msv.rs

@@ -1,6 +1,6 @@
 use std::cmp::{max, min};
 use std::str::FromStr;
-use rust_decimal::{Decimal, MathematicalOps};
+use rust_decimal::Decimal;
 use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
 use rust_decimal_macros::dec;
 use serde::{Deserialize, Serialize};
@@ -12,30 +12,56 @@ use standard::{SimpleDepth, Trade};
 /// - `msv(Vec<Vec<Decimal>>)`: msv
 /// - `liqs(Vec<Vec<Decimal>>)`: liqs
 /// - `eprs(Vec<Vec<Decimal>>)`: eprs
-/// - `sigmas(Vec<Vec<Decimal>>)`: sigmas
-/// - `sigma_mas(Vec<Vec<Decimal>>)`: sigma_mas
 /// - `total_size(i64)`: total_size
 /// - `result_size(i64)`: result_size
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub struct Indicators {
-    pub msv: Vec<Vec<Decimal>>,
-    pub liqs: Vec<Vec<Decimal>>,
-    pub eprs: Vec<Vec<Decimal>>,
-    pub sigmas: Vec<Vec<Decimal>>,
-    pub sigma_mas: Vec<Vec<Decimal>>,
+    pub indicator: Vec<Indicator>,
     pub total_size: i64,
     pub result_size: i64,
+    pub last_calc_time: Decimal
 }
 
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Indicator {
+    // 时间戳
+    pub timestamp: Decimal,
+    // 波动幅度
+    pub msv_vol: Decimal,
+    // 强度
+    pub msv_strength: Decimal,
+    // 流动性量
+    pub liq_liquidity_amount: Decimal,
+    // 预期利润
+    pub epr_expected_profit: Decimal
+}
+impl Indicators {
+    pub fn new() -> Indicators {
+        Indicators{
+            indicator: vec![],
+            total_size: 0,
+            result_size: 0,
+            last_calc_time: Decimal::ZERO,
+        }
+    }
+}
+impl Indicator {
+    pub fn new() -> Indicator {
+        Indicator{
+            timestamp: Default::default(),
+            msv_vol: Default::default(),
+            msv_strength: Default::default(),
+            liq_liquidity_amount: Default::default(),
+            epr_expected_profit: Default::default(),
+        }
+    }
+}
+
+
 // 将trades转换为具体指标 trades 50 [] stime etime
 pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simple_depths: Vec<SimpleDepth>, start_time: i64, end_time: i64) -> Indicators {
-    // 具体波动
-    let mut msv_data: Vec<Vec<Decimal>> = vec![];
-    // 预期利润幅度(except_profit_rate)
-    let mut epr_data: Vec<Vec<Decimal>> = vec![];
-    // 波动率sigma
-    let mut sigma_data: Vec<Vec<Decimal>> = vec![];
 
+    let mut indicators = Indicators::new();
     const GAMMA: Decimal = dec!(0.5);
 
     // ================== 计算每个点的具体波动率以及回溯幅度 ===================
@@ -44,6 +70,7 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simpl
         if index == 0 {
             continue;
         }
+        let mut indicator = Indicator::new();
 
         // 该元素向前遍历range毫秒
         let mut range_index = index;
@@ -97,61 +124,6 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simpl
             future_ref_price_sum / future_ref_count
         };
 
-        // 计算过去至多100条数据的sigma值 sigma^2 = (1 / (tn-t0))*sum((S(tk) - S(tk-1)) ^ 2)
-        let mut sigma_index = index - 1;
-        let t_last = trade.time;
-
-        let mut _t_first = trade.time;
-        // 右值
-        let mut total_right = Decimal::ZERO;
-        loop {
-            let flag_trade = trades.get(sigma_index).unwrap();
-            let next_trade = trades.get(sigma_index + 1).unwrap();
-
-            // 下标合法性判断
-            if sigma_index == 0 || sigma_index + 100 <= index {
-                _t_first = flag_trade.time;
-                break;
-            }
-
-            // 计算差值
-            let diff = Decimal::ONE - flag_trade.price / next_trade.price;
-            total_right += diff * diff;
-
-            sigma_index = sigma_index - 1;
-        }
-        let sigma_square = if _t_first == t_last {
-            let time_diff = Decimal::ONE;
-            (Decimal::ONE / time_diff) * total_right
-        } else {
-            let time_diff = (t_last - _t_first) / Decimal::ONE_THOUSAND;
-            (Decimal::ONE / time_diff) * total_right
-        };
-        let mut sigma = sigma_square.sqrt().unwrap();
-        sigma.rescale(6);
-        // 计算过去至多100个sigma值的平均值
-        let sigma_ma = if sigma_data.len() > 0 {
-            let mut sigma_ma_index = sigma_data.len();
-            let mut sigma_total = Decimal::ZERO;
-            let mut sigma_count = Decimal::ZERO;
-            loop {
-                if sigma_ma_index == 0 || sigma_ma_index + 99 < sigma_data.len() {
-                    break;
-                }
-                // 步进
-                sigma_ma_index -= 1;
-                // 计算
-                sigma_total += sigma_data[sigma_ma_index][1];
-                sigma_count += Decimal::ONE;
-            }
-            let mut sigma_ma = sigma_total / sigma_count;
-            sigma_ma.rescale(6);
-
-            sigma_ma
-        } else {
-            sigma
-        };
-
         // ==================== 波动逻辑计算 ====================
         let last_price = trade.price;
         let mut rate = Decimal::ONE_HUNDRED * (last_price - ref_price) / ref_price;
@@ -174,45 +146,33 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simpl
         epr = min(epr, rate.abs());
 
         // 去重,以及保留最大的波动率
-        if msv_data.len() > 0 {
-            let last = msv_data[msv_data.len() - 1].clone();
-            let last_time = last[0];
-            let last_rate = last[1];
+        if indicators.indicator.len() > 0 {
+            let last = indicators.indicator[indicators.indicator.len() - 1].clone();
+            let last_time = last.timestamp;
+            let last_rate = last.msv_vol;
 
             // 如果时间相同,则可能会进行remove等操作
             if last_time == trade.time {
                 // 如果最新的波动率大于最后波动率
                 if rate.abs() > last_rate.abs() {
-                    msv_data.remove(msv_data.len() - 1);
-                    msv_data.push(vec![trade.time, rate, dissociation]);
-
-                    epr_data.remove(epr_data.len() - 1);
-                    epr_data.push(vec![trade.time, epr]);
-
-                    sigma_data.remove(sigma_data.len() - 1);
-                    sigma_data.push(vec![trade.time, sigma, sigma_ma]);
+                    indicators.indicator.remove(indicators.indicator.len() - 1);
                 }
-            } else {
-                msv_data.push(vec![trade.time, rate, dissociation]);
-                epr_data.push(vec![trade.time, epr]);
-                sigma_data.push(vec![trade.time, sigma, sigma_ma]);
             }
-        } else {
-            msv_data.push(vec![trade.time, rate, dissociation]);
-            epr_data.push(vec![trade.time, epr]);
-            sigma_data.push(vec![trade.time, sigma, sigma_ma]);
         }
+        // 写入记录
+        indicator.timestamp = trade.time;
+        indicator.msv_vol = rate;
+        indicator.msv_strength = dissociation;
+        indicator.epr_expected_profit = epr;
+
+        indicators.indicator.push(indicator);
     }
 
     // 按时间序列填充数据
     let mut msv_index = 0;
-    let mut final_msv_data: Vec<Vec<Decimal>> = vec![];
-    let mut final_epr_data: Vec<Vec<Decimal>> = vec![];
-    let mut final_sigma_data: Vec<Vec<Decimal>> = vec![];
-    let mut final_sigma_ma_data: Vec<Vec<Decimal>> = vec![];
+    let mut final_indicators = Indicators::new();
 
     let mut depth_index = 0;
-    let mut final_volume_data: Vec<Vec<Decimal>> = vec![];
 
     let mut index_timestamp = Decimal::from_i64(start_time).unwrap();
     let last_timestamp = Decimal::from_i64(end_time).unwrap();
@@ -221,35 +181,30 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simpl
         let mut max_msv_data = Decimal::ZERO;
         let mut max_msv_qty_data = Decimal::ZERO;
         let mut max_epr_data = Decimal::ZERO;
-        let mut max_sigma_data = Decimal::ZERO;
-        let mut max_sigma_ma_data = Decimal::ZERO;
 
         // ====================================== 数据生产 ===============================================
         // 获取时间范围内的波动率数据
         loop {
             // 下标合法性判断
-            if msv_index >= msv_data.len() {
+            if msv_index >= indicators.indicator.len() {
                 break;
             }
 
             // msv_data的指定下标数据不在时间范围内(时间范围:指的是[index_timestamp-mills_back, index_timestamp]这个范围)
-            if index_timestamp < msv_data[msv_index][0] {
+            if index_timestamp < indicators.indicator[msv_index].timestamp {
                 break;
             }
 
             // -------------- 大小判断,取值
-            let msv_d = msv_data[msv_index][1];
-            let msv_qty_data = msv_data[msv_index][2];
-            let epr_d = epr_data[msv_index][1];
-            let sigma_d = sigma_data[msv_index][1];
-            let sigma_ma_d = sigma_data[msv_index][2];
+            let msv_d = indicators.indicator[msv_index].msv_vol;
+            let msv_qty_data = indicators.indicator[msv_index].msv_strength;
+            let epr_d = indicators.indicator[msv_index].epr_expected_profit;
+
             // msv波动数据
             if max_msv_data.abs() < msv_d.abs() {
                 max_msv_data = msv_d;
                 max_msv_qty_data = msv_qty_data;
                 max_epr_data = epr_d;
-                max_sigma_data = sigma_d;
-                max_sigma_ma_data = sigma_ma_d;
             }
             // // 波动率sigma
             // if max_sigma_data.abs() < sigma_d {
@@ -279,60 +234,28 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simpl
             // 下标步近
             depth_index += 1;
         }
-
-        // ====================================== 智能填充数据 ===============================================
-        // 流动性数据叠加
-        // let rst_size = if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && final_depth_data.len() > 0 {
-        //     final_depth_data.last().unwrap()[1]
-        // } else {
-        //     if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && simple_depths.len() > 0 {
-        //         simple_depths[0].size
-        //     } else {
-        //         if simple_depths.len() > 0 {
-        //             (max_size + min_size) / Decimal::TWO
-        //         } else {
-        //             Decimal::ZERO
-        //         }
-        //     }
-        // };
-        // final_depth_data.push(vec![index_timestamp, rst_size]);
-        //
-        // // 建议开仓距离
-        // let mut rst_spread = if rst_size == Decimal::ZERO {
-        //     Decimal::ZERO
-        // } else {
-        //     dec!(10000) / rst_size
-        // };
-        // rst_spread.rescale(6);
-        // final_spread_data.push(vec![index_timestamp, rst_spread]);
-
+        let mut indicator = Indicator::new();
         // 波动率数据处理
         // 如果这两个值为0,则代表这mills_back毫秒以内是没有数据的,填充0数据,使得x轴是完整的
         if max_msv_data == Decimal::ZERO {
-            final_msv_data.push(vec![index_timestamp, Decimal::ZERO, Decimal::ZERO]);
-            final_epr_data.push(vec![index_timestamp, Decimal::ZERO]);
-            final_volume_data.push(vec![index_timestamp, Decimal::ZERO]);
-
-            if final_sigma_data.len() > 0 {
-                final_sigma_data.push(vec![index_timestamp, final_sigma_data.last().unwrap()[1]]);
-                final_sigma_ma_data.push(vec![index_timestamp, final_sigma_ma_data.last().unwrap()[1]]);
-            } else {
-                final_sigma_data.push(vec![index_timestamp, Decimal::ZERO]);
-                final_sigma_ma_data.push(vec![index_timestamp, Decimal::ZERO]);
-            }
+            indicator.msv_vol = Decimal::ZERO;
+            indicator.msv_strength = Decimal::ZERO;
+            indicator.timestamp = index_timestamp;
+            indicator.epr_expected_profit = Decimal::ZERO;
+            indicator.liq_liquidity_amount = Decimal::ZERO;
 
             // 说明在这个时间范围内是有数据存在的,将各类副图放置完全
         } else {
-            final_msv_data.push(vec![index_timestamp, max_msv_data, max_msv_qty_data]);
-            final_epr_data.push(vec![index_timestamp, max_epr_data]);
+            indicator.msv_vol = max_msv_data;
+            indicator.msv_strength = max_msv_qty_data;
+            indicator.timestamp = index_timestamp;
+            indicator.epr_expected_profit = max_epr_data;
 
             let mut final_qty = max_msv_qty_data / Decimal::ONE_THOUSAND;
             final_qty.rescale(2);
-            final_volume_data.push(vec![index_timestamp, final_qty]);
-
-            final_sigma_data.push(vec![index_timestamp, max_sigma_data]);
-            final_sigma_ma_data.push(vec![index_timestamp, max_sigma_ma_data]);
+            indicator.liq_liquidity_amount = final_qty;
         }
+        final_indicators.indicator.push(indicator);
 
         // ====================================== 时间步进处理 ======================================
         // 对时间进行步近
@@ -345,16 +268,13 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simpl
 
     // 结果统计
     let total_size = trades.len().to_i64().unwrap();
-    let result_size = final_msv_data.len().to_i64().unwrap();
-    Indicators {
-        msv: final_msv_data,
-        liqs: final_volume_data,
-        eprs: final_epr_data,
-        sigmas: final_sigma_data,
-        sigma_mas: final_sigma_ma_data,
-        total_size,
-        result_size,
-    }
+    let result_size = final_indicators.indicator.len().to_i64().unwrap();
+    let last_calc_time = final_indicators.indicator[final_indicators.indicator.len()-1].timestamp;
+    final_indicators.last_calc_time = last_calc_time;
+    final_indicators.total_size = total_size;
+    final_indicators.result_size = result_size;
+    final_indicators
+
 }
 
 // 将json转换为trades

+ 9 - 9
src/rank.rs

@@ -53,8 +53,8 @@ pub fn generate_rank_by_indicator_map(indicator_map: &MutexGuard<HashMap<String,
         // let timing_difference = end_time - start_time;
         // 0.175 0.225 0.275 0.325
 
-        for (index, value) in indicators.msv.iter().enumerate() {
-            let msv_abs_value = value[1].abs();
+        for (_index, value) in indicators.indicator.iter().enumerate() {
+            let msv_abs_value = value.msv_vol.abs();
 
             if msv_abs_value <= Decimal::ZERO {
                 continue;
@@ -70,14 +70,14 @@ pub fn generate_rank_by_indicator_map(indicator_map: &MutexGuard<HashMap<String,
                 msv_abs_max = msv_abs_value
             }
 
-            let epr = &indicators.eprs[index];
-            if epr[1] > msv_abs_value * ONE_QUARTER || epr[1].abs() > ONE_PERCENT {
+            let epr = value.epr_expected_profit;
+            if epr > msv_abs_value * ONE_QUARTER || epr.abs() > ONE_PERCENT {
                 effective_epr_count += Decimal::ONE;
                 // epr_total += epr[1] * scale;
-                epr_total += epr[1];
+                epr_total += epr;
 
-                if value[1] > epr_max {
-                    epr_max = value[1]
+                if value.msv_vol > epr_max {
+                    epr_max = value.msv_vol
                 }
             }
         }
@@ -94,8 +94,8 @@ pub fn generate_rank_by_indicator_map(indicator_map: &MutexGuard<HashMap<String,
 
         // ============== liq相关数据的计算 =================
         let mut liquidity_total = Decimal::ZERO;
-        for value in indicators.liqs.iter() {
-            liquidity_total += value[1] * Decimal::ONE_THOUSAND;
+        for liq_value in indicators.indicator.iter() {
+            liquidity_total += liq_value.liq_liquidity_amount * Decimal::ONE_THOUSAND;
         }
         let mut liquidity_avg = if msv_count == Decimal::ZERO {
             Decimal::ZERO