Przeglądaj źródła

kucoin现货支持。

skyfffire 1 rok temu
rodzic
commit
0fb2dacfef

+ 4 - 4
strategy/src/exchange_disguise.rs

@@ -7,7 +7,7 @@ use global::trace_stack::TraceStack;
 use standard::SpecialDepth;
 use crate::binance_usdt_swap::reference_binance_swap_run;
 use crate::gate_swap::gate_swap_run;
-// use crate::kucoin_spot::kucoin_spot_run;
+use crate::kucoin_spot::kucoin_spot_run;
 use crate::kucoin_swap::kucoin_swap_run;
 use crate::quant::Quant;
 
@@ -45,9 +45,9 @@ pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: Str
         "kucoin_usdt_swap" => {
             kucoin_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
         },
-        // "kucoin_spot" => {
-        //     kucoin_spot_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
-        // },
+        "kucoin_spot" => {
+            kucoin_spot_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
+        },
         // "bitget_spot" => {
         //     bitget_spot_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
         // },

+ 6 - 5
strategy/src/gate_swap.rs

@@ -17,10 +17,12 @@ use crate::quant::Quant;
 use crate::exchange_disguise::on_special_depth;
 
 // 1交易、0参考 gate 合约 启动
-#[allow(dead_code)]
-pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>, type_num: i8,
-                           quant_arc: Arc<Mutex<Quant>>, name: String,
-                           symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
+pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>,
+                           type_num: i8,
+                           quant_arc: Arc<Mutex<Quant>>,
+                           name: String,
+                           symbols: Vec<String>,
+                           exchange_params: BTreeMap<String, String>) {
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
 
@@ -90,7 +92,6 @@ pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>, type_num: i8,
     });
 }
 
-#[allow(dead_code)]
 async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  update_flag_u: &mut Decimal,
                  multiplier: Decimal,

+ 126 - 110
strategy/src/kucoin_spot.rs

@@ -1,59 +1,80 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use futures_util::StreamExt;
 use rust_decimal::Decimal;
 use tokio::sync::Mutex;
