Ver Fonte

打开bitget

DESKTOP-NE65RNK\Citrus_limon há 1 ano atrás
pai
commit
bcdde5e879

+ 1 - 1
exchanges/src/gate_swap_ws.rs

@@ -153,7 +153,7 @@ impl GateSwapWs {
                     "time": time,
                     "channel": "futures.order_book",
                     "event": "subscribe",
-                    "payload": [symbol, "5", "0"]
+                    "payload": [symbol, "20", "0"]
                 })
             }
             GateSwapSubscribeType::PuFuturesBookTicker => {

+ 11 - 4
src/gate_usdt_swap_data_listener.rs

@@ -10,15 +10,16 @@ use tracing::info;
 use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
 use exchanges::response_base::ResponseData;
-use standard::{Depth, OrderBook};
+use standard::{Depth, OrderBook, SimpleDepth};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::listener_tools::{RecordMap, TradeMap, DepthMap, update_depth, update_record, update_trade};
+use crate::listener_tools::{RecordMap, TradeMap, DepthMap, SimpleDepthMap, update_record, update_trade, update_simple_depth};
 
 const EXCHANGE_NAME: &str = "gate_usdt_swap";
 
 lazy_static! {
     static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+    static ref SIMPLE_DEPTH_MAP: Mutex<SimpleDepthMap> = Mutex::new(HashMap::new());
     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
     static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
     static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
@@ -85,8 +86,14 @@ pub async fn data_listener(response: ResponseData) {
             };
 
             // 更新到本地数据库
-            let depth_map = DEPTH_MAP.lock().await;
-            update_depth(&new_depth, depth_map, EXCHANGE_NAME).await;
+            // ------ 简易深度数据
+            let simple_depth = SimpleDepth::new(&new_depth);
+            let simple_depth_map = SIMPLE_DEPTH_MAP.lock().await;
+            update_simple_depth(&simple_depth, simple_depth_map, EXCHANGE_NAME).await;
+
+            // ------ 标准深度数据
+            // let depth_map = DEPTH_MAP.lock().await;
+            // update_depth(&new_depth, depth_map, EXCHANGE_NAME).await;
         }
         // 订单流数据
         "futures.trades" => {

+ 32 - 1
src/json_db_utils.rs

@@ -6,7 +6,7 @@ use tokio::fs::File;
 use tokio::io::AsyncWriteExt;
 use tokio::{fs};
 use tracing::{error, info};
-use standard::{Record, SpecialDepth, SpecialTrade};
+use standard::{Record, SimpleDepth, SpecialDepth, SpecialTrade};
 
 pub async fn write_to_file(json_data: String, file_path: String) {
     // 尝试创建文件路径
@@ -155,6 +155,37 @@ pub async fn collect_depth_json(start_timestamp: i64, end_timestamp: i64, exchan
     serde_json::to_value(&depths).unwrap()
 }
 
+// 将一个时间段范围内的所有SimpleDepth返回(以json形式)
+pub async fn collect_simple_depth_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
+    let mut simple_depths = Vec::new();
+    let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "order_book_simple");
+
+    for filename in filenames {
+        let file_path = PathBuf::from(filename.as_str());
+
+        let file_content = fs::read_to_string(file_path).await;
+
+        // 检查文件内容是否成功读取
+        if let Ok(content) = file_content {
+            // 尝试反序列化文件内容
+            if let Ok(depth_list) = serde_json::from_str::<Vec<SimpleDepth>>(&content) {
+                // info!("{} 找到 1 条", filename);
+                for depth in depth_list.iter().rev() {
+                    // 不在时间范围内的就不要返回了
+                    let t = depth.time.to_i64().unwrap();
+                    if t < start_timestamp || t > end_timestamp {
+                        continue
+                    }
+
+                    simple_depths.push(depth.clone())
+                }
+            }
+        }
+    }
+
+    serde_json::to_value(&simple_depths).unwrap()
+}
+
 fn find_latest_directory(path: &Path) -> std::io::Result<Option<PathBuf>> {
     let mut latest: Option<(PathBuf, std::time::SystemTime)> = None;
 

+ 35 - 1
src/listener_tools.rs

@@ -3,10 +3,11 @@ use std::str::FromStr;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::ToPrimitive;
 use tokio::sync::MutexGuard;
-use standard::{Depth, Record, SpecialDepth, SpecialTrade, Trade};
+use standard::{Depth, Record, SimpleDepth, SpecialDepth, SpecialTrade, Trade};
 use crate::json_db_utils::{generate_file_path, minute_to_date, write_to_file};
 
 pub type DepthMap = HashMap<String, Vec<SpecialDepth>>;
+pub type SimpleDepthMap = HashMap<String, Vec<SimpleDepth>>;
 pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
 pub type RecordMap = HashMap<String, Record>;
 
@@ -86,3 +87,36 @@ pub async fn update_depth(new_depth: &Depth, mut depth_map: MutexGuard<'_, Depth
         depth_map.insert(new_depth.symbol.clone(), vec![SpecialDepth::new(&new_depth)]);
     }
 }
+
+// 更新简易深度数据
+pub async fn update_simple_depth(new_depth: &SimpleDepth, mut simple_depth_map: MutexGuard<'_, SimpleDepthMap>, exchange: &str) {
+    if let Some(depths) = simple_depth_map.get_mut(new_depth.symbol.as_str()) {
+        if let Some(last_depth) = depths.last() {
+            let last_depth_minutes = last_depth.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
+            let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上
+
+            // 如果分钟数不同,则清空列表并添加新的depth
+            if last_depth_minutes != new_depth_minutes {
+                let depths_json = serde_json::to_string(depths).unwrap();
+                let date_str = minute_to_date(last_depth_minutes);
+                let path = generate_file_path(exchange, date_str.as_str(), new_depth.symbol.as_str(), "order_book_simple", last_depth_minutes);
+
+                write_to_file(depths_json, path).await;
+
+                depths.clear();
+            }
+        }
+
+        // 去重
+        if let Some(last_depth) = depths.last() {
+            if last_depth.size != new_depth.size || last_depth.a1 != new_depth.a1 || last_depth.b1 != new_depth.b1 {
+                depths.push(new_depth.clone());
+            }
+        } else {
+            depths.push(new_depth.clone());
+        }
+    } else {
+        // 如果该symbol不存在,则创建新的Vec并添加depth
+        simple_depth_map.insert(new_depth.symbol.clone(), vec![new_depth.clone()]);
+    }
+}

+ 34 - 1
src/server.rs

@@ -4,7 +4,7 @@ use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
 use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
 use tracing::{info};
-use crate::json_db_utils::{collect_depth_json, collect_records_json, collect_special_trades_json, get_symbols_by_exchange};
+use crate::json_db_utils::{collect_depth_json, collect_records_json, collect_simple_depth_json, collect_special_trades_json, get_symbols_by_exchange};
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
@@ -337,6 +337,38 @@ async fn get_order_book(query: web::Query<SimpleQuery>) -> impl Responder {
     }
 }
 
