瀏覽代碼

交易所启动抽离

JiahengHe 2 年之前
父節點
當前提交
830520ddef

+ 116 - 0
strategy/src/binance_spot.rs

@@ -0,0 +1,116 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use rust_decimal::Decimal;
+use serde_json::Value;
+use tracing::{error};
+use tokio::spawn;
+use tokio::sync::mpsc::channel;
+use tokio::sync::Mutex;
+use tokio::time::sleep;
+use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::BinanceSpot;
+use standard::SpecialTicker;
+use crate::model::{OriginalTicker, OriginalTradeBa};
+use crate::quant::Quant;
+
+// 参考 币安 现货 启动
+pub async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+    let (tx, mut rx) = channel(100);
+    spawn(async move {
+        let mut ba_exc = BinanceSpotWs::new_label(name, false, exchange_params, BinanceSpotWsType::PublicAndPrivate, tx);
+        ba_exc.set_subscribe(vec![
+            BinanceSpotSubscribeType::PuAggTrade,
+            BinanceSpotSubscribeType::PuBookTicker,
+            BinanceSpotSubscribeType::PuDepth20levels100ms
+        ]);
+        ba_exc.custom_subscribe(bool_v1, symbols.clone()).await;
+    });
+
+    spawn(async move {
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        // ticker
+        let mut update_flag_u = 0i64;
+        loop {
+            sleep(Duration::from_millis(1)).await;
+            let mut trace_stack = TraceStack::default();
+
+            match rx.recv().await {
+                Some(data) => {
+                    trace_stack.on_network(data.time);
+                    trace_stack.on_before_quant();
+
+                    if data.code != "200".to_string() {
+                        continue;
+                    }
+                    if data.channel == "aggTrade" {
+                        let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
+                        let str = data.label.clone();
+                        let mut quant = bot_arc_clone.lock().await;
+                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
+                            max_buy = Decimal::ZERO;
+                            min_sell = Decimal::ZERO;
+                            quant.is_update.remove(str.as_str());
+                        }
+                        if trade.p > max_buy || max_buy == Decimal::ZERO{
+                            max_buy = trade.p
+                        }
+                        if trade.p < min_sell || min_sell == Decimal::ZERO{
+                            min_sell = trade.p
+                        }
+                        {
+                            quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
+                        }
+                    } else if data.channel == "bookTicker" {
+                        trace_stack.on_before_format();
+                        let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
+                        if ticker.u > update_flag_u {
+                            {
+                                let mut quant = bot_arc_clone.lock().await;
+                                // time_delay.quant_start = Utc::now().timestamp_micros();
+                                quant._update_ticker(SpecialTicker{
+                                    sell: ticker.a.clone(),
+                                    buy: ticker.b.clone(),
+                                    mid_price: Default::default(),
+                                }, data.label.clone());
+                                let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
+                                trace_stack.on_after_format();
+                                quant._update_depth(depth.clone(), data.label.clone(), &mut trace_stack);
+                                quant.local_depths.insert(data.label.clone(), depth);
+                            }
+                        } else {
+                            update_flag_u = ticker.u;
+                        }
+                    } else if data.channel == "depth" {
+                        trace_stack.on_before_format();
+                        let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
+                        let u = v["lastUpdateId"].as_i64().unwrap();
+                        if u > update_flag_u {
+                            let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSpot, data);
+                            trace_stack.on_after_format();
+                            {
+                                let mut quant = bot_arc_clone.lock().await;
+                                quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+                                quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+                                quant.local_depths.insert(depth.name, depth.depth);
+                            }
+                        } else {
+                            update_flag_u = u;
+                        }
+
+                    }
+                },
+                None => {
+                    error!("币安现货参考交易所通道错误");
+                    break;
+                }
+            }
+
+        }
+    });
+}

+ 111 - 0
strategy/src/binance_swap.rs

