Эх сурвалжийг харах

删除过期缓存数据,计算指定时间内的数据

JiahengHe 1 жил өмнө
parent
commit
548848a26d

+ 3 - 2
src/binance_usdt_swap_data_listener.rs

@@ -14,7 +14,7 @@ use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades, delete_expire_data};
 
 const EXCHANGE_NAME: &str = "binance_usdt_swap";
 
@@ -90,7 +90,8 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
                         indicators.result_size = old_msv.result_size + msv.result_size;
                         indicators.total_size = old_msv.total_size + msv.total_size;
                         indicators.last_calc_time = msv.last_calc_time;
-                        indicators
+                        // 删除掉指定小时前的数据
+                        delete_expire_data(indicators, 4)
                     }
                 };
                 indicator_map.insert(symbol, new_msv);

+ 3 - 2
src/bitget_usdt_swap_data_listener.rs

@@ -15,7 +15,7 @@ use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades, delete_expire_data};
 
 const EXCHANGE_NAME: &str = "bitget_usdt_swap";
 
@@ -90,7 +90,8 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
                         indicators.result_size = old_msv.result_size + msv.result_size;
                         indicators.total_size = old_msv.total_size + msv.total_size;
                         indicators.last_calc_time = msv.last_calc_time;
-                        indicators
+                        // 删除掉指定小时前的数据
+                        delete_expire_data(indicators, 4)
                     }
                 };
                 indicator_map.insert(symbol, new_msv);

+ 26 - 3
src/bybit_usdt_swap_data_listener.rs

@@ -16,7 +16,7 @@ use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades, delete_expire_data};
 
 const EXCHANGE_NAME: &str = "bybit_usdt_swap";
 
@@ -66,16 +66,39 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     }
     // 每分钟计算msv
     tokio::spawn(async move {
+        // 上次最后结束时间
+        let mut last_time: i64 = 0;
         loop {
             let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
+            let mut start_timestamp = last_time;
+            // 第一次拿两小时
+            if start_timestamp == 0 {
+                start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
+            }
             for symbol in symbols.clone() {
                 let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);
                 let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
                 let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
+                let new_msv = match indicator_map.get(&symbol) {
+                    None => {
+                        msv
+                    }
+                    Some(old_msv) => {
+                        let mut indicators = Indicators::new();
+                        indicators.indicator.extend(old_msv.indicator.clone());
+                        indicators.indicator.extend(msv.indicator);
+                        indicators.result_size = old_msv.result_size + msv.result_size;
+                        indicators.total_size = old_msv.total_size + msv.total_size;
+                        indicators.last_calc_time = msv.last_calc_time;
+                        // 删除掉指定小时前的数据
+                        delete_expire_data(indicators, 4)
+                    }
+                };
+                indicator_map.insert(symbol, new_msv);
             }
+            // 更新结束时间
+            last_time = end_timestamp;
             tokio::time::sleep(Duration::from_secs(65)).await;
         }
     });

+ 3 - 2
src/coinex_usdt_swap_data_listener.rs

@@ -15,7 +15,7 @@ use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades, delete_expire_data};
 
 const EXCHANGE_NAME: &str = "coinex_usdt_swap";
 
@@ -92,7 +92,8 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
                         indicators.result_size = old_msv.result_size + msv.result_size;
                         indicators.total_size = old_msv.total_size + msv.total_size;
                         indicators.last_calc_time = msv.last_calc_time;
-                        indicators
+                        // 删除掉指定小时前的数据
+                        delete_expire_data(indicators, 4)
                     }
                 };
                 indicator_map.insert(symbol, new_msv);

+ 3 - 2
src/gate_coin_spot_data_listener.rs

@@ -15,7 +15,7 @@ use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades, delete_expire_data};
 
 const EXCHANGE_NAME: &str = "gate_coin_spot";
 
@@ -92,7 +92,8 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
                         indicators.result_size = old_msv.result_size + msv.result_size;
                         indicators.total_size = old_msv.total_size + msv.total_size;
                         indicators.last_calc_time = msv.last_calc_time;
-                        indicators
+                        // 删除掉指定小时前的数据
+                        delete_expire_data(indicators, 4)
                     }
                 };
                 indicator_map.insert(symbol, new_msv);

+ 3 - 2
src/gate_usdt_swap_data_listener.rs

@@ -16,7 +16,7 @@ use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades, delete_expire_data};
 
 const EXCHANGE_NAME: &str = "gate_usdt_swap";
 
@@ -93,7 +93,8 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
                         indicators.result_size = old_msv.result_size + msv.result_size;
                         indicators.total_size = old_msv.total_size + msv.total_size;
                         indicators.last_calc_time = msv.last_calc_time;
-                        indicators
+                        // 删除掉指定小时前的数据
+                        delete_expire_data(indicators, 4)
                     }
                 };
                 indicator_map.insert(symbol, new_msv);

+ 3 - 2
src/mexc_usdt_swap_data_listener.rs