+#[get("/order_book_simple")]
+async fn get_order_book_simple(query: web::Query<SimpleQuery>) -> impl Responder {
+    if query.validate() {
+        let response_data = collect_simple_depth_json(
+            query.start_time.clone().unwrap(),
+            query.end_time.clone().unwrap(),
+            query.exchange.clone().unwrap().as_str(),
+            query.symbol.clone().unwrap().as_str()
+        ).await;
+
+        let response = Response {
+            query: serde_json::to_value(&query.into_inner()).unwrap(),
+            msg: Some("查询成功".to_string()),
+            code: 200,
+            data: response_data,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    } else {
+        let response = Response {
+            query: serde_json::to_value(&query.into_inner()).unwrap(),
+            msg: Some("查询内容有误,必须包含四个参数:[symbol, exchange, start_time, end_time]".to_string()),
+            code: 500,
+            data: Value::Null,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    }
+}
+
 pub fn run_server(port: u32, running: Arc<AtomicBool>) {
     let addr = format!("0.0.0.0:{}", port);
     info!("数据服务绑定地址:{}", addr);
@@ -351,6 +383,7 @@ pub fn run_server(port: u32, running: Arc<AtomicBool>) {
             .service(get_records_map)
             .service(get_exchanges)
             .service(get_order_book)
+            .service(get_order_book_simple)
     })
     .bind(addr)
     .expect("Bind port error")

+ 34 - 0
standard/src/lib.rs

@@ -186,6 +186,40 @@ impl SpecialDepth {
     }
 }
 
+// 简易深度信息
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct SimpleDepth {
+    pub time: Decimal,
+    pub size: Decimal,
+    pub b1: Decimal,
+    pub a1: Decimal,
+    pub symbol: String,
+}
+
+impl SimpleDepth {
+    pub fn new(depth: &Depth) -> SimpleDepth {
+        let mut total_size = Decimal::ZERO;
+
+        for ask in &depth.asks {
+            total_size += ask.price * ask.amount;
+        }
+
+        for bid in &depth.bids {
+            total_size += bid.price * bid.amount;
+        }
+
+        total_size.rescale(2);
+
+        SimpleDepth {
+            time: depth.time,
+            size: total_size,
+            b1: depth.bids[0].price,
+            a1: depth.asks[0].price,
+            symbol: depth.symbol.clone(),
+        }
+    }
+}
+
 /// 特殊Ticker结构体(市场行情)
 /// - `sell(Decimal)`: 卖一价
 /// - `buy(Decimal)`: 买一价