JiahengHe 2 лет назад
Родитель
Сommit
bb70b9ea48
4 измененных файлов с 203 добавлено и 34 удалено
  1. 6 1
      exchanges/src/gate_swap_ws.rs
  2. 9 1
      src/main.rs
  3. 9 1
      strategy/src/model.rs
  4. 179 31
      strategy/src/quant.rs

+ 6 - 1
exchanges/src/gate_swap_ws.rs

@@ -300,7 +300,12 @@ impl GateSwapWs {
                     if res_data.code == "-200" {
                         trace!("订阅成功:{:?}", res_data.data);
                     } else {
-                        self.sender.send(res_data).await.unwrap();
+                        // self.sender.send(res_data).await.unwrap();
+                        let sender = self.sender.clone();
+                        tokio::spawn(async move {
+                            sender.send(res_data).await.unwrap();
+                        });
+                        tokio::spawn(async move {});
 
                         //主动ping 服务器
                         let new_time = chrono::Utc::now().timestamp_millis();

+ 9 - 1
src/main.rs

@@ -32,12 +32,20 @@ async fn main() {
     let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
 
     let mut quant_obj = Quant::new(params.clone(), exchange_params.clone(), order_sender.clone(), error_sender.clone()).await;
+    let trade_name = quant_obj.trade_name.clone();
     let mut quant_arc = Arc::new(Mutex::new(quant_obj));
 
     info!("quant初始化……");
     quant_arc.lock().await.before_trade().await;
     let ref_name = quant_arc.lock().await.ref_name[0].clone();
-    quant::run_refer(quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
+    // quant::run_refer(quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
+
+    exchange_params.clear();
+    exchange_params.insert("access_key".to_string(), "4181c882718a95e72122ac1d52c88533".to_string());
+    exchange_params.insert("secret_key".to_string(), "de82d1507b843ff08d81a0e9b878b721359f274937216b307834b570b676fa3c".to_string());
+    // 交易交易所
+    quant::run_transaction(quant_arc.clone(), trade_name, vec![params.pair], exchange_params.clone()).await;
+
     info!("quant初始化完成。");
 
     let order_handler_quant_arc = quant_arc.clone();

+ 9 - 1
strategy/src/model.rs

@@ -85,13 +85,21 @@ pub struct OrderInfo{
 }
 
 #[derive(Serialize, Deserialize)]
-pub struct OriginalTrade {
+pub struct OriginalTradeBa {
     // 成交价格
     pub p: Decimal,
     // 买方是否是做市方。如true,则此次成交是一个主动卖出单,否则是一个主动买入单。
     pub m: bool
 }
 
+#[derive(Serialize, Deserialize)]
+pub struct OriginalTradeGa {
+    // 成交价格
+    pub size: Decimal,
+    // 买方是否是做市方。如true,则此次成交是一个主动卖出单,否则是一个主动买入单。
+    pub price: Decimal
+}
+
 #[derive(Serialize, Deserialize)]
 pub struct OriginalTicker {
     // 更新ID

+ 179 - 31
strategy/src/quant.rs

@@ -7,18 +7,19 @@ use std::time::Duration;
 use chrono::{Utc};
 use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
+use serde_json::Value;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
 use tokio::sync::Mutex;
 use tracing::{error, info};
 use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
-use exchanges::binance_usdt_swap_ws::BinanceUsdtSwapWs;
+use exchanges::gate_swap_rest::GateSwapRest;
+use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
 use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
-use standard::{Depth, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialDepth, SpecialTicker, Ticker};
+use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialDepth, SpecialTicker, Ticker};
 use standard::exchange::Exchange;
 use standard::exchange::ExchangeEnum::GateSwap;
-use standard::binance_handle::handle_special_depth;
 
-use crate::model::{LocalPosition, OrderInfo, OriginalTicker, OriginalTrade, TraderMsg};
+use crate::model::{LocalPosition, OrderInfo, OriginalTicker, OriginalTradeBa, OriginalTradeGa, TraderMsg};
 use crate::params::Params;
 use crate::predictor::Predictor;
 use crate::strategy::Strategy;
@@ -96,7 +97,8 @@ pub struct Quant {
     // 市场最优买卖价
     pub max_buy_min_sell_cache: HashMap<String, Vec<Decimal>>,
     // 最近一次的depth信息
-    pub local_depths: HashMap<String, Vec<Decimal>>
+    pub local_depths: HashMap<String, Vec<Decimal>>,
+    pub is_update: HashMap<String, bool>
 }
 
 struct MarketData{
@@ -185,7 +187,8 @@ impl Quant {
             },
             platform_rest: Exchange::new(GateSwap, symbol, false, exchange_params, order_sender, error_sender).await,
             max_buy_min_sell_cache: Default::default(),
-            local_depths: Default::default()
+            local_depths: Default::default(),
+            is_update: Default::default()
         };
         for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
@@ -278,6 +281,11 @@ impl Quant {
             }
         });
     }
+    pub fn update_order(&mut self, data: Vec<OrderInfo>){
+        for order in data {
+            self.update_local_order(order);
+        }
+    }
 
     pub fn update_local_order(&mut self, data: OrderInfo) {
         /*
@@ -566,7 +574,6 @@ impl Quant {
         } else if name == self.ref_name[0] { // 判断是否为当前跟踪的盘口
             // 判断是否需要触发ontick 对行情进行过滤
             // 过滤条件 价格变化很大 时间间隔很长
-            info!(?self.depths);
             let mut flag = 0;
             let bid_price_rate = (depth[BID_PRICE_INDEX] - self.depths.get(&name).unwrap()[BID_PRICE_INDEX]).abs() / depth[BID_PRICE_INDEX];
             let ask_price_rate = (depth[ASK_PRICE_INDEX] - self.depths.get(&name).unwrap()[ASK_PRICE_INDEX]).abs() / depth[ASK_PRICE_INDEX];
@@ -727,19 +734,22 @@ impl Quant {
     }
 
     // 获取深度信息
-    pub fn get_all_market_data(&self) -> Vec<Decimal> {
+    pub fn get_all_market_data(&mut self) -> Vec<Decimal> {
         // 只能定时触发 组合市场信息=交易盘口+参考盘口
         let mut market: Vec<Decimal> = Vec::new();
-        // TODO: 获取交易盘口市场信息
+        // 获取交易盘口市场信息
         let mut data: Vec<Decimal> = self.local_depths.get(&self.trade_name).unwrap().clone();
+        self.is_update.insert(self.symbol.clone(), true);
         let mut max_min_price = self.max_buy_min_sell_cache.get(&self.trade_name).unwrap().clone();
         market.append(&mut data);
         market.append(&mut max_min_price);
 
         for i in &self.ref_name {
-            // TODO: 获取参考盘口市场信息
+            // 获取参考盘口市场信息
             data = self.local_depths.get(i).unwrap().clone();
+            self.is_update.insert(i.clone(), true);
             max_min_price =  self.max_buy_min_sell_cache.get(i).unwrap().clone();
+            data.append(&mut max_min_price);
             market.append(&mut data);
         }
         return market;
@@ -755,8 +765,19 @@ impl Quant {
             }
         }
     }
+    pub fn update_equity(&mut self, data: Account) {
+        /*
+           更新保证金信息
+           合约一直更新
+           现货只有当出现异常时更新
+       */
+        if self.exchange.contains("spot") {
+            return;
+        }
+        self.local_cash = data.balance * self.used_pct
+    }
 
-    pub async fn update_equity(&mut self) {
+    pub async fn update_equity_rest(&mut self) {
         match self.platform_rest.get_account().await {
             Ok(val) => {
                 /*
@@ -784,7 +805,7 @@ impl Quant {
         let ticker = self.platform_rest.get_ticker().await.expect("获取价格信息异常!");
         let mp = (ticker.buy + ticker.sell) / Decimal::TWO;
         // 获取账户信息
-        self.update_equity().await;
+        self.update_equity_rest().await;
         // 初始资金
         let start_cash = self.local_cash.clone();
         let start_coin = self.local_cash.clone();
@@ -863,22 +884,145 @@ impl Quant {
         */
     }
 }
-pub async fn run_transaction(quant: Arc<Mutex<Quant>>, name: String, symbol: Vec<String>, exchange_params: BTreeMap<String, String>) {
 
+fn parse_json_array(json: &str) -> serde_json::Result<Vec<Value>> {
+    serde_json::from_str(json)
+}
+pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
+    let (tx, mut rx) = channel(100);
+    let gate_exc = GateSwapRest::new(false, exchange_params.clone());
+    // 获取user_id
+    let res_data = gate_exc.wallet_fee().await;
+    info!(?res_data);
+    if res_data.code != "200"{
+        error!("获取gate交易所参数 user_id 失败, 交易交易所启动失败!");
+        return;
+    }
+    let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
+    let user_id = wallet_obj["user_id"].to_string();
+    let symbols_one = symbols.clone();
+    // 获取乘数(计价货币兑换为结算货币的乘数)
+    let response = gate_exc.get_market_details("usdt".to_string()).await;
+    if(response.code != "200"){
+        error!("获取gate交易所参数 multiplier 失败, 交易交易所启动失败!");
+        return;
+    }
+    let contract_obj :Vec<Value> = parse_json_array(&response.data).unwrap();
+    let mut multiplier =  Decimal::ZERO;
+    for val in contract_obj {
+        if symbols[0].to_uppercase() == val["name"].as_str().unwrap(){
+            let num = val["quanto_multiplier"].as_str().unwrap();
+            multiplier = Decimal::from_str(num).unwrap();
+        }
+    }
+    if multiplier == Decimal::ZERO {
+        error!("获取gate交易所参数 multiplier 为0!");
+        return;
+    }
+    tokio::spawn( async move {
+        let mut gate_exc = GateSwapWs::new_label(name, false, exchange_params,
+                                               GateWsType::PublicAndPrivate("usdt".to_string()), tx);
+        gate_exc.set_subscribe(vec![GateSubscribeType::PuFuturesTrades]);
+        gate_exc.custom_subscribe(symbols_one.clone()).await;
+    });
+    tokio::spawn(async move {
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        let run_symbol = symbols.clone()[0].clone();
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        loop {
+            match rx.recv().await {
+                Some(data) => {
+                    if data.code == "200" {
+                        if data.channel == "futures.order_book" {
+                            let depth = standard::gate_handle::handle_special_depth(data);
+                            {
+                                let mut quant = bot_arc_clone.lock().await;
+                                quant._update_depth(depth.depth.clone(), depth.name.clone());
+                                quant.local_depths.insert(depth.name, depth.depth);
+                            }
+                        } else if data.channel == "futures.balances" {
+                            let account = standard::gate_handle::handle_account_info(data, run_symbol.clone());
+                            {
+                                let mut quant = bot_arc_clone.lock().await;
+                                quant.update_equity(account);
+                            }
+                        } else if data.channel == "futures.orders" {
+                            let orders = standard::gate_handle::handle_order(data, multiplier.clone());
+                            let mut order_infos:Vec<OrderInfo> = Vec::new();
+                            for order in orders.order{
+                                let mut order_info = OrderInfo{
+                                    symbol: "".to_string(),
+                                    amount: Default::default(),
+                                    side: "".to_string(),
+                                    price: Default::default(),
+                                    client_id: order.custom_id,
+                                    filled_price: Default::default(),
+                                    filled: Default::default(),
+                                    order_id: order.id,
+                                    local_time: 0,
+                                    create_time: 0,
+                                    status: order.status,
+                                    fee: Default::default(),
+                                };
+                                if "REMOVE" == order_info.status {
+                                    order_info.amount = order.amount;
+                                    order_info.price = order.price;
+                                    order_info.filled = order.deal_amount;
+                                    order_info.filled_price = order.avg_price;
+                                }
+                                order_infos.push(order_info);
+                            }
+                            {
+                                let mut quant = bot_arc_clone.lock().await;
+                                quant.update_order(order_infos);
+                            }
+                        } else if data.channel == "futures.positions" {
+                            let positions = standard::gate_handle::handle_position(data, multiplier.clone());
+                            {
+                                let mut quant = bot_arc_clone.lock().await;
+                                quant.update_position(positions);
+                            }
+                        } else if data.channel == "futures.trades" {
+                            let mut quant = bot_arc_clone.lock().await;
+                            let str = data.lable.clone();
+                            if quant.is_update.contains_key(&data.lable) && *quant.is_update.get(str.as_str()).unwrap(){
+                                max_buy = Decimal::ZERO;
+                                min_sell = Decimal::ZERO;
+                                quant.is_update.remove(str.as_str());
+                            }
+                            let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
+
+                            if trade.price > max_buy || max_buy == Decimal::ZERO{
+                                max_buy = trade.price
+                            }
+                            if trade.price < min_sell || min_sell == Decimal::ZERO{
+                                min_sell = trade.price
+                            }
+                            quant.max_buy_min_sell_cache.insert(data.lable, vec![max_buy, min_sell]);
+                        }
+                    }
+                },
+                None => {
+                    error!("交易交易所通道错误");
+                    break;
+                }
+            }
+        }
+    });
 }
 
 // 启动参考交易所
 pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<String>, exchange_param: BTreeMap<String, String>) {
     let (tx, mut rx) = channel(100);
-    let ref_names = quant_arc.lock().await.params.ref_pair.clone();
     tokio::spawn( async move {
         let mut ba_exc = BinanceSwapWs::new_label(name, false, exchange_param, BinanceWsType::PublicAndPrivate, tx);
-        ba_exc.set_subscribe(vec![ BinanceSubscribeType::PuAggTrade]);
-        ba_exc.custom_subscribe(ref_names).await;
+        ba_exc.set_subscribe(vec![ BinanceSubscribeType::PuBookTicker]);
+        ba_exc.custom_subscribe(symbol.clone()).await;
     });
     tokio::spawn(async move {
         // trade
-        let mut decimal = 99u32;
         let mut max_buy = Decimal::ZERO;
         let mut min_sell = Decimal::ZERO;
         // ticker
@@ -887,13 +1031,15 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<S
         loop {
             match rx.recv().await {
                 Some(data) => {
-                    // info!(?data);
-
                     if data.code == "200".to_string() {
                         if data.channel == "aggTrade" {
-                            let trade: OriginalTrade = serde_json::from_str(data.data.as_str()).unwrap();
-                            if decimal == 99 {
-                                decimal = trade.p.scale();
+                            let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
+                            let str = data.lable.clone();
+                            let mut quant = bot_arc_clone.lock().await;
+                            if quant.is_update.contains_key(&data.lable) && *quant.is_update.get(str.as_str()).unwrap(){
+                                max_buy = Decimal::ZERO;
+                                min_sell = Decimal::ZERO;
+                                quant.is_update.remove(str.as_str());
                             }
                             if trade.p > max_buy || max_buy == Decimal::ZERO{
                                 max_buy = trade.p
@@ -902,27 +1048,29 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<S
                                 min_sell = trade.p
                             }
                             {
-                                let mut quant = bot_arc_clone.lock().await;
                                 quant.max_buy_min_sell_cache.insert(data.lable, vec![max_buy, min_sell]);
-                                info!(?quant.max_buy_min_sell_cache);
                             }
                         } else if data.channel == "bookTicker" {
                             let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
-                            let special_depth = handle_special_depth(data);
                             if ticker.u > update_flag_u {
                                 {
                                     let mut quant = bot_arc_clone.lock().await;
-                                    quant._update_ticker(special_depth.ticker.clone(), special_depth.name.clone());
-                                    quant._update_depth(special_depth.depth.clone(), special_depth.name.clone());
-                                    quant.local_depths.insert(special_depth.name.clone(), special_depth.depth);
+                                    quant._update_ticker(SpecialTicker{
+                                        sell: ticker.a.clone(),
+                                        buy: ticker.b.clone(),
+                                        mid_price: Default::default(),
+                                    }, data.lable.clone());
+                                    let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
+                                    quant._update_depth(depth.clone(), data.lable.clone());
+                                    quant.local_depths.insert(data.lable.clone(), depth);
                                 }
                             } else {
                                 update_flag_u = ticker.u;
                             }
                         } else if data.channel == "depth" {
-                            let depth = handle_special_depth(data);
+                            let depth = standard::binance_handle::handle_special_depth(data);
                             {
-                                let mut quant = quant_arc.lock().await;
+                                let mut quant = bot_arc_clone.lock().await;
                                 quant._update_depth(depth.depth.clone(), depth.name.clone());
                                 quant.local_depths.insert(depth.name, depth.depth);
                             }
@@ -930,7 +1078,7 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<S
                     }
                 },
                 None => {
-                    error!("quant通道错误");
+                    error!("参考交易所通道错误");
                     break;
                 }
             }