浏览代码

quant 逻辑完成

JiahengHe 2 年之前
父节点
当前提交
90f846a065
共有 1 个文件被更改,包括 263 次插入229 次删除
  1. 263 229
      strategy/src/quant.rs

+ 263 - 229
strategy/src/quant.rs

@@ -1,18 +1,18 @@
 use std::collections::{BTreeMap, HashMap};
-use std::ops::Div;
+use std::io::Error;
 use std::str::FromStr;
 use std::sync::{Arc};
 use std::thread;
 use std::thread::{sleep};
 use std::time::Duration;
-use chrono::{Timelike, Utc};
+use chrono::{Utc};
 use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
-use tokio::sync::mpsc::channel;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
 use tokio::sync::Mutex;
 use exchanges::binance_usdt_swap_ws::BinanceUsdtSwapWs;
 use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
-use standard::{Market, OrderCommand, Platform, Position, PositionModeEnum, SpecialDepth, SpecialTicker, Ticker};
+use standard::{Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialDepth, SpecialTicker, Ticker};
 use standard::exchange::Exchange;
 use standard::exchange::ExchangeEnum::GateSwap;
 use standard::binance_handle::handle_special_depth;
@@ -23,7 +23,7 @@ use crate::predictor::Predictor;
 use crate::strategy::Strategy;
 use crate::utils::{clip};
 
