Răsfoiți Sursa

项目初始化

skyffire 1 an în urmă
comite
17ec4469d8
9 a modificat fișierele cu 321 adăugiri și 0 ștergeri
  1. 12 0
      .gitignore
  2. 28 0
      Cargo.toml
  3. 5 0
      global/.gitignore
  4. 25 0
      global/Cargo.toml
  5. 1 0
      global/src/lib.rs
  6. 106 0
      global/src/log_utils.rs
  7. 12 0
      src/control_c.rs
  8. 37 0
      src/main.rs
  9. 95 0
      src/server.rs

+ 12 - 0
.gitignore

@@ -0,0 +1,12 @@
+/target
+/.idea
+
+Cargo.lock
+config.toml*
+*.log
+*.log.*
+/logs*
+/test_account.toml
+
+*.json
+/db

+ 28 - 0
Cargo.toml

@@ -0,0 +1,28 @@
+[package]
+name = "indicator_set"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+global = { path="./global" }
+
+tokio = { version = "1.31.0", features = ["full"] }
+
+chrono = "0.4.26"
+
+serde = { version = "1.0.188", features = ["derive"] }
+serde_json = "1.0.105"
+
+rust_decimal = { version = "1.32.0", features = ["maths"] }
+rust_decimal_macros = "1.32.0"
+
+tracing = "0.1"
+tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
+tracing-appender-timezone = { git = "https://github.com/skyfffire/tracing-appender-timezone.git" }
+
+ctrlc = "3.2.5"
+
+actix-rt = "2.5.0"
+actix-web = "4.0.0-beta.12"

+ 5 - 0
global/.gitignore

@@ -0,0 +1,5 @@
+/target
+/.idea
+/logs
+
+Cargo.lock

+ 25 - 0
global/Cargo.toml

@@ -0,0 +1,25 @@
+[package]
+name = "global"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+rust_decimal = "1.32.0"
+rust_decimal_macros = "1.32.0"
+tracing = "0.1"
+tracing-subscriber = { version = "0.3.17", features = [
+    "env-filter",
+    "time",
+    "local-time"
+] }
+time = { version = "0.3.7", features = ["macros"] }
+tracing-appender-timezone = { git = "https://github.com/skyfffire/tracing-appender-timezone.git" }
+serde = "1.0.183"
+serde_derive = "1.0"
+serde_json = "1.0.104"
+chrono = "0.4.26"
+tokio = { version = "1.31.0", features = ["full"] }
+uuid = { version = "1.5.0", features = ["v4"] }
+simple_excel_writer = "0.2.0"

+ 1 - 0
global/src/lib.rs

@@ -0,0 +1 @@
+pub mod log_utils;

+ 106 - 0
global/src/log_utils.rs

@@ -0,0 +1,106 @@
+use std::fmt::Debug;
+use std::io;
+use tracing::{Event, Subscriber, warn};
+use tracing_appender_timezone::non_blocking::WorkerGuard;
+use tracing_subscriber::{fmt, Layer};
+use tracing_subscriber::layer::{Context, SubscriberExt};
+use tracing::field::{Field, Visit};
+use tracing_appender_timezone::rolling::{RollingFileAppender, Rotation};
+
+
+struct ErrorMessageVisitor {
+    message: String
+}
+
+impl Visit for ErrorMessageVisitor {
+    fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
+        if field.name() == "message" {
+            self.message = format!("{:?}", value);
+        }
+    }
+}
+
+
+// 错误报告发送到指定服务器
+struct ReportingLayer {
+    app_name: String,
+}
+impl<S> Layer<S> for ReportingLayer
+    where
+        S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
+{
+    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
+        if event.metadata().level() == &tracing::Level::ERROR {
+            let mut visitor = ErrorMessageVisitor {
+                message: String::new()
+            };
+            event.record(&mut visitor);
+
+            warn!("Unhandle error: {}", self.app_name)
+        }
+    }
+}
+
+pub fn init_log_with_debug() {
+    let _ = final_init(tracing::Level::DEBUG.as_str(), 0, "test".to_string());
+}
+
+pub fn init_log_with_trace() {
+    let _ = final_init(tracing::Level::TRACE.as_str(), 0, "test".to_string());
+}
+
+pub fn init_log_with_info() {
+    let _ = final_init(tracing::Level::INFO.as_str(), 0, "test".to_string());
+}
+
+pub fn final_init(level: &str, port: u32, app_name: String) -> WorkerGuard {
+    let mut path = String::new();
+    path.push_str("./logs");
+    path.push_str(port.to_string().as_str());
+
+    let file_appender = RollingFileAppender::builder()
+        .time_zone(8)
+        .rotation(Rotation::DAILY)
+        .filename_suffix("log")
+        .build(path)
+        .expect("initializing rolling file appender failed");
+    let (non_blocking, guard) = tracing_appender_timezone::non_blocking(file_appender);
+
+    use time::{macros::format_description, UtcOffset};
+    use tracing_subscriber::{fmt::time::OffsetTime};
+    let local_time = OffsetTime::new(
+        UtcOffset::from_hms(8, 0, 0).unwrap(),
+        format_description!("[month]-[day] [hour]:[minute]:[second].[subsecond digits:3]"),
+    );
+
+    let fmt_layer = fmt::layer()
+        .with_timer(local_time.clone())
+        .with_line_number(true)
+        .with_target(true)
+        .with_level(true)
+        .with_writer(io::stdout)
+        .with_span_events(fmt::format::FmtSpan::FULL);
+
+    let file_layer = fmt::layer()
+        .with_timer(local_time.clone())
+        .with_line_number(true)
+        .with_target(true)
+        .with_ansi(false)
+        .with_level(true)
+        .with_writer(non_blocking.clone())
+        .with_span_events(fmt::format::FmtSpan::FULL);
+
+    let reporting_layer = ReportingLayer {
+        app_name
+    };
+
+    let layer = tracing_subscriber::Registry::default()
+        .with(fmt_layer)
+        .with(file_layer)
+        .with(reporting_layer)
+        .with(tracing_subscriber::EnvFilter::new(level));
+
+    tracing::subscriber::set_global_default(layer).unwrap();
+
+    return guard;
+}

