Browse Source

k线存放、读取支持了

skyffire 1 năm trước cách đây
mục cha
commit
e40b92317f

+ 11 - 7
src/bitget_usdt_swap_data_listener.rs

@@ -7,18 +7,20 @@ use tracing::info;
 use exchanges::bitget_swap_rest::BitgetSwapRest;
 use exchanges::bitget_swap_ws::{BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
 use exchanges::response_base::ResponseData;
-use standard::{SpecialTrade};
+use standard::{Record, SpecialTrade};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::listener_tools::update_trade;
+use crate::listener_tools::{update_record, update_trade};
 
 // type DepthMap = HashMap<String, Vec<SpecialDepth>>;
 pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
+pub type RecordMap = HashMap<String, Record>;
 const EXCHANGE_NAME: &str = "bitget_usdt_swap";
 
 lazy_static! {
     // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
 }
 
 pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
@@ -84,11 +86,13 @@ pub async fn data_listener(response: ResponseData) {
         },
         // k线数据
         "candle1m" => {
-            let _records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
-            //
-            // for record in records.iter() {
-            //     info!(?record);
-            // }
+            let records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
+
+            for record in records.iter() {
+                let record_map= RECORD_MAP.lock().await;
+
+                update_record(record, record_map, EXCHANGE_NAME).await;
+            }
         },
         _ => {
             info!("48 未知的数据类型: {:?}", response)

+ 11 - 7
src/gate_usdt_swap_data_listener.rs

@@ -7,18 +7,20 @@ use tracing::info;
 use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
 use exchanges::response_base::ResponseData;
-use standard::{SpecialTrade};
+use standard::{Record, SpecialTrade};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::listener_tools::update_trade;
+use crate::listener_tools::{update_record, update_trade};
 
 // type DepthMap = HashMap<String, Vec<SpecialDepth>>;
 pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
+pub type RecordMap = HashMap<String, Record>;
 const EXCHANGE_NAME: &str = "gate_usdt_swap";
 
 lazy_static! {
     // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
 }
 
 pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
@@ -72,16 +74,18 @@ pub async fn data_listener(response: ResponseData) {
             for trade in trades.iter() {
                 let trades_map = TRADES_MAP.lock().await;
 
-                update_trade(&trade, trades_map, EXCHANGE_NAME).await
+                update_trade(trade, trades_map, EXCHANGE_NAME).await;
             }
         },
         // k线数据
         "futures.candlesticks" => {
-            let _records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSwap, &response);
+            let records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSwap, &response);
 
-            // for record in records.iter() {
-            //     info!(?record);
-            // }
+            for record in records.iter() {
+                let record_map= RECORD_MAP.lock().await;
+
+                update_record(record, record_map, EXCHANGE_NAME).await;
+            }
         },
         _ => {
             info!("48 未知的数据类型: {:?}", response)

+ 39 - 19
src/json_db_utils.rs

@@ -5,7 +5,7 @@ use tokio::fs::File;
 use tokio::io::AsyncWriteExt;
 use tokio::{fs};
 use tracing::{error, info};
-use standard::SpecialTrade;
+use standard::{Record, SpecialTrade};
 
 pub async fn write_to_file(json_data: String, file_path: String) {
     // 尝试创建文件路径
@@ -72,23 +72,6 @@ pub fn minute_to_date(minute: i64) -> String {
     datetime_east_eight.format("%Y%m%d").to_string()
 }
 
-// 从本地json的db里面加载SpecialTrade
-pub async fn read_special_trades_from_file<P: AsRef<Path>>(file_path: P) -> Vec<SpecialTrade> {
-    let file_content = fs::read_to_string(file_path).await;
-
-    // 检查文件内容是否成功读取
-    if let Ok(content) = file_content {
-        // 尝试反序列化文件内容
-        if let Ok(trades) = serde_json::from_str::<Vec<SpecialTrade>>(&content) {
-            trades // 成功反序列化,返回结果
-        } else {
-            vec![] // 反序列化失败,返回空 Vec
-        }
-    } else {
-        vec![] // 读取文件失败,返回空 Vec
-    }
-}
-
 // 将一个时间段范围内的所有SpecialTrade返回(以json形式)
 pub async fn collect_special_trades_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
     let mut all_trades = Vec::new();
@@ -96,7 +79,20 @@ pub async fn collect_special_trades_json(start_timestamp: i64, end_timestamp: i6
 
     for filename in filenames {
         let file_path = PathBuf::from(filename.as_str());
-        let mut trades = read_special_trades_from_file(file_path).await;
+        let file_content = fs::read_to_string(file_path).await;
+
+        // 检查文件内容是否成功读取
+        let mut trades = if let Ok(content) = file_content {
+            // 尝试反序列化文件内容
+            if let Ok(trades) = serde_json::from_str::<Vec<SpecialTrade>>(&content) {
+                trades // 成功反序列化,返回结果
+            } else {
+                vec![] // 反序列化失败,返回空 Vec
+            }
+        } else {
+            vec![] // 读取文件失败,返回空 Vec
+        };
+
         trades.reverse();
         info!("{} 找到 {} 条", filename, trades.len());
         all_trades.append(&mut trades);
@@ -105,6 +101,30 @@ pub async fn collect_special_trades_json(start_timestamp: i64, end_timestamp: i6
     serde_json::to_value(&all_trades).unwrap()
 }
 
+// 将一个时间段范围内的所有Record返回(以json形式)
+pub async fn collect_records_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
+    let mut records = Vec::new();
+    let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "record");
+
+    for filename in filenames {
+        let file_path = PathBuf::from(filename.as_str());
+        let file_content = fs::read_to_string(file_path).await;
+
+        // 检查文件内容是否成功读取
+        if let Ok(content) = file_content {
+            // 尝试反序列化文件内容
+            if let Ok(record) = serde_json::from_str::<Record>(&content) {
+                info!("{} 找到 1 条", filename);
+                records.push(record.clone());
+            }
+        }
+    }
+
+    records.reverse();
+
+    serde_json::to_value(&records).unwrap()
+}
+
 #[tokio::test]
 async fn read_test() {
     use global::log_utils::init_log_with_info;

+ 26 - 7
src/listener_tools.rs

@@ -1,10 +1,11 @@
 use std::str::FromStr;
 use rust_decimal::prelude::ToPrimitive;
 use tokio::sync::MutexGuard;
-use standard::{SpecialTrade, Trade};
-use crate::gate_usdt_swap_data_listener::TradeMap;
+use standard::{Record, SpecialTrade, Trade};
+use crate::gate_usdt_swap_data_listener::{RecordMap, TradeMap};
 use crate::json_db_utils::{generate_file_path, minute_to_date, write_to_file};
 
+// 更新订单流数据
 pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, TradeMap>, exchange: &str) {
     if let Some(trades) = trades_map.get_mut(new_trade.symbol.as_str()) {
         if let Some(last_trade) = trades.last() {
@@ -12,7 +13,7 @@ pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, Trad
             let last_trade_minutes = i64::from_str(last_trade.inner()[1].as_str()).unwrap() / 60000;    // 将毫秒转换成分钟数
             let new_trade_minutes = new_trade.time.to_i64().unwrap() / 60000;                           // 同上
 
-            // 如果分钟数不同,则清空列表并添加新的depth
+            // 如果分钟数不同,则清空列表并添加新的trade
             if last_trade_minutes != new_trade_minutes {
                 let depths_json = serde_json::to_string(trades).unwrap();
                 let date_str = minute_to_date(last_trade_minutes);
@@ -27,14 +28,32 @@ pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, Trad
         }
         trades.push(SpecialTrade::new(&new_trade));
     } else {
-        // 如果该symbol不存在,则创建新的Vec并添加depth
+        // 如果该symbol不存在,则创建新的Vec并添加trade
         trades_map.insert(new_trade.symbol.clone(), vec![SpecialTrade::new(&new_trade)]);
     }
 }
 
-// pub async fn update_record(new_record: &Record, &records_map: MutexGuard<`_, RecordMap>, exchange: &str) {
-//
-// }
+// 更新k线
+pub async fn update_record(new_record: &Record, mut records_map: MutexGuard<'_, RecordMap>, exchange: &str) {
+    // 如果k线记录存在于map,则进行一系列操作,用于保存map
+    if let Some(record) = records_map.get_mut(new_record.symbol.as_str()) {
+        let last_trade_minutes = record.time.to_i64().unwrap() / 60000;             // 将毫秒转换成分钟数
+        let new_trade_minutes = new_record.time.to_i64().unwrap() / 60000;          // 同上
+
+        // 如果分钟数不同,则清空列表并添加新的depth
+        if last_trade_minutes != new_trade_minutes {
+            let record_json = serde_json::to_string(record).unwrap();
+            let date_str = minute_to_date(last_trade_minutes);
+            let path = generate_file_path(exchange, date_str.as_str(), new_record.symbol.as_str(), "record", last_trade_minutes);
+
+            write_to_file(record_json, path).await;
+
+            records_map.clear();
+        }
+    }
+
+    records_map.insert(new_record.symbol.clone(), new_record.clone());
+}
 
 // // 更新深度数据
 // pub async fn update_depth(new_depth: &Depth) {

+ 35 - 1
src/server.rs

@@ -4,7 +4,7 @@ use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
 use serde::{Deserialize, Serialize};
 use serde_json::Value;
 use tracing::{info};
-use crate::json_db_utils::collect_special_trades_json;
+use crate::json_db_utils::{collect_records_json, collect_special_trades_json};
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
@@ -78,6 +78,39 @@ async fn get_trades(query: web::Query<TradesQuery>) -> impl Responder {
     }
 }
 
+// 句柄 GET 请求
+#[get("/records")]
+async fn get_records(query: web::Query<TradesQuery>) -> impl Responder {
+    if query.validate() {
+        let response_data = collect_records_json(
+            query.start_time.clone().unwrap(),
+            query.end_time.clone().unwrap(),
+            query.exchange.clone().unwrap().as_str(),
+            query.symbol.clone().unwrap().as_str()
+        ).await;
+
+        let response = Response {
+            query: serde_json::to_value(&query.into_inner()).unwrap(),
+            msg: Some("查询成功".to_string()),
+            code: 200,
+            data: response_data,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    } else {
+        let response = Response {
+            query: serde_json::to_value(&query.into_inner()).unwrap(),
+            msg: Some("查询内容有误,必须包含四个参数:[symbol, exchange, start_time, end_time]".to_string()),
+            code: 500,
+            data: Value::Null,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    }
+}
+
 pub fn run_server(port: u32, running: Arc<AtomicBool>) {
     let addr = format!("0.0.0.0:{}", port);
     info!("数据服务绑定地址:{}", addr);
@@ -86,6 +119,7 @@ pub fn run_server(port: u32, running: Arc<AtomicBool>) {
     let server_fut = HttpServer::new(move || {
         App::new()
             .service(get_trades)
+            .service(get_records)
     })
     .bind(addr)
     .expect("Bind port error")

+ 1 - 1
standard/src/lib.rs

@@ -198,7 +198,7 @@ impl OrderBook {
 /// - `close(Decimal)`: 收盘价;
 /// - `volume(Decimal)`: 交易量;
 /// - `symbol(String)`: 交易对;
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub struct Record {
     pub time: Decimal,
     pub open: Decimal,