Просмотр исходного кода

完成万里长征第一步:生成数据联通。

skyffire 1 год назад
Родитель
Сommit
c621fe60d8
4 измененных файлов с 120 добавлено и 33 удалено
  1. 3 1
      Cargo.toml
  2. 55 0
      src/db_connector.rs
  3. 3 2
      src/main.rs
  4. 59 30
      src/server.rs

+ 3 - 1
Cargo.toml

@@ -25,4 +25,6 @@ tracing-appender-timezone = { git = "https://github.com/skyfffire/tracing-append
 ctrlc = "3.2.5"
 
 actix-rt = "2.5.0"
-actix-web = "4.0.0-beta.12"
+actix-web = "4.0.0-beta.12"
+
+reqwest = { version = "0.11", features = ["json"] }

+ 55 - 0
src/db_connector.rs

@@ -0,0 +1,55 @@
+use reqwest::Client;
+use rust_decimal::Decimal;
+use serde::{Deserialize, Serialize};
+use serde_json::{json};
+use crate::server::Response;
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Trade {
+    pub id: String,
+    pub time: Decimal,
+    pub size: Decimal,
+    pub price: Decimal
+}
+
+pub async fn get_trades_json(exchange: &str, symbol: &str, start_at: i64, end_at: i64) -> Response {
+    let url = "http://dc.skyfffire.com:8888/trades";
+    let params = json!({
+        "exchange": exchange,
+        "symbol": symbol,
+        "start_time": start_at,
+        "end_time": end_at
+    });
+
+    // 创建 HTTP 客户端
+    let client = Client::new();
+
+    // 发送 GET 请求
+    let response = client.get(url)
+        .query(&params)
+        .send()
+        .await.unwrap();
+
+    // 错误处理
+    if response.status().is_success() {
+        let response_text = response.text().await.unwrap();
+        serde_json::from_str(response_text.as_str()).unwrap()
+    } else {
+        Response {
+            message: Some("请求失败,预计是指标层的网络请求错误。".to_string()),
+            query_string: Default::default(),
+            data: Default::default(),
+            code: -200,
+        }
+    }
+}
+
+#[tokio::test]
+async fn get_trades_test() {
+    use global::log_utils::init_log_with_info;
+    use tracing::info;
+    init_log_with_info();
+
+    let rst = get_trades_json("bitget_usdt_swap", "BTC_USDT", 1713210360000, 1713210960000).await;
+    info!(?rst)
+}

+ 3 - 2
src/main.rs

@@ -1,5 +1,6 @@
 mod control_c;
 mod server;
+pub mod db_connector;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -16,7 +17,7 @@ fn log_level_init(log_str: String, port: u32, app_name: String) -> WorkerGuard {
 #[tokio::main(flavor = "multi_thread")]
 async fn main() {
     // 日志级别配置
-    let _ = log_level_init("info".to_string(), 8888, "data-center".to_string());
+    let _ = log_level_init("info".to_string(), 18888, "data-center".to_string());
     // 掌控全局的关闭
     let running = Arc::new(AtomicBool::new(true));
     // panic错误捕获,panic级别的错误直接退出
@@ -27,7 +28,7 @@ async fn main() {
         panic_running.store(false, Ordering::Relaxed);
     }));
     // 初始化数据服务器
-    server::run_server(8888, running.clone());
+    server::run_server(18888, running.clone());
     // ctrl c退出检查程序
     control_c::exit_handler(running.clone());
     // 每一秒检查一次程序是否结束

+ 59 - 30
src/server.rs

@@ -1,20 +1,21 @@
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
+use chrono::Utc;
+use rust_decimal::Decimal;
 use serde::{Deserialize, Serialize};
 use serde_json::Value;
 use tracing::{info};
+use crate::db_connector::get_trades_json;
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
-pub struct TradesQuery {
+pub struct IndicatorQuery {
     symbol: Option<String>,
-    exchange: Option<String>,
-    start_time: Option<i64>,
-    end_time: Option<i64>,
+    exchange: Option<String>
 }
 
-impl TradesQuery {
+impl IndicatorQuery {
     pub fn validate(&self) -> bool {
         if self.symbol.is_none() {
             return false
@@ -24,45 +25,72 @@ impl TradesQuery {
             return false
         }
 
-        if self.start_time.is_none()  {
-            return false
-        }
-
-        if self.end_time.is_none()  {
-            return false
-        }
-
         true
     }
 }
 
 #[derive(Serialize, Deserialize)]
 pub struct Response {
-    message: Option<String>,
-    query_string: Value,
-    data: Value,
-    code: i32,
+    pub message: Option<String>,
+    pub query_string: Value,
+    pub data: Value,
+    pub code: i32,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Trade {
+    pub id: String,
+    pub time: Decimal,
+    pub size: Decimal,
+    pub price: Decimal
 }
 
 // 句柄 GET 请求
-#[get("/trades")]
-async fn get_trades(query: web::Query<TradesQuery>) -> impl Responder {
+#[get("/get_symbols")]
+async fn get_symbols_by_filter() -> impl Responder {
+    let response = Response {
+        query_string: Value::Null,
+        message: Some("get_symbols_by_filter 这个接口还没做".to_string()),
+        code: 400,
+        data: Value::Null,
+    };
+
+    let json_string = serde_json::to_string(&response).unwrap();
+    HttpResponse::BadRequest().content_type("application/json").body(json_string)
+}
+
+#[get("/get_indicator")]
+async fn get_indicator(query: web::Query<IndicatorQuery>) -> impl Responder {
     if query.validate() {
-        let response_data = Value::Null;
+        // 链接数据服务器查询数据
+        let end_time = Utc::now().timestamp_millis();
+        let start_time = end_time - 4 * 60 * 60 * 1000;
+        let db_response = get_trades_json(
+            query.exchange.clone().unwrap().as_str(),
+            query.symbol.clone().unwrap().as_str(),
+            start_time,
+            end_time,
+        ).await;
 
-        let response = Response {
-            query_string: serde_json::to_value(&query.into_inner()).unwrap(),
-            message: Some("查询成功".to_string()),
-            code: 200,
-            data: response_data,
-        };
+        // 对数据库返回的数据进行容错处理
+        if db_response.code == 200 {
+            let response = Response {
+                query_string: serde_json::to_value(&query.into_inner()).unwrap(),
+                message: Some("指标生成完毕".to_string()),
+                code: 200,
+                data: db_response.data,
+            };
 
-        let json_string = serde_json::to_string(&response).unwrap();
-        HttpResponse::Ok().content_type("application/json").body(json_string)
+            let json_string = serde_json::to_string(&response).unwrap();
+            HttpResponse::Ok().content_type("application/json").body(json_string)
+        } else {
+            let json_string = serde_json::to_string(&db_response).unwrap();
+            HttpResponse::Ok().content_type("application/json").body(json_string)
+        }
     } else {
         let response = Response {
             query_string: serde_json::to_value(&query.into_inner()).unwrap(),
-            message: Some("查询内容有误,必须包含四个参数:[symbol, exchange, start_time, end_time]".to_string()),
+            message: Some("[symbol以及exchange必传]".to_string()),
             code: 400,
             data: Value::Null,
         };
@@ -79,7 +107,8 @@ pub fn run_server(port: u32, running: Arc<AtomicBool>) {
     // 启动server
     let server_fut = HttpServer::new(move || {
         App::new()
-            .service(get_trades)
+            .service(get_symbols_by_filter)
+            .service(get_indicator)
     })
     .bind(addr)
     .expect("Bind port error")