Explorar el Código

接口做好了,剩下逻辑没有做了,架构是没啥大问题的。

skyffire hace 1 año
padre
commit
1403f0c939
Se han modificado 3 ficheros con 73 adiciones y 79 borrados
  1. 4 6
      README.MD
  2. 6 3
      src/gate_usdt_swap_data_listener.rs
  3. 63 70
      src/server.rs

+ 4 - 6
README.MD

@@ -1,11 +1,9 @@
 ## /rk/get_rank_list 获取分数排行榜
 
-##### request
-``` json
-{
-    "exchange": "gate_usdt_swap",           // 参与排行的交易所
-}
-```
+##### 请求参数
+| 名称       | 类型     | 必选 |
+|----------|--------|----|
+| exchange | String | 是  |
 
 ##### response
 ``` json

+ 6 - 3
src/gate_usdt_swap_data_listener.rs

@@ -16,11 +16,14 @@ use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
 use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, parse_json_to_trades};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
 
 const EXCHANGE_NAME: &str = "gate_usdt_swap";
 
 lazy_static! {
+    // 给其他模块使用
+    pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
+
     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
     static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
 }
@@ -66,14 +69,14 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     // 每分钟计算msv
     tokio::spawn(async move {
         loop {
-            let mut indicators = BTreeMap::new();
             let end_timestamp = Utc::now().timestamp_millis();
             let start_timestamp = end_timestamp - 60 * 1000 * 60;
             for symbol in symbols.clone() {
                 let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, "gate_usdt_swap", &symbol).await;
                 let trades = parse_json_to_trades(trades_value);
                 let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-                indicators.insert(symbol, msv);
+                let mut indicator_map = INDICATOR_MAP.lock().await;
+                indicator_map.insert(symbol, msv);
             }
             tokio::time::sleep(Duration::from_secs(60)).await;
         }

+ 63 - 70
src/server.rs

@@ -4,79 +4,24 @@ use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
 use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
 use tracing::{info};
-use crate::json_db_utils::{collect_special_trades_json};
+use crate::gate_usdt_swap_data_listener;
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
-pub struct SimpleQuery {
-    symbol: Option<String>,
+pub struct RankQuery {
     exchange: Option<String>,
-    start_time: Option<i64>,
-    end_time: Option<i64>,
 }
 
-impl SimpleQuery {
+impl RankQuery {
     pub fn validate(&self) -> bool {
-        if self.symbol.is_none() {
-            return false
-        }
-
         if self.exchange.is_none() {
             return false
         }
 
-        if self.start_time.is_none()  {
-            return false
-        }
-
-        if self.end_time.is_none()  {
-            return false
-        }
-
-        true
+        return true
     }
 }
 
-// 定义用于反序列化查询参数的结构体
-#[derive(Serialize, Deserialize, Clone)]
-pub struct SymbolsQuery {
-    exchange: Option<String>,
-}
-
-// impl SymbolsQuery {
-//     pub fn validate(&self) -> bool {
-//         if self.exchange.is_none() {
-//             return false
-//         }
-//
-//         return true
-//     }
-// }
-#[derive(Serialize, Deserialize, Clone)]
-pub struct ExchangeSpecialQuery {
-    exchange: Option<String>,
-    start_time: Option<i64>,
-    end_time: Option<i64>,
-}
-
-// impl ExchangeSpecialQuery {
-//     pub fn validate(&self) -> bool {
-//         if self.exchange.is_none() {
-//             return false
-//         }
-//
-//         if self.start_time.is_none()  {
-//             return false
-//         }
-//
-//         if self.end_time.is_none()  {
-//             return false
-//         }
-//
-//         true
-//     }
-// }
-
 #[derive(Serialize, Deserialize)]
 pub struct Response {
     msg: Option<String>,
@@ -85,21 +30,37 @@ pub struct Response {
     code: i32,
 }
 
-#[get("/trades")]
-async fn get_trades(query: web::Query<SimpleQuery>) -> impl Responder {
+#[get("/rk/get_rank_list")]
+async fn get_rank_list(query: web::Query<RankQuery>) -> impl Responder {
     if query.validate() {
-        let response_data = collect_special_trades_json(
-            query.start_time.clone().unwrap(),
-            query.end_time.clone().unwrap(),
-            query.exchange.clone().unwrap().as_str(),
-            query.symbol.clone().unwrap().as_str()
-        ).await;
+        let exchange = query.exchange.clone().unwrap().clone();
+        let indicators = match exchange.as_str() {
+            "gate_usdt_swap" => {
+                gate_usdt_swap_data_listener::INDICATOR_MAP.lock().await
+            },
+            _ => {
+                let response = Response {
+                    query: serde_json::to_value(&query.into_inner()).unwrap(),
+                    msg: Some("查询内容有误,exchange当前仅支持:[gate_usdt_swap]".to_string()),
+                    code: 500,
+                    data: Value::Null,
+                };
+
+                let json_string = serde_json::to_string(&response).unwrap();
+                return HttpResponse::Ok().content_type("application/json").body(json_string)
+            }
+        };
+
+        let count = indicators.get("BTC_USDT").unwrap().total_size;
+        let rst = json!({
+            "count": count
+        });
 
         let response = Response {
             query: serde_json::to_value(&query.into_inner()).unwrap(),
             msg: Some("查询成功".to_string()),
             code: 200,
-            data: json!(response_data),
+            data: rst,
         };
 
         let json_string = serde_json::to_string(&response).unwrap();
@@ -107,7 +68,7 @@ async fn get_trades(query: web::Query<SimpleQuery>) -> impl Responder {
     } else {
         let response = Response {
             query: serde_json::to_value(&query.into_inner()).unwrap(),
-            msg: Some("查询内容有误,必须包含四个参数:[symbol, exchange, start_time, end_time]".to_string()),
+            msg: Some("查询内容有误,必须传递的参数:[exchange]".to_string()),
             code: 500,
             data: Value::Null,
         };
@@ -117,6 +78,37 @@ async fn get_trades(query: web::Query<SimpleQuery>) -> impl Responder {
     }
 }
 
+#[get("/rk/get_exchanges")]
+async fn get_exchanges() -> impl Responder {
+    let exchanges = vec![
+        // "bitget_usdt_swap",
+        // "okx_usdt_swap",
+        // "bingx_usdt_swap",
+        // "mexc_usdt_swap",
+        // "bitmart_usdt_swap",
+        // "kucoin_usdt_swap",
+        // "coinsph_usdt_swap"
+        // "woo_usdt_swap",
+        // "cointr_usdt_swap",
+        "gate_usdt_swap",
+        // "binance_usdt_swap",
+        // "coinex_usdt_swap",
+        // "htx_usdt_swap",
+        // "phemex_usdt_swap",
+    ];
+    let response_data = json!(exchanges);
+
+    let response = Response {
+        query: Value::Null,
+        msg: Some("查询成功".to_string()),
+        code: 200,
+        data: response_data,
+    };
+
+    let json_string = serde_json::to_string(&response).unwrap();
+    HttpResponse::Ok().content_type("application/json").body(json_string)
+}
+
 pub fn run_server(port: u32, running: Arc<AtomicBool>) {
     let addr = format!("0.0.0.0:{}", port);
     info!("数据服务绑定地址:{}", addr);
@@ -124,7 +116,8 @@ pub fn run_server(port: u32, running: Arc<AtomicBool>) {
     // 启动server
     let server_fut = HttpServer::new(move || {
         App::new()
-            .service(get_trades)
+            .service(get_rank_list)
+            .service(get_exchanges)
     })
     .bind(addr)
     .expect("Bind port error")