Kaynağa Gözat

进行了一些基础计算和封装

skyffire 1 yıl önce
ebeveyn
işleme
565ab06af0
2 değiştirilmiş dosya ile 85 ekleme ve 42 silme
  1. 4 4
      src/server.rs
  2. 81 38
      src/symbol_filter.rs

+ 4 - 4
src/server.rs

@@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
 use actix_cors::Cors;
 use actix_web::{web, App, HttpResponse, HttpServer, Responder, post};
 use rust_decimal::Decimal;
-use rust_decimal_macros::dec;
+use rust_decimal::prelude::ToPrimitive;
 use serde::{Deserialize, Serialize};
 use serde_json::Value;
 use tracing::{info};
@@ -57,7 +57,7 @@ async fn get_symbols_by_filter(query: web::Json<Value>) -> impl Responder {
     };
     let minute_time_range = match parse_str_to_decimal(query_value.clone(), "minute_time_range") {
         Ok(range) => {
-            range
+            range.to_i64().unwrap()
         }
         Err(response) => {
             return response
@@ -67,12 +67,12 @@ async fn get_symbols_by_filter(query: web::Json<Value>) -> impl Responder {
         Ok(filters) => {
             filters
         }
-        Err(response) => {
+        Err(_) => {
             vec![]
         }
     };
 
-    get_symbols(&mode, &exchanges, &minute_time_range, &filters).await
+    get_symbols(&mode, &exchanges, minute_time_range, &filters).await
 }
 
 // ia: intelligence agency, 情报部门

+ 81 - 38
src/symbol_filter.rs

@@ -1,10 +1,10 @@
 use std::collections::{HashMap, HashSet};
+use std::str::FromStr;
 use actix_web::HttpResponse;
+use chrono::Utc;
 use rust_decimal::Decimal;
 use serde_json::{json, Value};
-use tracing::info;
 use crate::db_connector::{get_records_json, get_symbols_json};
-use crate::msv::{generate_msv_by_trades, parse_json_to_trades};
 use crate::server::Response;
 
 fn get_public_symbols(symbols_map: &Value) -> Vec<String> {
@@ -27,7 +27,30 @@ fn get_public_symbols(symbols_map: &Value) -> Vec<String> {
     common_symbols.unwrap_or_default().into_iter().collect::<Vec<String>>()
 }
 
-pub async fn get_symbols(mode: &str, exchanges: &Vec<String>, minute_time_range: &Decimal, filters: &Vec<Value>) -> HttpResponse {
+fn calc_total_volume(records: Value) -> Decimal {
+    let mut total_volume = Decimal::ZERO;
+    for record in records.as_array().unwrap() {
+        total_volume = total_volume + Decimal::from_str(record["volume"].as_str().unwrap().to_string().as_str()).unwrap();
+    }
+
+    total_volume
+}
+
+fn calc_rise_percentage(records: Value) -> Decimal {
+    let records_array = records.as_array().unwrap();
+
+    let first_record = records_array[records_array.len() - 1].clone();
+    let last_record = records_array[0].clone();
+    let open = Decimal::from_str(first_record["open"].as_str().unwrap().to_string().as_str()).unwrap();
+    let close = Decimal::from_str(last_record["close"].as_str().unwrap().to_string().as_str()).unwrap();
+
+    let mut rst = Decimal::ONE_HUNDRED * (close - open) / open;
+    rst.rescale(2);
+
+    return rst
+}
+
+pub async fn get_symbols(_mode: &str, exchanges: &Vec<String>, minute_time_range: i64, _filters: &Vec<Value>) -> HttpResponse {
     // 1. 获取所选交易所的所有交易对
     let mut symbols_map = json!({});
     for exchange in exchanges {
@@ -46,7 +69,7 @@ pub async fn get_symbols(mode: &str, exchanges: &Vec<String>, minute_time_range:
     // 2. 如果是多交易所,则获取所有交易所的交集
     let mut symbols: Vec<String> = get_public_symbols(&symbols_map);
     // 确保有足够的元素
-    let n = 20;
+    let n = 50;
     // 获取前20个元素,如果不足20个,则获取全部
     let top_symbols = if symbols.len() > n {
         &symbols[0..n]
@@ -67,58 +90,78 @@ pub async fn get_symbols(mode: &str, exchanges: &Vec<String>, minute_time_range:
     //     "exchange2": [k1,k2,...]
     // },
     // }
+    let end_time = Utc::now().timestamp_millis();
+    let start_time = end_time - minute_time_range * 60 * 1000;
     let mut records_map = json!({});
-    for symbol in symbols {
+    for symbol in &symbols {
         records_map[symbol.clone()] = json!({});
         for exchange in exchanges {
-            let db_response = get_records_json(exchange.as_str()).await;
+            let db_response = get_records_json(exchange.as_str(), symbol.as_str(), start_time, end_time).await;
 
             // 对数据库返回的数据进行容错处理
             if db_response.code == 200 {
-                let symbol_array_value = db_response.data;
+                let records = db_response.data;
 
-                records_map[symbol.clone()][exchange] = symbol_array_value.clone();
+                records_map[symbol.clone()][exchange] = records.clone();
             } else {
                 let json_string = serde_json::to_string(&db_response).unwrap();
                 return HttpResponse::Ok().content_type("application/json").body(json_string);
             }
         }
     }
-    // 4. 整理完之后,获取指标数据,进行逻辑判断,注意与/或只是指过滤器,如果选择了多交易所,交易所的逻辑部分都是与
+
+    // 4. 生成相关指标数据结构,要体现出交易所
+    // [
+    //     {
+    //         "symbol": "BTC_USDT",
+    //         "rise": {
+    //             "gate_usdt_swap": 3,
+    //             "bitget_usdt_swap": 2.9,
+    //         },
+    //         "volume": {
+    //             "gate_usdt_swap": 0.1,
+    //             "bitget_usdt_swap": 0.5,
+    //             "total": 0.6,
+    //         },
+    //     },
+    //     {
+    //         "symbol": "DOGE_USDT",
+    //             "rise": {
+    //             "gate_usdt_swap": -1.9,
+    //             "bitget_usdt_swap": -2,
+    //         },
+    //         "volume": {
+    //             "gate_usdt_swap": 0.2,
+    //             "bitget_usdt_swap": 0.3,
+    //             "total": 0.5,
+    //         },
+    //     }
+    // ]
+    let mut response_value = json!([]);
+    for symbol in &symbols {
+        let mut rise = json!({});
+        let mut volume = json!({});
+        for exchange in exchanges {
+            rise[exchange] = json!(calc_rise_percentage(records_map[symbol][exchange].clone()));
+            volume[exchange] = json!(calc_total_volume(records_map[symbol][exchange].clone()));
+        }
+        let value = json!({
+            "symbol": symbol.clone(),
+            "rise": rise,
+            "volume": volume
+        });
+
+        response_value.as_array_mut().unwrap().push(value);
+    }
+
+    // 5. 整理完之后,获取指标数据,进行逻辑判断,注意与/或只是指过滤器,如果选择了多交易所,交易所的逻辑部分都是与
     // 举例:
     //      选择了gate_usdt_swap与bitget_usdt_swap两个交易所,并且对交易量进行了0.5M的过滤
     //      两个交易所都有xrp,如果gate与bitget的xrp交易量都大于0.5M,则会显示在最终列表中
-    // 5. 最终出来的数据结构,要体现出交易所
-
-    let response_value = json!([
-        {
-            "symbol": "BTC_USDT",
-            "rise": {
-                "gate_usdt_swap": 3,
-                "bitget_usdt_swap": 2.9,
-            },
-            "volume": {
-                "gate_usdt_swap": 0.1,
-                "bitget_usdt_swap": 0.5,
-                "total": 0.6,
-            },
-        },
-        {
-            "symbol": "DOGE_USDT",
-            "rise": {
-                "gate_usdt_swap": -1.9,
-                "bitget_usdt_swap": -2,
-            },
-            "volume": {
-                "gate_usdt_swap": 0.2,
-                "bitget_usdt_swap": 0.3,
-                "total": 0.5,
-            },
-        }
-    ]);
+    // 最终过滤
     let response = Response {
         query: Value::Null,
-        msg: Some("指标生成完毕".to_string()),
+        msg: Some("排行榜生成完毕".to_string()),
         code: 200,
         data: response_value,
     };