@@ -0,0 +1,111 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use rust_decimal::Decimal;
+use serde_json::Value;
+use tracing::{error};
+use tokio::spawn;
+use tokio::sync::mpsc::channel;
+use tokio::sync::Mutex;
+use tokio::time::sleep;
+use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::BinanceSwap;
+use standard::SpecialTicker;
+use crate::model::{OriginalTicker, OriginalTradeBa};
+use crate::quant::Quant;
+
+// 参考 币安 合约 启动
+pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+    let (tx, mut rx) = channel(100);
+    spawn( async move {
+        let mut ba_exc = BinanceSwapWs::new_label(name, false, exchange_params, BinanceWsType::PublicAndPrivate, tx);
+        ba_exc.set_subscribe(vec![
+            BinanceSubscribeType::PuBookTicker,
+            BinanceSubscribeType::PuAggTrade
+        ]);
+        ba_exc.custom_subscribe(bool_v1, symbols.clone()).await;
+    });
+    spawn(async move {
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        // ticker
+        let mut update_flag_u = 0i64;
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        loop {
+            sleep(Duration::from_millis(1)).await;
+            let mut trace_stack = TraceStack::default();
+
+            match rx.recv().await {
+                Some(data) => {
+                    trace_stack.on_network(data.time);
+                    trace_stack.on_before_quant();
+
+                    if data.code != "200".to_string() {
+                        continue;
+                    }
+                    if data.channel == "aggTrade" {
+                        let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
+                        let str = data.label.clone();
+                        let mut quant = bot_arc_clone.lock().await;
+                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
+                            max_buy = Decimal::ZERO;
+                            min_sell = Decimal::ZERO;
+                            quant.is_update.remove(str.as_str());
+                        }
+                        if trade.p > max_buy || max_buy == Decimal::ZERO{
+                            max_buy = trade.p
+                        }
+                        if trade.p < min_sell || min_sell == Decimal::ZERO{
+                            min_sell = trade.p
+                        }
+                        {
+                            quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
+                        }
+                    } else if data.channel == "bookTicker" {
+                        trace_stack.on_before_format();
+                        let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
+                        if ticker.u > update_flag_u {
+                            {
+                                let mut quant = bot_arc_clone.lock().await;
+                                quant._update_ticker(SpecialTicker{
+                                    sell: ticker.a.clone(),
+                                    buy: ticker.b.clone(),
+                                    mid_price: Default::default(),
+                                }, data.label.clone());
+                                let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
+                                trace_stack.on_after_format();
+                                quant._update_depth(depth.clone(), data.label.clone(), &mut trace_stack);
+                                quant.local_depths.insert(data.label.clone(), depth);
+                            }
+                        } else {
+                            update_flag_u = ticker.u;
+                        }
+                    } else if data.channel == "depth" {
+                        trace_stack.on_before_format();
+                        let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
+                        let u = v["u"].as_i64().unwrap();
+                        if u > update_flag_u {
+                            let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, data);
+                            trace_stack.on_after_format();
+                            {
+                                let mut quant = bot_arc_clone.lock().await;
+                                quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+                                quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+                                quant.local_depths.insert(depth.name, depth.depth);
+                            }
+                        } else {
+                            update_flag_u = u;
+                        }
+                    }
+                },
+                None => {
+                    error!("参考交易所通道错误");
+                    break;
+                }
+            }
+        }
+    });
+}

+ 136 - 0
strategy/src/bitget_spot.rs

@@ -0,0 +1,136 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use rust_decimal::Decimal;
+use tracing::{error};
+use tokio::spawn;
+use tokio::sync::mpsc::channel;
+use tokio::sync::Mutex;
+use tokio::time::sleep;
+use exchanges::bitget_spot_ws::{BitgetSpotWs, BitgetSubscribeType, BitgetWsType};
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::BitgetSpot;
+use crate::model::{OrderInfo, OriginalTradeGa};
+use crate::quant::Quant;
+
+pub async fn bitget_spot_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
+    let (tx, mut rx) = channel(100);
+    let symbols_1 = symbols.clone();
+    spawn( async move {
+        let mut bit_exc_public = BitgetSpotWs::new_label(name.clone(), false, exchange_params.clone(), BitgetWsType::Public, tx.clone());
+        let mut bit_exc_private = BitgetSpotWs::new_label(name, false, exchange_params, BitgetWsType::Private, tx);
+        // 交易
+        if type_num == 1 {
+            bit_exc_private.set_subscribe(vec![
+                BitgetSubscribeType::PrAccount,
+                BitgetSubscribeType::PrOrders
+            ]);
+            bit_exc_public.set_subscribe(vec![
+                BitgetSubscribeType::PuTrade,
+                BitgetSubscribeType::PuBooks5
+            ]);
+        } else { // 参考
+            bit_exc_public.set_subscribe(vec![
+                BitgetSubscribeType::PuTrade,
+                BitgetSubscribeType::PuBooks5
+            ]);
+        }
+        bit_exc_public.custom_subscribe(bool_v1.clone(), symbols_1.clone()).await;
+        bit_exc_private.custom_subscribe(bool_v1, symbols_1).await;
+    });
+    spawn(async move {
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        let run_symbol = symbols.clone()[0].clone();
+
+        loop {
+            sleep(Duration::from_millis(1)).await;
+            let mut trace_stack = TraceStack::default();
+
+            match rx.recv().await {
+                Some(data) => {
+                    trace_stack.on_network(data.time);
+                    trace_stack.on_before_quant();
+
+                    if data.code != "200".to_string() {
+                        continue;
+                    }
+                    if data.channel == "books5" {
+                        trace_stack.on_before_format();
+                        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BitgetSpot, data);
+                        trace_stack.on_after_format();
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+                            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+                            quant.local_depths.insert(depth.name, depth.depth);
+                        }
+                    } else if data.channel == "trade" {
+                        let mut quant = bot_arc_clone.lock().await;
+                        let str = data.label.clone();
+                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
+                            max_buy = Decimal::ZERO;
+                            min_sell = Decimal::ZERO;
+                            quant.is_update.remove(str.as_str());
+                        }
+                        let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
+                        for trade in trades {
+                            if trade.price > max_buy || max_buy == Decimal::ZERO{
+                                max_buy = trade.price
+                            }
+                            if trade.price < min_sell || min_sell == Decimal::ZERO{
+                                min_sell = trade.price
+                            }
+                        }
+                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
+                    } else if data.channel == "orders" {
+                        trace_stack.on_before_format();
+                        let orders = standard::handle_info::HandleSwapInfo::handle_order(BitgetSpot, data.clone(), multiplier);
+                        trace_stack.on_after_format();
+                        let mut order_infos:Vec<OrderInfo> = Vec::new();
+                        for order in orders.order {
+                            if order.status == "NULL" {
+                                continue;
+                            }
+                            let order_info = OrderInfo {
+                                symbol: "".to_string(),
+                                amount: order.amount.abs(),
+                                side: "".to_string(),
+                                price: order.price,
+                                client_id: order.custom_id,
+                                filled_price: order.avg_price,
+                                filled: order.deal_amount.abs(),
+                                order_id: order.id,
+                                local_time: 0,
+                                create_time: 0,
+                                status: order.status,
+                                fee: Default::default(),
+                                trace_stack: Default::default(),
+                            };
+                            order_infos.push(order_info);
+                        }
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_order(order_infos, trace_stack);
+                        }
+                    } else if data.channel == "account" { // _update_account
+                        // TODO: 此处应有两个余额,交易币和本位币余额(由于quant中现货的更新账户没有做任何操作,所以此处可暂不处理)
+                        let account = standard::handle_info::HandleSwapInfo::handle_account_info(BitgetSpot, data, run_symbol.clone());
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_equity(account);
+                        }
+                    }
+                },
+                None => {
+                    error!("参考交易所通道错误");
+                    break;
+                }
+            }
+        }
+    });
+}

