Преглед на файлове

使用深度数据,看看具体的量啥的。

skyffire преди 1 година
родител
ревизия
3f9a17ed8e
променени са 5 файла, в които са добавени 184 реда и са изтрити 70 реда
  1. 32 0
      src/db_connector.rs
  2. 62 0
      src/depth.rs
  3. 1 0
      src/main.rs
  4. 79 70
      src/msv.rs
  5. 10 0
      src/server.rs

+ 32 - 0
src/db_connector.rs

@@ -76,6 +76,38 @@ pub async fn get_records_json(exchange: &str, symbol: &str, start_at: i64, end_a
     }
 }
 
+pub async fn get_depths_json(exchange: &str, symbol: &str, start_at: i64, end_at: i64) -> Response {
+    let url = "http://dc.skyfffire.com:8888/order_book";
+    let params = json!({
+        "exchange": exchange,
+        "symbol": symbol,
+        "start_time": start_at,
+        "end_time": end_at
+    });
+
+    // 创建 HTTP 客户端
+    let client = Client::new();
+
+    // 发送 GET 请求
+    let response = client.get(url)
+        .query(&params)
+        .send()
+        .await.unwrap();
+
+    // 错误处理
+    if response.status().is_success() {
+        let response_text = response.text().await.unwrap();
+        serde_json::from_str(response_text.as_str()).unwrap()
+    } else {
+        Response {
+            msg: Some("get_depths_json 请求失败,预计是指标层的网络请求错误。".to_string()),
+            query: params,
+            data: Default::default(),
+            code: 500,
+        }
+    }
+}
+
 pub async fn get_records_map(exchange: &str, start_at: i64, end_at: i64) -> Response {
     let url = "http://dc.skyfffire.com:8888/get_records_map";
     let params = json!({

+ 62 - 0
src/depth.rs

@@ -0,0 +1,62 @@
+use actix_web::HttpResponse;
+use rust_decimal::prelude::ToPrimitive;
+use serde_json::Value;
+use crate::db_connector::{get_depths_json};
+use crate::msv::{symbol_fix};
+use crate::params_utils::{get_str, parse_str_to_decimal};
+use crate::server::Response;
+
+pub async fn generate_depth(query_value: Value) -> HttpResponse {
+    // 参数处理
+    let exchange = match get_str(query_value.clone(), "exchange") {
+        Ok(str) => {
+            str
+        }
+        Err(response) => {
+            return response
+        }
+    };
+    let symbol = match get_str(query_value.clone(), "symbol") {
+        Ok(symbol) => {
+            symbol_fix(symbol.as_str())
+        }
+        Err(response) => {
+            return response
+        }
+    };
+    let start_time = match parse_str_to_decimal(query_value.clone(), "start_time_mills") {
+        Ok(t) => {
+            t.to_i64().unwrap()
+        }
+        Err(response) => {
+            return response
+        }
+    };
+    let end_time = match parse_str_to_decimal(query_value.clone(), "end_time_mills") {
+        Ok(t) => {
+            t.to_i64().unwrap()
+        }
+        Err(response) => {
+            return response
+        }
+    };
+
+    let db_response = get_depths_json(exchange.as_str(), symbol.as_str(), start_time, end_time).await;
+
+    // 对数据库返回的数据进行容错处理
+    if db_response.code == 200 {
+        // depth获取完毕
+        let response = Response {
+            query: query_value.clone(),
+            msg: Some("depth获取完毕".to_string()),
+            code: 200,
+            data: db_response.data
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    } else {
+        let json_string = serde_json::to_string(&db_response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    }
+}

+ 1 - 0
src/main.rs

@@ -5,6 +5,7 @@ mod msv;
 mod trades;
 mod params_utils;
 mod symbol_filter;
+mod depth;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};

+ 79 - 70
src/msv.rs

@@ -5,9 +5,10 @@ use rust_decimal::Decimal;
 use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
 use rust_decimal_macros::dec;
 use serde_json::{json, Value};
-use crate::db_connector::get_trades_json;
+use tracing::info;
+use crate::db_connector::{get_depths_json, get_trades_json};
 use crate::params_utils::{get_str, parse_str_to_decimal};
-use crate::server::{Response, Trade};
+use crate::server::{Response, SimpleDepth, Trade};
 
 pub fn symbol_fix(symbol: &str) -> String {
     let mut fixed = symbol.to_uppercase();
@@ -54,20 +55,17 @@ pub async fn generate_msv(query_value: Value) -> HttpResponse {
             return response
         }
     };
-    let trt_minutes_back = match parse_str_to_decimal(query_value.clone(), "trt_minutes_back") {
-        Ok(t) => {
-            t
-        }
-        Err(response) => {
-            return response
-        }
-    };
-    let trt_mills_back = trt_minutes_back * dec!(1000) * dec!(60);
 
     // 链接数据服务器查询数据
     let end_time = Utc::now().timestamp_millis();
     let start_time = end_time - minute_time_range * 60 * 1000;
-    let db_response = get_trades_json(
+    let trades_response = get_trades_json(
+        exchange.as_str(),
+        symbol.as_str(),
+        start_time,
+        end_time,
+    ).await;
+    let depth_response = get_depths_json(
         exchange.as_str(),
         symbol.as_str(),
         start_time,
@@ -75,11 +73,13 @@ pub async fn generate_msv(query_value: Value) -> HttpResponse {
     ).await;
 
     // 对数据库返回的数据进行容错处理
-    if db_response.code == 200 {
+    if trades_response.code == 200 && depth_response.code == 200{
         // 数据本地化处理
-        let trades = parse_json_to_trades(db_response.data);
+        let trades = parse_json_to_trades(trades_response.data);
+        // depth本地化处理
+        let simple_depths = parse_json_to_simple_depths(depth_response.data);
         // 指标生成
-        let indicator = generate_msv_by_trades(trades, mills_back, trt_mills_back, start_time, end_time);
+        let indicator = generate_msv_by_trades(trades, simple_depths, mills_back, start_time, end_time);
 
         // 返回数据
         let response = Response {
@@ -92,16 +92,14 @@ pub async fn generate_msv(query_value: Value) -> HttpResponse {
         let json_string = serde_json::to_string(&response).unwrap();
         HttpResponse::Ok().content_type("application/json").body(json_string)
     } else {
-        let json_string = serde_json::to_string(&db_response).unwrap();
+        let json_string = serde_json::to_string(&trades_response).unwrap();
         HttpResponse::Ok().content_type("application/json").body(json_string)
     }
 }
 
 // 将trades转换为具体指标
-pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, trt_mills_back: Decimal, start_time: i64, end_time: i64) -> Value {
+pub fn generate_msv_by_trades(mut trades: Vec<Trade>, simple_depths: Vec<SimpleDepth>, mills_back: Decimal, start_time: i64, end_time: i64) -> Value {
     let mut msv_data: Vec<Vec<Decimal>> = vec![];
-    let mut tr_data: Vec<Vec<Decimal>> = vec![];
-    let mut max_trt_count = Decimal::ZERO;
 
     const GAMMA: Decimal = dec!(0.5);
 
@@ -171,60 +169,22 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, trt_m
             } else {
                 msv_data.push(vec![trade.time, rate, dissociation]);
             }
-
-            // 生成速率曲线
-            // 计算区间的预定价格
-            // let mut trades_count = 0;
-            let mut trt_count = Decimal::ZERO;
-            let mut trt_range_index = index;
-            loop {
-                // 第0个就不用计算
-                if trt_range_index == 0 {
-                    break;
-                }
-
-                let flag_trade = trades.get(trt_range_index).unwrap();
-                let trt_range_time = trade.time - flag_trade.time;
-                // 判断回溯时间
-                if trt_range_time > trt_mills_back {
-                    break;
-                }
-
-                // 判断是否满足高速率条件
-                // trades_count += 1;
-                // if trades_count >= trt {
-                trt_count += Decimal::ONE;
-                // }
-
-                trt_range_index -= 1;
-            }
-
-            if trt_count > max_trt_count / dec!(0.732) {
-                tr_data.push(vec![trade.time, Decimal::ONE]);
-
-                if trt_count > max_trt_count {
-                    max_trt_count = trt_count;
-                }
-            } else {
-                tr_data.push(vec![trade.time, Decimal::ZERO]);
-            }
         }
     }
 
     // 按时间序列填充数据
     let mut final_msv_data: Vec<Vec<Decimal>> = vec![];
-    let mut final_tr_data: Vec<Vec<Decimal>> = vec![];
+    let mut final_depth_data: Vec<Vec<Decimal>> = vec![];
     let mut data_index = 0;
+    let mut depth_index = 0;
     let mut index_timestamp = Decimal::from_i64(start_time).unwrap();
     let last_timestamp = Decimal::from_i64(end_time).unwrap();
-    // let step_timestamp = mills_back;
     let step_timestamp = dec!(1000);
     loop {
         let mut max_msv_data = Decimal::ZERO;
         let mut max_msv_diss_data = Decimal::ZERO;
-        let mut max_tr_data = Decimal::ZERO;
 
-        // 获取时间范围内的数据
+        // 获取时间范围内的波动率数据
         loop {
             // 下标合法性判断
             if data_index >= msv_data.len() {
@@ -239,7 +199,6 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, trt_m
             // -------------- 大小判断,取值
             let msv_d = msv_data[data_index][1];
             let msv_diss_data = msv_data[data_index][2];
-            let tr_d = tr_data[data_index][1];
             // msv波动数据
             if max_msv_data.abs() < msv_d.abs() {
                 max_msv_data = msv_d;
@@ -248,25 +207,39 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, trt_m
             if max_msv_diss_data < msv_diss_data {
                 max_msv_diss_data = msv_diss_data;
             }
-            // 成交笔数数据
-            if max_tr_data < tr_d {
-                max_tr_data = tr_d;
-            }
-
+            // 下标步近
             data_index = data_index + 1;
         }
 
+        // 获取时间范围内的深度数据
+        let mut total_size = Decimal::ZERO;
+        loop {
+            // 下标合法性判断
+            if depth_index >= simple_depths.len() {
+                break;
+            }
+            // 时间范围合法性判断,只统计那一秒以内的深度总交易量
+            if index_timestamp + step_timestamp < simple_depths[depth_index].time {
+                break;
+            }
+            // 这一秒的所有深度数据求和
+            total_size += simple_depths[depth_index].size;
+            // 下标步近
+            depth_index += 1;
+        }
+
         // 如果这两个值为0,则代表这mills_back毫秒以内是没有数据的,填充0数据,使得x轴是完整的
-        if max_msv_data == Decimal::ZERO && max_tr_data == Decimal::ZERO {
+        if max_msv_data == Decimal::ZERO {
             final_msv_data.push(vec![index_timestamp, Decimal::ZERO, Decimal::ZERO]);
-            final_tr_data.push(vec![index_timestamp, Decimal::ZERO]);
 
         // 说明在这个时间范围内是有数据存在的
         } else {
             final_msv_data.push(vec![index_timestamp, max_msv_data, max_msv_diss_data]);
-            final_tr_data.push(vec![index_timestamp, max_tr_data]);
         }
+        // 简易的深度数据处理
+        final_depth_data.push(vec![index_timestamp, total_size]);
 
+        // ---------------- 最后的时间处理 -----------------
         // 对时间进行步近
         index_timestamp = index_timestamp + step_timestamp;
         // 时间越界
@@ -280,7 +253,7 @@ pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, trt_m
     let result_size = final_msv_data.len();
     json!({
         "msv": final_msv_data,
-        "tr": final_tr_data,
+        "tr": final_depth_data,
         "total_size": total_size,
         "result_size": result_size,
     })
@@ -302,3 +275,39 @@ pub fn parse_json_to_trades(trades_json: Value) -> Vec<Trade> {
 
     rst
 }
+
+// 将json转换为简易深度数据
+pub fn parse_json_to_simple_depths(depths_json: Value) -> Vec<SimpleDepth> {
+    let mut rst = vec![];
+
+    for depth_json in depths_json.as_array().unwrap() {
+        let time = Decimal::from_str(depth_json["t"].as_str().unwrap()).unwrap();
+        let mut size = Decimal::ZERO;
+
+        // 卖盘交易量统计
+        for ask_order_book in depth_json["a"].as_array().unwrap() {
+            let price = Decimal::from_str(ask_order_book.as_array().unwrap()[0].as_str().unwrap()).unwrap();
+            let amount = Decimal::from_str(ask_order_book.as_array().unwrap()[1].as_str().unwrap()).unwrap();
+
+            size = size + price * amount;
+        }
+
+        // 买盘交易量统计
+        for bid_order_book in depth_json["b"].as_array().unwrap() {
+            let price = Decimal::from_str(bid_order_book.as_array().unwrap()[0].as_str().unwrap()).unwrap();
+            let amount = Decimal::from_str(bid_order_book.as_array().unwrap()[1].as_str().unwrap()).unwrap();
+
+            size = size + price * amount;
+        }
+
+        size.rescale(2);
+        let simple_depth = SimpleDepth {
+            time,
+            size,
+        };
+
+        rst.insert(0, simple_depth)
+    }
+
+    rst
+}

+ 10 - 0
src/server.rs

@@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
 use serde_json::Value;
 use tracing::{info};
 use crate::db_connector::get_exchanges_json;
+use crate::depth::generate_depth;
 use crate::msv::generate_msv;
 use crate::params_utils::{get_array, get_str, parse_str_to_decimal};
 use crate::symbol_filter::get_final_symbols;
@@ -29,6 +30,12 @@ pub struct Trade {
     pub price: Decimal
 }
 
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct SimpleDepth {
+    pub time: Decimal,
+    pub size: Decimal
+}
+
 // 句柄 GET 请求
 #[post("/ia/get_symbols_by_filter")]
 async fn get_symbols_by_filter(query: web::Json<Value>) -> impl Responder {
@@ -104,6 +111,9 @@ async fn get_indicator(query: web::Json<Value>) -> impl Responder {
         "trades" => {
             generate_trades(query.into_inner().get("query").unwrap().clone()).await
         },
+        "depth" => {
+            generate_depth(query.into_inner().get("query").unwrap().clone()).await
+        },
         _ => {
             let query_json = query.into_inner().clone();
             let response = Response {