Browse Source

数据服务OK了。

skyffire 1 year ago
parent
commit
eca24eed7c
4 changed files with 123 additions and 3 deletions
  1. 12 0
      src/control_c.rs
  2. 4 3
      src/json_db_utils.rs
  3. 6 0
      src/main.rs
  4. 101 0
      src/server.rs

+ 12 - 0
src/control_c.rs

@@ -0,0 +1,12 @@
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use tracing::info;
+
+pub fn exit_handler(running: Arc<AtomicBool>) {
+    let r = running.clone();
+    ctrlc::set_handler(move || {
+        info!("检测到退出信号!");
+        r.store(false, Ordering::Relaxed);
+    })
+    .expect("Error setting Ctrl-C handler");
+}

+ 4 - 3
src/json_db_utils.rs

@@ -1,4 +1,5 @@
 use std::path::{Path, PathBuf};
+use serde_json::Value;
 use tokio::fs::File;
 use tokio::io::AsyncWriteExt;
 use tokio::{fs, spawn};
@@ -73,7 +74,7 @@ pub async fn read_special_trades_from_file<P: AsRef<Path>>(file_path: P) -> Vec<
 }
 
 // 将一个时间段范围内的所有SpecialTrade返回(以json形式)
-pub async fn collect_special_trades_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> String {
+pub async fn collect_special_trades_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
     let mut all_trades = Vec::new();
     let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "trades");
 
@@ -85,7 +86,7 @@ pub async fn collect_special_trades_json(start_timestamp: i64, end_timestamp: i6
         all_trades.append(&mut trades);
     }
 
-    serde_json::to_string(&all_trades).unwrap()
+    serde_json::to_value(&all_trades).unwrap()
 }
 
 #[tokio::test]
@@ -93,7 +94,7 @@ async fn read_test() {
     use global::log_utils::init_log_with_info;
     init_log_with_info();
 
-    let rst = collect_special_trades_json(1712824200000, 1712824380000, "gate_usdt_swap", "CFX_USDT").await;
+    let rst = collect_special_trades_json(1712894400000, 1712912400000, "gate_usdt_swap", "CFX_USDT").await;
     info!("{}", rst)
 }
 

+ 6 - 0
src/main.rs

@@ -1,5 +1,7 @@
 mod gate_usdt_swap_data_listener;
 mod json_db_utils;
+mod control_c;
+mod server;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -28,6 +30,10 @@ async fn main() {
         warn!("{}", msg);
         panic_running.store(false, Ordering::Relaxed);
     }));
+    // 初始化数据服务器
+    server::run_server(8888, running.clone());
+    // ctrl c退出检查程序
+    control_c::exit_handler(running.clone());
     // 每一秒检查一次程序是否结束
     while running.load(Ordering::Relaxed) {
         tokio::time::sleep(Duration::from_secs(1)).await;

+ 101 - 0
src/server.rs

@@ -0,0 +1,101 @@
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+use tracing::{info};
+use crate::json_db_utils::collect_special_trades_json;
+
+// 定义用于反序列化查询参数的结构体
+#[derive(Serialize, Deserialize, Clone)]
+pub struct TradesQuery {
+    symbol: Option<String>,
+    exchange: Option<String>,
+    start_time: Option<i64>,
+    end_time: Option<i64>,
+}
+
+impl TradesQuery {
+    pub fn validate(&self) -> bool {
+        if self.symbol.is_none() {
+            return false
+        }
+
+        if self.exchange.is_none() {
+            return false
+        }
+
+        if self.start_time.is_none()  {
+            return false
+        }
+
+        if self.end_time.is_none()  {
+            return false
+        }
+
+        true
+    }
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct Response {
+    message: Option<String>,
+    query_string: Value,
+    data: Value,
+    code: i32,
+}
+
+// 句柄 GET 请求
+#[get("/trades")]
+async fn get_trades(query: web::Query<TradesQuery>) -> impl Responder {
+    if query.validate() {
+        let response_data = collect_special_trades_json(
+            query.start_time.clone().unwrap(),
+            query.end_time.clone().unwrap(),
+            query.exchange.clone().unwrap().as_str(),
+            query.symbol.clone().unwrap().as_str()
+        ).await;
+
+        let response = Response {
+            query_string: serde_json::to_value(&query.into_inner()).unwrap(),
+            message: Some("查询成功".to_string()),
+            code: 200,
+            data: response_data,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    } else {
+        let response = Response {
+            query_string: serde_json::to_value(&query.into_inner()).unwrap(),
+            message: Some("查询内容有误,必须包含四个参数:[symbol, exchange, start_time, end_time]".to_string()),
+            code: 400,
+            data: Value::Null,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::BadRequest().content_type("application/json").body(json_string)
+    }
+}
+
+pub fn run_server(port: u32, running: Arc<AtomicBool>) {
+    let addr = format!("0.0.0.0:{}", port);
+    info!("数据服务绑定地址:{}", addr);
+
+    // 启动server
+    let server_fut = HttpServer::new(move || {
+        App::new()
+            .service(get_trades)
+    })
+    .bind(addr)
+    .expect("Bind port error")
+    .run();
+
+    info!("数据仓库服务已运行。");
+
+    let r = running.clone();
+    tokio::spawn(async move {
+        server_fut.await.expect("error running the server");
+        r.store(false, Ordering::Relaxed);
+    });
+}