+ 6 - 608
strategy/src/exchange_disguise.rs

@@ -1,24 +1,13 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use rust_decimal::Decimal;
-use serde_json::Value;
-use tokio::spawn;
-use tokio::sync::mpsc::{channel};
 use tokio::sync::Mutex;
-use tokio::time::sleep;
-use tracing::{error, info};
-use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
-use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
-use exchanges::bitget_spot_ws::{BitgetSpotWs, BitgetSubscribeType, BitgetWsType};
-use exchanges::gate_swap_rest::GateSwapRest;
-use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
-use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
-use global::trace_stack::TraceStack;
-use standard::exchange::ExchangeEnum::{BinanceSpot, BinanceSwap, BitgetSpot, GateSwap, KucoinSwap};
-use standard::SpecialTicker;
-use crate::model::{OrderInfo, OriginalTicker, OriginalTradeBa, OriginalTradeGa};
+use tracing::{error};
+use crate::binance_spot::reference_binance_spot_run;
+use crate::binance_swap::reference_binance_swap_run;
+use crate::bitget_spot::bitget_spot_run;
+use crate::gate_swap::gate_swap_run;
+use crate::kucoin_swap::kucoin_swap_run;
 use crate::quant::Quant;
 
 // 交易交易所启动
@@ -63,594 +52,3 @@ pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: Str
     }
 }
 