+use exchanges::kucoin_spot_ws::{KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::KucoinSpot;
+use crate::exchange_disguise::on_special_depth;
+use crate::model::OriginalTradeGa;
 use crate::quant::Quant;
 
 // 1交易、0参考 kucoin 现货 启动
-#[allow(dead_code)]
-pub async fn kucoin_spot_run(_bool_v1 :Arc<AtomicBool>, _type_num: i8, _quant_arc: Arc<Mutex<Quant>>, _name: String, _symbols: Vec<String>, _exchange_params: BTreeMap<String, String>) {
-    // let (tx, mut rx) = channel(100);
-    // let symbols_clone = symbols.clone();
-    // let mut symbol_arr = Vec::new();
-    // for symbol in symbols_clone {
-    //     let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSpot,symbol.as_str());
-    //     let new_symbol = symbol_mapper.replace("_", "-").to_uppercase();
-    //     symbol_arr.push(new_symbol);
-    // }
-    // spawn( async move {
-    //     let mut kucoin_exc;
-    //     kucoin_exc = KucoinSpotWs::new_label(name, false, exchange_params, KucoinWsType::Public, tx).await;
-    //     if type_num == 0 {
-    //         kucoin_exc.set_subscribe(vec![
-    //             KucoinSubscribeType::PuSpotMarketLevel2Depth50,
-    //             KucoinSubscribeType::PuMarketTicker,
-    //         ]);
-    //         kucoin_exc.custom_subscribe(bool_v1, symbol_arr).await;
-    //     }
-    // });
+pub async fn kucoin_spot_run(bool_v1: Arc<AtomicBool>,
+                             _type_num: i8,
+                             quant_arc: Arc<Mutex<Quant>>,
+                             name: String,
+                             symbols: Vec<String>,
+                             _exchange_params: BTreeMap<String, String>) {
+    let mut symbol_arr = Vec::new();
+    for symbol in symbols {
+        let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSpot, symbol.as_str());
+        let new_symbol = symbol_mapper.replace("_", "-").to_uppercase();
+        symbol_arr.push(new_symbol);
+    }
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+
+    let mut ws = KucoinSpotWs::new_label(name.clone(), false, None, KucoinSpotWsType::Public).await;
+    ws.set_symbols(symbol_arr);
+    ws.set_subscribe(vec![
+        KucoinSpotSubscribeType::PuSpotMarketLevel2Depth50,
+        // KucoinSpotSubscribeType::PuMarketTicker,         // python说:订阅 ticker来的很慢
+        KucoinSpotSubscribeType::PuMarketMatch,
+    ]);
 
-    // spawn(async move {
-    //     let bot_arc_clone = Arc::clone(&quant_arc);
-    //     // let run_symbol = symbols.clone()[0].clone();
-    //     // trade
-    //     let mut max_buy = Decimal::ZERO;
-    //     let mut min_sell = Decimal::ZERO;
-    //     // let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
-    //     let multiplier = Decimal::ONE;
-    //     loop {
-    //         sleep(Duration::from_millis(1)).await;
-    //
-    //         match rx.try_recv() {
-    //             Ok(data) => {
-    //                 on_data(bot_arc_clone.clone(), multiplier, &mut max_buy, &mut min_sell, data).await;
-    //             },
-    //             Err(_e) => { }
-    //         }
-    //     }
-    // });
+    // 开启ws
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    tokio::spawn(async move {
+        //链接
+        let bool_v3_clone = Arc::clone(&bool_v1);
+        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    });
+    //读取
+    // let bool_v1_clone = Arc::clone(&bool_v1);
+    tokio::spawn(async move {
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        // trade
+        let mut update_flag_u = Decimal::ZERO;
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        let multiplier = Decimal::ONE;
+        // let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
+        // let run_symbol = symbols.clone()[0].clone();
+
+        loop {
+            if let Some(data) = read_rx.next().await {
+                on_kucoin_spot_data(bot_arc_clone.clone(),
+                                    &mut update_flag_u,
+                                    multiplier,
+                                    &mut max_buy,
+                                    &mut min_sell,
+                                    data).await;
+            }
+        }
+    });
 }
 
-#[allow(dead_code)]
-async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, _multiplier: Decimal, _max_buy: &mut Decimal, _min_sell: &mut Decimal, data: ResponseData) {
+async fn on_kucoin_spot_data(bot_arc_clone: Arc<Mutex<Quant>>,
+                             update_flag_u: &mut Decimal,
+                             _multiplier: Decimal,
+                             max_buy: &mut Decimal,
+                             min_sell: &mut Decimal,
+                             data: ResponseData) {
     let mut trace_stack = TraceStack::default();
     trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
@@ -64,80 +85,75 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, _multiplier: Decimal, _max_bu
 
     if data.channel == "level2" {
         trace_stack.on_before_format();
-        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSpot,data);
+        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSpot, data.clone());
         trace_stack.on_after_format();
-        {
-            let mut quant = bot_arc_clone.lock().await;
-            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-            quant._update_depth(depth.depth.clone(), depth.name.clone(), trace_stack.clone());
-            quant.local_depths.insert(depth.name, depth.depth);
-        }
+
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, depth).await;
     } else if data.channel == "trade.ticker" {
         let ticker = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSpot, data);
         {
             let mut quant = bot_arc_clone.lock().await;
             quant._update_ticker(ticker.ticker, ticker.name);
         }
