浏览代码

修正一些订阅问题,策略可以进入到策略层。

skyfffire 2 年之前
父节点
当前提交
62bbb6d30b
共有 6 个文件被更改,包括 98 次插入103 次删除
  1. 2 1
      config.toml.sample
  2. 6 5
      src/main.rs
  3. 0 49
      strategy/src/attach_.rs
  4. 1 2
      strategy/src/lib.rs
  5. 1 1
      strategy/src/model.rs
  6. 88 45
      strategy/src/quant.rs

+ 2 - 1
config.toml.sample

@@ -8,7 +8,8 @@ pair = "eth_usdt"
 open = 0.001
 close = 0.0002
 lever_rate = 1.0
-interval = 0.1
+# 延迟时间,改成ms级别
+interval = 100
 ref_exchange = ["binance_usdt_swap"]
 ref_pair = ["eth_usdt"]
 used_pct = 0.9

+ 6 - 5
src/main.rs

@@ -38,13 +38,14 @@ async fn main() {
     info!("quant初始化……");
     quant_arc.lock().await.before_trade().await;
     let ref_name = quant_arc.lock().await.ref_name[0].clone();
+    // 参考交易所
     quant::run_refer(quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
-
-    exchange_params.clear();
-    exchange_params.insert("access_key".to_string(), params.access_key);
-    exchange_params.insert("secret_key".to_string(), params.secret_key);
     // 交易交易所
-    quant::run_transaction(quant_arc.clone(), trade_name, vec![params.pair], exchange_params.clone()).await;
+    quant::run_transaction(quant_arc.clone(), trade_name, vec![params.pair.clone()], exchange_params.clone()).await;
+    // 启动定期触发的系统逻辑
+    quant::on_timer(quant_arc.clone());
+    // 启动策略逻辑
+    quant::run_strategy(quant_arc.clone());
 
     info!("quant初始化完成。");
 

+ 0 - 49
strategy/src/attach_.rs

@@ -1,49 +0,0 @@
-use std::sync::Arc;
-use std::time::Duration;
-use chrono::Utc;
-use rust_decimal::Decimal;
-use rust_decimal_macros::dec;
-use tokio::sync::Mutex;
-use tokio::task::JoinHandle;
-use tracing::{info};
-use crate::quant::Quant;
-
-// 定期触发的系统逻辑
-pub fn on_timer(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()> {
-    let quant_arc_clone = quant_arc.clone();
-
-    return tokio::spawn(async move {
-        tokio::time::sleep(Duration::from_secs(20)).await;
-
-        loop {
-            tokio::time::sleep(Duration::from_secs(10)).await;
-
-            let mut quant = quant_arc_clone.lock().await;
-            {
-                // 检查风控
-                quant.check_risk();
-
-                // 线程停止信号
-                if quant.mode_signal == 1 {
-                    return
-                }
-
-                // 计算预估成交额
-                let total_trade_value = quant.local_buy_value + quant.local_sell_value;
-                let time_diff = Decimal::from(Utc::now().timestamp_millis() - quant.start_time);
-                let trade_vol_24h = ((total_trade_value / time_diff) * dec!(86400));
-                quant.strategy.trade_vol_24h_w = (trade_vol_24h / dec!(10000));
-                quant.strategy.trade_vol_24h_w.rescale(2);
-
-                // 打印各类信息
-                quant.strategy._print_summary();
-                // TODO quant没有rest
-                // info!("Rest报单平均延迟{}ms", quant.rest.avg_delay);
-                // info!("Rest报单最高延迟{}ms", quant.rest.max_delay);
-                for (name, interval) in &quant.market_update_interval {
-                    info!("WS盘口{}行情平均更新间隔{}ms。", name, interval);
-                }
-            }
-        }
-    });
-}

+ 1 - 2
strategy/src/lib.rs

@@ -3,5 +3,4 @@ pub mod quant;
 mod model;
 mod strategy;
 mod predictor;
-mod utils;
-mod attach_;
+mod utils;

+ 1 - 1
strategy/src/model.rs

@@ -100,7 +100,7 @@ pub struct OriginalTradeGa {
     pub price: Decimal
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Debug)]
 pub struct OriginalTicker {
     // 更新ID
     pub u: i64,

+ 88 - 45
strategy/src/quant.rs

@@ -6,13 +6,14 @@ use std::sync::{Arc};
 use std::time::Duration;
 use chrono::{Timelike, Utc};
 use rust_decimal::Decimal;
+use rust_decimal::prelude::ToPrimitive;
 use rust_decimal_macros::dec;
 use serde_json::Value;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
 use tokio::sync::Mutex;
 use tokio::task::JoinHandle;
 use tokio::time::sleep;
-use tracing::{error, info, warn};
+use tracing::{debug, error, info, warn};
 use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
 use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
@@ -235,7 +236,7 @@ impl Quant {
                 price_alpha.push(dec!(1.0));
             }
         }
-        println!("价格系数:{:?}", price_alpha);
+        info!("价格系数:{:?}", price_alpha);
         quant_obj.predictor = Predictor::new(quant_obj.ref_name.len())
             .alpha(price_alpha)
             .gamma(params.gamma);
@@ -276,7 +277,7 @@ impl Quant {
                         }
                     },
                     Err(e) => {
-                        println!("订单回执消费失败!{}", e);
+                        info!("订单回执消费失败!{}", e);
                         return;
                     }
                 }
