Ver Fonte

修复:在rust执行清退时中控无法请求到状态导致rust被杀死的缺陷。

skyffire há 1 ano atrás
pai
commit
38dab47194

+ 1 - 1
Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "as-rust"
-version = "1.5.1"
+version = "1.5.2"
 edition = "2021"
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

+ 11 - 0
global/src/cci.rs

@@ -0,0 +1,11 @@
+use rust_decimal::Decimal;
+use serde_derive::Serialize;
+
+#[derive(Serialize, Clone)]
+pub struct CentralControlInfo {
+    pub now_balance: Decimal,                   // 钱包余额
+    pub unrealized_pn_l: Decimal,               // 未实现盈亏
+    pub pos: Decimal,                           // 持仓数量
+    pub entry_price: Decimal,                   // 开仓价格
+    pub now_price: Decimal,                     // 当前价格
+}

+ 1 - 0
global/src/lib.rs

@@ -4,3 +4,4 @@ pub mod params;
 pub mod trace_stack;
 pub mod export_utils;
 pub mod account_info;
+pub mod cci;

+ 14 - 2
src/main.rs

@@ -5,8 +5,10 @@ mod quant_libs;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
+use tokio::sync::Mutex;
 use tracing::{info, warn};
 use tracing_appender_timezone::non_blocking::WorkerGuard;
+use global::cci::CentralControlInfo;
 use global::log_utils::send_remote_err_log;
 use global::params::Params;
 
@@ -57,12 +59,22 @@ async fn main() {
         panic_running.store(false, Ordering::Relaxed);
     }));
 
+    // 中央控制器信息
+    let cci = CentralControlInfo {
+        now_balance: Default::default(),
+        unrealized_pn_l: Default::default(),
+        pos: Default::default(),
+        entry_price: Default::default(),
+        now_price: Default::default(),
+    };
+    let cci_arc = Arc::new(Mutex::new(cci));
+
     // ws退出程序
     let ws_running = Arc::new(AtomicBool::new(true));
     // quant初始化动作
-    let quant_arc = quant_libs::init(params.clone(), ws_running.clone(), running.clone()).await;
+    let quant_arc = quant_libs::init(params.clone(), ws_running.clone(), running.clone(), cci_arc.clone()).await;
     // 初始化中控服务
-    server::run_server(params.port.clone(), running.clone(), quant_arc.clone());
+    server::run_server(params.port.clone(), running.clone(), cci_arc.clone());
     // ctrl c退出检查程序
     control_c::exit_handler(running.clone());
 

+ 12 - 2
src/quant_libs.rs

@@ -9,12 +9,16 @@ use std::time::Duration;
 use chrono::Utc;
 use tokio::sync::{mpsc, Mutex};
 use tracing::{error, info};
+use global::cci::CentralControlInfo;
 use global::params::Params;
 use global::trace_stack::TraceStack;
 use standard::Order;
 use strategy::model::OrderInfo;
 
