Browse Source

代码结构优化

skyfffire 2 years ago
parent
commit
7257096173
3 changed files with 128 additions and 104 deletions
  1. 1 0
      Cargo.toml
  2. 33 104
      src/main.rs
  3. 94 0
      src/quant_libs.rs

+ 1 - 0
Cargo.toml

@@ -17,6 +17,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
 serde = { version = "1.0.188", features = ["derive"] }
 actix-rt = "2.5.0"
 actix-web = "4.0.0-beta.12"
+ctrlc = "3.2.5"
 
 [workspace]
 members=[

+ 33 - 104
src/main.rs

@@ -1,116 +1,45 @@
 mod server;
+mod control_c;
+mod quant_libs;
 
-use std::collections::BTreeMap;
-use std::io::Error;
 use std::str::FromStr;
-use std::sync::Arc;
+use std::sync::atomic::Ordering;
 use std::time::Duration;
-use tokio::sync::{mpsc, Mutex};
-use tokio::try_join;
-use tracing::{error, info};
-use exchanges::gate_swap_rest::GateSwapRest;
-use standard::exchange::ExchangeEnum::GateSwap;
-use standard::Order;
-use strategy::model::OrderInfo;
+use tracing::{info};
 use strategy::params::Params;
-use strategy::{exchange_disguise, quant};
-use strategy::quant::Quant;
+
+// 日志级别配置
+fn log_level_init(log_str: String) {
+    let tracing_log_level = tracing::Level::from_str(log_str.as_str()).unwrap();
+    info!("日志级别读取成功:{}。", tracing_log_level);
+    global::log_utils::final_init(tracing_log_level);
+}
 
 #[tokio::main]
 async fn main() {
     // 获取本地配置
     let params = Params::new("config.toml").unwrap();
-
+    info!("配置读取成功:{:?}。", params);
     // 日志级别配置
-    let tracing_log_level = tracing::Level::from_str(params.log_level.as_str()).unwrap();
-    global::log_utils::final_init(tracing_log_level);
-
-    info!(?params);
-    info!(?tracing_log_level);
-
-    let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
-    exchange_params.insert("access_key".to_string(), params.access_key.clone());
-    exchange_params.insert("secret_key".to_string(), params.secret_key.clone());
-
-    let (order_sender, mut order_receiver) = mpsc::channel::<Order>(100);
-    let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
-
-    let mut quant_obj = Quant::new(GateSwap, params.clone(), exchange_params.clone(), order_sender.clone(), error_sender.clone()).await;
-    let trade_name = quant_obj.trade_name.clone();
-    let mut quant_arc = Arc::new(Mutex::new(quant_obj));
-
-    info!("quant初始化……");
-    quant_arc.lock().await.before_trade().await;
-    let ref_name = quant_arc.lock().await.ref_name[0].clone();
-    // 参考交易所
-    exchange_disguise::run_reference_exchange(params.ref_exchange.get(0).unwrap().clone(), quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
-    // 交易交易所
-    exchange_disguise::run_transactional_exchange(params.exchange, quant_arc.clone(), trade_name, vec![params.pair.clone()], exchange_params.clone()).await;
-    // 启动定期触发的系统逻辑
-    quant::on_timer(quant_arc.clone());
-    // 启动策略逻辑
-    quant::run_strategy(quant_arc.clone());
-
-    info!("quant初始化完成。");
-
-    let order_handler_quant_arc = quant_arc.clone();
-    let order_handler_thread = tokio::spawn(async move {
-        loop {
-            tokio::time::sleep(Duration::from_millis(1)).await;
-
-            match order_receiver.recv().await {
-                Some(order) => {
-                    {
-                        let mut quant = order_handler_quant_arc.lock().await;
-
-                        let mut order_info = OrderInfo {
-                            symbol: "".to_string(),
-                            amount: order.amount.abs(),
-                            side: "".to_string(),
-                            price: order.price,
-                            client_id: order.custom_id,
-                            filled_price: order.avg_price,
-                            filled: order.deal_amount.abs(),
-                            order_id: order.id,
-                            local_time: 0,
-                            create_time: 0,
-                            status: order.status,
-                            fee: Default::default(),
-                        };
-
-                        quant.update_local_order(order_info.clone());
-                    }
-                },
-                None => {
-                    error!("Order channel has been closed!");
-                }
-            }
-        }
-    });
-
-    // let error_handler_quant_arc = quant_arc.clone();
-    let error_handler_thread = tokio::spawn(async move {
-        loop {
-            tokio::time::sleep(Duration::from_millis(1)).await;
-
-            match error_receiver.recv().await {
-                Some(error) => {
-                    // let quant = error_handler_quant_arc.lock().await;
-
-                    error!("main: 订单出现错误{:?}", error);
-                },
-                None => {
-                    error!("Error channel has been closed!");
-                }
-            }
-        }
-    });
-
-    let server_thread = tokio::spawn(async move {
-        // let server = server::run_server(5566, quant_arc);
-        // info!("中控服务已运行。");
-        // server.await
-    });
-
-    try_join!(order_handler_thread, error_handler_thread, server_thread).unwrap();
+    log_level_init(params.log_level.clone());
+    // 退出检查程序
+    let running = control_c::exit_handler();
+    // quant初始化动作
+    let quant_arc = quant_libs::init(params).await;
+    // 初始化中控服务
+    server::run_server(6000, running.clone(), quant_arc.clone());
+
+    // 每一秒检查一次程序是否结束
+    while running.load(Ordering::Relaxed) {
+        tokio::time::sleep(Duration::from_secs(1)).await;
+    }
+
+    info!("检测到退出信号!等待10s,等待其他线程后续处理完毕(再次按control c可以立马结束)……");
+    let mut i = 10;
+    while i > 0 {
+        tokio::time::sleep(Duration::from_secs(1)).await;
+        info!("{}", i);
+        i = i - 1;
+    }
+    info!("程序已退出!为以防万一,请再次检查仓位和订单!");
 }

+ 94 - 0
src/quant_libs.rs

@@ -0,0 +1,94 @@
+use strategy::params::Params;
+use strategy::quant::Quant;
+use std::collections::BTreeMap;
+use std::io::Error;
+use strategy::{exchange_disguise, quant};
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::sync::{mpsc, Mutex};
+use tracing::{error, info};
+use standard::exchange::ExchangeEnum::GateSwap;
+use standard::Order;
+use strategy::model::OrderInfo;
+
+pub async fn init(params: Params) -> Arc<Mutex<Quant>> {
+    // 封装
+    let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
+    exchange_params.insert("access_key".to_string(), params.access_key.clone());
+    exchange_params.insert("secret_key".to_string(), params.secret_key.clone());
+
+    let (order_sender, mut order_receiver) = mpsc::channel::<Order>(100);
+    let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
+
+    let mut quant_obj = Quant::new(GateSwap, params.clone(), exchange_params.clone(), order_sender.clone(), error_sender.clone()).await;
+    let ref_name = quant_obj.ref_name[0].clone();
+    let trade_name = quant_obj.trade_name.clone();
+
+    info!("quant初始化……");
+    quant_obj.before_trade().await;
+    let mut quant_arc = Arc::new(Mutex::new(quant_obj));
+    // 参考交易所
+    exchange_disguise::run_reference_exchange(params.ref_exchange.get(0).unwrap().clone(), quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
+    // 交易交易所
+    exchange_disguise::run_transactional_exchange(params.exchange, quant_arc.clone(),  trade_name, vec![params.pair.clone()], exchange_params.clone()).await;
+    // 启动定期触发的系统逻辑
+    quant::on_timer(quant_arc.clone());
+    // 启动策略逻辑
+    quant::run_strategy(quant_arc.clone());
+    info!("quant初始化完成。");
+
+    let order_handler_quant_arc = quant_arc.clone();
+    tokio::spawn(async move {
+        loop {
+            tokio::time::sleep(Duration::from_millis(1)).await;
+
+            match order_receiver.recv().await {
+                Some(order) => {
+                    {
+                        let mut quant = order_handler_quant_arc.lock().await;
+
+                        let mut order_info = OrderInfo {
+                            symbol: "".to_string(),
+                            amount: order.amount.abs(),
+                            side: "".to_string(),
+                            price: order.price,
+                            client_id: order.custom_id,
+                            filled_price: order.avg_price,
+                            filled: order.deal_amount.abs(),
+                            order_id: order.id,
+                            local_time: 0,
+                            create_time: 0,
+                            status: order.status,
+                            fee: Default::default(),
+                        };
+
+                        quant.update_local_order(order_info.clone());
+                    }
+                },
+                None => {
+                    error!("Order channel has been closed!");
+                }
+            }
+        }
+    });
+
+    // let error_handler_quant_arc = quant_arc.clone();
+    tokio::spawn(async move {
+        loop {
+            tokio::time::sleep(Duration::from_millis(1)).await;
+
+            match error_receiver.recv().await {
+                Some(error) => {
+                    // let quant = error_handler_quant_arc.lock().await;
+
+                    error!("main: 订单出现错误{:?}", error);
+                },
+                None => {
+                    error!("Error channel has been closed!");
+                }
+            }
+        }
+    });
+
+    return quant_arc;
+}