Jelajahi Sumber

打开新接口,关闭gate现货定时计算

JiahengHe 1 tahun lalu
induk
melakukan
a13f2a7fac
3 mengubah file dengan 70 tambahan dan 64 penghapusan
  1. 7 7
      src/gate_coin_spot_data_listener.rs
  2. 1 1
      src/main.rs
  3. 62 56
      src/server.rs

+ 7 - 7
src/gate_coin_spot_data_listener.rs

@@ -17,7 +17,7 @@ use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
 use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
 
-const EXCHANGE_NAME: &str = "gate_coin_spot";
+const _EXCHANGE_NAME: &str = "gate_coin_spot";
 
 lazy_static! {
     // 给其他模块使用
@@ -26,7 +26,7 @@ lazy_static! {
     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
 }
 
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+pub async fn _run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     let name = "gate_coin_spot_listener";
     // 订阅所有币种
     let login = BTreeMap::new();
@@ -62,7 +62,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
 
             // 建立链接
             ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+            ws.ws_connect_async(is_shutdown_clone, _data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         });
     }
     // 每分钟计算msv
@@ -71,7 +71,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             let end_timestamp = Utc::now().timestamp_millis();
             let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
             for symbol in symbols.clone() {
-                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
+                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, _EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);
                 let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
                 let mut indicator_map = INDICATOR_MAP.lock().await;
@@ -84,13 +84,13 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         loop {
             tokio::time::sleep(Duration::from_secs(1800)).await;
-            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
+            delete_db_by_exchange(_EXCHANGE_NAME, "trades", 5 * 60).await;
         }
     });
 }
 
 // 读取数据
