Эх сурвалжийг харах

添加实时请求计算排行榜

DESKTOP-NE65RNK\Citrus_limon 1 жил өмнө
parent
commit
e62455fd5a
2 өөрчлөгдсөн 87 нэмэгдсэн , 22 устгасан
  1. 7 7
      src/main.rs
  2. 80 15
      src/server.rs

+ 7 - 7
src/main.rs

@@ -37,13 +37,13 @@ async fn main() {
     control_c::exit_handler(running.clone());
     // 启动各交易所的数据监听器
     binance_usdt_swap_data_listener::run_listener(running.clone()).await;
-    gate_usdt_swap_data_listener::run_listener(running.clone()).await;
-    coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
-    bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
-    bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
-    gate_coin_spot_data_listener::run_listener(running.clone()).await;
+    // gate_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // gate_coin_spot_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 80 - 15
src/server.rs

@@ -1,28 +1,36 @@
+use std::collections::HashMap;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
+use tokio::sync::{Mutex};
 use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
 use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
 use tracing::{info};
 use actix_cors::Cors;
+use chrono::Utc;
+use rust_decimal_macros::dec;
 use crate::{binance_usdt_swap_data_listener, bitget_usdt_swap_data_listener, bybit_usdt_swap_data_listener, coinex_usdt_swap_data_listener, gate_coin_spot_data_listener, gate_usdt_swap_data_listener, mexc_usdt_swap_data_listener, phemex_usdt_swap_data_listener, rank};
+use crate::json_db_utils::{collect_special_trades_json, get_symbols_by_exchange};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
 pub struct RankQuery {
     exchange: Option<String>,
+    time: Option<String>,
 }
 
 impl RankQuery {
     pub fn validate(&self) -> bool {
         if self.exchange.is_none() {
-            return false
+            return false;
         }
 
-        return true
+        return true;
     }
 }
 
+
 #[derive(Serialize, Deserialize)]
 pub struct Response {
     msg: Option<String>,
@@ -31,6 +39,62 @@ pub struct Response {
     code: i32,
 }
 
+#[get("/rk/get_real_time_rank_list")]
+async fn get_real_time_rank_list(query: web::Query<RankQuery>) -> impl Responder {
+    // ============================= 参数校验部分 =========================================
+    if !query.validate() {
+        let response = Response {
+            query: serde_json::to_value(&query.into_inner()).unwrap(),
+            msg: Some("查询内容有误,必须传递的参数:[exchange]".to_string()),
+            code: 500,
+            data: Value::Null,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        return HttpResponse::Ok().content_type("application/json").body(json_string);
+    }
+    let supportive_exchange = vec!["gate_usdt_swap", "coinex_usdt_swap", "binance_usdt_swap", "phemex_usdt_swap", "mexc_usdt_swap", "bybit_usdt_swap", "bitget_usdt_swap", "gate_coin_spot"];
+    let exchange = query.exchange.clone().unwrap().clone();
+    if !supportive_exchange.contains(&exchange.as_str()) {
+        let response = Response {
+            query: serde_json::to_value(&query.into_inner()).unwrap(),
+            msg: Some("查询内容有误,exchange当前仅支持:[gate_usdt_swap, coinex_usdt_swap, binance_usdt_swap, phemex_usdt_swap, mexc_usdt_swap, bybit_usdt_swap, bitget_usdt_swap, gate_coin_spot]".to_string()),
+            code: 500,
+            data: Value::Null,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        return HttpResponse::Ok().content_type("application/json").body(json_string);
+    }
+
+    // 计算msv
+    let end_timestamp = Utc::now().timestamp_millis();
+    let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
+
+    let symbols = get_symbols_by_exchange(exchange.as_str());
+
+    let indicator_map: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
+    for symbol in symbols.as_array().unwrap() {
+        let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, exchange.as_str(), &symbol.as_str().unwrap()).await;
+        let trades = parse_json_to_trades(trades_value);
+        let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
+        let mut map = indicator_map.lock().await;
+        map.insert(symbol.as_str().unwrap().to_string(), msv);
+    }
+    // 逻辑执行部分
+    let rst = rank::generate_rank_by_indicator_map(&indicator_map.lock().await);
+
+    let response = Response {
+        query: serde_json::to_value(&query.into_inner()).unwrap(),
+        msg: Some("查询成功".to_string()),
+        code: 200,
+        data: rst,
+    };
+
+    let json_string = serde_json::to_string(&response).unwrap();
+    HttpResponse::Ok().content_type("application/json").body(json_string)
+}
+
 #[get("/rk/get_rank_list")]
 async fn get_rank_list(query: web::Query<RankQuery>) -> impl Responder {
     // ============================= 参数校验部分 =========================================
@@ -49,28 +113,28 @@ async fn get_rank_list(query: web::Query<RankQuery>) -> impl Responder {
     let indicators = match exchange.as_str() {
         "gate_usdt_swap" => {
             gate_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        },
+        }
         "coinex_usdt_swap" => {
             coinex_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        },
+        }
         "binance_usdt_swap" => {
             binance_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        },
+        }
         "phemex_usdt_swap" => {
             phemex_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        },
+        }
         "mexc_usdt_swap" => {
             mexc_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        },
+        }
         "bybit_usdt_swap" => {
             bybit_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        },
+        }
         "bitget_usdt_swap" => {
             bitget_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        },
+        }
         "gate_coin_spot" => {
             gate_coin_spot_data_listener::INDICATOR_MAP.lock().await
-        },
+        }
         _ => {
             let response = Response {
                 query: serde_json::to_value(&query.into_inner()).unwrap(),
@@ -80,7 +144,7 @@ async fn get_rank_list(query: web::Query<RankQuery>) -> impl Responder {
             };
 
             let json_string = serde_json::to_string(&response).unwrap();
-            return HttpResponse::Ok().content_type("application/json").body(json_string)
+            return HttpResponse::Ok().content_type("application/json").body(json_string);
         }
     };
 
@@ -119,7 +183,7 @@ async fn get_exchanges() -> impl Responder {
         "mexc_usdt_swap",
         "bybit_usdt_swap",
         "bitget_usdt_swap",
-        "gate_coin_spot"
+        "gate_coin_spot",
     ];
     let response_data = json!(exchanges);
 
@@ -150,11 +214,12 @@ pub fn run_server(port: u32, running: Arc<AtomicBool>) {
         App::new()
             .wrap(cors)
             .service(get_rank_list)
+            .service(get_real_time_rank_list)
             .service(get_exchanges)
     })
-    .bind(addr)
-    .expect("Bind port error")
-    .run();
+        .bind(addr)
+        .expect("Bind port error")
+        .run();
 
     info!("数据仓库服务已运行。");