@@ -17,7 +17,7 @@ use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades, delete_expire_data};
 
 const EXCHANGE_NAME: &str = "mexc_usdt_swap";
 
@@ -95,7 +95,8 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
                         indicators.result_size = old_msv.result_size + msv.result_size;
                         indicators.total_size = old_msv.total_size + msv.total_size;
                         indicators.last_calc_time = msv.last_calc_time;
-                        indicators
+                        // 删除掉指定小时前的数据
+                        delete_expire_data(indicators, 4)
                     }
                 };
                 indicator_map.insert(symbol, new_msv);

+ 15 - 0
src/msv.rs

@@ -1,5 +1,6 @@
 use std::cmp::{max, min};
 use std::str::FromStr;
+use chrono::Utc;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
 use rust_decimal_macros::dec;
@@ -277,6 +278,20 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simpl
 
 }
 
+// 删除多少小时前的数据
+pub fn delete_expire_data(mut msv: Indicators, hour_num: i64) -> Indicators {
+    let now = Utc::now().timestamp_millis();
+    let limit_time = now - 60 * 1000 * 60 * hour_num;
+    let old_size = msv.indicator.len();
+    msv.indicator.retain(|info| info.timestamp.to_i64().unwrap() < limit_time);
+    let delete_num = old_size - msv.indicator.len();
+    // 更新有效条数
+    msv.result_size = msv.indicator.len().to_i64().unwrap();
+    // 更新总条数
+    msv.total_size -= delete_num.to_i64().unwrap();
+    msv
+}
+
 // 将json转换为trades
 pub fn parse_json_to_trades(trades_json: Value) -> Vec<Trade> {
     let mut rst = vec![];

+ 3 - 2
src/phemex_usdt_swap_data_listener.rs

@@ -15,7 +15,7 @@ use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades, delete_expire_data};
 
 const EXCHANGE_NAME: &str = "phemex_usdt_swap";
 
@@ -99,7 +99,8 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
                         indicators.result_size = old_msv.result_size + msv.result_size;
                         indicators.total_size = old_msv.total_size + msv.total_size;
                         indicators.last_calc_time = msv.last_calc_time;
-                        indicators
+                        // 删除掉指定小时前的数据
+                        delete_expire_data(indicators, 4)
                     }
                 };
                 indicator_map.insert(symbol, new_msv);

+ 9 - 1
src/rank.rs

@@ -1,6 +1,8 @@
 use std::cmp::{min};
 use std::collections::HashMap;
+use chrono::Utc;
 use rust_decimal::{Decimal, MathematicalOps};
+use rust_decimal::prelude::ToPrimitive;
 use rust_decimal_macros::dec;
 use serde::{Deserialize, Serialize};
 use serde_json::{Value};
@@ -34,7 +36,9 @@ const TWO_HUNDRED: Decimal = dec!(200);
 // const TEN_THOUSAND: Decimal = dec!(10000);
 
 // 根据最终的msv计算排行榜
-pub fn generate_rank_by_indicator_map(indicator_map: &MutexGuard<HashMap<String, Indicators>>) -> Value {
+pub fn generate_rank_by_indicator_map(indicator_map: &MutexGuard<HashMap<String, Indicators>>, hour_num: i64) -> Value {
+    let now = Utc::now().timestamp_millis();
+    let limit_time = now - 60 * 1000 * 60 * hour_num;
     let mut rank_list: Vec<Rank> = vec![];
     let binance_symbols = get_symbols_by_exchange("binance_usdt_swap").as_array().unwrap_or(&vec![]).clone();
     for (key, indicators) in indicator_map.iter() {
@@ -54,6 +58,10 @@ pub fn generate_rank_by_indicator_map(indicator_map: &MutexGuard<HashMap<String,
         // 0.175 0.225 0.275 0.325
 
         for (_index, value) in indicators.indicator.iter().enumerate() {
+            // 如果数据时间循环到超过限制时间,直接跳出循环
+            if value.timestamp.to_i64().unwrap() < limit_time{
+                break
+            }
             let msv_abs_value = value.msv_vol.abs();
 
             if msv_abs_value <= Decimal::ZERO {

+ 2 - 2
src/server.rs

@@ -82,7 +82,7 @@ async fn get_real_time_rank_list(query: web::Query<RankQuery>) -> impl Responder
         map.insert(symbol.as_str().unwrap().to_string(), msv);
     }
     // 逻辑执行部分
-    let rst = rank::generate_rank_by_indicator_map(&indicator_map.lock().await);
+    let rst = rank::generate_rank_by_indicator_map(&indicator_map.lock().await, 2);
 
     let response = Response {
         query: serde_json::to_value(&query.into_inner()).unwrap(),
@@ -149,7 +149,7 @@ async fn get_rank_list(query: web::Query<RankQuery>) -> impl Responder {
     };
 
     // 逻辑执行部分
-    let rst = rank::generate_rank_by_indicator_map(&indicators);
+    let rst = rank::generate_rank_by_indicator_map(&indicators, 2);
 
     let response = Response {
         query: serde_json::to_value(&query.into_inner()).unwrap(),