-// 1交易、0参考 gate 合约 启动
-async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
-    let (tx, mut rx) = channel(100);
-    let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
-    let mut user_id= "".to_string();
-    let symbols_one = symbols.clone();
-
-    // 交易
-    if type_num == 1{
-        // 获取user_id
-        let res_data = gate_exc.wallet_fee().await;
-        assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
-
-        let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
-        info!(?wallet_obj);
-        user_id = wallet_obj["user_id"].to_string();
-    }
-
-    spawn( async move {
-        let mut gate_exc = GateSwapWs::new_label(name, false, exchange_params,
-                                                 GateWsType::PublicAndPrivate("usdt".to_string()), tx);
-        // 交易
-        if type_num == 1 {
-            gate_exc.set_subscribe(vec![
-                GateSubscribeType::PuFuturesTrades,
-                GateSubscribeType::PuFuturesOrderBook,
-                GateSubscribeType::PrFuturesOrders(user_id.clone()),
-                GateSubscribeType::PrFuturesPositions(user_id.clone()),
-                GateSubscribeType::PrFuturesBalances(user_id.clone()),
-            ]);
-        } else { // 参考
-            gate_exc.set_subscribe(vec![
-                GateSubscribeType::PuFuturesTrades,
-                GateSubscribeType::PuFuturesOrderBook
-            ]);
-        }
-        gate_exc.custom_subscribe(bool_v1,symbols_one).await;
-    });
-    spawn(async move {
-        let bot_arc_clone = Arc::clone(&quant_arc);
-        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
-        let run_symbol = symbols.clone()[0].clone();
-        // trade
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        loop {
-            sleep(Duration::from_millis(1)).await;
-            let mut trace_stack = TraceStack::default();
-
-            match rx.recv().await {
-                Some(data) => {
-                    trace_stack.on_network(data.time);
-                    trace_stack.on_before_quant();
-
-                    if data.code != "200".to_string() {
-                        continue;
-                    }
-                    if data.channel == "futures.order_book" {
-                        trace_stack.on_before_format();
-                        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data);
-                        trace_stack.on_after_format();
-
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
-                            quant.local_depths.insert(depth.name, depth.depth);
-                        }
-                    } else if data.channel == "futures.balances" {
-                        let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_equity(account);
-                        }
-                    } else if data.channel == "futures.orders" {
-                        trace_stack.on_before_format();
-                        let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
-                        trace_stack.on_after_format();
-
-                        let mut order_infos:Vec<OrderInfo> = Vec::new();
-                        for order in orders.order {
-                            let order_info = OrderInfo {
-                                symbol: "".to_string(),
-                                amount: order.amount.abs(),
-                                side: "".to_string(),
-                                price: order.price,
-                                client_id: order.custom_id,
-                                filled_price: order.avg_price,
-                                filled: order.deal_amount.abs(),
-                                order_id: order.id,
-                                local_time: 0,
-                                create_time: 0,
-                                status: order.status,
-                                fee: Default::default(),
-                                trace_stack: Default::default(),
-                            };
-                            order_infos.push(order_info);
-                        }
-
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_order(order_infos, trace_stack);
-                        }
-                    } else if data.channel == "futures.positions" {
-                        let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_position(positions);
-                        }
-                    } else if data.channel == "futures.trades" {
-                        let mut quant = bot_arc_clone.lock().await;
-                        let str = data.label.clone();
-                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
-                            max_buy = Decimal::ZERO;
-                            min_sell = Decimal::ZERO;
-                            quant.is_update.remove(str.as_str());
-                        }
-                        let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
-                        for trade in trades {
-                            if trade.price > max_buy || max_buy == Decimal::ZERO{
-                                max_buy = trade.price
-                            }
-                            if trade.price < min_sell || min_sell == Decimal::ZERO{
-                                min_sell = trade.price
-                            }
-                        }
-                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
-                    }
-                },
-                None => {
-                    error!("交易所通道错误");
-                    break;
-                }
-            }
-        }
-    });
-}
-
-// 1交易、0参考 kucoin 合约 启动
-async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
-    let (tx, mut rx) = channel(100);
-    let symbols_clone = symbols.clone();
-    let mut symbol_arr = Vec::new();
-    for symbol in symbols_clone{
-        let new_symbol = symbol.replace("_", "").to_uppercase() + "M";
-        symbol_arr.push(new_symbol);
-    }
-    spawn( async move {
-        let mut kucoin_exc;
-        // 交易
-        if type_num == 1 {
-            kucoin_exc = KucoinSwapWs::new_label(name, false, exchange_params, KucoinWsType::Private, tx).await;
-            kucoin_exc.set_subscribe(vec![
-                KucoinSubscribeType::PuContractMarketLevel2Depth50,
-                // KucoinSubscribeType::PuContractMarkettickerV2,
-                KucoinSubscribeType::PrContractAccountWallet,
-                KucoinSubscribeType::PrContractPosition,
-                KucoinSubscribeType::PrContractMarketTradeOrders
-            ]);
-        } else { // 参考
-            kucoin_exc = KucoinSwapWs::new_label(name, false, exchange_params, KucoinWsType::Public, tx).await;
-            kucoin_exc.set_subscribe(vec![
-                KucoinSubscribeType::PuContractMarketLevel2Depth50,
-                // TODO: python注释掉了
-                // KucoinSubscribeType::PuContractMarkettickerV2,
-                KucoinSubscribeType::PuContractMarketExecution
-            ]);
-        }
-        kucoin_exc.custom_subscribe(bool_v1, symbol_arr).await;
-    });
-    spawn(async move {
-        let bot_arc_clone = Arc::clone(&quant_arc);
-        let run_symbol = symbols.clone()[0].clone();
-        // trade
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
-        loop {
-            sleep(Duration::from_millis(1)).await;
-            let mut trace_stack = TraceStack::default();
-
-            match rx.recv().await {
-                Some(data) => {
-                    trace_stack.on_network(data.time);
-                    trace_stack.on_before_quant();
-
-                    if data.code != "200".to_string() {
-                        continue;
-                    }
-                    if data.channel == "level2" {
-                        trace_stack.on_before_format();
-                        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data);
-                        trace_stack.on_after_format();
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            // time_delay.quant_start = Utc::now().timestamp_micros();
-                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
-                            quant.local_depths.insert(depth.name, depth.depth);
-                        }
-                    } else if data.channel == "tickerV2" {
-                        let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data);
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant._update_ticker(ticker.ticker, ticker.name);
-                        }
-                    } else if data.channel == "availableBalance.change" {
-
-                        let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_equity(account);
-                        }
-                    } else if data.channel == "symbolOrderChange" {
-                        trace_stack.on_before_format();
-                        let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
-                        trace_stack.on_after_format();
-                        let mut order_infos:Vec<OrderInfo> = Vec::new();
-                        for order in orders.order {
-                            if order.status == "NULL" {
-                                continue;
-                            }
-                            let order_info = OrderInfo {
-                                symbol: "".to_string(),
-                                amount: order.amount.abs(),
-                                side: "".to_string(),
-                                price: order.price,
-                                client_id: order.custom_id,
-                                filled_price: order.avg_price,
-                                filled: order.deal_amount.abs(),
-                                order_id: order.id,
-                                local_time: 0,
-                                create_time: 0,
-                                status: order.status,
-                                fee: Default::default(),
-                                trace_stack: Default::default(),
-                            };
-                            order_infos.push(order_info);
-                        }
-
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_order(order_infos, trace_stack);
-                        }
-                    } else if data.channel == "position.change" {
-                        let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_position(positions);
-                        }
-                    } else if data.channel == "match" {
-                        let mut quant = bot_arc_clone.lock().await;
-                        let str = data.label.clone();
-                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
-                            max_buy = Decimal::ZERO;
-                            min_sell = Decimal::ZERO;
-                            quant.is_update.remove(str.as_str());
-                        }
-                        let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
-                        if trade.price > max_buy || max_buy == Decimal::ZERO {
-                            max_buy = trade.price
-                        }
-                        if trade.price < min_sell || min_sell == Decimal::ZERO {
-                            min_sell = trade.price
-                        }
-                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
-                    }
-                },
-                None => {
-                    error!("交易所通道错误");
-                    break;
-                }
-            }
-        }
-    });
-}
-
-// 参考 币安 现货 启动
-async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
-    let (tx, mut rx) = channel(100);
-    spawn(async move {
-        let mut ba_exc = BinanceSpotWs::new_label(name, false, exchange_params, BinanceSpotWsType::PublicAndPrivate, tx);
-        ba_exc.set_subscribe(vec![
-            BinanceSpotSubscribeType::PuAggTrade,
-            BinanceSpotSubscribeType::PuBookTicker,
-            BinanceSpotSubscribeType::PuDepth20levels100ms
-        ]);
-        ba_exc.custom_subscribe(bool_v1, symbols.clone()).await;
-    });
-
-    spawn(async move {
-        let bot_arc_clone = Arc::clone(&quant_arc);
-        // trade
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        // ticker
-        let mut update_flag_u = 0i64;
-        loop {
-            sleep(Duration::from_millis(1)).await;
-            let mut trace_stack = TraceStack::default();
-
-            match rx.recv().await {
-                Some(data) => {
-                    trace_stack.on_network(data.time);
-                    trace_stack.on_before_quant();
-
-                    if data.code != "200".to_string() {
-                        continue;
-                    }
-                    if data.channel == "aggTrade" {
-                        let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
-                        let str = data.label.clone();
-                        let mut quant = bot_arc_clone.lock().await;
-                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
-                            max_buy = Decimal::ZERO;
-                            min_sell = Decimal::ZERO;
-                            quant.is_update.remove(str.as_str());
-                        }
-                        if trade.p > max_buy || max_buy == Decimal::ZERO{
-                            max_buy = trade.p
-                        }
-                        if trade.p < min_sell || min_sell == Decimal::ZERO{
-                            min_sell = trade.p
-                        }
-                        {
-                            quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
-                        }
-                    } else if data.channel == "bookTicker" {
-                        trace_stack.on_before_format();
-                        let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
-                        if ticker.u > update_flag_u {
-                            {
-                                let mut quant = bot_arc_clone.lock().await;
-                                // time_delay.quant_start = Utc::now().timestamp_micros();
-                                quant._update_ticker(SpecialTicker{
-                                    sell: ticker.a.clone(),
-                                    buy: ticker.b.clone(),
-                                    mid_price: Default::default(),
-                                }, data.label.clone());
-                                let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
-                                trace_stack.on_after_format();
-                                quant._update_depth(depth.clone(), data.label.clone(), &mut trace_stack);
-                                quant.local_depths.insert(data.label.clone(), depth);
-                            }
-                        } else {
-                            update_flag_u = ticker.u;
-                        }
-                    } else if data.channel == "depth" {
-                        trace_stack.on_before_format();
-                        let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
-                        let u = v["lastUpdateId"].as_i64().unwrap();
-                        if u > update_flag_u {
-                            let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSpot, data);
-                            trace_stack.on_after_format();
-                            {
-                                let mut quant = bot_arc_clone.lock().await;
-                                quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                                quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
-                                quant.local_depths.insert(depth.name, depth.depth);
-                            }
-                        } else {
-                            update_flag_u = u;
-                        }
-
-                    }
-                },
-                None => {
-                    error!("币安现货参考交易所通道错误");
-                    break;
-                }
-            }
-
-        }
-    });
-}
-
-
-// 参考 币安 合约 启动
-async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
-    let (tx, mut rx) = channel(100);
-    spawn( async move {
-        let mut ba_exc = BinanceSwapWs::new_label(name, false, exchange_params, BinanceWsType::PublicAndPrivate, tx);
-        ba_exc.set_subscribe(vec![
-            BinanceSubscribeType::PuBookTicker,
-            BinanceSubscribeType::PuAggTrade
-        ]);
-        ba_exc.custom_subscribe(bool_v1, symbols.clone()).await;
-    });
-    spawn(async move {
-        // trade
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        // ticker
-        let mut update_flag_u = 0i64;
-        let bot_arc_clone = Arc::clone(&quant_arc);
-        loop {
-            sleep(Duration::from_millis(1)).await;
-            let mut trace_stack = TraceStack::default();
-
-            match rx.recv().await {
-                Some(data) => {
-                    trace_stack.on_network(data.time);
-                    trace_stack.on_before_quant();
-
-                    if data.code != "200".to_string() {
-                        continue;
-                    }
-                    if data.channel == "aggTrade" {
-                        let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
-                        let str = data.label.clone();
-                        let mut quant = bot_arc_clone.lock().await;
-                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
-                            max_buy = Decimal::ZERO;
-                            min_sell = Decimal::ZERO;
-                            quant.is_update.remove(str.as_str());
-                        }
-                        if trade.p > max_buy || max_buy == Decimal::ZERO{
-                            max_buy = trade.p
-                        }
-                        if trade.p < min_sell || min_sell == Decimal::ZERO{
-                            min_sell = trade.p
-                        }
-                        {
-                            quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
-                        }
-                    } else if data.channel == "bookTicker" {
-                        trace_stack.on_before_format();
-                        let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
-                        if ticker.u > update_flag_u {
-                            {
-                                let mut quant = bot_arc_clone.lock().await;
-                                quant._update_ticker(SpecialTicker{
-                                    sell: ticker.a.clone(),
-                                    buy: ticker.b.clone(),
-                                    mid_price: Default::default(),
-                                }, data.label.clone());
-                                let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
-                                trace_stack.on_after_format();
-                                quant._update_depth(depth.clone(), data.label.clone(), &mut trace_stack);
-                                quant.local_depths.insert(data.label.clone(), depth);
-                            }
-                        } else {
-                            update_flag_u = ticker.u;
-                        }
-                    } else if data.channel == "depth" {
-                        trace_stack.on_before_format();
-                        let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
-                        let u = v["u"].as_i64().unwrap();
-                        if u > update_flag_u {
-                            let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, data);
-                            trace_stack.on_after_format();
-                            {
-                                let mut quant = bot_arc_clone.lock().await;
-                                quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                                quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
-                                quant.local_depths.insert(depth.name, depth.depth);
-                            }
-                        } else {
-                            update_flag_u = u;
-                        }
-                    }
-                },
-                None => {
-                    error!("参考交易所通道错误");
-                    break;
-                }
-            }
-        }
-    });
-}
-
-async fn bitget_spot_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
-    let (tx, mut rx) = channel(100);
-    let symbols_1 = symbols.clone();
-    spawn( async move {
-        let mut bit_exc_public = BitgetSpotWs::new_label(name.clone(), false, exchange_params.clone(), BitgetWsType::Public, tx.clone());
-        let mut bit_exc_private = BitgetSpotWs::new_label(name, false, exchange_params, BitgetWsType::Private, tx);
-        // 交易
-        if type_num == 1 {
-            bit_exc_private.set_subscribe(vec![
-                BitgetSubscribeType::PrAccount,
-                BitgetSubscribeType::PrOrders
-            ]);
-            bit_exc_public.set_subscribe(vec![
-                BitgetSubscribeType::PuTrade,
-                BitgetSubscribeType::PuBooks5
-            ]);
-        } else { // 参考
-            bit_exc_public.set_subscribe(vec![
-                BitgetSubscribeType::PuTrade,
-                BitgetSubscribeType::PuBooks5
-            ]);
-        }
-        bit_exc_public.custom_subscribe(bool_v1.clone(), symbols_1.clone()).await;
-        bit_exc_private.custom_subscribe(bool_v1, symbols_1).await;
-    });
-    spawn(async move {
-        let bot_arc_clone = Arc::clone(&quant_arc);
-        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
-        // trade
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        let run_symbol = symbols.clone()[0].clone();
-
-        loop {
-            sleep(Duration::from_millis(1)).await;
-            let mut trace_stack = TraceStack::default();
-
-            match rx.recv().await {
-                Some(data) => {
-                    trace_stack.on_network(data.time);
-                    trace_stack.on_before_quant();
-
-                    if data.code != "200".to_string() {
-                        continue;
-                    }
-                    if data.channel == "books5" {
-                        trace_stack.on_before_format();
-                        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BitgetSpot, data);
-                        trace_stack.on_after_format();
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
-                            quant.local_depths.insert(depth.name, depth.depth);
-                        }
-                    } else if data.channel == "trade" {
-                        let mut quant = bot_arc_clone.lock().await;
-                        let str = data.label.clone();
-                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
-                            max_buy = Decimal::ZERO;
-                            min_sell = Decimal::ZERO;
-                            quant.is_update.remove(str.as_str());
-                        }
-                        let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
-                        for trade in trades {
-                            if trade.price > max_buy || max_buy == Decimal::ZERO{
-                                max_buy = trade.price
-                            }
-                            if trade.price < min_sell || min_sell == Decimal::ZERO{
-                                min_sell = trade.price
-                            }
-                        }
-                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
-                    } else if data.channel == "orders" {
-                        trace_stack.on_before_format();
-                        let orders = standard::handle_info::HandleSwapInfo::handle_order(BitgetSpot, data.clone(), multiplier);
-                        trace_stack.on_after_format();
-                        let mut order_infos:Vec<OrderInfo> = Vec::new();
-                        for order in orders.order {
-                            if order.status == "NULL" {
-                                continue;
-                            }
-                            let order_info = OrderInfo {
-                                symbol: "".to_string(),
-                                amount: order.amount.abs(),
-                                side: "".to_string(),
-                                price: order.price,
-                                client_id: order.custom_id,
-                                filled_price: order.avg_price,
-                                filled: order.deal_amount.abs(),
-                                order_id: order.id,
-                                local_time: 0,
-                                create_time: 0,
-                                status: order.status,
-                                fee: Default::default(),
-                                trace_stack: Default::default(),
-                            };
-                            order_infos.push(order_info);
-                        }
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_order(order_infos, trace_stack);
-                        }
-                    } else if data.channel == "account" { // _update_account
-                        // TODO: 此处应有两个余额,交易币和本位币余额(由于quant中现货的更新账户没有做任何操作,所以此处可暂不处理)
-                        let account = standard::handle_info::HandleSwapInfo::handle_account_info(BitgetSpot, data, run_symbol.clone());
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_equity(account);
-                        }
-                    }
-                },
-                None => {
-                    error!("参考交易所通道错误");
-                    break;
-                }
-            }
-        }
-    });
-}

