Browse Source

kucoin 对接

JiahengHe 2 years ago
parent
commit
4221945fef
2 changed files with 138 additions and 16 deletions
  1. 135 6
      strategy/src/exchange_disguise.rs
  2. 3 10
      strategy/src/quant.rs

+ 135 - 6
strategy/src/exchange_disguise.rs

@@ -4,6 +4,7 @@ use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 use rust_decimal::Decimal;
+use rust_decimal_macros::dec;
 use serde_json::Value;
 use tokio::sync::mpsc::{channel};
 use tokio::sync::Mutex;
@@ -12,7 +13,8 @@ use tracing::{error, info};
 use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
 use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
-use standard::exchange::ExchangeEnum::{BinanceSwap, GateSwap};
+use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
+use standard::exchange::ExchangeEnum::{BinanceSwap, GateSwap, KucoinSwap};
 use standard::SpecialTicker;
 use crate::model::{OrderInfo, OriginalTicker, OriginalTradeBa, OriginalTradeGa};
 use crate::quant::Quant;
@@ -21,7 +23,10 @@ use crate::quant::Quant;
 pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
     match exchange_name.as_str() {
         "gate_usdt_swap" => {
-            transactional_gate_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
+            gate_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
+        }
+        "kucoin_usdt_swap" => {
+            kucoin_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
         }
         _ => {
             panic!("参数错误!")
@@ -36,7 +41,10 @@ pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: Str
             reference_binance_swap_run(bool_v1, quant_arc, name, symbols, exchange_params).await;
         }
         "gate_usdt_swap" => {
-            transactional_gate_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
+            gate_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
+        }
+        "kucoin_usdt_swap" => {
+            kucoin_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
         }
         _ => {
             panic!("参数错误!")
@@ -44,8 +52,8 @@ pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: Str
     }
 }
 
-// 交易 gate 合约 启动
-async fn transactional_gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+// 1交易、0参考 gate 合约 启动
+async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
     let (tx, mut rx) = channel(100);
     let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
     // 获取user_id
@@ -168,7 +176,128 @@ async fn transactional_gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, qua
                     }
                 },
                 None => {
-                    error!("交易交易所通道错误");
+                    error!("交易所通道错误");
+                    break;
+                }
+            }
+        }
+    });
+}
+
+// 1交易、0参考 kucoin 合约 启动
+async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+    let (tx, mut rx) = channel(100);
+    let symbols_clone = symbols.clone();
+    tokio::spawn( async move {
+        let mut kucoin_exc;
+        // 交易
+        if type_num == 1 {
+            kucoin_exc = KucoinSwapWs::new_label(name, false, exchange_params, KucoinWsType::Private, tx).await;
+            kucoin_exc.set_subscribe(vec![
+                KucoinSubscribeType::PuContractMarketLevel2Depth50,
+                // KucoinSubscribeType::PuContractMarkettickerV2,
+                KucoinSubscribeType::PrContractAccountWallet,
+                KucoinSubscribeType::PrContractPosition,
+                KucoinSubscribeType::PrContractMarketTradeOrders
+            ]);
+        } else { // 参考
+            kucoin_exc = KucoinSwapWs::new_label(name, false, exchange_params, KucoinWsType::Public, tx).await;
+            kucoin_exc.set_subscribe(vec![
+                KucoinSubscribeType::PuContractMarketLevel2Depth50,
+                // TODO: python注释掉了
+                // KucoinSubscribeType::PuContractMarkettickerV2,
+                KucoinSubscribeType::PuContractMarketExecution
+            ]);
+        }
+        kucoin_exc.custom_subscribe(bool_v1, symbols_clone).await;
+    });
+    tokio::spawn(async move {
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        let run_symbol = symbols.clone()[0].clone();
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
+        loop {
+            sleep(Duration::from_millis(1)).await;
+            match rx.recv().await {
+                Some(data) => {
+                    if data.code != "200".to_string() {
+                        continue;
+                    }
+                    if data.channel == "level2" {
+                        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data);
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+                            quant._update_depth(depth.depth.clone(), depth.name.clone());
+                            quant.local_depths.insert(depth.name, depth.depth);
+                        }
+                    } else if data.channel == "tickerV2" {
+                        let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data);
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant._update_ticker(ticker.ticker, ticker.name);
+                        }
+                    } else if data.channel == "availableBalance.change" {
+
+                        let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_equity(account);
+                        }
+                    } else if data.channel == "symbolOrderChange" {
+                        let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
+                        let mut order_infos:Vec<OrderInfo> = Vec::new();
+                        for order in orders.order {
+                            let order_info = OrderInfo {
+                                symbol: "".to_string(),
+                                amount: order.amount.abs(),
+                                side: "".to_string(),
+                                price: order.price,
+                                client_id: order.custom_id,
+                                filled_price: order.avg_price,
+                                filled: order.deal_amount.abs(),
+                                order_id: order.id,
+                                local_time: 0,
+                                create_time: 0,
+                                status: order.status,
+                                fee: Default::default(),
+                            };
+                            order_infos.push(order_info);
+                        }
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_order(order_infos);
+                        }
+                    } else if data.channel == "position.change" {
+                        let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_position(positions);
+                        }
+                    } else if data.channel == "match" {
+                        let mut quant = bot_arc_clone.lock().await;
+                        let str = data.label.clone();
+                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
+                            max_buy = Decimal::ZERO;
+                            min_sell = Decimal::ZERO;
+                            quant.is_update.remove(str.as_str());
+                        }
+                        let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
+                        for trade in trades {
+                            if trade.price > max_buy || max_buy == Decimal::ZERO{
+                                max_buy = trade.price
+                            }
+                            if trade.price < min_sell || min_sell == Decimal::ZERO{
+                                min_sell = trade.price
+                            }
+                        }
+                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
+                    }
+                },
+                None => {
+                    error!("交易所通道错误");
                     break;
                 }
             }

+ 3 - 10
strategy/src/quant.rs

@@ -4,7 +4,7 @@ use std::io::Error;
 use std::str::FromStr;
 use std::sync::{Arc};
 use std::time::Duration;
-use chrono::{Timelike, Utc};
+use chrono::{Utc};
 use rust_decimal::Decimal;
 use rust_decimal::prelude::{ToPrimitive, Zero};
 use rust_decimal_macros::dec;
@@ -764,15 +764,9 @@ impl Quant {
     }
 
     pub async fn get_exchange_info(&mut self,) {
-        match self.platform_rest.get_market().await {
-            Ok(val) => {
-                self.market = val
-            },
-            Err(e) => {
-                info!("获取市场信息错误: {:?}", e);
-            }
-        }
+        self.market = self.platform_rest.get_self_market();
     }
+
     pub fn update_equity(&mut self, data: Account) {
         /*
            更新保证金信息
@@ -1286,7 +1280,6 @@ mod tests {
     use tracing::info;
     use standard::Order;
 
-    use crate::params::Params;
     use crate::quant::Quant;
 
     #[tokio::test]