Quellcode durchsuchen

3.4.0_test版本,行情速率加入。

skyffire vor 1 Jahr
Ursprung
Commit
b864a30750

+ 21 - 2
standard/src/gate_swap_handle.rs

@@ -7,7 +7,7 @@ use tokio::time::Instant;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
-use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker};
+use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker, Trade};
 
 // 处理账号信息
 pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account {
@@ -150,4 +150,23 @@ pub fn format_depth_items(value: &Value) -> Vec<MarketOrder> {
         })
     }
     return depth_items;
-}
+}
+
+
+
+pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
+    let result = res_data.data.as_array().unwrap();
+    let mut trades = vec![];
+
+    for item in result {
+        trades.push(Trade {
+            id: item["id"].to_string(),
+            time: Decimal::from_i64(item["create_time_ms"].as_i64().unwrap()).unwrap(),
+            size: Decimal::from_i64(item["size"].as_i64().unwrap()).unwrap(),
+            price: Decimal::from_str(item["price"].as_str().unwrap().to_string().as_str()).unwrap(),
+            symbol: item["contract"].as_str().unwrap().to_string(),
+        })
+    }
+
+    return trades
+}

+ 13 - 2
standard/src/handle_info.rs

@@ -7,7 +7,7 @@ use tracing::{error, info};
 use exchanges::response_base::ResponseData;
 use global::public_params;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, kucoin_handle, coinex_swap_handle, htx_swap_handle};
+use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, kucoin_handle, coinex_swap_handle, htx_swap_handle, Trade};
 use crate::{Account, MarketOrder, Position, SpecialDepth, SpecialOrder, SpecialTicker};
 
 #[allow(dead_code)]
@@ -181,7 +181,6 @@ impl HandleSwapInfo {
             }
         }
     }
-
     // 处理深度信息
     pub fn handle_special_depth(exchange: ExchangeEnum, res_data: &ResponseData) -> SpecialDepth {
         let label = res_data.label.clone();
@@ -190,6 +189,18 @@ impl HandleSwapInfo {
         // 运算、组装
         make_special_depth(label, &mut format_depth.depth_asks, &mut format_depth.depth_bids, format_depth.t, format_depth.create_at)
     }
+    // 处理成交信息
+    pub fn trades_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Vec<Trade> {
+        match exchange {
+            ExchangeEnum::GateSwap => {
+                gate_swap_handle::format_trade_items(&res_data)
+            }
+            _ => {
+                error!("未找到该交易所!trades_handle: {:?}", exchange);
+                panic!("未找到该交易所!trades_handle: {:?}", exchange);
+            }
+        }
+    }
 }
 
 

+ 16 - 0
standard/src/lib.rs

@@ -4,6 +4,7 @@ use std::fmt::Formatter;
 use std::io::{Error};
 use async_trait::async_trait;
 use rust_decimal::Decimal;
+use serde::{Deserialize, Serialize};
 use serde_json::Value;
 use tokio::time::Instant;
 use global::trace_stack::TraceStack;
@@ -128,6 +129,21 @@ impl Account {
     }
 }
 
+/// 交易结构体(订单流)
+/// - `id(String)`: id
+/// - `time(Decimal)`: 交易更新时间戳(ms)
+/// - `size(Decimal)`: 交易量,负数是卖方
+/// - `price(Decimal)`: 成交价格
+/// - `symbol(String)`: 成交符号
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Trade {
+    pub id: String,
+    pub time: Decimal,
+    pub size: Decimal,
+    pub price: Decimal,
+    pub symbol: String
+}
+
 /// Depth结构体(市场深度)
 /// - `time(i64)`: 深度更新时间戳(ms);
 /// - `asks(Vec<MarketOrder>)`: 卖方深度列表;

+ 76 - 16
strategy/src/core.rs

@@ -8,7 +8,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 use chrono::{Utc};
 use rust_decimal::Decimal;
-use rust_decimal::prelude::{ToPrimitive};
+use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
 use rust_decimal_macros::dec;
 use tokio::spawn;
 use tokio::sync::mpsc::{Sender};
@@ -20,7 +20,7 @@ use global::cci::CentralControlInfo;
 use global::params::Params;
 use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
-use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
+use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker, Trade};
 use standard::exchange::{Exchange};
 use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap};
 
