Browse Source

开启币本位定时计算,注释新接口

JiahengHe 1 year ago
parent
commit
5de170cf71
3 changed files with 64 additions and 70 deletions
  1. 7 7
      src/gate_coin_spot_data_listener.rs
  2. 1 1
      src/main.rs
  3. 56 62
      src/server.rs

+ 7 - 7
src/gate_coin_spot_data_listener.rs

@@ -17,7 +17,7 @@ use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
 use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
 
-const _EXCHANGE_NAME: &str = "gate_coin_spot";
+const EXCHANGE_NAME: &str = "gate_coin_spot";
 
 lazy_static! {
     // 给其他模块使用
@@ -26,7 +26,7 @@ lazy_static! {
     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
 }
 
-pub async fn _run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     let name = "gate_coin_spot_listener";
     // 订阅所有币种
     let login = BTreeMap::new();
@@ -62,7 +62,7 @@ pub async fn _run_listener(is_shutdown_arc: Arc<AtomicBool>) {
 
             // 建立链接
             ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, _data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         });
     }
     // 每分钟计算msv
@@ -71,7 +71,7 @@ pub async fn _run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             let end_timestamp = Utc::now().timestamp_millis();
             let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
             for symbol in symbols.clone() {
-                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, _EXCHANGE_NAME, &symbol).await;
+                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
                 let trades = parse_json_to_trades(trades_value);
                 let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
                 let mut indicator_map = INDICATOR_MAP.lock().await;
@@ -84,13 +84,13 @@ pub async fn _run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         loop {
             tokio::time::sleep(Duration::from_secs(1800)).await;
-            delete_db_by_exchange(_EXCHANGE_NAME, "trades", 5 * 60).await;
+            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
         }
     });
 }
 
 // 读取数据
