Jelajahi Sumber

所有止损逻辑都添加完了,等待测试。

skyffire 1 tahun lalu
induk
melakukan
c4babc7508
5 mengubah file dengan 133 tambahan dan 18 penghapusan
  1. 22 1
      src/core_libs.rs
  2. 30 2
      standard/src/gate_swap.rs
  3. 13 4
      standard/src/lib.rs
  4. 17 1
      strategy/src/core.rs
  5. 51 10
      strategy/src/strategy.rs

+ 22 - 1
src/core_libs.rs

@@ -27,7 +27,7 @@ pub async fn init(params: Params,
     exchange_params.insert("pass_key".to_string(), params.pass_key.clone());
 
     let (order_sender, mut order_receiver) = mpsc::channel::<Order>(100);
-    let (price_order_sender, _price_order_receiver) = mpsc::channel::<PriceOrder>(100);
+    let (price_order_sender, mut price_order_receiver) = mpsc::channel::<PriceOrder>(100);
     let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
 
     let mut core_obj = Core::new(params.exchange.clone(),
@@ -103,6 +103,26 @@ pub async fn init(params: Params,
                 },
                 None => {
                     error!("Order channel has been closed!");
+                    break
+                }
+            }
+        }
+    });
+
+    let price_order_handler_core_arc = core_arc.clone();
+    tokio::spawn(async move {
+        loop {
+            tokio::time::sleep(Duration::from_millis(1)).await;
+
+            match price_order_receiver.recv().await {
+                Some(price_order) => {
+                    let mut core = price_order_handler_core_arc.lock().await;
+
+                    core.update_local_price_order(price_order).await;
+                },
+                None => {
+                    error!("Price order channel has been closed!");
+                    break
                 }
             }
         }
@@ -121,6 +141,7 @@ pub async fn init(params: Params,
                 },
                 None => {
                     error!("Error channel has been closed!");
+                    break
                 }
             }
         }

+ 30 - 2
standard/src/gate_swap.rs

@@ -11,7 +11,7 @@ use rust_decimal_macros::dec;
 use serde_json::{json};
 use tokio::spawn;
 use tokio::time::Instant;
-use tracing::{error, info, trace};
+use tracing::{error, trace};
 use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum, PriceOrder};
 use exchanges::gate_swap_rest::GateSwapRest;
 use global::trace_stack::TraceStack;