@@ -121,6 +121,8 @@ pub struct Core {
     pub low_speed_rate: Decimal,                            // 低速界限,0.5代表最大值的50%
     pub low_speed_trade_time_vec: Vec<Decimal>,             // 低速的交易时间数组,长度就是时间范围内有多少条交易
     pub low_speed_max_trades_count: usize,                  // 从开机到现在的一定时间范围内的最大交易次数
+    pub is_low_speed: bool,                                 // 当前是否是低速模式
+    pub low_speed_hard_limit: usize,                        // 这个值以下的都是低速模式
 }
 
 impl Core {
@@ -259,9 +261,11 @@ impl Core {
             predict: Default::default(),
             // 低速率改进的相关参数
             low_speed_time_range: Decimal::ONE,
-            low_speed_rate: dec!(0.168),
+            low_speed_rate: dec!(0.732),
             low_speed_trade_time_vec: vec![],
             low_speed_max_trades_count: 0,
+            is_low_speed: true,
+            low_speed_hard_limit: 60,
         };
         for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
@@ -317,6 +321,75 @@ impl Core {
         }
     }
 
+    // 移除过期的成交集
+    fn remove_old_elements(&mut self) {
+        // 获取当前时间的毫秒级时间戳
+        let current_time = Utc::now().timestamp_millis();
+
+        // 定义一分钟的毫秒数
+        let one_minute_in_millis = (self.low_speed_time_range.to_f64().unwrap() * 60f64 * 1000f64).to_i64().unwrap();
+
+        // 使用 while 循环检查并移除旧元素
+        while !self.low_speed_trade_time_vec.is_empty() {
+            // 获取第一个元素的时间戳
+            let first_time = self.low_speed_trade_time_vec[0].to_i64().unwrap();
+
+            // 计算时间差
+            let time_difference = current_time - first_time;
+
+            // 如果时间差超过一分钟,移除第一个元素
+            if time_difference > one_minute_in_millis {
+                self.low_speed_trade_time_vec.remove(0);
+            } else {
+                // 如果第一个元素没有超过一分钟,停止检查
+                break;
+            }
+        }
+    }
+
+    // 更新当前成交集
+    pub async fn update_trades(&mut self, trades: &Vec<Trade>) {
+        // 1. 放入当前的vec里面
+        for trade in trades {
+            self.low_speed_trade_time_vec.push(trade.time);
+        }
+
+        // 2. 移除过期的成交集
+        self.remove_old_elements();
+
+        // 3. 最大值确认
+        let trades_count_in_time_range = self.low_speed_trade_time_vec.len();
+        if trades_count_in_time_range > self.low_speed_max_trades_count {
+            self.low_speed_max_trades_count = trades_count_in_time_range;
+        }
+
+        // 4. 定义当前的速度
+        let prev_is_low_speed = self.is_low_speed;
+        self.is_low_speed = if trades_count_in_time_range < self.low_speed_hard_limit {
+            true
+        } else {
+            let d_count = Decimal::from_usize(trades_count_in_time_range).unwrap();
+            let d_max_count = Decimal::from_usize(self.low_speed_max_trades_count).unwrap();
+            let rate = d_count / d_max_count;
+
+            if rate < self.low_speed_rate {
+                true
+            } else {
+                false
+            }
+        };
+        if prev_is_low_speed != self.is_low_speed {
+            if self.is_low_speed {
+                info!("----------------------------行情速率变化:高->低");
+            } else {
+                info!("----------------------------行情速率变化:低->高");
+            }
+        }
+
+        // 5. 同步到策略模块
+        self.strategy.is_low_speed = self.is_low_speed;
+    }
+
     // #[instrument(skip(self, data, trace_stack), level="TRACE")]
     pub async fn update_local_order(&mut self, data: OrderInfo, trace_stack: TraceStack) {
         if data.filled != Decimal::ZERO {
@@ -916,19 +989,6 @@ impl Core {
         }
     }
 
-    pub async fn update_position_rest_swap(&mut self) {
-        let position = self.platform_rest.get_position().await;
-        match position {
-            Ok(val) => {
-                // info!("bybit_swap:定时获取的仓位信息");
-                self.update_position(val).await;
-            }
-            Err(err) => {
-                error!("bybit_swap:定时获取仓位信息错误!\nget_position:res_data={:?}", err);
-            }
-        }
-    }
-
     // #[instrument(skip(self), level="TRACE")]
     pub async fn update_equity_rest_spot(&mut self) {
         match self.platform_rest.get_spot_account().await {

+ 4 - 17
strategy/src/gate_swap.rs

@@ -148,23 +148,10 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
             core.update_position(positions).await;
         }
         "futures.trades" => {
-            // let mut core = core_arc_clone.lock().await;
-            // let str = data.label.clone();
-            // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
-            //     *max_buy = Decimal::ZERO;
-            //     *min_sell = Decimal::ZERO;
-            //     core.is_update.remove(str.as_str());
-            // }
-            // let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
-            // for trade in trades {
-            //     if trade.price > *max_buy || *max_buy == Decimal::ZERO{
-            //         *max_buy = trade.price
-            //     }
-            //     if trade.price < *min_sell || *min_sell == Decimal::ZERO{
-            //         *min_sell = trade.price
-            //     }
-            // }
-            // core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+            let trades = standard::handle_info::HandleSwapInfo::trades_handle(GateSwap, &response);
+
+            let mut core = core_arc_clone.lock().await;
+            core.update_trades(&trades).await;
         }
         _ => {
             error!("未知推送类型");

+ 20 - 6
strategy/src/strategy.rs

@@ -96,7 +96,9 @@ pub struct Strategy {
     pub close_dist: Vec<Decimal>,                                   // 平仓相关价格
 
     pub trade_close_dist: Decimal,                                  //
-    pub trade_open_dist: Decimal,                                   //
+    pub trade_open_dist: Decimal,                                   // 高速挂单距离
+    pub is_low_speed: bool,                                         // 是否是低速模式
+    pub trade_low_speed_open_dist: Decimal,                         // 低速时挂单距离
 
     pub ref_index: usize,                                           //
     pub predict: Decimal,                                           //
@@ -192,6 +194,8 @@ impl Strategy {
             close_dist: vec![],
             trade_close_dist: params.close,
             trade_open_dist: params.open,
+            trade_low_speed_open_dist: params.low_speed_open,
+            is_low_speed: true,
             ref_index: 0,
             predict: Default::default(),
             predict_alpha: Default::default(),
@@ -237,6 +241,14 @@ impl Strategy {
         return strategy;
     }
 
+    pub fn get_open_dist(&mut self) -> Decimal {
+        if self.is_low_speed {
+            self.trade_low_speed_open_dist
+        } else {
+            self.trade_open_dist
+        }
+    }
+
     // 更新当前strategy的各类信息
     // #[instrument(skip(self, trader_msg), level="TRACE")]
     pub fn _update_data(&mut self,
@@ -314,7 +326,8 @@ impl Strategy {
 
         // spread
         let temp_predict = predict * self.predict_alpha;
-        self.predict = utils::clip(temp_predict, -self.trade_open_dist, self.trade_open_dist);
+        let open_dist = self.get_open_dist();
+        self.predict = utils::clip(temp_predict, -open_dist, open_dist);
         // debug!(?self.predict);
 
         // 计算当前账户cash和coin
@@ -402,7 +415,8 @@ impl Strategy {
         self.daily_return = self.profit / run_time_day;
         self.daily_return.rescale(2);
         self.short_pos_bias.rescale(2);
-        self.trade_open_dist.rescale(6);
+        let mut open_dist = self.get_open_dist();
+        open_dist.rescale(6);
         self.trade_close_dist.rescale(6);
         self.predict.rescale(5);
         // 挂单列表长度
@@ -417,8 +431,8 @@ impl Strategy {
         msg.push_str(format!("[推算利润 {:?}, 盈亏 {:?}%, 做多杠杆 {:?}%, 做多浮盈 {:?}%, 做空杠杆 {:?}%, 做空浮盈 {:?}%], ",
                              self.local_profit, self.profit, long_pos_leverage, self.long_pos_bias, short_pos_leverage, self.short_pos_bias).as_str());
         msg.push_str(format!("[请求 {:?}, 上限{:?}次/10秒], ", self._req_num_per_window, self.limit_order_requests_num).as_str());
-        msg.push_str(format!("[当前参数, 开仓 {:?}, 平仓 {:?}, 方向 {:?}, 参考 {:?}, 模式 {:?}], ",
-                             self.trade_open_dist, self.trade_close_dist, self.side, self.ref_name[self.ref_index], self.maker_mode).as_str());
+        msg.push_str(format!("[参数, 开仓 {:?}, 平仓 {:?}, 方向 {:?}, 参考 {:?}, 模式 {:?}, 低速度: {:?}], ",
+                             self.open_dist, open_dist, self.side, self.ref_name[self.ref_index], self.maker_mode, self.is_low_speed).as_str());
         msg.push_str(format!("[挂单列表,共{:?}单, ", o_num).as_str());
         for (_, order) in &self.local_orders {
             let mut order_value = order.amount * self.mp;
@@ -472,7 +486,7 @@ impl Strategy {
     // 生成各类挂单价格,原文是gen_dist
     // #[instrument(skip(self), level="TRACE")]
     pub fn generate_dist(&mut self) {
-        let open = self.trade_open_dist;
+        let open = self.get_open_dist();
         let close = self.trade_close_dist;
         let pos_rate = vec![self.long_hold_rate, self.short_hold_rate];
         let ref_bp = self.ref_bp;