-#[derive(Debug)]
+
 pub struct Quant {
     pub  params: Params,
     // 启动时间
@@ -90,7 +90,8 @@ pub struct Quant {
     pub trade_name: String,
     pub ready: i8,
     pub predictor: Predictor,
-    pub market: Market
+    pub market: Market,
+    pub platform_rest:Box<dyn Platform+Send+Sync>
 }
 
 struct MarketData{
@@ -99,88 +100,89 @@ struct MarketData{
 }
 
 impl Quant {
-    pub fn new(params: Params) -> Quant{
+    pub async fn new(params: Params, exchange_params: BTreeMap<String, String>,  order_sender: Sender<Order>, error_sender: Sender<Error>) -> Quant {
         let symbol = params.pair.clone();
         let pairs: Vec<&str> = params.pair.split('_').collect();
         let mut quant_obj = Quant {
-           params: params.clone(),
-           start_time: 0,
-           symbol: symbol.clone(),
-           base: pairs[0].to_string(),
-           quote: pairs[1].to_string(),
-           hold_coin: clip(params.hold_coin, dec!(0.0), dec!(10000.0)),
+            params: params.clone(),
+            start_time: 0,
+            symbol: symbol.clone(),
+            base: pairs[0].to_string(),
+            quote: pairs[1].to_string(),
+            hold_coin: clip(params.hold_coin, dec!(0.0), dec!(10000.0)),
             strategy: Strategy::new(&params, true),
             local_orders: Default::default(),
-           local_orders_backup: Default::default(),
-           local_orders_backup_cid: Default::default(),
-           handled_orders_cid: Default::default(),
-           local_profit: Default::default(),
-           local_cash: Default::default(),
-           local_coin: Default::default(),
-           local_position: LocalPosition{
-               long_pos: Default::default(),
-               short_pos: Default::default(),
-               long_avg: Default::default(),
-               short_avg: Default::default(),
-           },
-           local_position_by_orders: LocalPosition{
-               long_pos: Default::default(),
-               short_pos: Default::default(),
-               long_avg: Default::default(),
-               short_avg: Default::default(),
-           },
-           local_buy_amount: Default::default(),
-           local_sell_amount: Default::default(),
-           local_buy_value: Default::default(),
-           local_sell_value: Default::default(),
-           local_cancel_log: Default::default(),
-           interval: params.interval,
-           exchange: params.exchange,
-           trade_msg: TraderMsg::new(),
-           exit_msg: "正常退出".to_string(),
-           position_check_series: Default::default(),
-           stop_loss: params.stop_loss,
-           used_pct: params.used_pct,
-           mode_signal: 0,
-           trade_order_update_time: Utc::now().timestamp_millis(),
-           on_tick_event_time: Utc::now().timestamp_millis(),
-           tickers: Default::default(),
-           depths: Default::default(),
-           market_update_time: Default::default(),
-           market_update_interval: Default::default(),
-           ref_num: params.ref_exchange.len() as i8,
-           ref_name: Default::default(),
-           trade_name: "".to_string(),
-           ready: 0,
-           predictor: Predictor{
-               loop_count: 0,
-               market_info_list: vec![],
-               mid_price_list: vec![],
-               ref_mid_price_per_exchange_per_frame: vec![],
-               ref_exchange_length: 0,
-               data_length_max: 0,
-               alpha: vec![],
-               gamma: Default::default(),
-               avg_spread_list: vec![],
-           },
-           market: Market{
-               symbol,
-               base_asset: "".to_string(),
-               quote_asset: "".to_string(),
-               tick_size: Default::default(),
-               amount_size: Default::default(),
-               price_precision: Default::default(),
-               amount_precision: Default::default(),
-               min_qty: Default::default(),
-               max_qty: Default::default(),
-               min_notional: Default::default(),
-               max_notional: Default::default(),
-               ct_val: Default::default(),
-           }
+            local_orders_backup: Default::default(),
+            local_orders_backup_cid: Default::default(),
+            handled_orders_cid: Default::default(),
+            local_profit: Default::default(),
+            local_cash: Default::default(),
+            local_coin: Default::default(),
+            local_position: LocalPosition {
+                long_pos: Default::default(),
+                short_pos: Default::default(),
+                long_avg: Default::default(),
+                short_avg: Default::default(),
+            },
+            local_position_by_orders: LocalPosition {
+                long_pos: Default::default(),
+                short_pos: Default::default(),
+                long_avg: Default::default(),
+                short_avg: Default::default(),
+            },
+            local_buy_amount: Default::default(),
+            local_sell_amount: Default::default(),
+            local_buy_value: Default::default(),
+            local_sell_value: Default::default(),
+            local_cancel_log: Default::default(),
+            interval: params.interval,
+            exchange: params.exchange,
+            trade_msg: TraderMsg::new(),
+            exit_msg: "正常退出".to_string(),
+            position_check_series: Default::default(),
+            stop_loss: params.stop_loss,
+            used_pct: params.used_pct,
+            mode_signal: 0,
+            trade_order_update_time: Utc::now().timestamp_millis(),
+            on_tick_event_time: Utc::now().timestamp_millis(),
+            tickers: Default::default(),
+            depths: Default::default(),
+            market_update_time: Default::default(),
+            market_update_interval: Default::default(),
+            ref_num: params.ref_exchange.len() as i8,
+            ref_name: Default::default(),
+            trade_name: "".to_string(),
+            ready: 0,
+            predictor: Predictor {
+                loop_count: 0,
+                market_info_list: vec![],
+                mid_price_list: vec![],
+                ref_mid_price_per_exchange_per_frame: vec![],
+                ref_exchange_length: 0,
+                data_length_max: 0,
+                alpha: vec![],
+                gamma: Default::default(),
+                avg_spread_list: vec![],
+            },
+            market: Market {
+                symbol: symbol.clone(),
+                base_asset: "".to_string(),
+                quote_asset: "".to_string(),
+                tick_size: Default::default(),
+                amount_size: Default::default(),
+                price_precision: Default::default(),
+                amount_precision: Default::default(),
+                min_qty: Default::default(),
+                max_qty: Default::default(),
+                min_notional: Default::default(),
+                max_notional: Default::default(),
+                ct_val: Default::default(),
+            },
+            platform_rest: Exchange::new(GateSwap, symbol, false, exchange_params, order_sender, error_sender).await
         };
-        for i in 0..=params.ref_exchange.len()-1 {
+        for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
-            let tickers_key: String = format!("{}{}{}{}", params.ref_exchange[i], "@" , params.ref_pair[i], "@ref");
+            let tickers_key: String = format!("{}{}{}{}", params.ref_exchange[i], "@", params.ref_pair[i], "@ref");
             let ref_name_element = tickers_key.clone();
             let depths_key: String = tickers_key.clone();
             let market_update_time_key = tickers_key.clone();
@@ -196,7 +198,7 @@ impl Quant {
             quant_obj.market_update_time.insert(market_update_time_key, Default::default());
             quant_obj.market_update_interval.insert(market_update_interval_key, Default::default());
         }
-        let name = format!("{}{}{}",quant_obj.exchange.clone(), "@", quant_obj.symbol);
+        let name = format!("{}{}{}", quant_obj.exchange.clone(), "@", quant_obj.symbol);
         let market_update_time_key = name.clone();
         let market_update_interval_key = name.clone();
         let tickers_key = name.clone();
@@ -213,9 +215,9 @@ impl Quant {
         // broker.newWs
         let mut price_alpha: Vec<Decimal> = Vec::new();
         for ref_pair_str in params.ref_pair {
-            if params.pair.contains("1000") && !ref_pair_str.contains("1000"){
+            if params.pair.contains("1000") && !ref_pair_str.contains("1000") {
                 price_alpha.push(dec!(1000.0));
-            } else if !params.pair.contains("1000") && ref_pair_str.contains("1000"){
+            } else if !params.pair.contains("1000") && ref_pair_str.contains("1000") {
                 price_alpha.push(dec!(0.001))
             } else {
                 price_alpha.push(dec!(1.0));
@@ -229,42 +231,82 @@ impl Quant {
         return quant_obj;
     }
 
-    pub fn update_local_order(&mut self, data: OrderInfo){
+    pub async fn handle_signals(quant_arc: Arc<Mutex<Quant>>, mut rx: Receiver<Order>) {
+        tokio::spawn(async move{
+            loop {
+                match rx.try_recv() {
+                    Ok(val)=>{
+                        // 只处理这两种订单回执
+                        if ["NEW", "REMOVE"].contains(&val.status.as_str()){
+                            let mut local_order_info = OrderInfo{
+                                symbol: "".to_string(),
+                                amount: Default::default(),
+                                side: "".to_string(),
+                                price: Default::default(),
+                                client_id: "".to_string(),
+                                filled_price: Default::default(),
+                                filled: Default::default(),
+                                order_id: "".to_string(),
+                                local_time: 0,
+                                create_time: 0,
+                                status: "".to_string(),
+                                fee: Decimal::ZERO,
+                            };
+                            if val.status== "NEW" {
+                                local_order_info.client_id = val.custom_id;
+                                local_order_info.order_id = val.id;
+                            } else if val.status == "REMOVE" {
+                                local_order_info.client_id = val.custom_id;
+                            }
+                            let mut bot = quant_arc.lock().await;
+                            // 写入本地订单缓存
+                            bot.update_local_order(local_order_info);
+                        }
+                    },
+                    Err(e) => {
+                        println!("订单回执消费失败!{}", e);
+                    }
+                }
+            }
+        });
+    }
+
+    pub fn update_local_order(&mut self, data: OrderInfo) {
         /*
-         更新订单
-            首先直接复写本地订单
-            1、如果是开仓单
-                如果新增: 增加本地订单
-                如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按已成交量发送平仓订单 修改本地仓位
-                如果成交: 删除本地订单 发送平仓订单 修改本地仓位
-            2、如果是平仓单
-                如果新增: 增加本地订单
-                如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按未成交量发送平仓订单 修改本地仓位
-                如果成交: 删除本地订单 修改本地仓位
-            NEW 可以从 ws / rest 来
-            REMOVE 主要从 ws 来 必须包含 filled 和 filled_price 用于本地仓位推算 定期rest查过旧订单
-            为了防止下单失败依然有订单成交 本地需要做一个缓存
-        */
+     更新订单
+        首先直接复写本地订单
+        1、如果是开仓单
+            如果新增: 增加本地订单
+            如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按已成交量发送平仓订单 修改本地仓位
+            如果成交: 删除本地订单 发送平仓订单 修改本地仓位
+        2、如果是平仓单
+            如果新增: 增加本地订单
+            如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按未成交量发送平仓订单 修改本地仓位
+            如果成交: 删除本地订单 修改本地仓位
+        NEW 可以从 ws / rest 来
+        REMOVE 主要从 ws 来 必须包含 filled 和 filled_price 用于本地仓位推算 定期rest查过旧订单
+        为了防止下单失败依然有订单成交 本地需要做一个缓存
+    */
         // 触发订单更新
         self.trade_order_update_time = Utc::now().timestamp_millis();
         // 新增订单推送 仅需要cid oid信息
-        if data.status == "NEW"{
+        if data.status == "NEW" {
             // 更新oid信息 更新订单 loceltime信息(尤其是查单返回new的情况 必须更新 否则会误触发风控)
-            if self.local_orders.contains_key(&data.client_id){
+            if self.local_orders.contains_key(&data.client_id) {
                 let mut order_info = self.local_orders.get(&data.client_id).unwrap().clone();
                 order_info.order_id = data.order_id;
                 order_info.local_time = Utc::now().timestamp_millis();
                 self.local_orders.insert(data.client_id.clone(), order_info);
             }
-        } else if data.status == "REMOVE"{
+        } else if data.status == "REMOVE" {
             // 如果在撤单记录中 说明此订单结束生命周期 可以移除记录
-            if self.local_cancel_log.contains_key(&data.client_id){
+            if self.local_cancel_log.contains_key(&data.client_id) {
                 self.local_cancel_log.remove(&data.client_id);
             }
             // 在cid缓存队列中 说明是本策略的订单
-            if self.local_orders_backup.contains_key(&data.client_id){
+            if self.local_orders_backup.contains_key(&data.client_id) {
                 // 不在已处理cid缓存队列中 说明还没参与过仓位计算 则执行订单计算
-                if self.handled_orders_cid.contains(&data.client_id){
+                if self.handled_orders_cid.contains(&data.client_id) {
                     println!("订单已经参与过仓位计算 拒绝重复进行计算, 订单号:{}", data.client_id);
                 } else {
                     // 添加进已处理队列
@@ -280,12 +322,12 @@ impl Quant {
                     }
                     // 只有开仓成交才触发onPosition
                     // 如果漏推送 rest补充的订单查询信息过来 可能会导致 kd kk 推送出现计算分母为0的情况
-                    if filled > Decimal::ZERO{
-                        if self.exchange.contains("spot"){ // 如果是现货交易 还需要修改equity
+                    if filled > Decimal::ZERO {
+                        if self.exchange.contains("spot") { // 如果是现货交易 还需要修改equity
                             // 现货必须考虑fee 买入fee单位为币 卖出fee单位为u
                             let fee = data.fee;
-                            if side == "kd"{ // buy  开多
-                                self.local_buy_amount += filled-fee;
+                            if side == "kd" { // buy  开多
+                                self.local_buy_amount += filled - fee;
                                 self.local_buy_value += (filled - fee) * filled_price;
                                 let new_long_pos = self.local_position_by_orders.long_pos.clone();
                                 if new_long_pos == Decimal::ZERO {
@@ -298,12 +340,12 @@ impl Quant {
                                 }
                                 self.local_cash -= filled * filled_price;
                                 self.local_coin = filled - fee;
-                            } else if side == "pd"{ // sell 平多
+                            } else if side == "pd" { // sell 平多
                                 self.local_sell_amount += filled;
                                 self.local_sell_value += filled * filled_price;
                                 self.local_profit += filled * (filled_price - self.local_position_by_orders.long_avg);
                                 let new_long_pos = self.local_position_by_orders.long_pos - filled;
-                                if new_long_pos == Decimal::ZERO{
+                                if new_long_pos == Decimal::ZERO {
                                     self.local_position_by_orders.long_avg = Decimal::ZERO;
                                     self.local_position_by_orders.long_pos = Decimal::ZERO;
                                 } else {
@@ -311,12 +353,12 @@ impl Quant {
                                 }
                                 self.local_cash += filled * filled_price - fee;
                                 self.local_coin -= filled;
-                            } else if side == "pk"{ // buy 平空
+                            } else if side == "pk" { // buy 平空
                                 self.local_buy_amount += filled - fee;
                                 self.local_buy_value += (filled - fee) * filled_price;
                                 self.local_profit += filled * (self.local_position_by_orders.short_avg - filled_price);
                                 let new_short_pos = self.local_position_by_orders.short_pos - filled;
-                                if new_short_pos == Decimal::ZERO{
+                                if new_short_pos == Decimal::ZERO {
                                     self.local_position_by_orders.short_avg = Decimal::ZERO;
                                     self.local_position_by_orders.short_pos = Decimal::ZERO;
                                 } else {
@@ -324,11 +366,11 @@ impl Quant {
                                 }
                                 self.local_cash -= filled * filled_price;
                                 self.local_coin += filled - fee;
-                            } else if side == "kk"{ // sell 开空
+                            } else if side == "kk" { // sell 开空
                                 self.local_sell_amount += filled;
                                 self.local_sell_value += filled * filled_price;
                                 let new_short_pos = self.local_position_by_orders.short_pos - filled;
-                                if new_short_pos == Decimal::ZERO{
+                                if new_short_pos == Decimal::ZERO {
                                     self.local_position_by_orders.short_avg = Decimal::ZERO;
                                     self.local_position_by_orders.short_pos = Decimal::ZERO;
                                 } else {
@@ -394,7 +436,7 @@ impl Quant {
                         self._print_local_trades_summary();
                     }
                     // 每次有订单变动就触发一次策略
-                    if self.mode_signal == 0 && self.ready == 1{
+                    if self.mode_signal == 0 && self.ready == 1 {
                         // 更新交易数据
                         self.update_trade_msg();
                         // 触发策略挂单逻辑
@@ -405,6 +447,7 @@ impl Quant {
                         if order.is_not_empty() {
                             print!("触发onOrder");
                             self._update_local_orders(&order);
+
                             //TODO: 交易所处理订单信号 self.loop.create_task(self.rest.handle_signals(orders))
                             println!("订单指令:{:?}", order);
                         }
@@ -413,7 +456,7 @@ impl Quant {
             } else {
                 println!("订单不属于本策略 拒绝进行仓位计算: {}", data.client_id);
             }
-            if self.local_orders.contains_key(&data.client_id){
+            if self.local_orders.contains_key(&data.client_id) {
                 println!("删除本地订单, client_id:{}", data.client_id);
                 self.local_orders.remove(&data.client_id);
             } else {
@@ -424,38 +467,38 @@ impl Quant {
         }
     }
 
-    pub fn _print_local_trades_summary(&mut self){
+    pub fn _print_local_trades_summary(&mut self) {
         // 计算本地累计利润
         let local_buy_amount = self.local_buy_amount.round_dp(5);
         let local_buy_value = self.local_buy_value.round_dp(5);
         let local_sell_amount = self.local_sell_amount.round_dp(5);
         let local_sell_value = self.local_sell_value.round_dp(5);
 
-        if self.strategy.mp > Decimal::ZERO{
+        if self.strategy.mp > Decimal::ZERO {
             let unrealized = (local_buy_amount - local_sell_amount) * self.strategy.mp;
             let realized = local_sell_value - local_buy_value;
-            let local_profit = (unrealized+realized).round_dp(5);
+            let local_profit = (unrealized + realized).round_dp(5);
             self.strategy.local_profit = local_profit;
             println!("买量 {},卖量 {},买额{},卖额{}", local_buy_amount, local_sell_amount, local_buy_value, local_sell_value);
         }
     }
 
     // 检测初始数据是否齐全
-    pub fn check_ready(&mut self){
+    pub fn check_ready(&mut self) {
         // 检查 ticker 行情
-        for i in &self.ref_name{
+        for i in &self.ref_name {
             if self.tickers.is_empty() || !self.tickers.contains_key(i) {
                 println!("参考盘口ticker未准备好");
                 return;
             } else {
-                if self.tickers.get(i).unwrap().buy == dec!(0) || self.tickers.get(i).unwrap().sell == dec!(0){
+                if self.tickers.get(i).unwrap().buy == dec!(0) || self.tickers.get(i).unwrap().sell == dec!(0) {
                     println!("参考盘口ticker未准备好");
                     return;
                 }
             }
         }
-        if self.tickers.contains_key(&self.trade_name){
-            if self.tickers.get(&self.trade_name).unwrap().buy == dec!(0) || self.tickers.get(&self.trade_name).unwrap().sell == dec!(0)  {
+        if self.tickers.contains_key(&self.trade_name) {
+            if self.tickers.get(&self.trade_name).unwrap().buy == dec!(0) || self.tickers.get(&self.trade_name).unwrap().sell == dec!(0) {
                 println!("参考盘口ticker未准备好");
                 return;
             }
@@ -464,8 +507,8 @@ impl Quant {
             return;
         }
         // 检查 market 行情
-        let all_market:Vec<Decimal> = self.get_all_market_data();
-        if all_market.len() != LENGTH*(1usize+self.ref_num as usize){
+        let all_market: Vec<Decimal> = self.get_all_market_data();
+        if all_market.len() != LENGTH * (1usize + self.ref_num as usize) {
             println!("聚合行情未准备好");
             return;
         } else {
@@ -476,7 +519,7 @@ impl Quant {
         self.ready = 1
     }
 
-    pub fn _update_depth(&mut self, data: SpecialDepth){
+    pub fn _update_depth(&mut self, data: SpecialDepth) {
         // 要从回调传入的深度信息中获取data.name  TODO: 这里暂时只支持一个参考交易所,支持多个须在入参中添加name字段及值
         let name = self.ref_name[0].clone();
         let market_update_interval_key = name.clone();
@@ -486,13 +529,13 @@ impl Quant {
         let depths3_key = name.clone();
 
         let now_time = Utc::now().timestamp_millis();
-        if self.market_update_time.contains_key(&name) && *self.market_update_time.get(&name).unwrap() != 0i64{
+        if self.market_update_time.contains_key(&name) && *self.market_update_time.get(&name).unwrap() != 0i64 {
             let interval = Decimal::from(now_time - self.market_update_time.get(&name).unwrap());
-            if *self.market_update_interval.get(&name).unwrap() == dec!(0){
+            if *self.market_update_interval.get(&name).unwrap() == dec!(0) {
                 self.market_update_interval.insert(market_update_interval_key, interval);
-            }else{
+            } else {
                 let value = self.market_update_interval.get(&name).unwrap();
-                self.market_update_interval.insert(market_update_interval_key, value*dec!(0.999) + interval*dec!(0.001));
+                self.market_update_interval.insert(market_update_interval_key, value * dec!(0.999) + interval * dec!(0.001));
             }
         }
         self.market_update_time.insert(market_update_time_key, now_time);
@@ -502,7 +545,7 @@ impl Quant {
         }
         // 判断是否需要触发ondepth
         // 是否是交易盘口
-        if name == self.trade_name{
+        if name == self.trade_name {
             // 更新depths
             self.depths.insert(depths2_key, data.depth.clone());
             // 允许交易
@@ -525,7 +568,7 @@ impl Quant {
             // 更新depths
             self.depths.insert(depths3_key, data.depth);
             // 允许交易
-            if self.mode_signal == 0 && self.ready == 1 && flag == 1{
+            if self.mode_signal == 0 && self.ready == 1 && flag == 1 {
                 // 更新交易数据
                 self.update_trade_msg();
                 // 触发事件撤单逻辑
@@ -533,21 +576,23 @@ impl Quant {
                 self.strategy.local_time = Utc::now().timestamp_millis();
                 // 产生交易信号
                 let orders = self.strategy.on_time(&self.trade_msg);
-                if order_not_empty(&orders) == 1 {
+                if orders.is_not_empty() {
                     println!("触发onTick");
-                    self._update_local_orders(&orders)
-                    //TODO: 异步处理信号
-                    // self.loop.create_task(self.rest.handle_signals(orders))
+                    self._update_local_orders(&orders);
+                    //异步处理信号
+                    // tokio::spawn(async move{
+                    //     self.platform_rest.command_order(orders).await;
+                    // });
                 }
             }
         }
     }
 
-    pub fn update_position(&mut self, data: Vec<Position>){
-        if data.is_empty(){
+    pub fn update_position(&mut self, data: Vec<Position>) {
+        if data.is_empty() {
             return;
         }
-        let mut  position = LocalPosition::new();
+        let mut position = LocalPosition::new();
         for pos in data {
             if pos.position_mode == PositionModeEnum::Long {
                 position.long_pos = pos.amount;
@@ -564,19 +609,19 @@ impl Quant {
         }
     }
 
-    pub fn _update_ticker(&mut self, data: SpecialTicker){
+    pub fn _update_ticker(&mut self, data: SpecialTicker) {
         // update ticker infomation
         // 要从回调传入的深度信息中获取data.name  TODO: 这里暂时只支持一个参考交易所,支持多个须在入参中添加name字段及值
         let name = self.ref_name[0].clone();
         self.tickers.insert(name, data);
     }
 
-    pub fn on_agg_market(&mut self){
+    pub fn on_agg_market(&mut self) {
         /** 处理聚合行情
-         1. 获取聚合行情
-         2. 更新预测器
-         3. 触发tick回测
-        **/
+                1. 获取聚合行情
+                2. 更新预测器
+                3. 触发tick回测
+               **/
         // 更新聚合市场数据
         let agg_market = self.get_all_market_data();
         // 更新聚合市场信息
@@ -585,7 +630,7 @@ impl Quant {
         self.predictor.market_info_handler(&self.trade_msg.market);
     }
 
-    pub fn update_trade_msg(&mut self){
+    pub fn update_trade_msg(&mut self) {
         // 更新保证金
         self.trade_msg.cash = self.local_cash.round_dp(10);
         self.trade_msg.coin = self.local_coin.round_dp(10);
@@ -595,11 +640,11 @@ impl Quant {
         let orders = self.local_orders.clone();
         self.trade_msg.orders = orders;
         // 更新 ref
-        let mut ref_tickers: BTreeMap<String,Ticker> = BTreeMap::new();
+        let mut ref_tickers: BTreeMap<String, Ticker> = BTreeMap::new();
         for i in &self.ref_name {
             let bp = self.tickers.get(i).unwrap().buy.clone();
             let ap = self.tickers.get(i).unwrap().sell.clone();
-            ref_tickers.insert( i.clone(),Ticker{
+            ref_tickers.insert(i.clone(), Ticker {
                 time: 0,
                 high: Default::default(),
                 low: Default::default(),
@@ -614,10 +659,10 @@ impl Quant {
     }
 
     // 本地记录所有报单信息
-    pub fn _update_local_orders(&mut self, orders: &OrderCommand){
+    pub fn _update_local_orders(&mut self, orders: &OrderCommand) {
         if !orders.limits_open.is_empty() {
             for j in orders.limits_open.keys() {
-                let order_info = OrderInfo{
+                let order_info = OrderInfo {
                     symbol: self.symbol.clone(),
                     amount: Decimal::from_str(orders.limits_open.get(j).unwrap()[0].as_str()).unwrap(),
                     side: orders.limits_open.get(j).unwrap()[1].clone(),
@@ -642,7 +687,7 @@ impl Quant {
         if !orders.cancel.is_empty() {
             for cancel_key in orders.cancel.keys() {
                 let cid = orders.cancel.get(cancel_key).unwrap()[0].clone();
-                if self.local_cancel_log.contains_key(&cid){
+                if self.local_cancel_log.contains_key(&cid) {
                     let num = self.local_cancel_log.get(&cid).unwrap() + 1;
                     self.local_cancel_log.insert(cid, num);
                 } else {
@@ -655,7 +700,7 @@ impl Quant {
         if self.local_orders_backup_cid.len() > max_len {
             let cid = self.local_orders_backup_cid[0].clone();
             // 判断是否超过1个小时 如果超过则移除历史记录
-            if self.local_orders_backup.contains_key(&cid){
+            if self.local_orders_backup.contains_key(&cid) {
                 let local_time = self.local_orders_backup.get(&cid).unwrap().local_time;
                 if Utc::now().timestamp_millis() - local_time > 3600000 {
                     self.local_orders_backup.remove(&cid);
@@ -669,14 +714,14 @@ impl Quant {
     }
 
     // 获取深度信息
-    pub fn get_all_market_data(&self) -> Vec<Decimal>{
+    pub fn get_all_market_data(&self) -> Vec<Decimal> {
         // 只能定时触发 组合市场信息=交易盘口+参考盘口
-        let mut market:Vec<Decimal> = Vec::new();
+        let mut market: Vec<Decimal> = Vec::new();
         // TODO: 获取交易盘口市场信息,self.ws._get_data()["data"]
-        let mut data:Vec<Decimal> = Vec::new();
+        let mut data: Vec<Decimal> = Vec::new();
         market.append(&mut data);
 
-        for i in &self.ref_name{
+        for i in &self.ref_name {
             // TODO: 获取参考盘口市场信息,self.ws_ref[i]._get_data()["data"]
             data = Vec::new();
             market.append(&mut data);
@@ -684,9 +729,9 @@ impl Quant {
         return market;
     }
 
-    pub async fn get_exchange_info(&mut self, platform: &Box<dyn Platform + Send + Sync>){
-        match platform.get_market().await {
-            Ok(val)=> {
+    pub async fn get_exchange_info(&mut self,) {
+        match self.platform_rest.get_market().await {
+            Ok(val) => {
                 self.market = val
             },
             Err(e) => {
@@ -695,15 +740,15 @@ impl Quant {
         }
     }
 
-    pub async fn update_equity(&mut self, platform: &Box<dyn Platform + Send + Sync>){
-        match platform.get_account().await {
-            Ok(val)=> {
+    pub async fn update_equity(&mut self) {
+        match self.platform_rest.get_account().await {
+            Ok(val) => {
                 /*
-                   更新保证金信息
-                   合约一直更新
-                   现货只有当出现异常时更新
-               */
-                if self.exchange.contains("spot"){
+               更新保证金信息
+               合约一直更新
+               现货只有当出现异常时更新
+           */
+                if self.exchange.contains("spot") {
                     return;
                 }
                 self.local_cash = val.balance * self.used_pct
@@ -714,35 +759,28 @@ impl Quant {
         }
     }
 
-    pub async fn handle_callback(){
-
-    }
-
-    pub async fn before_trade(&mut self, exchange_params: BTreeMap<String, String>) -> bool{
-        // 交易交易所rest
-        let platform:Box<dyn Platform+Send+Sync> = Exchange::new(GateSwap, self.symbol.clone(), false, exchange_params);
-
-       sleep(Duration::from_secs(1));
+    pub async fn before_trade(&mut self) -> bool {
+        sleep(Duration::from_secs(1));
 
         // 获取市场信息
-        self.get_exchange_info(&platform).await;
+        self.get_exchange_info().await;
         // 获取价格信息
-        let ticker = platform.get_ticker().await.expect("获取价格信息异常!");
+        let ticker = self.platform_rest.get_ticker().await.expect("获取价格信息异常!");
         let mp = (ticker.buy + ticker.sell) / Decimal::TWO;
         // 获取账户信息
-        self.update_equity(&platform).await;
+        self.update_equity().await;
         // 初始资金
         let start_cash = self.local_cash.clone();
         let start_coin = self.local_cash.clone();
-        if start_cash.is_zero() && start_coin.is_zero(){
-            self.exit_msg = format!("{}{}{}{}","初始为零 cash: ", start_cash, " coin: ", start_coin);
+        if start_cash.is_zero() && start_coin.is_zero() {
+            self.exit_msg = format!("{}{}{}{}", "初始为零 cash: ", start_cash, " coin: ", start_coin);
             // 停止程序
             return false;
         }
         println!("初始cash: {start_cash} 初始coin: {start_coin}");
         // 初始化策略基础信息
-        if mp <= dec!(0){
-            self.exit_msg = format!("{}{}","初始价格获取错误: ", mp);
+        if mp <= dec!(0) {
+            self.exit_msg = format!("{}{}", "初始价格获取错误: ", mp);
             // 停止程序
             return false;
         } else {
@@ -757,16 +795,16 @@ impl Quant {
         self.strategy.total_amount = self.strategy.equity * self.strategy.lever_rate / self.strategy.mp;
         // 获取数量精度
         self.strategy.step_size = self.market.amount_size.clone();
-        if self.strategy.step_size < Decimal::ONE{
+        if self.strategy.step_size < Decimal::ONE {
             self.strategy.step_size = self.strategy.step_size.trunc();
         }
         // 获取价格精度
         self.strategy.tick_size = self.market.tick_size.clone();
-        if self.strategy.tick_size < Decimal::ONE{
+        if self.strategy.tick_size < Decimal::ONE {
             self.strategy.tick_size = self.strategy.tick_size.trunc();
         }
-        if self.strategy.step_size.is_zero() || self.strategy.tick_size.is_zero(){
-            self.exit_msg = format!("{}{}{}{}","交易精度未正常获取 step_size: ", self.strategy.step_size, " tick_size:", self.strategy.tick_size);
+        if self.strategy.step_size.is_zero() || self.strategy.tick_size.is_zero() {
+            self.exit_msg = format!("{}{}{}{}", "交易精度未正常获取 step_size: ", self.strategy.step_size, " tick_size:", self.strategy.tick_size);
             // 停止程序
             return false;
         } else {
@@ -777,20 +815,20 @@ impl Quant {
         // 计算下单数量
         let mut long_one_hand_value: Decimal = start_cash * self.params.lever_rate / grid;
         let mut short_one_hand_value: Decimal = Decimal::ZERO;
-        let long_one_hand_amount: Decimal =(long_one_hand_value/mp/&self.strategy.step_size).floor()*self.strategy.step_size;
+        let long_one_hand_amount: Decimal = (long_one_hand_value / mp / &self.strategy.step_size).floor() * self.strategy.step_size;
         let mut short_one_hand_amount: Decimal = Decimal::ZERO;
 
-        if self.exchange.contains("spot"){
+        if self.exchange.contains("spot") {
             short_one_hand_value = start_coin * mp * self.params.lever_rate / grid;
-            short_one_hand_amount = (short_one_hand_value/mp/self.strategy.step_size).floor()*self.strategy.step_size;
+            short_one_hand_amount = (short_one_hand_value / mp / self.strategy.step_size).floor() * self.strategy.step_size;
         } else {
             short_one_hand_value = start_cash * self.params.lever_rate / grid;
-            short_one_hand_amount = (short_one_hand_value/mp/self.strategy.step_size).floor()*self.strategy.step_size;
+            short_one_hand_amount = (short_one_hand_value / mp / self.strategy.step_size).floor() * self.strategy.step_size;
         }
         println!("最低单手交易下单量为 buy: {}, sell: {}", long_one_hand_amount, short_one_hand_amount);
         let hand_min_limit = Decimal::new(20, 0);
         if (long_one_hand_amount.is_zero() && short_one_hand_amount.is_zero()) ||
-            (long_one_hand_value < hand_min_limit && short_one_hand_value < hand_min_limit){
+            (long_one_hand_value < hand_min_limit && short_one_hand_value < hand_min_limit) {
             self.exit_msg = format!("{}{}{}{}", "初始下单量太少 buy: ", long_one_hand_amount, " sell: ", short_one_hand_amount);
             // 停止程序
             return false;
@@ -809,33 +847,16 @@ impl Quant {
         */
     }
 }
-
-// 检查orders信号  对应python中 _not_empty
-fn order_not_empty(orders: &OrderCommand) -> i8{
-    if !orders.cancel.is_empty() || !orders.check.is_empty(){
-        return 1;
-    }
-    if !orders.limits_open.is_empty() && orders.limits_open.len() > 0usize {
-        return 1;
-    }
-    if !orders.limits_close.is_empty() && orders.limits_close.len() > 0usize {
-        return 1;
-    }
-    return 0;
-}
-
-pub async fn run_transaction(quant: Arc<Mutex<Quant>>, exchange_params: BTreeMap<String, String>){
-
-}
+pub async fn run_transaction(quant: Arc<Mutex<Quant>>, name: String, symbol: Vec<String>, exchange_params: BTreeMap<String, String>) {}
 
 // 启动参考交易所
-pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name :String, symbol: Vec<String>, exchange_param: BTreeMap<String, String>){
+pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<String>, exchange_param: BTreeMap<String, String>) {
     let (tx, mut rx) = channel(100);
-    thread::spawn(move || {
+    tokio::spawn( async move {
         let ba_exc = BinanceUsdtSwapWs::new_label(name, false, exchange_param, tx);
-        ba_exc.custom_subscribe(symbol, 1, 0);
+        ba_exc.custom_subscribe(symbol, 1, 0).await;
     });
-    thread::spawn(move || {
+    tokio::spawn(async move {
         // trade
         let mut decimal = 99u32;
         let mut max_buy = Decimal::ZERO;
@@ -843,12 +864,11 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name :String, symbol: Vec<S
         // ticker
         let mut update_flag_u = 0i64;
         let bot_arc_clone = Arc::clone(&quant_arc);
-        async move {
-            loop {
-                match rx.try_recv() {
-                    Ok(data) => {
-
-                        if data.channel == "aggTrade" {
+        loop {
+            match rx.try_recv() {
+                Ok(data) => {
+                    if data.code == "200".to_string() {
+                        if  data.channel == "aggTrade" {
                             let trade: OriginalTrade = serde_json::from_str(data.data.as_str()).unwrap();
                             if decimal == 99 {
                                 decimal = trade.p.scale();
@@ -878,10 +898,10 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name :String, symbol: Vec<S
                                 quant._update_depth(depth);
                             }
                         }
-                    },
-                    Err(_) => {
-                        println!("Channel has been closed!");
                     }
+                },
+                Err(_) => {
+                    println!("Channel has been closed!");
                 }
             }
         }
@@ -890,20 +910,34 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name :String, symbol: Vec<S
 
 
 
-
 #[cfg(test)]
 mod tests {
-    use chrono::Utc;
+    use std::collections::BTreeMap;
+    use std::io::Error;
     use rust_decimal::Decimal;
     use rust_decimal_macros::dec;
+    use tokio::sync::mpsc::{channel, Receiver, Sender};
+    use standard::Order;
+
     use crate::params::Params;
     use crate::quant::Quant;
 
     #[tokio::test]
     async fn test_new_exchange() {
+        let (order_tx, order_rx):(Sender<Order>, Receiver<Order>) = channel(1024);
+        let (err_tx, err_rx):(Sender<Error>, Receiver<Error>) = channel(1024);
+
         let _params = Params::new("config.toml").unwrap();
-        let quant: Quant = Quant::new(_params);
-        println!("属性:{:?}", quant);
+
+        let mut params_exc: BTreeMap<String, String> = BTreeMap::new();
+        let access_key = "";
+        let secret_key = "";
+
+        let mut quant: Quant = Quant::new(_params, params_exc,order_tx, err_tx).await;
+        let is_ok = quant.before_trade().await;
+
+        println!("结果: {}", is_ok)
+
     }
 
     #[tokio::test]