@@ -325,7 +326,7 @@ impl Quant {
             if self.local_orders_backup.contains_key(&data.client_id) {
                 // 不在已处理cid缓存队列中 说明还没参与过仓位计算 则执行订单计算
                 if self.handled_orders_cid.contains(&data.client_id) {
-                    println!("订单已经参与过仓位计算 拒绝重复进行计算, 订单号:{}", data.client_id);
+                    info!("订单已经参与过仓位计算 拒绝重复进行计算, 订单号:{}", data.client_id);
                 } else {
                     // 添加进已处理队列
                     self.handled_orders_cid.push(data.client_id.clone());
@@ -399,7 +400,7 @@ impl Quant {
                                 self.local_cash += filled * filled_price - fee;
                                 self.local_coin -= filled;
                             } else {
-                                println!("错误的仓位方向{}", side);
+                                info!("错误的仓位方向{}", side);
                             }
                         } else { // 合约订单流仓位计算
                             if side == "kd" { // buy 开多
@@ -442,14 +443,14 @@ impl Quant {
                                     self.local_position_by_orders.short_avg = Decimal::ZERO;
                                 }
                             } else {
-                                println!("错误的仓位方向{}", side);
+                                info!("错误的仓位方向{}", side);
                             }
                             // 统计合约交易手续费 正fee为扣手续费 负fee为返佣
                             if data.fee > Decimal::ZERO {
                                 self.local_profit -= data.fee;
                             }
                         }
-                        println!("更新推算仓位 {:?}", self.local_position_by_orders);
+                        info!("更新推算仓位 {:?}", self.local_position_by_orders);
                         // 本地计算利润
                         self._print_local_trades_summary();
                     }
@@ -467,25 +468,25 @@ impl Quant {
                             self._update_local_orders(&order);
                             //交易所处理订单信号
                             let platform_rest_fb = self.platform_rest.clone_box();
-                            println!("订单指令:{:?}", order);
+                            info!("订单指令:{:?}", order);
                             tokio::spawn(async move{
-                                println!("订单指令:{:?}", order);
+                                info!("订单指令:{:?}", order);
                                 platform_rest_fb.command_order(order).await;
                             });
                         }
                     }
                 }
             } else {
-                println!("订单不属于本策略 拒绝进行仓位计算: {}", data.client_id);
+                info!("订单不属于本策略 拒绝进行仓位计算: {}", data.client_id);
             }
             if self.local_orders.contains_key(&data.client_id) {
-                println!("删除本地订单, client_id:{}", data.client_id);
+                info!("删除本地订单, client_id:{}", data.client_id);
                 self.local_orders.remove(&data.client_id);
             } else {
-                println!("该订单不在本地挂单表中, client_id:{}", data.client_id);
+                info!("该订单不在本地挂单表中, client_id:{}", data.client_id);
             }
         } else {
-            println!("未知的订单事件类型:{:?}", data);
+            info!("未知的订单事件类型:{:?}", data);
         }
     }
 