+ 155 - 0
strategy/src/gate_swap.rs

@@ -0,0 +1,155 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use rust_decimal::Decimal;
+use serde_json::Value;
+use tracing::{error, info};
+use tokio::spawn;
+use tokio::sync::mpsc::channel;
+use tokio::sync::Mutex;
+use tokio::time::sleep;
+use exchanges::gate_swap_rest::GateSwapRest;
+use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::GateSwap;
+use crate::model::{OrderInfo, OriginalTradeGa};
+use crate::quant::Quant;
+
+// 1交易、0参考 gate 合约 启动
+pub async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+    let (tx, mut rx) = channel(100);
+    let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
+    let mut user_id= "".to_string();
+    let symbols_one = symbols.clone();
+
+    // 交易
+    if type_num == 1{
+        // 获取user_id
+        let res_data = gate_exc.wallet_fee().await;
+        assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
+
+        let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
+        info!(?wallet_obj);
+        user_id = wallet_obj["user_id"].to_string();
+    }
+
+    spawn( async move {
+        let mut gate_exc = GateSwapWs::new_label(name, false, exchange_params,
+                                                 GateWsType::PublicAndPrivate("usdt".to_string()), tx);
+        // 交易
+        if type_num == 1 {
+            gate_exc.set_subscribe(vec![
+                GateSubscribeType::PuFuturesTrades,
+                GateSubscribeType::PuFuturesOrderBook,
+                GateSubscribeType::PrFuturesOrders(user_id.clone()),
+                GateSubscribeType::PrFuturesPositions(user_id.clone()),
+                GateSubscribeType::PrFuturesBalances(user_id.clone()),
+            ]);
+        } else { // 参考
+            gate_exc.set_subscribe(vec![
+                GateSubscribeType::PuFuturesTrades,
+                GateSubscribeType::PuFuturesOrderBook
+            ]);
+        }
+        gate_exc.custom_subscribe(bool_v1,symbols_one).await;
+    });
+    spawn(async move {
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
+        let run_symbol = symbols.clone()[0].clone();
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        loop {
+            sleep(Duration::from_millis(1)).await;
+            let mut trace_stack = TraceStack::default();
+
+            match rx.recv().await {
+                Some(data) => {
+                    trace_stack.on_network(data.time);
+                    trace_stack.on_before_quant();
+
+                    if data.code != "200".to_string() {
+                        continue;
+                    }
+                    if data.channel == "futures.order_book" {
+                        trace_stack.on_before_format();
+                        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data);
+                        trace_stack.on_after_format();
+
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+                            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+                            quant.local_depths.insert(depth.name, depth.depth);
+                        }
+                    } else if data.channel == "futures.balances" {
+                        let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_equity(account);
+                        }
+                    } else if data.channel == "futures.orders" {
+                        trace_stack.on_before_format();
+                        let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
+                        trace_stack.on_after_format();
+
+                        let mut order_infos:Vec<OrderInfo> = Vec::new();
+                        for order in orders.order {
+                            let order_info = OrderInfo {
+                                symbol: "".to_string(),
+                                amount: order.amount.abs(),
+                                side: "".to_string(),
+                                price: order.price,
+                                client_id: order.custom_id,
+                                filled_price: order.avg_price,
+                                filled: order.deal_amount.abs(),
+                                order_id: order.id,
+                                local_time: 0,
+                                create_time: 0,
+                                status: order.status,
+                                fee: Default::default(),
+                                trace_stack: Default::default(),
+                            };
+                            order_infos.push(order_info);
+                        }
+
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_order(order_infos, trace_stack);
+                        }
+                    } else if data.channel == "futures.positions" {
+                        let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_position(positions);
+                        }
+                    } else if data.channel == "futures.trades" {
+                        let mut quant = bot_arc_clone.lock().await;
+                        let str = data.label.clone();
+                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
+                            max_buy = Decimal::ZERO;
+                            min_sell = Decimal::ZERO;
+                            quant.is_update.remove(str.as_str());
+                        }
+                        let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
+                        for trade in trades {
+                            if trade.price > max_buy || max_buy == Decimal::ZERO{
+                                max_buy = trade.price
+                            }
+                            if trade.price < min_sell || min_sell == Decimal::ZERO{
+                                min_sell = trade.price
+                            }
+                        }
+                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
+                    }
+                },
+                None => {
+                    error!("交易所通道错误");
+                    break;
+                }
+            }
+        }
+    });
+}

