Browse Source

改为2小时数据 去掉线性计分

DESKTOP-NE65RNK\Citrus_limon 1 year ago
parent
commit
9bb0e40498

+ 1 - 1
exchanges/src/bybit_swap_ws.rs

@@ -290,7 +290,7 @@ impl BybitSwapWs {
         let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
         let json_value: Value = serde_json::from_str(&text).unwrap();
         if json_value.get("success").is_some() {
-            info!("bybit_swap_ws订阅结果:{:?}", json_value);
+            // info!("bybit_swap_ws订阅结果:{:?}", json_value);
             //订阅内容
             let success = json_value["success"].as_bool().unwrap();
             // let ret_msg = json_value["ret_msg"].as_str().unwrap();

+ 13 - 11
exchanges/src/mexc_swap_ws.rs

@@ -2,7 +2,6 @@ use std::io::Read;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use std::time::Duration;
-
 use flate2::read::GzDecoder;
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use serde_json::json;
@@ -12,7 +11,7 @@ use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{error, info, trace};
 
 use crate::response_base::ResponseData;
-use crate::socket_tool::AbstractWsMode;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
 
 //类型
 pub enum MexcSwapWsType {
@@ -107,7 +106,7 @@ impl MexcSwapWs {
             ws_param,
             symbol_s: vec![],
             subscribe_types: vec![],
-            heartbeat_time: 1000 * 18,
+            heartbeat_time: 1000 * 12,
         }
     }
 
@@ -187,7 +186,7 @@ impl MexcSwapWs {
     pub async fn ws_connect_async<F, Future>(&mut self,
                                              is_shutdown_arc: Arc<AtomicBool>,
                                              handle_function: F,
-                                             _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
                                              write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
         where
             F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
@@ -197,15 +196,18 @@ impl MexcSwapWs {
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let tag = self.tag.clone();
-        // let heartbeat_time = self.ws_param.ws_ping_interval.clone();
+        let heartbeat_time = self.heartbeat_time.clone();
 
         //心跳-- 方法内部线程启动
-        // let write_tx_clone1 = write_tx_am.clone();
-        // tokio::spawn(async move {
-        //     trace!("线程-异步心跳-开始");
-        //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
-        //     trace!("线程-异步心跳-结束");
-        // });
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            trace!("线程-异步心跳-开始");
+            let ping_str = json!({
+                "method": "ping"
+            });
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
+            trace!("线程-异步心跳-结束");
+        });
 
 
         //设置订阅

+ 1 - 1
src/binance_usdt_swap_data_listener.rs

@@ -67,7 +67,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         loop {
             let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
+            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
             for symbol in symbols.clone() {
                 let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);

+ 1 - 1
src/bybit_usdt_swap_data_listener.rs

@@ -68,7 +68,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         loop {
             let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
+            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
             for symbol in symbols.clone() {
                 let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);

+ 1 - 2
src/coinex_usdt_swap_data_listener.rs

@@ -9,7 +9,6 @@ use tracing::info;
 use exchanges::coinex_swap_rest::CoinexSwapRest;
 use exchanges::coinex_swap_ws::{CoinexSwapSubscribeType, CoinexSwapWs, CoinexSwapWsType};
 // use exchanges::coinex_swap_ws::{CoinexSwapSubscribeType, CoinexSwapWs};
-use exchanges::phemex_swap_ws::{PhemexSwapWs, PhemexSwapWsType};
 use exchanges::response_base::ResponseData;
 use rust_decimal_macros::dec;
 use standard::exchange::ExchangeEnum;
@@ -70,7 +69,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         loop {
             let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
+            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
             for symbol in symbols.clone() {
                 let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);

+ 1 - 1
src/gate_usdt_swap_data_listener.rs

@@ -70,7 +70,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         loop {
             let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
+            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
             for symbol in symbols.clone() {
                 let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);

+ 1 - 1
src/mexc_usdt_swap_data_listener.rs

@@ -72,7 +72,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         loop {
             let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
+            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
             for symbol in symbols.clone() {
                 let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);

+ 1 - 1
src/phemex_usdt_swap_data_listener.rs

@@ -76,7 +76,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         loop {
             let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
+            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
             for symbol in loc_symbols.clone() {
                 let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);

+ 9 - 7
src/rank.rs

@@ -48,9 +48,9 @@ pub fn generate_rank_by_indicator_map(indicator_map: &MutexGuard<HashMap<String,
         let mut epr_total = Decimal::ZERO;
         let mut epr_max = Decimal::ZERO;
 
-        let start_time = indicators.msv[0][0];
-        let end_time = indicators.msv[indicators.msv.len() - 1][0];
-        let timing_difference = end_time - start_time;
+        // let start_time = indicators.msv[0][0];
+        // let end_time = indicators.msv[indicators.msv.len() - 1][0];
+        // let timing_difference = end_time - start_time;
         // 0.175 0.225 0.275 0.325
 
         for (index, value) in indicators.msv.iter().enumerate() {
@@ -59,11 +59,12 @@ pub fn generate_rank_by_indicator_map(indicator_map: &MutexGuard<HashMap<String,
             if msv_abs_value <= Decimal::ZERO {
                 continue;
             }
-            let a_scale = ((value[0] - start_time) / timing_difference).round_dp(2) + dec!(0.25);
+            // let a_scale = ((value[0] - start_time) / timing_difference).round_dp(2) + dec!(0.25);
 
-            let scale = if a_scale > Decimal::ONE { Decimal::ONE } else { a_scale };
+            // let scale = if a_scale > Decimal::ONE { Decimal::ONE } else { a_scale };
             msv_count += Decimal::ONE;
-            msv_abs_total += msv_abs_value * scale;
+            // msv_abs_total += msv_abs_value * scale;
+            msv_abs_total += msv_abs_value;
 
             if msv_abs_value > msv_abs_max {
                 msv_abs_max = msv_abs_value
@@ -72,7 +73,8 @@ pub fn generate_rank_by_indicator_map(indicator_map: &MutexGuard<HashMap<String,
             let epr = &indicators.eprs[index];
             if epr[1] > msv_abs_value * ONE_QUARTER || epr[1].abs() > ONE_PERCENT {
                 effective_epr_count += Decimal::ONE;
-                epr_total += epr[1] * scale;
+                // epr_total += epr[1] * scale;
+                epr_total += epr[1];
 
                 if value[1] > epr_max {
                     epr_max = value[1]