Ver Fonte

合并dev修改

gepangpang há 1 ano atrás
pai
commit
27a7b17829

+ 11 - 1
global/src/trace_stack.rs

@@ -5,6 +5,7 @@ use rust_decimal::prelude::ToPrimitive;
 
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct TraceStack {
+    pub before_network: i64,                // 官方数据生成时间
     pub after_network: i64,                 // 到达网络层时间
     pub before_quant: i64,                  // quant层执行开始时间(也是通道+锁走完后的时间)
     pub after_quant: i64,                   // quant层执行结束时间
@@ -24,7 +25,11 @@ pub struct TraceStack {
 }
 
 impl TraceStack {
-    pub fn on_network(&mut self, after_network: i64) {
+    pub fn on_before_network(&mut self, before_network_millis: i64) {
+        self.before_network = before_network_millis;
+    }
+
+    pub fn on_after_network(&mut self, after_network: i64) {
         self.after_network = after_network;
     }
 
@@ -92,6 +97,10 @@ impl fmt::Display for TraceStack {
             msg.push_str(format!("订单来源:{} ", self.source).as_str());
         }
 
+        if self.before_network != 0 && self.after_network != 0 {
+            msg.push_str(format!("数据生成+到达rust耗时{}毫秒  ", (self.after_network - self.before_network).to_f64().unwrap() / 1000.0).as_str());
+        }
+
         if self.before_quant != 0 && self.after_network != 0 {
             msg.push_str(format!("通道+锁耗时{}微秒  ", self.before_quant - self.after_network).as_str());
         }
@@ -123,6 +132,7 @@ impl fmt::Display for TraceStack {
 impl Default for TraceStack {
     fn default() -> Self {
         TraceStack {
+            before_network: 0,
             after_network: 0,
             before_format: 0,
             after_format: 0,

+ 9 - 8
src/main.rs

@@ -45,14 +45,6 @@ async fn main() {
     info!("配置读取成功:{:?}。", params);
     // 主进程控制
     let running = Arc::new(AtomicBool::new(true));
-    // ws退出程序
-    let ws_running = Arc::new(AtomicBool::new(true));
-    // quant初始化动作
-    let quant_arc = quant_libs::init(params.clone(), ws_running.clone(), running.clone()).await;
-    // 初始化中控服务
-    server::run_server(params.port.clone(), running.clone(), quant_arc.clone());
-    // ctrl c退出检查程序
-    control_c::exit_handler(running.clone());
 
     // panic错误捕获,panic级别的错误直接退出
     let account_name_clone = params.account_name.clone();
@@ -65,6 +57,15 @@ async fn main() {
         panic_running.store(false, Ordering::Relaxed);
     }));
 
+    // ws退出程序
+    let ws_running = Arc::new(AtomicBool::new(true));
+    // quant初始化动作
+    let quant_arc = quant_libs::init(params.clone(), ws_running.clone(), running.clone()).await;
+    // 初始化中控服务
+    server::run_server(params.port.clone(), running.clone(), quant_arc.clone());
+    // ctrl c退出检查程序
+    control_c::exit_handler(running.clone());
+
     // 每一秒检查一次程序是否结束
     while running.load(Ordering::Relaxed) {
         tokio::time::sleep(Duration::from_secs(1)).await;

+ 1 - 1
src/quant_libs.rs

@@ -51,7 +51,7 @@ pub async fn init(params: Params, ws_running: Arc<AtomicBool>, running: Arc<Atom
                 Some(order) => {
                     // 刚下的订单有可能会成交,所以有几率触发后续订单逻辑,所以这里也是一个订单触发逻辑
                     let mut trace_stack = TraceStack::default();
-                    trace_stack.on_network(Utc::now().timestamp_micros());
+                    trace_stack.on_after_network(Utc::now().timestamp_micros());
                     trace_stack.on_before_quant();
 
                     if order.status != "NULL" {

+ 3 - 1
standard/src/binance_handle.rs

@@ -21,14 +21,16 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let aq = Decimal::from_str(data["A"].as_str().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
     let t = Decimal::from_str(&data["u"].to_string()).unwrap();
+    let create_at = data["E"].as_i64().unwrap() * 1000;
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
         t,
+        create_at,
     }
 }
 

+ 2 - 1
standard/src/binance_spot_handle.rs

@@ -22,13 +22,14 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let mp = (bp + ap) * dec!(0.5);
     let t = Decimal::from_str(&data["u"].to_string()).unwrap();
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at: Default::default() };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
         t,
+        create_at: Default::default(),
     }
 }
 

+ 3 - 1
standard/src/bitget_spot_handle.rs

@@ -121,13 +121,15 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let aq = Decimal::from_str(data["askSz"].as_str().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
     let t = Decimal::from_str(data["ts"].as_str().unwrap()).unwrap();
+    let create_at = data["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
         t,
+        create_at,
     }
 }

+ 3 - 1
standard/src/gate_handle.rs

@@ -137,19 +137,21 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let depth_asks = format_depth_items(data["asks"].clone());
     let depth_bids = format_depth_items(data["bids"].clone());
     let t = Decimal::from_str(&data["t"].to_string()).unwrap();
+    let create_at = data["t"].as_i64().unwrap() * 1000;
 
     let ap = depth_asks[0].price;
     let bp = depth_bids[0].price;
     let aq = depth_asks[0].amount;
     let bq = depth_bids[0].amount;
     let mp = (bp + ap) * dec!(0.5);
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
         t,
+        create_at,
     }
 }
 

+ 11 - 1
standard/src/handle_info.rs

@@ -134,41 +134,50 @@ impl HandleSwapInfo {
         let mut depth_asks: Vec<MarketOrder>;
         let mut depth_bids: Vec<MarketOrder>;
         let t: Decimal;
+        let create_at: i64;
         match exchange {
             ExchangeEnum::BinanceSpot => {
                 depth_asks = binance_handle::format_depth_items(res_data_json["asks"].clone());
                 depth_bids = binance_handle::format_depth_items(res_data_json["bids"].clone());
                 t = Decimal::from_str(&res_data_json["lastUpdateId"].to_string()).unwrap();
+                create_at = 0;
             }
             ExchangeEnum::BinanceSwap => {
                 depth_asks = binance_handle::format_depth_items(res_data_json["a"].clone());
                 depth_bids = binance_handle::format_depth_items(res_data_json["b"].clone());
                 t = Decimal::from_str(&res_data_json["u"].to_string()).unwrap();
+                create_at = res_data_json["E"].as_i64().unwrap() * 1000;
             }
             ExchangeEnum::GateSwap => {
                 depth_asks = gate_handle::format_depth_items(res_data_json["asks"].clone());
                 depth_bids = gate_handle::format_depth_items(res_data_json["bids"].clone());
+                // todo! 有id可以取 保证与py一致
                 t = Decimal::from_str(&res_data_json["t"].to_string()).unwrap();
+                create_at = res_data_json["t"].as_i64().unwrap() * 1000;
             }
             ExchangeEnum::KucoinSwap => {
                 depth_asks = kucoin_handle::format_depth_items(res_data_json["asks"].clone());
                 depth_bids = kucoin_handle::format_depth_items(res_data_json["bids"].clone());
                 t = Decimal::from_str(&res_data_json["sequence"].to_string()).unwrap();
+                create_at = res_data_json["ts"].as_i64().unwrap() * 1000;
             }
             ExchangeEnum::KucoinSpot => {
                 depth_asks = kucoin_spot_handle::format_depth_items(res_data_json["asks"].clone());
                 depth_bids = kucoin_spot_handle::format_depth_items(res_data_json["bids"].clone());
                 t = Decimal::from_str(&res_data_json["timestamp"].to_string()).unwrap();
+                create_at = res_data_json["timestamp"].as_i64().unwrap() * 1000;
             }
             ExchangeEnum::OkxSwap => {
                 depth_asks = okx_handle::format_depth_items(res_data_json[0]["asks"].clone());
                 depth_bids = okx_handle::format_depth_items(res_data_json[0]["bids"].clone());
                 t = Decimal::from_str(&res_data_json[0]["seqId"].to_string()).unwrap();
+                create_at = res_data_json[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
             }
             ExchangeEnum::BitgetSpot => {
                 depth_asks = bitget_spot_handle::format_depth_items(res_data_json[0]["asks"].clone());
                 depth_bids = bitget_spot_handle::format_depth_items(res_data_json[0]["bids"].clone());
                 t = Decimal::from_str(res_data_json[0]["ts"].as_str().unwrap()).unwrap();
+                create_at = res_data_json[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
             }
             _ => {
                 error!("未找到该交易所!handle_special_depth: {:?}",exchange);
@@ -227,13 +236,14 @@ impl HandleSwapInfo {
             }
         }
 
-        let ticker_info = SpecialTicker { sell: depth_asks[0].price, buy: depth_bids[0].price, mid_price: mp, t };
+        let ticker_info = SpecialTicker { sell: depth_asks[0].price, buy: depth_bids[0].price, mid_price: mp, t, create_at };
         let depth_info = bp.iter().cloned().chain(bv.iter().cloned()).chain(ap.iter().cloned()).chain(av.iter().cloned()).collect();
         SpecialDepth {
             name: res_data.label,
             depth: depth_info,
             ticker: ticker_info,
             t,
+            create_at,
         }
     }
 }

+ 3 - 1
standard/src/kucoin_handle.rs

@@ -46,14 +46,16 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let aq = Decimal::from_f64(data["bestAskSize"].as_f64().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
     let t = Decimal::from_str(&data["sequence"].to_string()).unwrap();
+    let create_at = (data["ts"].as_f64().unwrap() / 1000.0).floor() as i64;
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
         t,
+        create_at,
     }
 }
 

+ 3 - 1
standard/src/kucoin_spot_handle.rs

@@ -121,13 +121,15 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let aq = Decimal::from_str(data["bestAskSize"].as_str().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
     let t = Decimal::from_str(data["sequence"].as_str().unwrap()).unwrap();
+    let create_at = data["time"].as_i64().unwrap() * 1000;
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
         t,
+        create_at,
     }
 }

+ 13 - 12
standard/src/kucoin_swap.rs

@@ -160,14 +160,15 @@ impl Platform for KucoinSwap {
             let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
             let ticker_info = res_data_json;
             let time = (Decimal::from_str(&*ticker_info["ts"].to_string()).unwrap() / dec!(1000000)).floor().to_i64().unwrap();
+
             let result = Ticker {
                 time,
-                high: Decimal::from_f64(ticker_info["bestAskPrice"].as_f64().unwrap()).unwrap(),
-                low: Decimal::from_f64(ticker_info["bestBidPrice"].as_f64().unwrap()).unwrap(),
-                sell: Decimal::from_f64(ticker_info["bestAskPrice"].as_f64().unwrap()).unwrap(),
-                buy: Decimal::from_f64(ticker_info["bestBidPrice"].as_f64().unwrap()).unwrap(),
-                last: Decimal::from_f64(ticker_info["price"].as_f64().unwrap()).unwrap(),
-                volume: Decimal::from_f64(ticker_info["size"].as_f64().unwrap()).unwrap(),
+                high: Decimal::from_str(ticker_info["bestAskPrice"].as_str().unwrap()).unwrap(),
+                low: Decimal::from_str(ticker_info["bestBidPrice"].as_str().unwrap()).unwrap(),
+                sell: Decimal::from_str(ticker_info["bestAskPrice"].as_str().unwrap()).unwrap(),
+                buy: Decimal::from_str(ticker_info["bestBidPrice"].as_str().unwrap()).unwrap(),
+                last: Decimal::from_str(ticker_info["price"].as_str().unwrap()).unwrap(),
+                volume: Decimal::from_str(&ticker_info["size"].to_string()).unwrap(),
             };
             Ok(result)
         } else {
@@ -186,12 +187,12 @@ impl Platform for KucoinSwap {
             let time = (Decimal::from_str(&*ticker_info["ts"].to_string()).unwrap() / dec!(1000000)).floor().to_i64().unwrap();
             let result = Ticker {
                 time,
-                high: Decimal::from_f64(ticker_info["bestAskPrice"].as_f64().unwrap()).unwrap(),
-                low: Decimal::from_f64(ticker_info["bestBidPrice"].as_f64().unwrap()).unwrap(),
-                sell: Decimal::from_f64(ticker_info["bestAskPrice"].as_f64().unwrap()).unwrap(),
-                buy: Decimal::from_f64(ticker_info["bestBidPrice"].as_f64().unwrap()).unwrap(),
-                last: Decimal::from_f64(ticker_info["price"].as_f64().unwrap()).unwrap(),
-                volume: Decimal::from_f64(ticker_info["size"].as_f64().unwrap()).unwrap(),
+                high: Decimal::from_str(ticker_info["bestAskPrice"].as_str().unwrap()).unwrap(),
+                low: Decimal::from_str(ticker_info["bestBidPrice"].as_str().unwrap()).unwrap(),
+                sell: Decimal::from_str(ticker_info["bestAskPrice"].as_str().unwrap()).unwrap(),
+                buy: Decimal::from_str(ticker_info["bestBidPrice"].as_str().unwrap()).unwrap(),
+                last: Decimal::from_str(ticker_info["price"].as_str().unwrap()).unwrap(),
+                volume: Decimal::from_str(&ticker_info["size"].to_string()).unwrap(),
             };
             Ok(result)
         } else {

+ 8 - 1
standard/src/lib.rs

@@ -143,13 +143,15 @@ impl Depth {
 /// - `name<String>`: 平台信息;
 /// - `depth(Vec<Decimal>)`: 深度信息;
 /// - `ticker(SpecialTicker)`: 市场行情;
-/// - ``
+/// - `t(Decimal)`: 数据更新id
+/// - `create_at(i64)`: 数据生成时间
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct SpecialDepth {
     pub name: String,
     pub depth: Vec<Decimal>,
     pub ticker: SpecialTicker,
     pub t: Decimal,
+    pub create_at: i64,
 }
 
 impl SpecialDepth {
@@ -159,6 +161,7 @@ impl SpecialDepth {
             depth: vec![],
             ticker: SpecialTicker::new(),
             t: Default::default(),
+            create_at: 0,
         }
     }
 }
@@ -167,12 +170,15 @@ impl SpecialDepth {
 /// - `sell(Decimal)`: 卖一价
 /// - `buy(Decimal)`: 买一价
 /// - `mid_price(Decimal)`: 平均价
+/// - `t(Decimal)`: 数据更新id
+/// - `create_at(i64)`: 数据生成时间
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct SpecialTicker {
     pub sell: Decimal,
     pub buy: Decimal,
     pub mid_price: Decimal,
     pub t: Decimal,
+    pub create_at: i64
 }
 
 impl SpecialTicker {
@@ -182,6 +188,7 @@ impl SpecialTicker {
             buy: Default::default(),
             mid_price: Default::default(),
             t: Default::default(),
+            create_at: 0,
         }
     }
 }

+ 4 - 2
standard/src/okx_handle.rs

@@ -127,15 +127,17 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let ap = Decimal::from_str(asks[0].as_str().unwrap()).unwrap();
     let aq = Decimal::from_str(asks[1].as_str().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
-    let t = Decimal::from_str(data["ts"].as_str().unwrap()).unwrap();
+    let t = Decimal::from_str(&data["seqId"].to_string()).unwrap();
+    let create_at = data["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
         t,
+        create_at,
     }
 }
 

+ 2 - 2
standard/tests/exchange_test.rs

@@ -20,6 +20,7 @@ use standard::exchange::{Exchange, ExchangeEnum};
 // use standard::{binance_spot_handle, Order, Platform, utils};
 // use standard::{binance_handle, Order, Platform, utils};
 // use standard::{kucoin_handle, Order, Platform, utils};
+// use standard::{kucoin_spot_handle, Order, Platform, utils};
 // use standard::{gate_handle, Order, Platform, utils};
 // use standard::{bitget_spot_handle, Order, Platform, utils};
 use standard::{okx_handle, Order, Platform, utils};
@@ -177,8 +178,6 @@ pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subs
             // let write_tx_am = Arc::new(Mutex::new(write_tx));
             // let bool_v1 = Arc::new(AtomicBool::new(true));
             //
-            // let api_key = env::var("binance_access_key").unwrap_or("".to_string());
-            // let api_secret = env::var("binance_secret_key").unwrap_or("".to_string());
             // let params = BinanceSwapLogin {
             //     api_key: account_info.binance_access_key,
             //     api_secret: account_info.binance_secret_key,
@@ -526,6 +525,7 @@ pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subs
                 let mold_clone = Arc::clone(&mold_arc);
                 loop {
                     if let Some(data) = read_rx.next().await {
+                        trace!("原始数据 data:{:?}",data);
                         match mold_clone.as_str() {
                             "depth" => {
                                 if data.data != "" {

+ 1 - 1
standard/tests/kucoin_spot_handle_test.rs

@@ -5,7 +5,7 @@ use exchanges::kucoin_spot_ws::{KucoinSpotSubscribeType};
 use standard::exchange::ExchangeEnum;
 use crate::exchange_test::test_new_exchange_wss;
 
-const SYMBOL: &str = "BLZ_USDT";
+const SYMBOL: &str = "BTC_USDT";
 
 // 测试订阅深度订阅
 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]

+ 1 - 1
strategy/src/binance_spot.rs

@@ -48,7 +48,7 @@ pub async fn reference_binance_spot_run(_bool_v1 :Arc<AtomicBool>, _quant_arc: A
 #[allow(dead_code)]
 async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut i64, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_network(data.time);
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 3 - 1
strategy/src/binance_usdt_swap.rs

@@ -55,7 +55,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_network(data.time);
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {
@@ -84,6 +84,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
         trace_stack.on_before_format();
         // 将ticker数据转换为模拟深度
         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(BinanceSwap, data.clone());
+        trace_stack.on_before_network(special_depth.create_at.clone());
         trace_stack.on_after_format();
 
         on_special_depth(bot_arc_clone, update_flag_u, data.label.clone(), trace_stack, special_depth).await;
@@ -91,6 +92,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
         trace_stack.on_before_format();
         // 将depth数据转换为模拟深度
         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, data.clone());
+        trace_stack.on_before_network(special_depth.create_at.clone());
         trace_stack.on_after_format();
 
         on_special_depth(bot_arc_clone, update_flag_u, data.label.clone(), trace_stack, special_depth).await;

+ 1 - 1
strategy/src/bitget_spot.rs

@@ -84,7 +84,7 @@ pub async fn bitget_spot_run(_bool_v1 :Arc<AtomicBool>, _type_num: i8, _quant_ar
 async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
     let mut trace_stack = TraceStack::default();
 
-    trace_stack.on_network(data.time);
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 10 - 3
strategy/src/exchange_disguise.rs

@@ -7,6 +7,7 @@ use global::trace_stack::TraceStack;
 use standard::SpecialDepth;
 use crate::binance_usdt_swap::reference_binance_swap_run;
 use crate::gate_swap::gate_swap_run;
+use crate::kucoin_spot::kucoin_spot_run;
 use crate::kucoin_swap::kucoin_swap_run;
 use crate::quant::Quant;
 
@@ -44,9 +45,9 @@ pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: Str
         "kucoin_usdt_swap" => {
             kucoin_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
         },
-        // "kucoin_spot" => {
-        //     kucoin_spot_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
-        // },
+        "kucoin_spot" => {
+            kucoin_spot_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
+        },
         // "bitget_spot" => {
         //     bitget_spot_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
         // },
@@ -72,3 +73,9 @@ pub async fn on_special_depth(bot_arc: Arc<Mutex<Quant>>,
 }
 
 pub async fn on_trade() {}
+
+pub async fn on_order() {}
+
+pub async fn on_position() {}
+
+pub async fn on_account() {}

+ 16 - 15
strategy/src/gate_swap.rs

@@ -14,12 +14,15 @@ use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::GateSwap;
 use crate::model::{OrderInfo, OriginalTradeGa};
 use crate::quant::Quant;
+use crate::exchange_disguise::on_special_depth;
 
 // 1交易、0参考 gate 合约 启动
-#[allow(dead_code)]
-pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>, type_num: i8,
-                           quant_arc: Arc<Mutex<Quant>>, name: String,
-                           symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
+pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>,
+                           type_num: i8,
+                           quant_arc: Arc<Mutex<Quant>>,
+                           name: String,
+                           symbols: Vec<String>,
+                           exchange_params: BTreeMap<String, String>) {
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
 
@@ -47,7 +50,7 @@ pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>, type_num: i8,
             ws = GateSwapWs::new_label(name.clone(), false, Some(login_param),
                                        GateSwapWsType::PublicAndPrivate("usdt".to_string()));
             ws.set_subscribe(vec![
-                GateSwapSubscribeType::PuFuturesTrades,
+                // GateSwapSubscribeType::PuFuturesTrades,
                 GateSwapSubscribeType::PuFuturesOrderBook,
                 GateSwapSubscribeType::PrFuturesOrders(user_id.clone()),
                 GateSwapSubscribeType::PrFuturesPositions(user_id.clone()),
@@ -68,15 +71,17 @@ pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>, type_num: i8,
 
     spawn(async move {
         let bot_arc_clone = Arc::clone(&quant_arc);
-        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
         let run_symbol = symbols.clone()[0].clone();
         // trade
+        let mut update_flag_u = Decimal::ZERO;
         let mut max_buy = Decimal::ZERO;
         let mut min_sell = Decimal::ZERO;
+        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
 
         loop {
             if let Some(data) = read_rx.next().await {
                 on_data(bot_arc_clone.clone(),
+                        &mut update_flag_u,
                         multiplier,
                         run_symbol.clone(),
                         &mut max_buy,
@@ -87,15 +92,15 @@ pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>, type_num: i8,
     });
 }
 
-#[allow(dead_code)]
 async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
+                 update_flag_u: &mut Decimal,
                  multiplier: Decimal,
                  run_symbol: String,
                  max_buy: &mut Decimal,
                  min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_network(data.time);
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {
@@ -104,15 +109,11 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
 
     if data.channel == "futures.order_book" {
         trace_stack.on_before_format();
-        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data);
+        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data.clone());
+        trace_stack.on_before_network(depth.create_at.clone());
         trace_stack.on_after_format();
 
-        {
-            let mut quant = bot_arc_clone.lock().await;
-            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-            quant._update_depth(depth.depth.clone(), depth.name.clone(), trace_stack.clone());
-            quant.local_depths.insert(depth.name, depth.depth);
-        }
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, depth).await;
     } else if data.channel == "futures.balances" {
         let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
         {

+ 133 - 115
strategy/src/kucoin_spot.rs

@@ -1,61 +1,82 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use futures_util::StreamExt;
 use rust_decimal::Decimal;
 use tokio::sync::Mutex;
+use exchanges::kucoin_spot_ws::{KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::KucoinSpot;
+use crate::exchange_disguise::on_special_depth;
+use crate::model::OriginalTradeGa;
 use crate::quant::Quant;
 
 // 1交易、0参考 kucoin 现货 启动
-#[allow(dead_code)]
-pub async fn kucoin_spot_run(_bool_v1 :Arc<AtomicBool>, _type_num: i8, _quant_arc: Arc<Mutex<Quant>>, _name: String, _symbols: Vec<String>, _exchange_params: BTreeMap<String, String>) {
-    // let (tx, mut rx) = channel(100);
-    // let symbols_clone = symbols.clone();
-    // let mut symbol_arr = Vec::new();
-    // for symbol in symbols_clone {
-    //     let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSpot,symbol.as_str());
-    //     let new_symbol = symbol_mapper.replace("_", "-").to_uppercase();
-    //     symbol_arr.push(new_symbol);
-    // }
-    // spawn( async move {
-    //     let mut kucoin_exc;
-    //     kucoin_exc = KucoinSpotWs::new_label(name, false, exchange_params, KucoinWsType::Public, tx).await;
-    //     if type_num == 0 {
-    //         kucoin_exc.set_subscribe(vec![
-    //             KucoinSubscribeType::PuSpotMarketLevel2Depth50,
-    //             KucoinSubscribeType::PuMarketTicker,
-    //         ]);
-    //         kucoin_exc.custom_subscribe(bool_v1, symbol_arr).await;
-    //     }
-    // });
+pub async fn kucoin_spot_run(bool_v1: Arc<AtomicBool>,
+                             _type_num: i8,
+                             quant_arc: Arc<Mutex<Quant>>,
+                             name: String,
+                             symbols: Vec<String>,
+                             _exchange_params: BTreeMap<String, String>) {
+    let mut symbol_arr = Vec::new();
+    for symbol in symbols {
+        let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSpot, symbol.as_str());
+        let new_symbol = symbol_mapper.replace("_", "-").to_uppercase();
+        symbol_arr.push(new_symbol);
+    }
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+
+    let mut ws = KucoinSpotWs::new_label(name.clone(), false, None, KucoinSpotWsType::Public).await;
+    ws.set_symbols(symbol_arr);
+    ws.set_subscribe(vec![
+        KucoinSpotSubscribeType::PuSpotMarketLevel2Depth50,
+        // KucoinSpotSubscribeType::PuMarketTicker,         // python说:订阅 ticker来的很慢
+        KucoinSpotSubscribeType::PuMarketMatch,
+    ]);
 
-    // spawn(async move {
-    //     let bot_arc_clone = Arc::clone(&quant_arc);
-    //     // let run_symbol = symbols.clone()[0].clone();
-    //     // trade
-    //     let mut max_buy = Decimal::ZERO;
-    //     let mut min_sell = Decimal::ZERO;
-    //     // let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
-    //     let multiplier = Decimal::ONE;
-    //     loop {
-    //         sleep(Duration::from_millis(1)).await;
-    //
-    //         match rx.try_recv() {
-    //             Ok(data) => {
-    //                 on_data(bot_arc_clone.clone(), multiplier, &mut max_buy, &mut min_sell, data).await;
-    //             },
-    //             Err(_e) => { }
-    //         }
-    //     }
-    // });
+    // 开启ws
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    tokio::spawn(async move {
+        //链接
+        let bool_v3_clone = Arc::clone(&bool_v1);
+        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    });
+    //读取
+    // let bool_v1_clone = Arc::clone(&bool_v1);
+    tokio::spawn(async move {
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        // trade
+        let mut update_flag_u = Decimal::ZERO;
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        let multiplier = Decimal::ONE;
+        // let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
+        // let run_symbol = symbols.clone()[0].clone();
+
+        loop {
+            if let Some(data) = read_rx.next().await {
+                on_kucoin_spot_data(bot_arc_clone.clone(),
+                                    &mut update_flag_u,
+                                    multiplier,
+                                    &mut max_buy,
+                                    &mut min_sell,
+                                    data).await;
+            }
+        }
+    });
 }
 
-#[allow(dead_code)]
-async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, _multiplier: Decimal, _max_buy: &mut Decimal, _min_sell: &mut Decimal, data: ResponseData) {
+async fn on_kucoin_spot_data(bot_arc_clone: Arc<Mutex<Quant>>,
+                             update_flag_u: &mut Decimal,
+                             _multiplier: Decimal,
+                             max_buy: &mut Decimal,
+                             min_sell: &mut Decimal,
+                             data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_network(data.time);
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {
@@ -64,80 +85,77 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, _multiplier: Decimal, _max_bu
 
     if data.channel == "level2" {
         trace_stack.on_before_format();
-        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSpot,data);
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSpot, data.clone());
+        trace_stack.on_before_network(special_depth.create_at.clone());
         trace_stack.on_after_format();
-        {
-            let mut quant = bot_arc_clone.lock().await;
-            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-            quant._update_depth(depth.depth.clone(), depth.name.clone(), trace_stack.clone());
-            quant.local_depths.insert(depth.name, depth.depth);
-        }
+
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
     } else if data.channel == "trade.ticker" {
-        let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSpot, data);
-        {
-            let mut quant = bot_arc_clone.lock().await;
-            quant._update_ticker(ticker.ticker, ticker.name);
+        trace_stack.on_before_format();
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSpot, data.clone());
+        trace_stack.on_before_network(special_depth.create_at.clone());
+        trace_stack.on_after_format();
+
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+    }  else if data.channel == "trade.l3match" {
+        let mut quant = bot_arc_clone.lock().await;
+        let str = data.label.clone();
+        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
+            *max_buy = Decimal::ZERO;
+            *min_sell = Decimal::ZERO;
+            quant.is_update.remove(str.as_str());
+        }
+        let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
+        if trade.price > *max_buy || *max_buy == Decimal::ZERO {
+            *max_buy = trade.price
+        }
+        if trade.price < *min_sell || *min_sell == Decimal::ZERO {
+            *min_sell = trade.price
         }
+        quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+    } else if data.channel == "availableBalance.change" {
+        // 取消原有推送解析,因为推送的信息不准确
+        // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
+        // {
+        //     let mut quant = bot_arc_clone.lock().await;
+        //     quant.update_equity(account);
+        // }
+    } else if data.channel == "symbolOrderChange" {
+        // trace_stack.on_before_format();
+        // let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
+        // trace_stack.on_after_format();
+        // let mut order_infos:Vec<OrderInfo> = Vec::new();
+        // for order in orders.order {
+        //     if order.status == "NULL" {
+        //         continue;
+        //     }
+        //     let order_info = OrderInfo {
+        //         symbol: "".to_string(),
+        //         amount: order.amount.abs(),
+        //         side: "".to_string(),
+        //         price: order.price,
+        //         client_id: order.custom_id,
+        //         filled_price: order.avg_price,
+        //         filled: order.deal_amount.abs(),
+        //         order_id: order.id,
+        //         local_time: 0,
+        //         create_time: 0,
+        //         status: order.status,
+        //         fee: Default::default(),
+        //         trace_stack: Default::default(),
+        //     };
+        //     order_infos.push(order_info);
+        // }
+        //
+        // {
+        //     let mut quant = bot_arc_clone.lock().await;
+        //     quant.update_order(order_infos, trace_stack);
+        // }
+    } else if data.channel == "position.change" {
+        // let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
+        // {
+        //     let mut quant = bot_arc_clone.lock().await;
+        //     quant.update_position(positions);
+        // }
     }
-    // else if data.channel == "availableBalance.change" {
-    //     // 取消原有推送解析,因为推送的信息不准确
-    //     // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
-    //     // {
-    //     //     let mut quant = bot_arc_clone.lock().await;
-    //     //     quant.update_equity(account);
-    //     // }
-    // } else if data.channel == "symbolOrderChange" {
-    //     // trace_stack.on_before_format();
-    //     // let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
-    //     // trace_stack.on_after_format();
-    //     // let mut order_infos:Vec<OrderInfo> = Vec::new();
-    //     // for order in orders.order {
-    //     //     if order.status == "NULL" {
-    //     //         continue;
-    //     //     }
-    //     //     let order_info = OrderInfo {
-    //     //         symbol: "".to_string(),
-    //     //         amount: order.amount.abs(),
-    //     //         side: "".to_string(),
-    //     //         price: order.price,
-    //     //         client_id: order.custom_id,
-    //     //         filled_price: order.avg_price,
-    //     //         filled: order.deal_amount.abs(),
-    //     //         order_id: order.id,
-    //     //         local_time: 0,
-    //     //         create_time: 0,
-    //     //         status: order.status,
-    //     //         fee: Default::default(),
-    //     //         trace_stack: Default::default(),
-    //     //     };
-    //     //     order_infos.push(order_info);
-    //     // }
-    //     //
-    //     // {
-    //     //     let mut quant = bot_arc_clone.lock().await;
-    //     //     quant.update_order(order_infos, trace_stack);
-    //     // }
-    // } else if data.channel == "position.change" {
-    //     // let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
-    //     // {
-    //     //     let mut quant = bot_arc_clone.lock().await;
-    //     //     quant.update_position(positions);
-    //     // }
-    // } else if data.channel == "match" {
-    //     // let mut quant = bot_arc_clone.lock().await;
-    //     // let str = data.label.clone();
-    //     // if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
-    //     //     *max_buy = Decimal::ZERO;
-    //     //     *min_sell = Decimal::ZERO;
-    //     //     quant.is_update.remove(str.as_str());
-    //     // }
-    //     // let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
-    //     // if trade.price > *max_buy || *max_buy == Decimal::ZERO {
-    //     //     *max_buy = trade.price
-    //     // }
-    //     // if trade.price < *min_sell || *min_sell == Decimal::ZERO {
-    //     //     *min_sell = trade.price
-    //     // }
-    //     // quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-    // }
 }

+ 7 - 5
strategy/src/kucoin_swap.rs

@@ -111,7 +111,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_network(data.time);
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {
@@ -119,14 +119,16 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
     }
     if data.channel == "level2" {
         trace_stack.on_before_format();
-        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data.clone());
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data.clone());
+        trace_stack.on_before_network(special_depth.create_at.clone());
         trace_stack.on_after_format();
 
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, depth).await
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await
     } else if data.channel == "tickerV2" {
-        let depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data.clone());
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data.clone());
+        trace_stack.on_before_network(special_depth.create_at.clone());
 
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, depth).await
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await
     } else if data.channel == "availableBalance.change" {
         // 取消原有推送解析,因为推送的信息不准确
         // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());