-pub async fn init(params: Params, ws_running: Arc<AtomicBool>, running: Arc<AtomicBool>) -> Arc<Mutex<Quant>> {
+pub async fn init(params: Params,
+                  ws_running: Arc<AtomicBool>,
+                  running: Arc<AtomicBool>,
+                  cci_arc: Arc<Mutex<CentralControlInfo>>) -> Arc<Mutex<Quant>> {
     // 封装
     let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
     exchange_params.insert("access_key".to_string(), params.access_key.clone());
@@ -24,7 +28,13 @@ pub async fn init(params: Params, ws_running: Arc<AtomicBool>, running: Arc<Atom
     let (order_sender, mut order_receiver) = mpsc::channel::<Order>(100);
     let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
 
-    let mut quant_obj = Quant::new(params.exchange.clone(), params.clone(), exchange_params.clone(), order_sender.clone(), error_sender.clone(), running.clone()).await;
+    let mut quant_obj = Quant::new(params.exchange.clone(),
+                                   params.clone(),
+                                   exchange_params.clone(),
+                                   order_sender.clone(),
+                                   error_sender.clone(),
+                                   running.clone(),
+                                   cci_arc.clone()).await;
     let ref_name = quant_obj.ref_name[0].clone();
     let trade_name = quant_obj.trade_name.clone();
 

+ 9 - 57
src/server.rs

@@ -1,72 +1,24 @@
-use std::ops::Div;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
-use serde::{Serialize};
-use strategy::quant::Quant;
 use actix_web::{web, App, HttpResponse, HttpServer, Responder, post, get};
-use rust_decimal::Decimal;
 use tokio::sync::Mutex;
 use tracing::{info};
-
-
-// #[derive(Deserialize, Debug)]
-// struct InputData {
-//     stop: i64,
-// }
-
-#[derive(Serialize, Clone)]
-struct AccountInfo {
-    now_balance: Decimal,                   // 钱包余额
-    unrealized_pn_l: Decimal,               // 未实现盈亏
-    pos: Decimal,                           // 持仓数量
-    entry_price: Decimal,                   // 开仓价格
-    now_price: Decimal,                     // 当前价格
-}
+use global::cci::CentralControlInfo;
 
 #[derive(Clone)]
 struct Arcs {
     running: Arc<AtomicBool>,
-    quant_arc: Arc<Mutex<Quant>>
+    cci_arc: Arc<Mutex<CentralControlInfo>>
 }
 
 // 句柄 GET 请求
 #[get("/account")]
 async fn get_account(arcs: web::Data<Arcs>) -> impl Responder {
-    let quant = arcs.quant_arc.lock().await;
-
-    // --------------------------------数据逻辑处理--------------------------------
-    let mut pos = quant.local_position_by_orders.long_pos - quant.local_position_by_orders.short_pos;
-    if !quant.exchange.contains("spot") {
-        pos = quant.local_position.long_pos - quant.local_position.short_pos;
-    }
-    pos.rescale(8);
+    // --------------------------------数据解锁处理--------------------------------
+    let cci = arcs.cci_arc.lock().await;
 
-    let mut entry_price;
-    if pos.gt(&Decimal::ZERO) {
-        entry_price = quant.local_position_by_orders.long_avg;
-    } else {
-        entry_price = quant.local_position_by_orders.short_avg;
-    }
-    entry_price.rescale(8);
-
-    let mut now_balance = quant.strategy.equity.div(quant.used_pct);
-    now_balance.rescale(4);
-
-    let mut unrealized_pn_l = quant.local_profit;
-    unrealized_pn_l.rescale(4);
-
-    let mut now_price = quant.strategy.mp;
-    now_price.rescale(8);
-
-    // --------------------------------发送到远端--------------------------------
-    let info = AccountInfo{
-        now_balance,
-        unrealized_pn_l,
-        pos,
-        entry_price,
-        now_price
-    };
-    let json_string = serde_json::to_string(&info).unwrap();
+    // --------------------------------回报--------------------------------
+    let json_string = serde_json::to_string(&(cci.clone())).unwrap();
     HttpResponse::Ok().content_type("application/json").body(json_string)
 }
 
@@ -74,16 +26,16 @@ async fn get_account(arcs: web::Data<Arcs>) -> impl Responder {
 #[post("/exit")]
 async fn on_change(arcs: web::Data<Arcs>) -> impl Responder {
     arcs.running.store(false, Ordering::Relaxed);
-    HttpResponse::Ok().body(format!("程序已收到退出信号,将在5秒内退出。"))
+    HttpResponse::Ok().body("程序已收到退出信号,将在清退仓位后退出。".to_string())
 }
 
-pub fn run_server(port: u32, running: Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>) {
+pub fn run_server(port: u32, running: Arc<AtomicBool>, cci_arc: Arc<Mutex<CentralControlInfo>>) {
     let addr = format!("0.0.0.0:{}", port);
     info!("中控绑定地址:{}", addr);
 
     let arcs = Arcs {
         running: running.clone(),
-        quant_arc: quant_arc.clone(),
+        cci_arc: cci_arc.clone(),
     };
 
     let server_fut = HttpServer::new(move || {

+ 2 - 2
strategy/src/bybit_usdt_swap.rs

@@ -129,7 +129,7 @@ async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data
         let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, data, run_symbol.clone());
         {
             let mut quant = bot_arc_clone.lock().await;
-            quant.update_equity(account);
+            quant.update_equity(account).await;
         }
     } else if data.channel == "order" {
         trace_stack.on_before_format();
@@ -167,7 +167,7 @@ async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data
         let positions = standard::handle_info::HandleSwapInfo::handle_position(BybitSwap,data, ct_val.clone());
         {
             let mut quant = bot_arc_clone.lock().await;
-            quant.update_position(positions);
+            quant.update_position(positions).await;
         }
     }
 }

+ 1 - 1
strategy/src/exchange_disguise.rs

@@ -95,7 +95,7 @@ pub async fn on_special_depth(bot_arc: Arc<Mutex<Quant>>,
         *update_flag_u = special_depth.t;
         let mut quant = bot_arc.lock().await;
         quant._update_ticker(special_depth.ticker, label.clone());
-        quant._update_depth(special_depth.depth.clone(), label.clone(), trace_stack);
+        quant._update_depth(special_depth.depth.clone(), label.clone(), trace_stack).await;
         quant.local_depths.insert(special_depth.name, special_depth.depth);
     }
 }

+ 2 - 2
strategy/src/gate_swap.rs

@@ -119,7 +119,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
         let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
         {
             let mut quant = bot_arc_clone.lock().await;
-            quant.update_equity(account);
+            quant.update_equity(account).await;
         }
     } else if data.channel == "futures.orders" {
         trace_stack.on_before_format();
@@ -154,7 +154,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
         let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
         {
             let mut quant = bot_arc_clone.lock().await;
-            quant.update_position(positions);
+            quant.update_position(positions).await;
         }
     } else if data.channel == "futures.trades" {
         let mut quant = bot_arc_clone.lock().await;

+ 1 - 1
strategy/src/kucoin_swap.rs

@@ -176,7 +176,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
         let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
         {
             let mut quant = bot_arc_clone.lock().await;
-            quant.update_position(positions);
+            quant.update_position(positions).await;
         }
     } else if data.channel == "match" {
         let mut quant = bot_arc_clone.lock().await;

+ 2 - 2
strategy/src/okx_usdt_swap.rs

@@ -149,13 +149,13 @@ async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data
         let positions = standard::handle_info::HandleSwapInfo::handle_position(OkxSwap,data, ct_val);
         {
             let mut quant = bot_arc_clone.lock().await;
-            quant.update_position(positions);
+            quant.update_position(positions).await;
         }
     } else if data.channel == "account" {
         let account = standard::handle_info::HandleSwapInfo::handle_account_info(OkxSwap, data.clone(), run_symbol.clone());
         {
             let mut quant = bot_arc_clone.lock().await;
-            quant.update_equity(account);
+            quant.update_equity(account).await;
         }
     }
 }

+ 68 - 5
strategy/src/quant.rs

@@ -15,6 +15,7 @@ use tokio::sync::{Mutex};
 use tokio::task::JoinHandle;
 use tokio::time::sleep;
 use tracing::{error, info, warn, instrument};
+use global::cci::CentralControlInfo;
 use global::params::Params;
 use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
@@ -114,10 +115,19 @@ pub struct Quant {
     pub recall_max_count: usize,                            // 最大回溯条数
     pub short_volume_rate: Decimal,                         // 主动性跌比率(0.01代表1%)
     pub long_volume_rate: Decimal,                          // 主动性涨比率(0.01代表1%)
+
+    // 中控
+    pub cci_arc: Arc<Mutex<CentralControlInfo>>,            // 中控信息汇集
 }
 
 impl Quant {
-    pub async fn new(exchange: String, params: Params, exchange_params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>, running: Arc<AtomicBool>) -> Quant {
+    pub async fn new(exchange: String,
+                     params: Params,
+                     exchange_params: BTreeMap<String, String>,
+                     order_sender: Sender<Order>,
+                     error_sender: Sender<Error>,
+                     running: Arc<AtomicBool>,
+                     cci_arc: Arc<Mutex<CentralControlInfo>>) -> Quant {
         let symbol = params.pair.clone();
         let pairs: Vec<&str> = params.pair.split('_').collect();
         let mut quant_obj = Quant {
@@ -238,6 +248,7 @@ impl Quant {
             recall_max_count: 5000.to_usize().unwrap(),
             short_volume_rate: dec!(0.618),
             long_volume_rate: dec!(0.618),
+            cci_arc,
         };
         for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
@@ -597,7 +608,7 @@ impl Quant {
     }
 
     #[instrument(skip(self, depth, name, trace_stack), level="TRACE")]
-    pub fn _update_depth(&mut self, depth: Vec<Decimal>, name: String, mut trace_stack: TraceStack) {
+    pub async fn _update_depth(&mut self, depth: Vec<Decimal>, name: String, mut trace_stack: TraceStack) {
         // info!(?depth, ?name);
         trace_stack.on_depth();
 
@@ -673,13 +684,36 @@ impl Quant {
                         ts.on_before_send();
                         platform_rest_fb.command_order(orders, ts.clone()).await;
                     });
+
+                    // 更新中控账户相关信息
+                    {
+                        let mut now_balance = self.strategy.equity / self.used_pct;
+                        now_balance.rescale(4);
+
+                        let mut cci = self.cci_arc.lock().await;
+                        cci.now_balance = now_balance;
+                    }
+
                 }
             }
         }
+
+        {
+
+            let mut unrealized_pn_l = self.local_profit;
+            unrealized_pn_l.rescale(4);
+
+            let mut now_price = self.strategy.mp;
+            now_price.rescale(8);
+
+            let mut cci = self.cci_arc.lock().await;
+            cci.unrealized_pn_l = unrealized_pn_l;
+            cci.now_price = now_price;
+        }
     }
 
     #[instrument(skip(self, data), level="TRACE")]
-    pub fn update_position(&mut self, data: Vec<Position>) {
+    pub async fn update_position(&mut self, data: Vec<Position>) {
         if data.is_empty() {
             return;
         }
@@ -699,6 +733,27 @@ impl Quant {
             info!("更新本地仓位:{:?}", position);
             self.local_position = position;
         }
+
+        // 更新中控持仓相关的信息
+        {
+            let mut pos = self.local_position_by_orders.long_pos - self.local_position_by_orders.short_pos;
+            if !self.exchange.contains("spot") {
+                pos = self.local_position.long_pos - self.local_position.short_pos;
+            }
+            pos.rescale(8);
+
+            let mut entry_price;
+            if pos.gt(&Decimal::ZERO) {
+                entry_price = self.local_position_by_orders.long_avg;
+            } else {
+                entry_price = self.local_position_by_orders.short_avg;
+            }
+            entry_price.rescale(8);
+
+            let mut cci = self.cci_arc.lock().await;
+            cci.pos = pos;
+            cci.entry_price = entry_price;
+        }
     }
 
     #[instrument(skip(self, data, name), level="TRACE")]
@@ -842,7 +897,7 @@ impl Quant {
     }
 
     #[instrument(skip(self, data), level="TRACE")]
-    pub fn update_equity(&mut self, data: Account) {
+    pub async fn update_equity(&mut self, data: Account) {
         /*
            更新保证金信息
            合约一直更新
@@ -851,7 +906,7 @@ impl Quant {
         if self.exchange.contains("spot") {
             return;
         }
-        self.local_cash = data.balance * self.used_pct
+        self.local_cash = data.balance * self.used_pct;
     }
 
     #[instrument(skip(self), level="TRACE")]
@@ -1396,6 +1451,14 @@ impl Quant {
         } else {
             self.update_equity_rest_swap().await;
         }
+        // 更新中控账户相关信息
+        {
+            let mut now_balance = self.local_cash / self.used_pct;
+            now_balance.rescale(4);
+
+            let mut cci = self.cci_arc.lock().await;
+            cci.now_balance = now_balance;
+        }
         // 初始资金
         let start_cash = self.local_cash.clone();
         let start_coin = self.local_coin.clone();