+    }  else if data.channel == "trade.l3match" {
+        let mut quant = bot_arc_clone.lock().await;
+        let str = data.label.clone();
+        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
+            *max_buy = Decimal::ZERO;
+            *min_sell = Decimal::ZERO;
+            quant.is_update.remove(str.as_str());
+        }
+        let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
+        if trade.price > *max_buy || *max_buy == Decimal::ZERO {
+            *max_buy = trade.price
+        }
+        if trade.price < *min_sell || *min_sell == Decimal::ZERO {
+            *min_sell = trade.price
+        }
+        quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+    } else if data.channel == "availableBalance.change" {
+        // 取消原有推送解析,因为推送的信息不准确
+        // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
+        // {
+        //     let mut quant = bot_arc_clone.lock().await;
+        //     quant.update_equity(account);
+        // }
+    } else if data.channel == "symbolOrderChange" {
+        // trace_stack.on_before_format();
+        // let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
+        // trace_stack.on_after_format();
+        // let mut order_infos:Vec<OrderInfo> = Vec::new();
+        // for order in orders.order {
+        //     if order.status == "NULL" {
+        //         continue;
+        //     }
+        //     let order_info = OrderInfo {
+        //         symbol: "".to_string(),
+        //         amount: order.amount.abs(),
+        //         side: "".to_string(),
+        //         price: order.price,
+        //         client_id: order.custom_id,
+        //         filled_price: order.avg_price,
+        //         filled: order.deal_amount.abs(),
+        //         order_id: order.id,
+        //         local_time: 0,
+        //         create_time: 0,
+        //         status: order.status,
+        //         fee: Default::default(),
+        //         trace_stack: Default::default(),
+        //     };
+        //     order_infos.push(order_info);
+        // }
+        //
+        // {
+        //     let mut quant = bot_arc_clone.lock().await;
+        //     quant.update_order(order_infos, trace_stack);
+        // }
+    } else if data.channel == "position.change" {
+        // let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
+        // {
+        //     let mut quant = bot_arc_clone.lock().await;
+        //     quant.update_position(positions);
+        // }
     }
-    // else if data.channel == "availableBalance.change" {
-    //     // 取消原有推送解析,因为推送的信息不准确
-    //     // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
-    //     // {
-    //     //     let mut quant = bot_arc_clone.lock().await;
-    //     //     quant.update_equity(account);
-    //     // }
-    // } else if data.channel == "symbolOrderChange" {
-    //     // trace_stack.on_before_format();
-    //     // let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
-    //     // trace_stack.on_after_format();
-    //     // let mut order_infos:Vec<OrderInfo> = Vec::new();
-    //     // for order in orders.order {
-    //     //     if order.status == "NULL" {
-    //     //         continue;
-    //     //     }
-    //     //     let order_info = OrderInfo {
-    //     //         symbol: "".to_string(),
-    //     //         amount: order.amount.abs(),
-    //     //         side: "".to_string(),
-    //     //         price: order.price,
-    //     //         client_id: order.custom_id,
-    //     //         filled_price: order.avg_price,
-    //     //         filled: order.deal_amount.abs(),
-    //     //         order_id: order.id,
-    //     //         local_time: 0,
-    //     //         create_time: 0,
-    //     //         status: order.status,
-    //     //         fee: Default::default(),
-    //     //         trace_stack: Default::default(),
-    //     //     };
-    //     //     order_infos.push(order_info);
-    //     // }
-    //     //
-    //     // {
-    //     //     let mut quant = bot_arc_clone.lock().await;
-    //     //     quant.update_order(order_infos, trace_stack);
-    //     // }
-    // } else if data.channel == "position.change" {
-    //     // let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
-    //     // {
-    //     //     let mut quant = bot_arc_clone.lock().await;
-    //     //     quant.update_position(positions);
-    //     // }
-    // } else if data.channel == "match" {
-    //     // let mut quant = bot_arc_clone.lock().await;
-    //     // let str = data.label.clone();
-    //     // if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
-    //     //     *max_buy = Decimal::ZERO;
-    //     //     *min_sell = Decimal::ZERO;
-    //     //     quant.is_update.remove(str.as_str());
-    //     // }
-    //     // let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
-    //     // if trade.price > *max_buy || *max_buy == Decimal::ZERO {
-    //     //     *max_buy = trade.price
-    //     // }
-    //     // if trade.price < *min_sell || *min_sell == Decimal::ZERO {
-    //     //     *min_sell = trade.price
-    //     // }
-    //     // quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-    // }
 }