+ 154 - 0
strategy/src/kucoin_swap.rs

@@ -0,0 +1,154 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use rust_decimal::Decimal;
+use tracing::{error};
+use tokio::spawn;
+use tokio::sync::mpsc::channel;
+use tokio::sync::Mutex;
+use tokio::time::sleep;
+use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::KucoinSwap;
+use crate::model::{OrderInfo, OriginalTradeGa};
+use crate::quant::Quant;
+
+// 1交易、0参考 kucoin 合约 启动
+pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+    let (tx, mut rx) = channel(100);
+    let symbols_clone = symbols.clone();
+    let mut symbol_arr = Vec::new();
+    for symbol in symbols_clone{
+        let new_symbol = symbol.replace("_", "").to_uppercase() + "M";
+        symbol_arr.push(new_symbol);
+    }
+    spawn( async move {
+        let mut kucoin_exc;
+        // 交易
+        if type_num == 1 {
+            kucoin_exc = KucoinSwapWs::new_label(name, false, exchange_params, KucoinWsType::Private, tx).await;
+            kucoin_exc.set_subscribe(vec![
+                KucoinSubscribeType::PuContractMarketLevel2Depth50,
+                // KucoinSubscribeType::PuContractMarkettickerV2,
+                KucoinSubscribeType::PrContractAccountWallet,
+                KucoinSubscribeType::PrContractPosition,
+                KucoinSubscribeType::PrContractMarketTradeOrders
+            ]);
+        } else { // 参考
+            kucoin_exc = KucoinSwapWs::new_label(name, false, exchange_params, KucoinWsType::Public, tx).await;
+            kucoin_exc.set_subscribe(vec![
+                KucoinSubscribeType::PuContractMarketLevel2Depth50,
+                // TODO: python注释掉了
+                // KucoinSubscribeType::PuContractMarkettickerV2,
+                KucoinSubscribeType::PuContractMarketExecution
+            ]);
+        }
+        kucoin_exc.custom_subscribe(bool_v1, symbol_arr).await;
+    });
+    spawn(async move {
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        let run_symbol = symbols.clone()[0].clone();
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
+        loop {
+            sleep(Duration::from_millis(1)).await;
+            let mut trace_stack = TraceStack::default();
+
+            match rx.recv().await {
+                Some(data) => {
+                    trace_stack.on_network(data.time);
+                    trace_stack.on_before_quant();
+
+                    if data.code != "200".to_string() {
+                        continue;
+                    }
+                    if data.channel == "level2" {
+                        trace_stack.on_before_format();
+                        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data);
+                        trace_stack.on_after_format();
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            // time_delay.quant_start = Utc::now().timestamp_micros();
+                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+                            quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut trace_stack);
+                            quant.local_depths.insert(depth.name, depth.depth);
+                        }
+                    } else if data.channel == "tickerV2" {
+                        let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data);
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant._update_ticker(ticker.ticker, ticker.name);
+                        }
+                    } else if data.channel == "availableBalance.change" {
+
+                        let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_equity(account);
+                        }
+                    } else if data.channel == "symbolOrderChange" {
+                        trace_stack.on_before_format();
+                        let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
+                        trace_stack.on_after_format();
+                        let mut order_infos:Vec<OrderInfo> = Vec::new();
+                        for order in orders.order {
+                            if order.status == "NULL" {
+                                continue;
+                            }
+                            let order_info = OrderInfo {
+                                symbol: "".to_string(),
+                                amount: order.amount.abs(),
+                                side: "".to_string(),
+                                price: order.price,
+                                client_id: order.custom_id,
+                                filled_price: order.avg_price,
+                                filled: order.deal_amount.abs(),
+                                order_id: order.id,
+                                local_time: 0,
+                                create_time: 0,
+                                status: order.status,
+                                fee: Default::default(),
+                                trace_stack: Default::default(),
+                            };
+                            order_infos.push(order_info);
+                        }
+
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_order(order_infos, trace_stack);
+                        }
+                    } else if data.channel == "position.change" {
+                        let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_position(positions);
+                        }
+                    } else if data.channel == "match" {
+                        let mut quant = bot_arc_clone.lock().await;
+                        let str = data.label.clone();
+                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
+                            max_buy = Decimal::ZERO;
+                            min_sell = Decimal::ZERO;
+                            quant.is_update.remove(str.as_str());
+                        }
+                        let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
+                        if trade.price > max_buy || max_buy == Decimal::ZERO {
+                            max_buy = trade.price
+                        }
+                        if trade.price < min_sell || min_sell == Decimal::ZERO {
+                            min_sell = trade.price
+                        }
+                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
+                    }
+                },
+                None => {
+                    error!("交易所通道错误");
+                    break;
+                }
+            }
+        }
+    });
+}

+ 6 - 1
strategy/src/lib.rs

@@ -4,4 +4,9 @@ mod strategy;
 mod predictor;
 mod utils;
 pub mod exchange_disguise;
-mod gp_predictor;
+mod gp_predictor;
+mod binance_swap;
+mod binance_spot;
+mod gate_swap;
+mod kucoin_swap;
+mod bitget_spot;