Преглед изворни кода

网络延迟(100ms以上) 限制开单开发完成,待线上测试

JiahengHe пре 1 година
родитељ
комит
97f726f23e

+ 2 - 2
global/src/clear_position_result.rs

@@ -2,7 +2,7 @@ use serde_derive::{Deserialize, Serialize};
 
 #[derive(Debug, Serialize, Deserialize, Clone)]
 pub struct ClearPositionResult {
-    pub check_id: String,
+    pub r_id: String,
     pub clear_order_num: String,
     pub clear_order_str: String,
     pub clear_position_num: String,
@@ -14,7 +14,7 @@ pub struct ClearPositionResult {
 impl ClearPositionResult {
     pub fn new() -> ClearPositionResult {
         ClearPositionResult{
-            check_id: "".to_string(),
+            r_id: "".to_string(),
             clear_order_num: "0".to_string(),
             clear_order_str: "".to_string(),
             clear_position_num: "0".to_string(),

+ 2 - 2
global/src/params.rs

@@ -53,7 +53,7 @@ pub struct Params {
     // 运行模式 0.正常策略运行, 1.清理挂单及仓位
     pub run_mode: i8,
     // 机器人id
-    pub check_id: String,
+    pub r_id: String,
 }
 
 impl Params {
@@ -101,7 +101,7 @@ impl Params {
             log_level: "info".to_string(),
             port: call_port,
             run_mode: 0,
-            check_id: "-1".to_string()
+            r_id: "-1".to_string(),
         };
         Ok(params)
     }

+ 8 - 8
src/main.rs

@@ -47,7 +47,7 @@ fn read_params_json() -> Params {
     let mut call_port = 5555;
     // 运行模式 0.正常策略运行, 1.清理挂单及仓位
     let mut run_mode = 0;
-    let mut check_id = "-1".to_string();
+    let mut r_id = "-1".to_string();
 
     let args: Vec<String> = std::env::args().collect();
 
@@ -83,10 +83,10 @@ fn read_params_json() -> Params {
         }
 
         //上报ID
-        if arg.starts_with("--check_id=") {
+        if arg.starts_with("--r_id=") {
             let parts: Vec<&str> = arg.split('=').collect();
             if parts.len() == 2 {
-                check_id = parts[1].to_string();
+                r_id = parts[1].to_string();
             } else {
                 error!("启动失败,回执单id参数格式设置错误 --check_id=xxx!");
                 panic!("启动失败,回执单id参数格式设置错误 --check_id=xxx!");
@@ -98,12 +98,12 @@ fn read_params_json() -> Params {
     let mut params = Params::new_json(path, call_port).unwrap();
     if run_mode == 1 {
         params.run_mode = 1;
-        if check_id == "-1" {
-            error!("启动失败,缺少机器人id参数!");
-            panic!("启动失败,缺少机器人id参数!");
+        if r_id == "-1" {
+            error!("启动失败,缺少回执单id参数!");
+            panic!("启动失败,缺少回执单id参数!");
         }
     }
-    params.check_id = check_id;
+    params.r_id = r_id;
     return params;
 }
 
@@ -147,7 +147,7 @@ async fn main() {
         // core初始化动作
         let mut core_arc = clear_core_libs::init(params.clone(), ws_running.clone(), running.clone(), cci_arc.clone()).await;
         info!("开始执行清仓程序");
-        core_arc.exit(params.check_id).await;
+        core_arc.exit(params.r_id).await;
         info!("清仓程序执行完毕");
         // 强制退出
         std::process::exit(0);

+ 1 - 0
src/server.rs

@@ -6,6 +6,7 @@ use tokio::sync::Mutex;
 use tracing::{info};
 use global::cci::CentralControlInfo;
 
+// arcs
 #[derive(Clone)]
 struct Arcs {
     running: Arc<AtomicBool>,

+ 1 - 1
standard/src/coinex_swap.rs

@@ -510,7 +510,7 @@ impl Platform for CoinexSwap {
                     avg_price: Default::default(),
                     status: "NULL".to_string(),
                     order_type: "".to_string(),
-                    trace_stack:  TraceStack::new(0, Instant::now()).on_special("485 coinex_swap".to_string())
+                    trace_stack:  TraceStack::new(0, Instant::now()).on_special("513 coinex_swap".to_string())
                 };
             } else {
                 result = format_order_item(res_data_json, ct_val, "canceled");

+ 1 - 1
standard/src/handle_info.rs

@@ -258,7 +258,7 @@ pub fn make_special_depth(label: String, depth_asks: &mut Vec<MarketOrder>, dept
     }
 }
 
-pub fn format_depth(exchange: ExchangeEnum, res_data: &ResponseData) -> DepthParam {
+pub fn  format_depth(exchange: ExchangeEnum, res_data: &ResponseData) -> DepthParam {
     let depth_asks: Vec<MarketOrder>;
     let depth_bids: Vec<MarketOrder>;
     let t: Decimal;

+ 12 - 4
strategy/src/binance_usdt_swap.rs

@@ -85,16 +85,24 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
             // 将ticker数据转换为模拟深度
             let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BinanceSwap, &response);
             trace_stack.on_after_format();
-
-            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+            // 网络时间差
+            let mut distance = -1;
+            if !response.label.contains("ref") {
+                distance = response.time - special_depth.create_at/1000;
+            }
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await;
         }
         "depth" => {
             trace_stack.set_source("binance_usdt_swap.depth".to_string());
             // 将depth数据转换为模拟深度
             let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, &response);
             trace_stack.on_after_format();
-
-            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+            // 网络时间差
+            let mut distance = -1;
+            if !response.label.contains("ref") {
+                distance = response.time - special_depth.create_at/1000;
+            }
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await;
         }
         _ => {
             error!("未知推送类型");

+ 6 - 2
strategy/src/bitget_usdt_swap.rs

@@ -169,8 +169,12 @@ async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>,
             trace_stack.set_source("bitget_usdt_swap.books1".to_string());
             let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BitgetSwap, &response);
             trace_stack.on_after_format();
-
-            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+            // 网络时间差
+            let mut distance = -1;
+            if !response.label.contains("ref") {
+                distance = response.time - special_depth.create_at/1000;
+            }
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await;
         },
         "pong" => {},
         _ => {

+ 6 - 2
strategy/src/bybit_usdt_swap.rs

@@ -156,8 +156,12 @@ async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>, update_flag_u: &mut De
             // 处理ticker信息
             let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BybitSwap, &response);
             trace_stack.on_after_format();
-
-            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+            // 网络时间差
+            let mut distance = -1;
+            if !response.label.contains("ref") {
+                distance = response.time - special_depth.create_at/1000;
+            }
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await;
         }
         "orderbook" => {
             // let mut is_update = false;

+ 1 - 1
strategy/src/clear_core.rs

@@ -537,7 +537,7 @@ impl ClearCore {
 
         let mut result = self.check_position().await;
         // 设置机器人id
-        result.check_id = check_id;
+        result.r_id = check_id;
         info!("清仓程序结果 {:?}", result);
         // 判断是否有清仓,是否有异常
         if result.clear_position_num != "0" || result.clear_order_num != "0" || result.clear_other_err{

+ 6 - 2
strategy/src/coinex_usdt_swap.rs

@@ -88,8 +88,12 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
             trace_stack.set_source("coinex_usdt_swap.order_book".to_string());
             let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(CoinexSwap, &response);
             trace_stack.on_after_format();
-
-            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+            // 网络时间差
+            let mut distance = -1;
+            if !response.label.contains("ref") {
+                distance = response.time - special_depth.create_at/1000;
+            }
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await;
         }
         "balance.update" => {
             let account = standard::handle_info::HandleSwapInfo::handle_account_info(CoinexSwap, &response, run_symbol);

+ 40 - 2
strategy/src/core.rs

@@ -1,6 +1,6 @@
 use tokio::time::Instant;
 use std::cmp::max;
-use std::collections::{BTreeMap, HashMap};
+use std::collections::{BTreeMap, HashMap, VecDeque};
 use std::io::Error;
 use std::str::FromStr;
 use std::sync::{Arc};
@@ -115,6 +115,11 @@ pub struct Core {
     pub agg_market: Vec<Decimal>,
     pub ref_price: Vec<Vec<Decimal>>,
     pub predict: Decimal,
+
+    // 交易交易所行情网络延迟
+    pub time_distance: VecDeque<i64>,
+    // 是否下开仓单
+    pub is_open: bool
 }
 
 impl Core {
@@ -251,6 +256,8 @@ impl Core {
             agg_market: vec![],
             ref_price: vec![],
             predict: Default::default(),
+            time_distance: VecDeque::with_capacity(10),
+            is_open: true,
         };
         for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
@@ -510,6 +517,7 @@ impl Core {
                                                               &self.local_coin,
                                                               &self.ref_price,
                                                               &self.predict,
+                                                              &self.is_open,
                                                               &trace_stack.ins);
                         // trace_stack.on_after_strategy();
                         // 记录指令触发信息
@@ -613,11 +621,40 @@ impl Core {
             info!("{}", msg);
         }
     }
+    // 检查网络速度风控
+    pub fn check_avg_network_delay(&mut self){
+        if self.time_distance.is_empty() {
+            return;
+        }
+
+        let sum: i64 = self.time_distance.iter().sum();
+        let count = self.time_distance.len();
+
+        // 将总和转换为 f64 并计算平均值
+        let avg = sum / count as i64;
+        // 延迟超过100毫秒 不下开仓单
+        if avg > 100 {
+            self.is_open = false;
+            info!("不允许开单");
+            info!("time_distance: {:?}", self.time_distance);
+            info!("avg: {:?}", avg);
+        } else {
+            self.is_open = true;
+        }
+    }
 
     // #[instrument(skip(self, depth, name_ref, trace_stack), level="TRACE")]
-    pub async fn on_depth_update(&mut self, depth: &Vec<Decimal>, name_ref: &String, trace_stack: &mut TraceStack) {
+    pub async fn on_depth_update(&mut self, depth: &Vec<Decimal>, name_ref: &String, time_distance: i64, trace_stack: &mut TraceStack) {
         // 要从回调传入的深度信息中获取data.name
         let now_time = Utc::now().timestamp_millis();
+        if time_distance != -1 {
+            // 超过长度移除最早的元素
+            if self.time_distance.len() == 10 {
+                self.time_distance.pop_front();
+            }
+            self.time_distance.push_back(time_distance);
+            self.check_avg_network_delay();
+        }
         if self.market_update_time.contains_key(name_ref) && *self.market_update_time.get(name_ref).unwrap() != 0i64 {
             let interval = Decimal::from(now_time - self.market_update_time.get(name_ref).unwrap());
             if *self.market_update_interval.get(name_ref).unwrap() == dec!(0) {
@@ -665,6 +702,7 @@ impl Core {
                                                        &self.local_coin,
                                                        &self.ref_price,
                                                        &self.predict,
+                                                       &self.is_open,
                                                        &trace_stack.ins);
                 trace_stack.on_after_strategy();
 

+ 3 - 2
strategy/src/exchange_disguise.rs

@@ -115,7 +115,8 @@ pub async fn on_special_depth(core_arc: Arc<Mutex<Core>>,
                               update_flag_u: &mut Decimal,
                               label: &String,
                               trace_stack: &mut TraceStack,
-                              special_depth: &SpecialDepth) {
+                              special_depth: &SpecialDepth,
+                              time_distance: i64) {
     if special_depth.t > *update_flag_u {
         let mut core = core_arc.lock().await;
         trace_stack.on_after_unlock_core();
@@ -124,7 +125,7 @@ pub async fn on_special_depth(core_arc: Arc<Mutex<Core>>,
         core.depths.insert(label.clone(), special_depth.depth.clone());
 
         // 触发depth更新
-        core.on_depth_update(&(special_depth.depth), &label, trace_stack).await;
+        core.on_depth_update(&(special_depth.depth), &label, time_distance, trace_stack).await;
 
         core.local_depths.insert(special_depth.name.clone(), special_depth.depth.clone());
 

+ 13 - 4
strategy/src/gate_swap.rs

@@ -105,16 +105,25 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
             trace_stack.set_source("gate_usdt_swap.order_book".to_string());
             let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, &response);
             trace_stack.on_after_format();
-
-            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+            // 网络时间差
+            let mut distance = -1;
+            if !response.label.contains("ref") {
+                distance = response.time - special_depth.create_at/1000;
+            }
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await;
         }
         "futures.book_ticker" => {
             trace_stack.set_source("gate_usdt_swap.book_ticker".to_string());
             // 将ticker数据转换为模拟深度
             let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(GateSwap, &response);
             trace_stack.on_after_format();
-
-            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+            // 网络时间差
+            let mut distance = -1;
+            if !response.label.contains("ref") {
+                distance = response.time - special_depth.create_at/1000;
+                info!("response.time {} special_depth.create_at {}", response.time, special_depth.create_at);
+            }
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await;
         }
         "futures.balances" => {
             let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, &response, run_symbol);

+ 6 - 2
strategy/src/htx_usdt_swap.rs

@@ -152,8 +152,12 @@ async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>,
         trace_stack.set_source("htx_usdt_swap.depth".to_string());
         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(HtxSwap, &response);
         trace_stack.on_after_format();
-
-        on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+        // 网络时间差
+        let mut distance = -1;
+        if !response.label.contains("ref") {
+            distance = response.time - special_depth.create_at/1000;
+        }
+        on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await;
     } else {
         error!("未知推送类型");
         error!(?response);

+ 12 - 4
strategy/src/kucoin_swap.rs

@@ -116,16 +116,24 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
             trace_stack.set_source("kucoin_usdt_swap.level2".to_string());
             let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap, &response);
             trace_stack.on_after_format();
-
-            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await
+            // 网络时间差
+            let mut distance = -1;
+            if !response.label.contains("ref") {
+                distance = response.time - special_depth.create_at/1000;
+            }
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await
         }
         "tickerV2" => {
             trace_stack.set_source("kucoin_swap.tickerV2".to_string());
 
             let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(KucoinSwap, &response);
             trace_stack.on_before_network(special_depth.create_at.clone());
-
-            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await
+            // 网络时间差
+            let mut distance = -1;
+            if !response.label.contains("ref") {
+                distance = response.time - special_depth.create_at/1000;
+            }
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await
         }
         "symbolOrderChange" => {
             trace_stack.set_source("kucoin_swap.symbolOrderChange".to_string());

+ 24 - 7
strategy/src/strategy.rs

@@ -105,8 +105,9 @@ pub struct Strategy {
     pub trade_vol_24h_w: Decimal,                                   // 24小时成交额(单位:万)
     pub grid: Decimal,                                              // 网格数量
 
-    // 订单流相关
-    pub side: String,                                               // 当前主动性方向
+    // 速度限制,至少0.5秒才取消订单
+    pub prev_place_order_timestamp: i64,                            // 上次挂单的时间
+    pub min_cancel_interval_mills: i64,                             // 至少要挂这么久才允许撤销
 }
 
 impl Strategy {
@@ -198,7 +199,8 @@ impl Strategy {
             post_side: 0,
             trade_vol_24h_w: Default::default(),
             grid: Decimal::from(params.grid),
-            side: "normal".to_string(),
+            prev_place_order_timestamp: 0,
+            min_cancel_interval_mills: 500,
         };
 
         // 交易名字
@@ -417,8 +419,8 @@ impl Strategy {
         msg.push_str(format!("[推算利润 {:?}, 盈亏 {:?}%, 做多杠杆 {:?}%, 做多浮盈 {:?}%, 做空杠杆 {:?}%, 做空浮盈 {:?}%], ",
                              self.local_profit, self.profit, long_pos_leverage, self.long_pos_bias, short_pos_leverage, self.short_pos_bias).as_str());
         msg.push_str(format!("[请求 {:?}, 上限{:?}次/10秒], ", self._req_num_per_window, self.limit_order_requests_num).as_str());
-        msg.push_str(format!("[当前参数, 开仓 {:?}, 平仓 {:?}, 方向 {:?}, 参考 {:?}, 模式 {:?}], ",
-                             self.trade_open_dist, self.trade_close_dist, self.side, self.ref_name[self.ref_index], self.maker_mode).as_str());
+        msg.push_str(format!("[当前参数, 开仓 {:?}, 平仓 {:?}, 参考 {:?}, 模式 {:?}], ",
+                             self.trade_open_dist, self.trade_close_dist, self.ref_name[self.ref_index], self.maker_mode).as_str());
         msg.push_str(format!("[挂单列表,共{:?}单, ", o_num).as_str());
         for (_, order) in &self.local_orders {
             let mut order_value = order.amount * self.mp;
@@ -510,7 +512,7 @@ impl Strategy {
         // debug!(?mode, ?buy_start, ?sell_start, ?mp);
 
         // 开仓相关
-        avoid = min(dec!(0.001), open * dec!(0.05));
+        avoid = min(dec!(0.002), open * dec!(0.1));
         // 持仓偏移
         let buy_shift = Decimal::ONE + pos_rate[0] * grid;
         let sell_shift = Decimal::ONE + pos_rate[1] * grid;
@@ -1022,6 +1024,11 @@ impl Strategy {
     // 生成取消订单的指令
     // #[instrument(skip(self, command), level="TRACE")]
     pub fn _cancel_open(&self, command: &mut OrderCommand, local_orders: &HashMap<String, OrderInfo>) {
+        // 强制性时间间隔
+        if self.prev_place_order_timestamp + self.min_cancel_interval_mills > Utc::now().timestamp_millis() {
+            return;
+        }
+
         // debug!(?command);
         // 挂单范围
         let long_upper = self.open_dist[0];
@@ -1244,6 +1251,7 @@ impl Strategy {
                    local_coin: &Decimal,
                    ref_price: &Vec<Vec<Decimal>>,
                    predict: &Decimal,
+                   is_open: &bool,
                    _ins: &Instant) -> OrderCommand {
         self.on_time_print();
 
@@ -1272,7 +1280,11 @@ impl Strategy {
         // 下单指令处理逻辑
         self._cancel_open(&mut command, local_orders);              // 撤单命令处理
         self._post_close(&mut command, local_orders);               // 平仓单命令处理
-        self._post_open(&mut command, local_orders);                // 限价单命令处理
+        if *is_open {
+            self._post_open(&mut command, local_orders);                // 限价单命令处理
+        } else {
+            info!("没开单")
+        }
 
         self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
         self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter
@@ -1280,6 +1292,11 @@ impl Strategy {
         self._refresh_request_limit();                              // 刷新频率限制
         self._update_request_num(&mut command);                     // 统计刷新频率
 
+        // 如果提交了订单,则更新最后提交时间
+        if command.limits_open.len() != 0 {
+            self.prev_place_order_timestamp = Utc::now().timestamp_millis();
+        }
+
         // if command.limits_open.len() != 0 || command.limits_close.len() != 0 {
         //     let name = self.params.account_name.clone();
         //     // 参考卖价