+ 12 - 0
src/control_c.rs

@@ -0,0 +1,12 @@
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use tracing::info;
+
+pub fn exit_handler(running: Arc<AtomicBool>) {
+    let r = running.clone();
+    ctrlc::set_handler(move || {
+        info!("检测到退出信号!");
+        r.store(false, Ordering::Relaxed);
+    })
+    .expect("Error setting Ctrl-C handler");
+}

+ 37 - 0
src/main.rs

@@ -0,0 +1,37 @@
+mod control_c;
+mod server;
+
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tracing::{info, warn};
+use tracing_appender_timezone::non_blocking::WorkerGuard;
+
+// 日志级别配置
+fn log_level_init(log_str: String, port: u32, app_name: String) -> WorkerGuard {
+    info!("日志级别读取成功:{}。", log_str);
+    global::log_utils::final_init(log_str.as_str(), port, app_name)
+}
+
+#[tokio::main(flavor = "multi_thread")]
+async fn main() {
+    // 日志级别配置
+    let _ = log_level_init("info".to_string(), 8888, "data-center".to_string());
+    // 掌控全局的关闭
+    let running = Arc::new(AtomicBool::new(true));
+    // panic错误捕获,panic级别的错误直接退出
+    let panic_running = running.clone();
+    std::panic::set_hook(Box::new(move |panic_info| {
+        let msg = format!("type=panic, msg={:?}, location={:?}", panic_info.to_string(), panic_info.location());
+        warn!("{}", msg);
+        panic_running.store(false, Ordering::Relaxed);
+    }));
+    // 初始化数据服务器
+    server::run_server(8888, running.clone());
+    // ctrl c退出检查程序
+    control_c::exit_handler(running.clone());
+    // 每一秒检查一次程序是否结束
+    while running.load(Ordering::Relaxed) {
+        tokio::time::sleep(Duration::from_secs(1)).await;
+    }
+}

+ 95 - 0
src/server.rs

@@ -0,0 +1,95 @@
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+use tracing::{info};
+
+// 定义用于反序列化查询参数的结构体
+#[derive(Serialize, Deserialize, Clone)]
+pub struct TradesQuery {
+    symbol: Option<String>,
+    exchange: Option<String>,
+    start_time: Option<i64>,
+    end_time: Option<i64>,
+}
+
+impl TradesQuery {
+    pub fn validate(&self) -> bool {
+        if self.symbol.is_none() {
+            return false
+        }
+
+        if self.exchange.is_none() {
+            return false
+        }
+
+        if self.start_time.is_none()  {
+            return false
+        }
+
+        if self.end_time.is_none()  {
+            return false
+        }
+
+        true
+    }
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct Response {
+    message: Option<String>,
+    query_string: Value,
+    data: Value,
+    code: i32,
+}
+
+// 句柄 GET 请求
+#[get("/trades")]
+async fn get_trades(query: web::Query<TradesQuery>) -> impl Responder {
+    if query.validate() {
+        let response_data = Value::Null;
+
+        let response = Response {
+            query_string: serde_json::to_value(&query.into_inner()).unwrap(),
+            message: Some("查询成功".to_string()),
+            code: 200,
+            data: response_data,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    } else {
+        let response = Response {
+            query_string: serde_json::to_value(&query.into_inner()).unwrap(),
+            message: Some("查询内容有误,必须包含四个参数:[symbol, exchange, start_time, end_time]".to_string()),
+            code: 400,
+            data: Value::Null,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::BadRequest().content_type("application/json").body(json_string)
+    }
+}
+
+pub fn run_server(port: u32, running: Arc<AtomicBool>) {
+    let addr = format!("0.0.0.0:{}", port);
+    info!("指标服务绑定地址:{}", addr);
+
+    // 启动server
+    let server_fut = HttpServer::new(move || {
+        App::new()
+            .service(get_trades)
+    })
+    .bind(addr)
+    .expect("Bind port error")
+    .run();
+
+    info!("指标集服务已运行。");
+
+    let r = running.clone();
+    tokio::spawn(async move {
+        server_fut.await.expect("error running the server");
+        r.store(false, Ordering::Relaxed);
+    });
+}