-pub async fn data_listener(response: ResponseData) {
+pub async fn _data_listener(response: ResponseData) {
     if response.code != 200 {
         return;
     }
@@ -107,7 +107,7 @@ pub async fn data_listener(response: ResponseData) {
 
                 // 更新到本地数据库
                 let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
+                update_trade(trade, trades_map, _EXCHANGE_NAME).await;
             }
         }
         _ => {

+ 1 - 1
src/main.rs

@@ -43,7 +43,7 @@ async fn main() {
     mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
     bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
     bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
-    gate_coin_spot_data_listener::run_listener(running.clone()).await;
+    // gate_coin_spot_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 62 - 56
src/server.rs

@@ -1,11 +1,17 @@
+use std::collections::HashMap;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
+use tokio::sync::{Mutex};
 use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
 use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
 use tracing::{info};
 use actix_cors::Cors;
+use chrono::Utc;
+use rust_decimal_macros::dec;
 use crate::{binance_usdt_swap_data_listener, bitget_usdt_swap_data_listener, bybit_usdt_swap_data_listener, coinex_usdt_swap_data_listener, gate_coin_spot_data_listener, gate_usdt_swap_data_listener, mexc_usdt_swap_data_listener, phemex_usdt_swap_data_listener, rank};
+use crate::json_db_utils::{collect_special_trades_json, get_symbols_by_exchange};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
@@ -33,61 +39,61 @@ pub struct Response {
     code: i32,
 }
 
-// #[get("/rk/get_real_time_rank_list")]
-// async fn get_real_time_rank_list(query: web::Query<RankQuery>) -> impl Responder {
-//     // ============================= 参数校验部分 =========================================
-//     if !query.validate() {
-//         let response = Response {
-//             query: serde_json::to_value(&query.into_inner()).unwrap(),
-//             msg: Some("查询内容有误,必须传递的参数:[exchange]".to_string()),
-//             code: 500,
-//             data: Value::Null,
-//         };
-//
-//         let json_string = serde_json::to_string(&response).unwrap();
-//         return HttpResponse::Ok().content_type("application/json").body(json_string);
-//     }
-//     let supportive_exchange = vec!["gate_usdt_swap", "coinex_usdt_swap", "binance_usdt_swap", "phemex_usdt_swap", "mexc_usdt_swap", "bybit_usdt_swap", "bitget_usdt_swap", "gate_coin_spot"];
-//     let exchange = query.exchange.clone().unwrap().clone();
-//     if !supportive_exchange.contains(&exchange.as_str()) {
-//         let response = Response {
-//             query: serde_json::to_value(&query.into_inner()).unwrap(),
-//             msg: Some("查询内容有误,exchange当前仅支持:[gate_usdt_swap, coinex_usdt_swap, binance_usdt_swap, phemex_usdt_swap, mexc_usdt_swap, bybit_usdt_swap, bitget_usdt_swap, gate_coin_spot]".to_string()),
-//             code: 500,
-//             data: Value::Null,
-//         };
-//
-//         let json_string = serde_json::to_string(&response).unwrap();
-//         return HttpResponse::Ok().content_type("application/json").body(json_string);
-//     }
-//
-//     // 计算msv
-//     let end_timestamp = Utc::now().timestamp_millis();
-//     let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
-//
-//     let symbols = get_symbols_by_exchange(exchange.as_str());
-//
-//     let indicator_map: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
-//     for symbol in symbols.as_array().unwrap() {
-//         let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, exchange.as_str(), &symbol.as_str().unwrap()).await;
-//         let trades = parse_json_to_trades(trades_value);
-//         let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-//         let mut map = indicator_map.lock().await;
-//         map.insert(symbol.as_str().unwrap().to_string(), msv);
-//     }
-//     // 逻辑执行部分
-//     let rst = rank::generate_rank_by_indicator_map(&indicator_map.lock().await);
-//
-//     let response = Response {
-//         query: serde_json::to_value(&query.into_inner()).unwrap(),
-//         msg: Some("查询成功".to_string()),
-//         code: 200,
-//         data: rst,
-//     };
-//
-//     let json_string = serde_json::to_string(&response).unwrap();
-//     HttpResponse::Ok().content_type("application/json").body(json_string)
-// }
+#[get("/rk/get_real_time_rank_list")]
+async fn get_real_time_rank_list(query: web::Query<RankQuery>) -> impl Responder {
+    // ============================= 参数校验部分 =========================================
+    if !query.validate() {
+        let response = Response {
+            query: serde_json::to_value(&query.into_inner()).unwrap(),
+            msg: Some("查询内容有误,必须传递的参数:[exchange]".to_string()),
+            code: 500,
+            data: Value::Null,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        return HttpResponse::Ok().content_type("application/json").body(json_string);
+    }
+    let supportive_exchange = vec!["gate_usdt_swap", "coinex_usdt_swap", "binance_usdt_swap", "phemex_usdt_swap", "mexc_usdt_swap", "bybit_usdt_swap", "bitget_usdt_swap", "gate_coin_spot"];
+    let exchange = query.exchange.clone().unwrap().clone();
+    if !supportive_exchange.contains(&exchange.as_str()) {
+        let response = Response {
+            query: serde_json::to_value(&query.into_inner()).unwrap(),
+            msg: Some("查询内容有误,exchange当前仅支持:[gate_usdt_swap, coinex_usdt_swap, binance_usdt_swap, phemex_usdt_swap, mexc_usdt_swap, bybit_usdt_swap, bitget_usdt_swap, gate_coin_spot]".to_string()),
+            code: 500,
+            data: Value::Null,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        return HttpResponse::Ok().content_type("application/json").body(json_string);
+    }
+
+    // 计算msv
+    let end_timestamp = Utc::now().timestamp_millis();
+    let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
+
+    let symbols = get_symbols_by_exchange(exchange.as_str());
+
+    let indicator_map: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
+    for symbol in symbols.as_array().unwrap() {
+        let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, exchange.as_str(), &symbol.as_str().unwrap()).await;
+        let trades = parse_json_to_trades(trades_value);
+        let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
+        let mut map = indicator_map.lock().await;
+        map.insert(symbol.as_str().unwrap().to_string(), msv);
+    }
+    // 逻辑执行部分
+    let rst = rank::generate_rank_by_indicator_map(&indicator_map.lock().await);
+
+    let response = Response {
+        query: serde_json::to_value(&query.into_inner()).unwrap(),
+        msg: Some("查询成功".to_string()),
+        code: 200,
+        data: rst,
+    };
+
+    let json_string = serde_json::to_string(&response).unwrap();
+    HttpResponse::Ok().content_type("application/json").body(json_string)
+}
 
 #[get("/rk/get_rank_list")]
 async fn get_rank_list(query: web::Query<RankQuery>) -> impl Responder {
@@ -208,7 +214,7 @@ pub fn run_server(port: u32, running: Arc<AtomicBool>) {
         App::new()
             .wrap(cors)
             .service(get_rank_list)
-            // .service(get_real_time_rank_list)
+            .service(get_real_time_rank_list)
             .service(get_exchanges)
     })
         .bind(addr)