Jelajahi Sumber

指标修好了。

skyffire 1 tahun lalu
induk
melakukan
35181bac07
5 mengubah file dengan 169 tambahan dan 78 penghapusan
  1. 30 0
      README.MD
  2. 2 2
      src/db_connector.rs
  3. 3 3
      src/main.rs
  4. 97 6
      src/msv.rs
  5. 37 67
      src/server.rs

+ 30 - 0
README.MD

@@ -0,0 +1,30 @@
+## /ia/get_indicator  生成指标
+
+### MSV指标
+##### 毫秒(millisecond)波动率(volatility)
+``` json
+{
+    "indicator": "msv",                     // 指标名
+    "query": {                              // 查询结构
+        "exchange": "gate_usdt_swap",       // 交易所,当前支持[gate_usdt_swap, bitget_usdt_swap]
+        "symbol": "btc",                    // 符号,支持大小写,如果不写usdt会自动在后面添加_USDT
+        "minute_time_range": "240",         // 查询多少分钟的数据
+        "mills_back": "37"                  // 回溯多少毫秒
+    }
+}
+```
+
+##### response
+``` json
+{
+    "query": {},                            // 你的查询参数,用于接口联调       
+    "message": "hello",                     // 后台提醒
+    "code": 200,                            // 200 就是对的
+    "data": {                               // 指标结果
+        "x": ["1", "2"],                    // x轴数据
+        "y": ["1", "2"],                    // y轴数据
+        "total_size": 3,                    // 总trades条数
+        "result_size": 1,                   // 指标数据条数
+    },
+}
+```

+ 2 - 2
src/db_connector.rs

@@ -36,8 +36,8 @@ pub async fn get_trades_json(exchange: &str, symbol: &str, start_at: i64, end_at
         serde_json::from_str(response_text.as_str()).unwrap()
     } else {
         Response {
-            message: Some("请求失败,预计是指标层的网络请求错误。".to_string()),
-            query_string: Default::default(),
+            msg: Some("请求失败,预计是指标层的网络请求错误。".to_string()),
+            query: Default::default(),
             data: Default::default(),
             code: -200,
         }

+ 3 - 3
src/main.rs

@@ -1,7 +1,7 @@
 mod control_c;
 mod server;
 pub mod db_connector;
-mod msv_generate;
+mod msv;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -22,11 +22,11 @@ async fn main() {
     // 掌控全局的关闭
     let running = Arc::new(AtomicBool::new(true));
     // panic错误捕获,panic级别的错误直接退出
-    let panic_running = running.clone();
+    // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {
         let msg = format!("type=panic, msg={:?}, location={:?}", panic_info.to_string(), panic_info.location());
         warn!("{}", msg);
-        panic_running.store(false, Ordering::Relaxed);
+        // panic_running.store(false, Ordering::Relaxed);
     }));
     // 初始化数据服务器
     server::run_server(18888, running.clone());

+ 97 - 6
src/msv_generate.rs → src/msv.rs

@@ -1,20 +1,111 @@
 use std::cmp::max;
 use std::collections::BTreeMap;
 use std::str::FromStr;
+use actix_web::{HttpResponse};
+use chrono::Utc;
 use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
 use serde_json::{json, Value};
-use crate::server::Trade;
+use crate::db_connector::get_trades_json;
+use crate::server::{Response, Trade};
+
+pub fn symbol_fix(symbol: &str) -> String {
+    let mut fixed = symbol.to_uppercase();
+
+    if !fixed.contains("_USDT") {
+        fixed = format!("{}_USDT", fixed);
+    }
+
+    fixed
+}
 
 // 将trades_json转换为指标