@@ -501,7 +502,7 @@ impl Quant {
             let realized = local_sell_value - local_buy_value;
             let local_profit = (unrealized + realized).round_dp(5);
             self.strategy.local_profit = local_profit;
-            println!("买量 {},卖量 {},买额{},卖额{}", local_buy_amount, local_sell_amount, local_buy_value, local_sell_value);
+            info!("买量 {},卖量 {},买额{},卖额{}", local_buy_amount, local_sell_amount, local_buy_value, local_sell_value);
         }
     }
 
@@ -510,31 +511,32 @@ impl Quant {
         // 检查 ticker 行情
         for i in &self.ref_name {
             if self.tickers.is_empty() || !self.tickers.contains_key(i) {
-                println!("参考盘口ticker未准备好");
+                info!("513参考盘口ticker未准备好");
                 return;
             } else {
                 if self.tickers.get(i).unwrap().buy == dec!(0) || self.tickers.get(i).unwrap().sell == dec!(0) {
-                    println!("参考盘口ticker未准备好");
+                    info!("517参考盘口ticker未准备好");
                     return;
                 }
             }
         }
         if self.tickers.contains_key(&self.trade_name) {
             if self.tickers.get(&self.trade_name).unwrap().buy == dec!(0) || self.tickers.get(&self.trade_name).unwrap().sell == dec!(0) {
-                println!("参考盘口ticker未准备好");
+                let trade_ticker = self.tickers.get(&self.trade_name).unwrap();
+                info!("524参考盘口ticker未准备好");
                 return;
             }
         } else {
-            println!("交易盘口ticker未准备好");
+            info!("528交易盘口ticker未准备好");
             return;
         }
         // 检查 market 行情
         let all_market: Vec<Decimal> = self.get_all_market_data();
         if all_market.len() != LENGTH * (1usize + self.ref_num as usize) {
-            println!("聚合行情未准备好");
+            info!("聚合行情未准备好");
             return;
         } else {
-            println!("聚合行情准备就绪");
+            info!("聚合行情准备就绪");
             self.trade_msg.market = all_market;
             self.predictor.market_info_handler(&self.trade_msg.market);
         }
@@ -598,14 +600,14 @@ impl Quant {
                 // 产生交易信号
                 let orders = self.strategy.on_time(&self.trade_msg);
                 if orders.is_not_empty() {
-                    println!("触发onTick");
+                    info!("触发onTick");
                     self._update_local_orders(&orders);
 
                     //异步交易所处理订单信号
                     let platform_rest_fb = self.platform_rest.clone_box();
-                    println!("订单指令:{:?}", orders);
+                    info!("订单指令:{:?}", orders);
                     tokio::spawn(async move{
-                        println!("订单指令:{:?}", orders);
+                        info!("订单指令:{:?}", orders);
                         platform_rest_fb.command_order(orders).await;
                     });
                 }
@@ -630,7 +632,7 @@ impl Quant {
         // 更新仓位信息
         if position != self.local_position {
             self.local_position = position;
-            println!("更新本地仓位:{:?}", self.local_position);
+            info!("更新本地仓位:{:?}", self.local_position);
         }
     }
 
@@ -763,7 +765,7 @@ impl Quant {
                 self.market = val
             },
             Err(e) => {
-                println!("获取市场信息错误: {:?}", e);
+                info!("获取市场信息错误: {:?}", e);
             }
         }
     }
@@ -793,7 +795,7 @@ impl Quant {
                 self.local_cash = val.balance * self.used_pct
             },
             Err(e) => {
-                println!("获取账户信息错误: {:?}", e);
+                info!("获取账户信息错误: {:?}", e);
             }
         }
     }
@@ -979,14 +981,14 @@ impl Quant {
             // 停止程序
             return false;
         }