@@ -539,6 +539,7 @@ impl Platform for GateSwap {
             Ok(PriceOrder {
                 price_order_id: response_data.data["id"].to_string(),
                 order_id: "".to_string(),
+                status: "NEW".to_string(),
             })
         } else {
             Err(Error::new(ErrorKind::Other, response_data.to_string()))
@@ -555,6 +556,7 @@ impl Platform for GateSwap {
             Ok(PriceOrder {
                 price_order_id: order_id.to_string(),
                 order_id: "".to_string(),
+                status: "REMOVE".to_string(),
             })
         } else {
             Err(Error::new(ErrorKind::Other, response_data.to_string()))
@@ -639,7 +641,7 @@ impl Platform for GateSwap {
                         // 下止损单成功之后发到逻辑层处理。
                         match price_order_result {
                             Ok(mut price_order) => {
-                                price_order.order_id = order.id.clone();
+                                price_order.order_id = cid;
 
                                 self_clone.price_order_sender.send(price_order).await.unwrap();
                             }
@@ -699,6 +701,32 @@ impl Platform for GateSwap {
         // 等待所有任务完成
         let _: Result<Vec<_>, _> = futures.try_collect().await;
 
+        // 撤销止损订单
+        let mut cancel_op_handlers = vec![];
+        for item in order_command.cancel_price_order.keys() {
+            let op_id = order_command.cancel_price_order[item].price_order_id.clone();
+
+            let mut self_clone = self.clone();
+            let handle = spawn(async move {
+                let cancel_rst = self_clone.cancel_stop_loss_order(&op_id).await;
+
+                match cancel_rst {
+                    Ok(price_order) => {
+                        self_clone.price_order_sender.send(price_order).await.unwrap();
+                    }
+                    Err(err) => {
+                        error!("gate swap, 取消止损订单失败!{:?}", err);
+                    }
+                }
+            });
+
+            cancel_op_handlers.push(handle);
+        }
+
+        let futures = FuturesUnordered::from_iter(cancel_op_handlers);
+        // 等待所有任务完成
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+
         // 检查订单指令
         let mut check_handlers = vec![];
         for item in order_command.check.keys() {

+ 13 - 4
standard/src/lib.rs

@@ -52,13 +52,15 @@ pub enum PositionModeEnum {
 /// - `limits_close(HashMap<String, Vec<String>>)`: 可用交易币数量 `{"order_name": [c_id, o_id]}`;
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct OrderCommand {
-    // 取消订单指令,数据结构例子:
+    // 取消订单指令
     pub cancel: HashMap<String, Vec<String>>,
-    // 检验指令,数据结构例子:(暂没找到例子)
+    // 取消止损订单指令
+    pub cancel_price_order: HashMap<String, PriceOrder>,
+    // 检验指令
     pub check: HashMap<String, Vec<String>>,
-    // 限开指令,数据结构例子:(暂没找到例子)
+    // 限开指令
     pub limits_open: HashMap<String, Vec<String>>,
-    // 限收指令,数据结构例子:(暂没找到例子)
+    // 限收指令
     pub limits_close: HashMap<String, Vec<String>>,
 }
 
@@ -66,6 +68,7 @@ impl OrderCommand {
     pub fn new() -> OrderCommand {
         OrderCommand {
             cancel: Default::default(),
+            cancel_price_order: Default::default(),
             check: Default::default(),
             limits_open: Default::default(),
             limits_close: Default::default(),
@@ -300,10 +303,16 @@ impl Order {
     }
 }
 
+
+/// 止损单结构体
+/// - `price_order_id(String)`: 止损单id
+/// - `order_id(String)`: 对应的订单id,取的client_id
+/// - `status(String)`:  订单状态,可取:[REMOVE、NEW]
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct PriceOrder {
     pub price_order_id: String,
     pub order_id: String,
+    pub status: String,
 }
 
 /// Ticker结构体(市场行情)

+ 17 - 1
strategy/src/core.rs

@@ -301,6 +301,21 @@ impl Core {
         }
     }
 
+    // 更新本地止损订单
+    pub async fn update_local_price_order(&mut self, price_order: PriceOrder) {
+        match price_order.status.as_str() {
+            "NEW" => {
+                self.local_price_orders.insert(price_order.order_id.clone(), price_order);
+            },
+            "REMOVE" => {
+                self.local_price_orders.remove(&price_order.order_id);
+            }
+            _ => {
+                error!("错误的price order status:{:?}", price_order);
+            }
+        }
+    }
+
     // #[instrument(skip(self, data, trace_stack), level="TRACE")]
     pub async fn update_local_order(&mut self, data: OrderInfo, trace_stack: TraceStack) {
         // if data.filled != Decimal::ZERO {
@@ -337,7 +352,6 @@ impl Core {
         // 新增订单推送 仅需要cid oid信息
         if data.status == "NEW" {
             // 更新oid信息 更新订单 loceltime信息(尤其是查单返回new的情况 必须更新 否则会误触发风控)
-
             if self.local_orders.contains_key(&data.client_id) {
                 let mut order_info = self.local_orders.get(&data.client_id).unwrap().clone();
                 order_info.order_id = data.order_id;
@@ -499,6 +513,7 @@ impl Core {
                         self.strategy.local_time = Utc::now().timestamp_millis();
                         // trace_stack.on_before_strategy();
                         let mut order = self.strategy.on_tick(&self.local_orders,
+                                                              &self.local_price_orders,
                                                               &self.local_position_by_orders,
                                                               &self.agg_market,
                                                               &self.local_cash,
@@ -654,6 +669,7 @@ impl Core {
 
                 // 产生交易信号
                 let mut orders = self.strategy.on_tick(&self.local_orders,
+                                                       &self.local_price_orders,
                                                        &self.local_position_by_orders,
                                                        &self.agg_market,
                                                        &self.local_cash,

+ 51 - 10
strategy/src/strategy.rs

@@ -13,7 +13,7 @@ use reqwest::{Client};
 use tokio::spawn;
 use tokio::time::Instant;
 use global::params::Params;
-use standard::{OrderCommand};
+use standard::{OrderCommand, PriceOrder};
 
 #[derive(Debug)]
 pub struct Strategy {
@@ -565,6 +565,7 @@ impl Strategy {
             command.check.clear();
             command.limits_open.clear();
             command.limits_close.clear();
+            command.cancel_price_order.clear();
             msg = format!("请求频率溢出,程序禁止任何操作!({}/{})", self.request_count, self.limit_requests_num);
         } else if self.request_order_count >= self.limit_order_requests_num { // 100%超过下单频率,则不再进行平仓挂单
             command.limits_close.clear();
@@ -1225,6 +1226,44 @@ impl Strategy {
         }
     }
 
+    // 取消止损订单
+    pub fn _cancel_price_order(&mut self,
+                               command: &mut OrderCommand,
+                               local_orders: &HashMap<String, OrderInfo>,
+                               local_price_orders: &HashMap<String, PriceOrder>) {
+        // 1. 正在取消的订单对应的止损单要取消
+        for key in  command.cancel.keys() {
+            let client_id = command.cancel[key][0].clone();
+
+            for id in local_price_orders.keys() {
+                // 不存在的话,进行下一个止损单的判定
+                if !client_id.eq(id) {
+                    continue
+                }
+
+                // 如果在取消的订单存在对应的止损单
+                command.cancel_price_order.insert(id.clone(), local_price_orders[id].clone());
+            }
+        }
+
+        // 2. 止损单对应的订单不存在的也要取消
+        for id_x in local_price_orders.keys() {
+            let mut is_exists = false;
+
+            // 判断本地订单里面是不是有这个止损单对应的单
+            for id_y in local_orders.keys() {
+                if id_x.eq(id_y) {
+                    is_exists = true;
+                }
+            }
+
+            // 如果本地订单不存在这个止损单,说明此止损单不合法
+            if !is_exists {
+                command.cancel_price_order.insert(id_x.clone(), local_price_orders[id_x].clone());
+            }
+        }
+    }
+
     // 定时打印
     pub fn on_time_print(&mut self) {
         if self.local_time - self._print_time < self._print_interval {
@@ -1249,6 +1288,7 @@ impl Strategy {
     // 在满足条件后,返回非空command,否则返回一个空的command。原文的onTime。
     pub fn on_tick(&mut self,
                    local_orders: &HashMap<String, OrderInfo>,
+                   local_price_orders: &HashMap<String, PriceOrder>,
                    local_position: &LocalPosition,
                    agg_market: &Vec<Decimal>,
                    local_cash: &Decimal,
@@ -1281,15 +1321,16 @@ impl Strategy {
         self.generate_dist();
 
         // 下单指令处理逻辑
-        self._cancel_open(&mut command, local_orders);              // 撤单命令处理
-        self._post_close(&mut command, local_orders);               // 平仓单命令处理
-        self._post_open(&mut command, local_orders);                // 限价单命令处理
-
-        self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
-        self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter
-        self._check_request_limit(&mut command);                    // 限制频率,移除不合规则之订单,是一个filter
-        self._refresh_request_limit();                              // 刷新频率限制
-        self._update_request_num(&mut command);                     // 统计刷新频率
+        self._cancel_open(&mut command, local_orders);                              // 撤单命令处理
+        self._post_close(&mut command, local_orders);                               // 平仓单命令处理
+        self._post_open(&mut command, local_orders);                                // 限价单命令处理
+
+        self._check_local_orders(&mut command, local_orders);                       // 固定时间检查超时订单
+        self._update_in_cancel(&mut command, local_orders);                         // 更新撤单队列,是一个filter
+        self._cancel_price_order(&mut command, local_orders, local_price_orders);   // 取消止损单命令处理
+        self._check_request_limit(&mut command);                                    // 限制频率,移除不合规则之订单,是一个filter
+        self._refresh_request_limit();                                              // 刷新频率限制
+        self._update_request_num(&mut command);                                     // 统计刷新频率
 
         if command.limits_open.len() != 0 || command.limits_close.len() != 0 {
             let name = self.params.account_name.clone();