-pub async fn _data_listener(response: ResponseData) {
+pub async fn data_listener(response: ResponseData) {
     if response.code != 200 {
         return;
     }
@@ -107,7 +107,7 @@ pub async fn _data_listener(response: ResponseData) {
 
                 // 更新到本地数据库
                 let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, _EXCHANGE_NAME).await;
+                update_trade(trade, trades_map, EXCHANGE_NAME).await;
             }
         }
         _ => {

+ 1 - 1
src/main.rs

@@ -43,7 +43,7 @@ async fn main() {
     mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
     bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
     bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // gate_coin_spot_data_listener::run_listener(running.clone()).await;
+    gate_coin_spot_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 56 - 62
src/server.rs

@@ -1,17 +1,11 @@
-use std::collections::HashMap;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
-use tokio::sync::{Mutex};
 use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
 use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
 use tracing::{info};
 use actix_cors::Cors;
-use chrono::Utc;
-use rust_decimal_macros::dec;
 use crate::{binance_usdt_swap_data_listener, bitget_usdt_swap_data_listener, bybit_usdt_swap_data_listener, coinex_usdt_swap_data_listener, gate_coin_spot_data_listener, gate_usdt_swap_data_listener, mexc_usdt_swap_data_listener, phemex_usdt_swap_data_listener, rank};
-use crate::json_db_utils::{collect_special_trades_json, get_symbols_by_exchange};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
@@ -39,61 +33,61 @@ pub struct Response {
     code: i32,
 }
 
-#[get("/rk/get_real_time_rank_list")]
-async fn get_real_time_rank_list(query: web::Query<RankQuery>) -> impl Responder {
-    // ============================= 参数校验部分 =========================================
-    if !query.validate() {
-        let response = Response {
-            query: serde_json::to_value(&query.into_inner()).unwrap(),
-            msg: Some("查询内容有误,必须传递的参数:[exchange]".to_string()),
-            code: 500,
-            data: Value::Null,
-        };
-
-        let json_string = serde_json::to_string(&response).unwrap();
-        return HttpResponse::Ok().content_type("application/json").body(json_string);
-    }
-    let supportive_exchange = vec!["gate_usdt_swap", "coinex_usdt_swap", "binance_usdt_swap", "phemex_usdt_swap", "mexc_usdt_swap", "bybit_usdt_swap", "bitget_usdt_swap", "gate_coin_spot"];
-    let exchange = query.exchange.clone().unwrap().clone();
-    if !supportive_exchange.contains(&exchange.as_str()) {
-        let response = Response {
-            query: serde_json::to_value(&query.into_inner()).unwrap(),
-            msg: Some("查询内容有误,exchange当前仅支持:[gate_usdt_swap, coinex_usdt_swap, binance_usdt_swap, phemex_usdt_swap, mexc_usdt_swap, bybit_usdt_swap, bitget_usdt_swap, gate_coin_spot]".to_string()),
-            code: 500,
-            data: Value::Null,
-        };
-
-        let json_string = serde_json::to_string(&response).unwrap();
-        return HttpResponse::Ok().content_type("application/json").body(json_string);
-    }
-
-    // 计算msv
-    let end_timestamp = Utc::now().timestamp_millis();
-    let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
-
-    let symbols = get_symbols_by_exchange(exchange.as_str());
-
-    let indicator_map: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
-    for symbol in symbols.as_array().unwrap() {
-        let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, exchange.as_str(), &symbol.as_str().unwrap()).await;
-        let trades = parse_json_to_trades(trades_value);
-        let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-        let mut map = indicator_map.lock().await;
-        map.insert(symbol.as_str().unwrap().to_string(), msv);
-    }
-    // 逻辑执行部分
-    let rst = rank::generate_rank_by_indicator_map(&indicator_map.lock().await);
-
-    let response = Response {
-        query: serde_json::to_value(&query.into_inner()).unwrap(),
-        msg: Some("查询成功".to_string()),
-        code: 200,
-        data: rst,
-    };
-
-    let json_string = serde_json::to_string(&response).unwrap();
-    HttpResponse::Ok().content_type("application/json").body(json_string)
-}
+// #[get("/rk/get_real_time_rank_list")]
+// async fn get_real_time_rank_list(query: web::Query<RankQuery>) -> impl Responder {
+//     // ============================= 参数校验部分 =========================================
+//     if !query.validate() {
+//         let response = Response {
+//             query: serde_json::to_value(&query.into_inner()).unwrap(),
+//             msg: Some("查询内容有误,必须传递的参数:[exchange]".to_string()),
+//             code: 500,
+//             data: Value::Null,
+//         };
+//
+//         let json_string = serde_json::to_string(&response).unwrap();
+//         return HttpResponse::Ok().content_type("application/json").body(json_string);
+//     }
+//     let supportive_exchange = vec!["gate_usdt_swap", "coinex_usdt_swap", "binance_usdt_swap", "phemex_usdt_swap", "mexc_usdt_swap", "bybit_usdt_swap", "bitget_usdt_swap", "gate_coin_spot"];
+//     let exchange = query.exchange.clone().unwrap().clone();
+//     if !supportive_exchange.contains(&exchange.as_str()) {
+//         let response = Response {
+//             query: serde_json::to_value(&query.into_inner()).unwrap(),
+//             msg: Some("查询内容有误,exchange当前仅支持:[gate_usdt_swap, coinex_usdt_swap, binance_usdt_swap, phemex_usdt_swap, mexc_usdt_swap, bybit_usdt_swap, bitget_usdt_swap, gate_coin_spot]".to_string()),
+//             code: 500,
+//             data: Value::Null,
+//         };
+//
+//         let json_string = serde_json::to_string(&response).unwrap();
+//         return HttpResponse::Ok().content_type("application/json").body(json_string);
+//     }
+//
+//     // 计算msv
+//     let end_timestamp = Utc::now().timestamp_millis();
+//     let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
+//
+//     let symbols = get_symbols_by_exchange(exchange.as_str());
+//
+//     let indicator_map: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
+//     for symbol in symbols.as_array().unwrap() {
+//         let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, exchange.as_str(), &symbol.as_str().unwrap()).await;
+//         let trades = parse_json_to_trades(trades_value);
+//         let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
+//         let mut map = indicator_map.lock().await;
+//         map.insert(symbol.as_str().unwrap().to_string(), msv);
+//     }
+//     // 逻辑执行部分
+//     let rst = rank::generate_rank_by_indicator_map(&indicator_map.lock().await);
+//
+//     let response = Response {
+//         query: serde_json::to_value(&query.into_inner()).unwrap(),
+//         msg: Some("查询成功".to_string()),
+//         code: 200,
+//         data: rst,
+//     };
+//
+//     let json_string = serde_json::to_string(&response).unwrap();
+//     HttpResponse::Ok().content_type("application/json").body(json_string)
+// }
 
 #[get("/rk/get_rank_list")]
 async fn get_rank_list(query: web::Query<RankQuery>) -> impl Responder {
@@ -214,7 +208,7 @@ pub fn run_server(port: u32, running: Arc<AtomicBool>) {
         App::new()
             .wrap(cors)
             .service(get_rank_list)
-            .service(get_real_time_rank_list)
+            // .service(get_real_time_rank_list)
             .service(get_exchanges)
     })
         .bind(addr)