-        println!("初始cash: {start_cash} 初始coin: {start_coin}");
+        info!("初始cash: {start_cash} 初始coin: {start_coin}");
         // 初始化策略基础信息
         if mp <= dec!(0) {
             self.exit_msg = format!("{}{}", "初始价格获取错误: ", mp);
             // 停止程序
             return false;
         } else {
-            println!("初始价格为 {}", mp);
+            info!("初始价格为 {}", mp);
         }
         self.strategy.mp = mp.clone();
         self.strategy.start_cash = start_cash.clone();
@@ -1010,8 +1012,8 @@ impl Quant {
             // 停止程序
             return false;
         } else {
-            println!("数量精度 {}", self.strategy.step_size);
-            println!("价格精度 {}", self.strategy.tick_size);
+            info!("数量精度 {}", self.strategy.step_size);
+            info!("价格精度 {}", self.strategy.tick_size);
         }
         let grid = Decimal::from(self.params.grid.clone());
         // 计算下单数量
@@ -1027,7 +1029,7 @@ impl Quant {
             short_one_hand_value = start_cash * self.params.lever_rate / grid;
             short_one_hand_amount = (short_one_hand_value / mp / self.strategy.step_size).floor() * self.strategy.step_size;
         }
-        println!("最低单手交易下单量为 buy: {}, sell: {}", long_one_hand_amount, short_one_hand_amount);
+        info!("最低单手交易下单量为 buy: {}, sell: {}", long_one_hand_amount, short_one_hand_amount);
         let hand_min_limit = Decimal::new(20, 0);
         if (long_one_hand_amount.is_zero() && short_one_hand_amount.is_zero()) ||
             (long_one_hand_value < hand_min_limit && short_one_hand_value < hand_min_limit) {
@@ -1056,16 +1058,15 @@ fn parse_json_array(json: &str) -> serde_json::Result<Vec<Value>> {
     serde_json::from_str(json)
 }
 
-pub async fn run_stratey(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
-    let quant_arc_clone = quant_arc.clone();
+pub fn run_strategy(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
     return tokio::spawn(async move {
         //定期触发策略
         info!("定时触发器启动");
         info!("前期准备完成");
         sleep(Duration::from_secs(10)).await;
         loop {
-            let start_time = Utc::now().second();
-            let mut quant = quant_arc_clone.lock().await;
+            let start_time = Utc::now().timestamp_millis();
+            let mut quant = quant_arc.lock().await;
             if quant.ready == 1 {
                 // 更新交易信息集合
                 quant.update_trade_msg();
@@ -1107,8 +1108,8 @@ pub async fn run_stratey(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
                 quant.check_ready();
             }
             // 计算耗时并进行休眠
-            let pass_time = Utc::now().second() - start_time;
-            sleep(Duration::from_secs(pass_time as u64)).await;
+            let pass_time = Utc::now().timestamp_millis() - start_time;
+            sleep(Duration::from_millis(quant.interval.to_u64().unwrap() - pass_time.to_u64().unwrap())).await;
         }
     });
 }
@@ -1146,7 +1147,7 @@ pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols
     tokio::spawn( async move {
         let mut gate_exc = GateSwapWs::new_label(name, false, exchange_params,
                                                GateWsType::PublicAndPrivate("usdt".to_string()), tx);
-        gate_exc.set_subscribe(vec![GateSubscribeType::PuFuturesTrades]);
+        gate_exc.set_subscribe(vec![GateSubscribeType::PuFuturesTrades, GateSubscribeType::PuFuturesOrderBook]);
         gate_exc.custom_subscribe(symbols_one.clone()).await;
     });
 
@@ -1166,6 +1167,7 @@ pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols
                             {
                                 let mut quant = bot_arc_clone.lock().await;
                                 quant._update_depth(depth.depth.clone(), depth.name.clone());
+                                quant._update_ticker(depth.ticker.clone(), depth.name.clone());
                                 quant.local_depths.insert(depth.name, depth.depth);
                             }
                         } else if data.channel == "futures.balances" {
@@ -1245,7 +1247,7 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<S
     let (tx, mut rx) = channel(100);
     tokio::spawn( async move {
         let mut ba_exc = BinanceSwapWs::new_label(name, false, exchange_param, BinanceWsType::PublicAndPrivate, tx);
-        ba_exc.set_subscribe(vec![ BinanceSubscribeType::PuBookTicker]);
+        ba_exc.set_subscribe(vec![BinanceSubscribeType::PuBookTicker, BinanceSubscribeType::PuAggTrade]);
         ba_exc.custom_subscribe(symbol.clone()).await;
     });
     tokio::spawn(async move {
@@ -1313,6 +1315,46 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<S
     });
 }
 
+// 定期触发的系统逻辑
+pub fn on_timer(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()> {
+    let quant_arc_clone = quant_arc.clone();
+
+    return tokio::spawn(async move {
+        tokio::time::sleep(Duration::from_secs(20)).await;
+
+        loop {
+            tokio::time::sleep(Duration::from_secs(10)).await;
+
+            let mut quant = quant_arc_clone.lock().await;
+            {
+                // 检查风控
+                quant.check_risk();
+
+                // 线程停止信号
+                if quant.mode_signal == 1 {
+                    return
+                }
+
+                // 计算预估成交额
+                let total_trade_value = quant.local_buy_value + quant.local_sell_value;
+                let time_diff = Decimal::from(Utc::now().timestamp_millis() - quant.start_time);
+                let trade_vol_24h = ((total_trade_value / time_diff) * dec!(86400));
+                quant.strategy.trade_vol_24h_w = (trade_vol_24h / dec!(10000));
+                quant.strategy.trade_vol_24h_w.rescale(2);
+
+                // 打印各类信息
+                quant.strategy._print_summary();
+                // TODO quant没有rest
+                // info!("Rest报单平均延迟{}ms", quant.rest.avg_delay);
+                // info!("Rest报单最高延迟{}ms", quant.rest.max_delay);
+                for (name, interval) in &quant.market_update_interval {
+                    info!("WS盘口{}行情平均更新间隔{}ms。", name, interval);
+                }
+            }
+        }
+    });
+}
+
 
 
 #[cfg(test)]
@@ -1322,6 +1364,7 @@ mod tests {
     use rust_decimal::Decimal;
     use rust_decimal_macros::dec;
     use tokio::sync::mpsc::{channel, Receiver, Sender};
+    use tracing::info;
     use standard::Order;
 
     use crate::params::Params;
@@ -1341,12 +1384,14 @@ mod tests {
         // let mut quant: Quant = Quant::new(_params, params_exc,order_tx, err_tx).await;
         // let is_ok = quant.before_trade().await;
         //
-        // println!("结果: {}", is_ok)
+        // info!("结果: {}", is_ok)
 
     }
 
     #[tokio::test]
     async fn test_time(){
+        global::log_utils::init_log_with_trace();
+
         let start_cash:Decimal = dec!(1.11);
         let start_coin:Decimal = dec!(0.12);
         let lever_rate:Decimal = dec!(10);
@@ -1367,10 +1412,8 @@ mod tests {
         //     short_one_hand_amount = (short_one_hand_value/mp/step_size).trunc()*step_size;
         // }
 
-        println!("long_one_hand_value:{:?}, short_one_hand_value: {:?}", long_one_hand_value, short_one_hand_value);
-        println!("long_one_hand_amount:{:?}, short_one_hand_amount: {:?}", long_one_hand_amount, short_one_hand_amount);
-        println!("{:?},{:?},{:?},{:?},{:?}", short_one_hand_value, mp, step_size, (short_one_hand_value/mp/step_size), ((short_one_hand_value/mp)/step_size).floor())
+        info!("long_one_hand_value:{:?}, short_one_hand_value: {:?}", long_one_hand_value, short_one_hand_value);
+        info!("long_one_hand_amount:{:?}, short_one_hand_amount: {:?}", long_one_hand_amount, short_one_hand_amount);
+        info!("{:?},{:?},{:?},{:?},{:?}", short_one_hand_value, mp, step_size, (short_one_hand_value/mp/step_size), ((short_one_hand_value/mp)/step_size).floor())
     }
-
-
 }