-pub fn generate_msv(trades_json: Value) -> Value {
-    let trades = parse_json_to_trades(trades_json);
+pub async fn generate_msv(query_value: Value) -> HttpResponse {
+    // 参数处理
+    let exchange = match query_value["exchange"].as_str() {
+        None => {
+            "gate_usdt_swap"
+        }
+        Some(exchange) => {
+            exchange
+        }
+    };
+    let symbol = match query_value["symbol"].as_str() {
+        None => {
+            "BTC_USDT".to_string()
+        }
+        Some(symbol) => {
+            symbol_fix(symbol)
+        }
+    };
+    let mills_back = match query_value["mills_back"].as_str() {
+        None => {
+            dec!(37)
+        }
+        Some(mills_back_str) => {
+            Decimal::from_str(mills_back_str).unwrap()
+        }
+    };
+    let minute_time_range = match query_value["minute_time_range"].as_str() {
+        None => {
+            240
+        }
+        Some(minute_time_range_str) => {
+            match i64::from_str(minute_time_range_str) {
+                Ok(minute_time_range) => {
+                    minute_time_range
+                }
+                Err(_) => {
+                    // 返回数据
+                    let response = Response {
+                        query: query_value.clone(),
+                        msg: Some("时间不要字符串,要数字,这个版本不支持字符串".to_string()),
+                        code: 500,
+                        data: Value::Null,
+                    };
+
+                    let json_string = serde_json::to_string(&response).unwrap();
+                    return HttpResponse::Ok().content_type("application/json").body(json_string)
+                }
+            }
+        }
+    };
+
+    // 链接数据服务器查询数据
+    let end_time = Utc::now().timestamp_millis();
+    let start_time = end_time - minute_time_range * 60 * 1000;
+    let db_response = get_trades_json(
+        exchange,
+        symbol.as_str(),
+        start_time,
+        end_time,
+    ).await;
+
+    // 对数据库返回的数据进行容错处理
+    if db_response.code == 200 {
+        // 数据本地化处理
+        let trades = parse_json_to_trades(db_response.data);
+        // 指标生成
+        let indicator = generate_msv_by_trades(trades, mills_back);
+
+        // 返回数据
+        let response = Response {
+            query: query_value.clone(),
+            msg: Some("指标生成完毕".to_string()),
+            code: 200,
+            data: indicator,
+        };
 
-    generate_msv_by_trades(trades)
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    } else {
+        let json_string = serde_json::to_string(&db_response).unwrap();
+        HttpResponse::BadRequest().content_type("application/json").body(json_string)
+    }
 }
 
 // 将trades转换为具体指标
-pub fn generate_msv_by_trades(mut trades: Vec<Trade>) -> Value {
+pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal) -> Value {
     let mut amplitude_map: BTreeMap<Decimal, Decimal> = BTreeMap::new();
 
     // 每一个元素都遍历一遍
@@ -40,7 +131,7 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>) -> Value {
             let flag_trade = trades.get(range_index).unwrap();
             let range_time = trade.time - flag_trade.time;
             // 判断该ticker是否是range ms以外
-            if range_time > dec!(50) {
+            if range_time > mills_back {
                 break;
             }
 

+ 37 - 67
src/server.rs

@@ -1,40 +1,17 @@
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use actix_cors::Cors;
-use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
-use chrono::Utc;
+use actix_web::{web, App, HttpResponse, HttpServer, Responder, post};
 use rust_decimal::Decimal;
 use serde::{Deserialize, Serialize};
 use serde_json::Value;
 use tracing::{info};
-use crate::db_connector::get_trades_json;
-use crate::msv_generate::generate_msv;
-
-// 定义用于反序列化查询参数的结构体
-#[derive(Serialize, Deserialize, Clone)]
-pub struct IndicatorQuery {
-    symbol: Option<String>,
-    exchange: Option<String>
-}
-
-impl IndicatorQuery {
-    pub fn validate(&self) -> bool {
-        if self.symbol.is_none() {
-            return false
-        }
-
-        if self.exchange.is_none() {
-            return false
-        }
-
-        true
-    }
-}
+use crate::msv::generate_msv;
 
 #[derive(Serialize, Deserialize)]
 pub struct Response {
-    pub message: Option<String>,
-    pub query_string: Value,
+    pub msg: Option<String>,
+    pub query: Value,
     pub data: Value,
     pub code: i32,
 }
@@ -48,11 +25,11 @@ pub struct Trade {
 }
 
 // 句柄 GET 请求
-#[get("/ia/get_symbols")]
-async fn get_symbols_by_filter() -> impl Responder {
+#[post("/ia/get_symbols_by_filter")]
+async fn get_symbols_by_filter(query: web::Json<Value>) -> impl Responder {
     let response = Response {
-        query_string: Value::Null,
-        message: Some("get_symbols_by_filter 这个接口还没做".to_string()),
+        query: query.into_inner(),
+        msg: Some("get_symbols_by_filter 这个接口还没做".to_string()),
         code: 400,
         data: Value::Null,
     };
@@ -62,49 +39,42 @@ async fn get_symbols_by_filter() -> impl Responder {
 }
 
 // ia: intelligence agency, 情报部门
-#[get("/ia/get_indicator")]
-async fn get_indicator(query: web::Query<IndicatorQuery>) -> impl Responder {
-    // 客户端传过来的数据校验
-    if query.validate() {
-        // 链接数据服务器查询数据
-        let end_time = Utc::now().timestamp_millis();
-        let start_time = end_time - 4 * 60 * 60 * 1000;
-        let db_response = get_trades_json(
-            query.exchange.clone().unwrap().as_str(),
-            query.symbol.clone().unwrap().as_str(),
-            start_time,
-            end_time,
-        ).await;
+#[post("/ia/get_indicator")]
+async fn get_indicator(query: web::Json<Value>) -> impl Responder {
+    let indicator = match query["indicator"].as_str() {
+        None => {
+            let query_json = query.into_inner().clone();
+            let response = Response {
+                query: Value::Null,
+                msg: Some(format!("indicator没有传给后端, {}", query_json.to_string())),
+                code: 500,
+                data: Value::Null,
+            };
 
-        // 对数据库返回的数据进行容错处理
-        if db_response.code == 200 {
-            // 指标生成
-            let indicator = generate_msv(db_response.data);
+            let json_string = serde_json::to_string(&response).unwrap();
+            return HttpResponse::BadRequest().content_type("application/json").body(json_string)
+        }
+        Some(indicator) => {
+            indicator
+        }
+    };
 
-            // 返回数据
+    match indicator {
+        "msv" => {
+            generate_msv(query.into_inner().get("query").unwrap().clone()).await
+        },
+        _ => {
+            let query_json = query.into_inner().clone();
             let response = Response {
-                query_string: serde_json::to_value(&query.into_inner()).unwrap(),
-                message: Some("指标生成完毕".to_string()),
-                code: 200,
-                data: indicator,
+                query: Value::Null,
+                msg: Some(format!("不存在的指标, {}", query_json.to_string())),
+                code: 500,
+                data: Value::Null,
             };
 
             let json_string = serde_json::to_string(&response).unwrap();
-            HttpResponse::Ok().content_type("application/json").body(json_string)
-        } else {
-            let json_string = serde_json::to_string(&db_response).unwrap();
-            HttpResponse::Ok().content_type("application/json").body(json_string)
+            HttpResponse::BadRequest().content_type("application/json").body(json_string)
         }
-    } else {
-        let response = Response {
-            query_string: serde_json::to_value(&query.into_inner()).unwrap(),
-            message: Some("[symbol以及exchange必传]".to_string()),
-            code: 400,
-            data: Value::Null,
-        };
-
-        let json_string = serde_json::to_string(&response).unwrap();
-        HttpResponse::BadRequest().content_type("application/json").body(json_string)
     }
 }