Bläddra i källkod

修改Binance、Bitget、Kucoin订阅测试
修改BinanceSpot、KucoinSpot订阅数据处理

gepangpang 1 år sedan
förälder
incheckning
1d0a448fbe

+ 2 - 1
standard/Cargo.toml

@@ -18,4 +18,5 @@ chrono = "0.4.30"
 futures = "0.3"
 tracing = "0.1"
 tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
-toml = "0.5.11"
+toml = "0.5.11"
+futures-channel = "0.3.29"

+ 50 - 0
standard/src/binance_spot_handle.rs

@@ -0,0 +1,50 @@
+use std::str::FromStr;
+use rust_decimal::Decimal;
+use rust_decimal_macros::dec;
+use exchanges::response_base::ResponseData;
+use crate::{MarketOrder, SpecialDepth, SpecialTicker};
+use crate::exchange::ExchangeEnum;
+use crate::handle_info::HandleSwapInfo;
+
+
+// 处理特殊Ticker信息
+pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
+    let res_data_str = res_data.data;
+    let res_data_json: serde_json::Value = serde_json::from_str(&*res_data_str).unwrap();
+    format_special_ticker(res_data_json, res_data.label)
+}
+
+pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialDepth {
+    let bp = Decimal::from_str(data["b"].as_str().unwrap()).unwrap();
+    let bq = Decimal::from_str(data["B"].as_str().unwrap()).unwrap();
+    let ap = Decimal::from_str(data["a"].as_str().unwrap()).unwrap();
+    let aq = Decimal::from_str(data["A"].as_str().unwrap()).unwrap();
+    let mp = (bp + ap) * dec!(0.5);
+    let t = Decimal::from_str(&data["u"].to_string()).unwrap();
+
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
+    let depth_info = vec![bp, bq, ap, aq];
+    SpecialDepth {
+        name: label,
+        depth: depth_info,
+        ticker: ticker_info,
+        t,
+    }
+}
+
+// 处理特殊深度数据
+pub fn handle_special_depth(res_data: ResponseData) -> SpecialDepth {
+    HandleSwapInfo::handle_special_depth(ExchangeEnum::BinanceSpot, res_data)
+}
+
+// 格式化深度信息
+pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
+    let mut depth_items: Vec<MarketOrder> = vec![];
+    for value in value.as_array().unwrap() {
+        depth_items.push(MarketOrder {
+            price: Decimal::from_str(value[0].as_str().unwrap()).unwrap(),
+            amount: Decimal::from_str(value[1].as_str().unwrap()).unwrap(),
+        })
+    }
+    return depth_items;
+}

+ 3 - 3
standard/src/handle_info.rs

@@ -139,7 +139,7 @@ impl HandleSwapInfo {
             ExchangeEnum::BinanceSpot => {
                 depth_asks = binance_handle::format_depth_items(res_data_json["asks"].clone());
                 depth_bids = binance_handle::format_depth_items(res_data_json["bids"].clone());
-                t = Decimal::from_str(&res_data_json["u"].to_string()).unwrap();
+                t = Decimal::from_str(&res_data_json["lastUpdateId"].to_string()).unwrap();
             }
             ExchangeEnum::BinanceSwap => {
                 depth_asks = binance_handle::format_depth_items(res_data_json["a"].clone());
@@ -159,7 +159,7 @@ impl HandleSwapInfo {
             ExchangeEnum::KucoinSpot => {
                 depth_asks = kucoin_spot_handle::format_depth_items(res_data_json["asks"].clone());
                 depth_bids = kucoin_spot_handle::format_depth_items(res_data_json["bids"].clone());
-                t = Decimal::from_str(&res_data_json["sequence"].to_string()).unwrap();
+                t = Decimal::from_str(&res_data_json["timestamp"].to_string()).unwrap();
             }
             ExchangeEnum::OkxSwap => {
                 depth_asks = okx_handle::format_depth_items(res_data_json[0]["asks"].clone());
@@ -169,7 +169,7 @@ impl HandleSwapInfo {
             ExchangeEnum::BitgetSpot => {
                 depth_asks = bitget_spot_handle::format_depth_items(res_data_json[0]["asks"].clone());
                 depth_bids = bitget_spot_handle::format_depth_items(res_data_json[0]["bids"].clone());
-                t = Decimal::from_str(res_data_json["ts"].as_str().unwrap()).unwrap();
+                t = Decimal::from_str(res_data_json[0]["ts"].as_str().unwrap()).unwrap();
             }
             _ => {
                 error!("未找到该交易所!handle_special_depth: {:?}",exchange);

+ 1 - 1
standard/src/kucoin_spot_handle.rs

@@ -120,7 +120,7 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let ap = Decimal::from_str(data["bestAsk"].as_str().unwrap()).unwrap();
     let aq = Decimal::from_str(data["bestAskSize"].as_str().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
-    let t = Decimal::from_str(&data["sequence"].to_string()).unwrap();
+    let t = Decimal::from_str(data["sequence"].as_str().unwrap()).unwrap();
 
     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
     let depth_info = vec![bp, bq, ap, aq];

+ 1 - 0
standard/src/lib.rs

@@ -17,6 +17,7 @@ pub mod handle_info;
 mod binance_swap;
 mod binance_spot;
 pub mod binance_handle;
+pub mod binance_spot_handle;
 // 引入gate模块
 mod gate_swap;
 mod gate_spot;

+ 31 - 0
standard/tests/binance_handle_test.rs

@@ -0,0 +1,31 @@
+mod exchange_test;
+use tracing::instrument;
+use exchanges::binance_swap_ws::BinanceSwapSubscribeType;
+use standard::exchange::ExchangeEnum;
+use crate::exchange_test::test_new_exchange_wss;
+
+const SYMBOL: &str = "BTC_USDT";
+
+// 测试订阅深度信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+#[instrument(level = "TRACE")]
+async fn test_get_wss_depth() {
+    global::log_utils::init_log_with_trace();
+
+    let binance_subscribe_type = vec![
+        BinanceSwapSubscribeType::PuDepth20levels100ms,
+    ];
+    test_new_exchange_wss(ExchangeEnum::BinanceSwap, SYMBOL, binance_subscribe_type, "depth").await;
+}
+
+// 测试订阅Ticker信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+#[instrument(level = "TRACE")]
+async fn test_get_wss_ticker() {
+    global::log_utils::init_log_with_trace();
+
+    let binance_subscribe_type = vec![
+        BinanceSwapSubscribeType::PuBookTicker,
+    ];
+    test_new_exchange_wss(ExchangeEnum::BinanceSwap, SYMBOL, binance_subscribe_type, "ticker").await;
+}

+ 1 - 1
standard/tests/binance_spot_handle_test.rs

@@ -4,7 +4,7 @@ use exchanges::binance_spot_ws::BinanceSpotSubscribeType;
 use standard::exchange::ExchangeEnum;
 use crate::exchange_test::test_new_exchange_wss;
 
-const SYMBOL: &str = "BLZ_USDT";
+const SYMBOL: &str = "BTC_USDT";
 
 // 测试订阅深度信息
 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]

+ 5 - 5
standard/tests/bitget_spot_handle_test.rs

@@ -1,7 +1,7 @@
 mod exchange_test;
 
 use tracing::{instrument};
-use exchanges::bitget_spot_ws::{BitgetSubscribeType};
+use exchanges::bitget_spot_ws::{BitgetSpotSubscribeType};
 use standard::exchange::ExchangeEnum;
 use crate::exchange_test::test_new_exchange_wss;
 
@@ -14,7 +14,7 @@ async fn test_get_wss_depth() {
     global::log_utils::init_log_with_trace();
 
     let bitget_subscribe_type = vec![
-        BitgetSubscribeType::PuBooks5
+        BitgetSpotSubscribeType::PuBooks5
     ];
     test_new_exchange_wss(ExchangeEnum::BitgetSpot, SYMBOL, bitget_subscribe_type, "depth").await;
 }
@@ -26,7 +26,7 @@ async fn test_get_wss_ticker() {
     global::log_utils::init_log_with_trace();
 
     let bitget_subscribe_type = vec![
-        BitgetSubscribeType::PuTicker
+        BitgetSpotSubscribeType::PuTicker
     ];
     test_new_exchange_wss(ExchangeEnum::BitgetSpot, SYMBOL, bitget_subscribe_type, "ticker").await;
 }
@@ -38,7 +38,7 @@ async fn test_get_wss_account() {
     global::log_utils::init_log_with_trace();
 
     let bitget_subscribe_type = vec![
-        BitgetSubscribeType::PrAccount
+        BitgetSpotSubscribeType::PrAccount
     ];
     test_new_exchange_wss(ExchangeEnum::BitgetSpot, SYMBOL, bitget_subscribe_type, "account").await;
 }
@@ -50,7 +50,7 @@ async fn test_get_wss_orders() {
     global::log_utils::init_log_with_trace();
 
     let bitget_subscribe_type = vec![
-        BitgetSubscribeType::PrOrders
+        BitgetSpotSubscribeType::PrOrders
     ];
     test_new_exchange_wss(ExchangeEnum::BitgetSpot, SYMBOL, bitget_subscribe_type, "orders").await;
 }

+ 226 - 161
standard/tests/exchange_test.rs

@@ -1,28 +1,27 @@
 use std::collections::{BTreeMap};
-// use std::{env};
 use std::io::{Error};
-// use std::sync::Arc;
-// use std::sync::atomic::AtomicBool;
-// use std::time::Duration;
-// use rust_decimal_macros::dec;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use futures::StreamExt;
+use rust_decimal_macros::dec;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
-// use tokio::try_join;
-// use tracing::{error, trace};
-use tracing::{error};
-use exchanges::kucoin_spot_ws::KucoinSpotSubscribeType;
-// use exchanges::bitget_spot_ws::{BitgetSpotWs, BitgetSubscribeType, BitgetWsType};
-// use exchanges::kucoin_spot_ws::{KucoinSpotWs, KucoinSpotSubscribeType, KucoinSpotWsType};
-// use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
+use tokio::sync::Mutex;
+use tokio::try_join;
+use tracing::{error, trace};
+// use exchanges::binance_spot_ws::{BinanceSpotLogin, BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
+// use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
+// use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
+use exchanges::kucoin_spot_ws::{KucoinSpotLogin, KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
+// use exchanges::bitget_spot_ws::{BitgetSpotWs, BitgetSpotSubscribeType, BitgetSpotWsType, BitgetSpotLogin};
 // use exchanges::okx_swap_ws::{OkxSubscribeType, OkxSwapWs, OkxWsType};
-// use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
-// use exchanges::response_base::ResponseData;
+use exchanges::response_base::ResponseData;
 use standard::exchange::{Exchange, ExchangeEnum};
+// use standard::{binance_spot_handle, Order, Platform, utils};
 // use standard::{binance_handle, Order, Platform, utils};
-// use standard::{okx_handle, Order, Platform, utils};
 // use standard::{kucoin_handle, Order, Platform, utils};
-// use standard::{kucoin_spot_handle, Order, Platform, utils};
+use standard::{kucoin_spot_handle, Order, Platform, utils};
 // use standard::{bitget_spot_handle, Order, Platform, utils};
-use standard::{Order, Platform, utils};
+// use standard::{okx_handle, Order, Platform, utils};
 
 // 创建实体
 #[allow(dead_code)]
@@ -109,45 +108,46 @@ pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn
 }
 
 #[allow(dead_code)]
-pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, _symbol: &str, _subscriber_type: T, _mold: &str) where Vec<KucoinSpotSubscribeType>: From<T> {
+pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<KucoinSpotSubscribeType>: From<T> {
     utils::proxy_handle();
-
+    let account_info = global::account_info::get_account_info("../test_account.toml");
     match exchange {
         ExchangeEnum::BinanceSpot => {
             // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
             // trace!(symbol_format);
-            // let name = format!("binance_usdt_swap@{}", symbol.to_string().to_lowercase());
+            // let name = format!("binance_spot@{}", symbol.to_string().to_lowercase());
+            // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+            // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+            // let write_tx_am = Arc::new(Mutex::new(write_tx));
             // let bool_v1 = Arc::new(AtomicBool::new(true));
             //
-            // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
-            // let mut params: BTreeMap<String, String> = BTreeMap::new();
-            // let access_key = env::var("binance_access_key").unwrap_or("".to_string());
-            // let secret_key = env::var("binance_secret_key").unwrap_or("".to_string());
-            // params.insert("access_key".to_string(), access_key);
-            // params.insert("secret_key".to_string(), secret_key);
+            // let params = BinanceSpotLogin {
+            //     api_key: account_info.binance_access_key,
+            //     api_secret: account_info.binance_secret_key,
+            // };
             // let mut exchange_wss;
-            // exchange_wss = BinanceSpotWs::new_label(name, false, params, BinanceSpotWsType::PublicAndPrivate, res_sender);
+            // exchange_wss = BinanceSpotWs::new_label(name, false, Option::from(params), BinanceSpotWsType::PublicAndPrivate);
+            // exchange_wss.set_symbols(vec![symbol_format]);
             // exchange_wss.set_subscribe(subscriber_type.into());
             //
-            // let t1 = tokio::spawn(async move {
-            //     exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
-            // });
+            //
             // let mold_arc = Arc::new(mold.to_string());
-            // let t2 = tokio::spawn(async move {
+            // //读取
+            // tokio::spawn(async move {
             //     let mold_clone = Arc::clone(&mold_arc);
             //     loop {
-            //         tokio::time::sleep(Duration::from_millis(1)).await;
-            //         if let Ok(received) = res_receiver.try_recv() {
+            //         if let Some(data) = read_rx.next().await {
+            //             trace!("原始数据 data:{:?}",data);
             //             match mold_clone.as_str() {
             //                 "depth" => {
-            //                     if received.data != "" {
-            //                         let result = binance_handle::handle_special_depth(received);
+            //                     if data.data != "" {
+            //                         let result = binance_spot_handle::handle_special_depth(data);
             //                         trace!(?result)
             //                     }
             //                 }
             //                 "ticker" => {
-            //                     if received.data != "" {
-            //                         let result = binance_handle::handle_special_ticker(received);
+            //                     if data.data != "" {
+            //                         let result = binance_spot_handle::handle_special_ticker(data);
             //                         trace!(?result)
             //                     }
             //                 }
@@ -157,74 +157,56 @@ pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, _symbol: &str, _su
             //                 }
             //             }
             //         }
-            //     }
+            //     };
             // });
-            // try_join!(t1, t2).unwrap();
-            error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
-            panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
+            //
+            // let t1 = tokio::spawn(async move {
+            //     //链接
+            //     let bool_v3_clone = Arc::clone(&bool_v1);
+            //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+            // });
+            // try_join!(t1).unwrap();
         }
         ExchangeEnum::BinanceSwap => {
-            error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
-            panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
-        }
-        ExchangeEnum::GateSwap => {
-            error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
-            panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
-        }
-        ExchangeEnum::KucoinSwap => {
-            // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
-            // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
-            // let name = format!("kucoin_usdt_swap@{}", symbol.to_string().to_lowercase());
+            // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
+            // trace!(symbol_format);
+            // let name = format!("binance_swap@{}", symbol.to_string().to_lowercase());
+            // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+            // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+            // let write_tx_am = Arc::new(Mutex::new(write_tx));
             // let bool_v1 = Arc::new(AtomicBool::new(true));
             //
-            // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
-            // let mut params: BTreeMap<String, String> = BTreeMap::new();
-            // let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
-            // let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
-            // let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
-            // params.insert("access_key".to_string(), access_key);
-            // params.insert("secret_key".to_string(), secret_key);
-            // params.insert("pass_key".to_string(), pass_key);
+            // let api_key = env::var("binance_access_key").unwrap_or("".to_string());
+            // let api_secret = env::var("binance_secret_key").unwrap_or("".to_string());
+            // let params = BinanceSwapLogin {
+            //     api_key: account_info.binance_access_key,
+            //     api_secret: account_info.binance_secret_key,
+            // };
             // let mut exchange_wss;
-            // if ["depth", "ticker"].contains(&mold) {
-            //     exchange_wss = KucoinSwapWs::new_label(name, false, params, KucoinWsType::Public, res_sender).await
-            // } else {
-            //     exchange_wss = KucoinSwapWs::new_label(name, false, params, KucoinWsType::Private, res_sender).await
-            // }
+            // exchange_wss = BinanceSwapWs::new_label(name, false, Option::from(params), BinanceSwapWsType::PublicAndPrivate);
+            // exchange_wss.set_symbols(vec![symbol_format]);
             // exchange_wss.set_subscribe(subscriber_type.into());
             //
-            // let t1 = tokio::spawn(async move {
-            //     exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
-            // });
+            //
             // let mold_arc = Arc::new(mold.to_string());
-            // let t2 = tokio::spawn(async move {
+            // //读取
+            // tokio::spawn(async move {
             //     let mold_clone = Arc::clone(&mold_arc);
             //     loop {
-            //         tokio::time::sleep(Duration::from_millis(1)).await;
-            //         if let Ok(received) = res_receiver.try_recv() {
+            //         if let Some(data) = read_rx.next().await {
+            //             trace!("原始数据 data:{:?}",data);
             //             match mold_clone.as_str() {
             //                 "depth" => {
-            //                     let result = kucoin_handle::handle_special_depth(received);
-            //                     trace!(?result)
+            //                     if data.data != "" {
+            //                         let result = binance_handle::handle_special_depth(data);
+            //                         trace!(?result)
+            //                     }
             //                 }
             //                 "ticker" => {
-            //                     let result = kucoin_handle::handle_special_ticker(received);
-            //                     trace!(?result)
-            //                 }
-            //                 "account" => {
-            //                     trace!(?received);
-            //                     let result = kucoin_handle::handle_account_info(received, symbol_back.clone());
-            //                     trace!(?result)
-            //                 }
-            //                 "position" => {
-            //                     trace!(?received);
-            //                     let result = kucoin_handle::handle_position(received, dec!(1));
-            //                     trace!(?result)
-            //                 }
-            //                 "orders" => {
-            //                     trace!(?received);
-            //                     let result = kucoin_handle::handle_order(received, dec!(0.001));
-            //                     trace!(?result)
+            //                     if data.data != "" {
+            //                         let result = binance_handle::handle_special_ticker(data);
+            //                         trace!(?result)
+            //                     }
             //                 }
             //                 _ => {
             //                     error!("没有该命令!mode={}", mold_clone);
@@ -232,68 +214,66 @@ pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, _symbol: &str, _su
             //                 }
             //             }
             //         }
-            //     }
+            //     };
             // });
-            // try_join!(t1, t2).unwrap();
+            //
+            // let t1 = tokio::spawn(async move {
+            //     //链接
+            //     let bool_v3_clone = Arc::clone(&bool_v1);
+            //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+            // });
+            // try_join!(t1).unwrap();
         }
-        ExchangeEnum::KucoinSpot => {
-            // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
+        ExchangeEnum::KucoinSwap => {
+            // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
             // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
-            // trace!(symbol_format);
-            // let name = format!("kucoin_usdt_spot@{}", symbol.to_string().to_lowercase());
-            // let bool_v1 = Arc::new(AtomicBool::new(true));
             //
-            // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
-            // let mut params: BTreeMap<String, String> = BTreeMap::new();
-            // let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
-            // let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
-            // let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
-            // params.insert("access_key".to_string(), access_key);
-            // params.insert("secret_key".to_string(), secret_key);
-            // params.insert("pass_key".to_string(), pass_key);
+            // let name = format!("kucoin_swap@{}", symbol.to_string().to_lowercase());
+            // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+            // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+            // let write_tx_am = Arc::new(Mutex::new(write_tx));
+            // let bool_v1 = Arc::new(AtomicBool::new(true));
             //
-            // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
-            //     KucoinSpotWs::new_label(name, false, params, KucoinSpotWsType::Public).await
-            // } else {
-            //     KucoinSpotWs::new_label(name, false, params, KucoinSpotWsType::Private).await
+            // let params = KucoinSwapLogin {
+            //     access_key: account_info.kucoin_access_key,
+            //     secret_key: account_info.kucoin_secret_key,
+            //     pass_key: account_info.kucoin_pass,
             // };
-            //
+            // let mut exchange_wss;
+            // if ["depth", "ticker"].contains(&mold) {
+            //     exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Public).await;
+            // } else {
+            //     exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Private).await;
+            // }
+            // exchange_wss.set_symbols(vec![symbol_format]);
             // exchange_wss.set_subscribe(subscriber_type.into());
             //
-            // let t1 = tokio::spawn(async move {
-            //     exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
-            // });
             // let mold_arc = Arc::new(mold.to_string());
-            // let t2 = tokio::spawn(async move {
+            // tokio::spawn(async move {
             //     let mold_clone = Arc::clone(&mold_arc);
             //     loop {
-            //         tokio::time::sleep(Duration::from_millis(1)).await;
-            //         if let Ok(received) = res_receiver.try_recv() {
-            //             trace!(?received);
+            //         if let Some(data) = read_rx.next().await {
+            //             trace!("原始数据 data:{:?}",data);
             //             match mold_clone.as_str() {
             //                 "depth" => {
-            //                     if received.data != "" {
-            //                         let result = kucoin_spot_handle::handle_special_depth(received);
-            //                         trace!(?result)
-            //                     }
+            //                     let result = kucoin_handle::handle_special_depth(data);
+            //                     trace!(?result)
             //                 }
             //                 "ticker" => {
-            //                     if received.data != "" {
-            //                         let result = kucoin_spot_handle::handle_special_ticker(received);
-            //                         trace!(?result)
-            //                     }
+            //                     let result = kucoin_handle::handle_special_ticker(data);
+            //                     trace!(?result)
             //                 }
             //                 "account" => {
-            //                     if received.data != "" {
-            //                         let result = kucoin_spot_handle::handle_account_info(received, symbol_back.clone());
-            //                         trace!(?result)
-            //                     }
+            //                     let result = kucoin_handle::handle_account_info(data, symbol_back.clone());
+            //                     trace!(?result)
+            //                 }
+            //                 "position" => {
+            //                     let result = kucoin_handle::handle_position(data, dec!(1));
+            //                     trace!(?result)
             //                 }
             //                 "orders" => {
-            //                     if received.data != "" {
-            //                         let result = kucoin_spot_handle::handle_order(received, dec!(1));
-            //                         trace!(?result)
-            //                     }
+            //                     let result = kucoin_handle::handle_order(data, dec!(0.001));
+            //                     trace!(?result)
             //                 }
             //                 _ => {
             //                     error!("没有该命令!mode={}", mold_clone);
@@ -303,63 +283,143 @@ pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, _symbol: &str, _su
             //         }
             //     }
             // });
-            // try_join!(t1, t2).unwrap();
+            //
+            // let t1 = tokio::spawn(async move {
+            //     //链接
+            //     let bool_v3_clone = Arc::clone(&bool_v1);
+            //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+            // });
+            // try_join!(t1).unwrap();
+        }
+        ExchangeEnum::KucoinSpot => {
+            let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
+            let symbol_back = utils::format_symbol(symbol.to_string(), "_");
+            trace!(symbol_format);
+            let name = format!("kucoin_spot@{}", symbol.to_string().to_lowercase());
+            let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+            let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+            let write_tx_am = Arc::new(Mutex::new(write_tx));
+            let bool_v1 = Arc::new(AtomicBool::new(true));
+
+            let params = KucoinSpotLogin {
+                access_key: account_info.kucoin_access_key,
+                secret_key: account_info.kucoin_secret_key,
+                pass_key: account_info.kucoin_pass,
+            };
+            let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
+                KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Public).await
+            } else {
+                KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Private).await
+            };
+            exchange_wss.set_symbols(vec![symbol_format]);
+            exchange_wss.set_subscribe(subscriber_type.into());
+
+            let mold_arc = Arc::new(mold.to_string());
+            tokio::spawn(async move {
+                let mold_clone = Arc::clone(&mold_arc);
+                loop {
+                    if let Some(data) = read_rx.next().await {
+                        trace!("原始数据 data:{:?}",data);
+                        match mold_clone.as_str() {
+                            "depth" => {
+                                if data.data != "" {
+                                    let result = kucoin_spot_handle::handle_special_depth(data);
+                                    trace!(?result)
+                                }
+                            }
+                            "ticker" => {
+                                if data.data != "" {
+                                    let result = kucoin_spot_handle::handle_special_ticker(data);
+                                    trace!(?result)
+                                }
+                            }
+                            "account" => {
+                                if data.data != "" {
+                                    let result = kucoin_spot_handle::handle_account_info(data, symbol_back.clone());
+                                    trace!(?result)
+                                }
+                            }
+                            "orders" => {
+                                if data.data != "" {
+                                    let result = kucoin_spot_handle::handle_order(data, dec!(1));
+                                    trace!(?result)
+                                }
+                            }
+                            _ => {
+                                error!("没有该命令!mode={}", mold_clone);
+                                panic!("没有该命令!mode={}", mold_clone)
+                            }
+                        }
+                    }
+                }
+            });
+            let t1 = tokio::spawn(async move {
+                //链接
+                let bool_v3_clone = Arc::clone(&bool_v1);
+                exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+            });
+            try_join!(t1).unwrap();
+        }
+        ExchangeEnum::GateSwap => {
+            error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
+            panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
         }
         ExchangeEnum::BitgetSpot => {
             // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
             // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
             // trace!(symbol_format);
-            // let name = format!("bitget_usdt_spot@{}", symbol.to_string().to_lowercase());
+            // let name = format!("bitget_spot@{}", symbol.to_string().to_lowercase());
+            // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+            // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+            // let write_tx_am = Arc::new(Mutex::new(write_tx));
             // let bool_v1 = Arc::new(AtomicBool::new(true));
             //
-            // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
-            // let mut params: BTreeMap<String, String> = BTreeMap::new();
-            // let access_key = env::var("bitget_access_key").unwrap_or("".to_string());
+            // let api_key = env::var("bitget_access_key").unwrap_or("".to_string());
             // let secret_key = env::var("bitget_secret_key").unwrap_or("".to_string());
-            // let pass_key = env::var("bitget_pass_key").unwrap_or("".to_string());
-            // params.insert("access_key".to_string(), access_key);
-            // params.insert("secret_key".to_string(), secret_key);
-            // params.insert("pass_key".to_string(), pass_key);
+            // let passphrase_key = env::var("bitget_pass_key").unwrap_or("".to_string());
+            // let params = BitgetSpotLogin {
+            //     api_key,
+            //     secret_key,
+            //     passphrase_key,
+            // };
             //
             // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
-            //     BitgetSpotWs::new_label(name, false, params, BitgetWsType::Public, res_sender)
+            //     BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Public)
             // } else {
-            //     BitgetSpotWs::new_label(name, false, params, BitgetWsType::Private, res_sender)
+            //     BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Private)
             // };
-            //
+            // exchange_wss.set_symbols(vec![symbol_format]);
             // exchange_wss.set_subscribe(subscriber_type.into());
             //
-            // let t1 = tokio::spawn(async move {
-            //     exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
-            // });
             // let mold_arc = Arc::new(mold.to_string());
-            // let t2 = tokio::spawn(async move {
-            //     let mold_clone = Arc::clone(&mold_arc);
+            // //读取
+            // tokio::spawn(async move {
             //     loop {
-            //         tokio::time::sleep(Duration::from_millis(1)).await;
-            //         if let Ok(received) = res_receiver.try_recv() {
+            //         let mold_clone = Arc::clone(&mold_arc);
+            //         if let Some(data) = read_rx.next().await {
+            //             trace!("原始数据 data:{:?}",data);
             //             match mold_clone.as_str() {
             //                 "depth" => {
-            //                     if received.data != "" {
-            //                         let result = bitget_spot_handle::handle_special_depth(received);
+            //                     if data.data != "" {
+            //                         let result = bitget_spot_handle::handle_special_depth(data);
             //                         trace!(?result)
             //                     }
             //                 }
             //                 "ticker" => {
-            //                     if received.data != "" {
-            //                         let result = bitget_spot_handle::handle_special_ticker(received);
+            //                     if data.data != "" {
+            //                         let result = bitget_spot_handle::handle_special_ticker(data);
             //                         trace!(?result)
             //                     }
             //                 }
             //                 "account" => {
-            //                     if received.data != "" {
-            //                         let result = bitget_spot_handle::handle_account_info(received, symbol_back.clone());
+            //                     if data.data != "" {
+            //                         let result = bitget_spot_handle::handle_account_info(data, symbol_back.clone());
             //                         trace!(?result)
             //                     }
             //                 }
             //                 "orders" => {
-            //                     if received.data != "" {
-            //                         let result = bitget_spot_handle::handle_order(received, dec!(1));
+            //                     if data.data != "" {
+            //                         let result = bitget_spot_handle::handle_order(data, dec!(1));
             //                         trace!(?result)
             //                     }
             //                 }
@@ -371,7 +431,12 @@ pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, _symbol: &str, _su
             //         }
             //     }
             // });
-            // try_join!(t1, t2).unwrap();
+            // let t1 = tokio::spawn(async move {
+            //     //链接
+            //     let bool_v3_clone = Arc::clone(&bool_v1);
+            //     exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+            // });
+            // try_join!(t1).unwrap();
         }
         ExchangeEnum::OkxSwap => {
             // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();

+ 6 - 6
standard/tests/kucoin_handle_test.rs

@@ -1,7 +1,7 @@
 mod exchange_test;
 
 use tracing::{instrument};
-use exchanges::kucoin_swap_ws::{KucoinSubscribeType};
+use exchanges::kucoin_swap_ws::{KucoinSwapSubscribeType};
 use standard::exchange::ExchangeEnum;
 use crate::exchange_test::test_new_exchange_wss;
 
@@ -14,7 +14,7 @@ async fn test_get_wss_depth() {
     global::log_utils::init_log_with_trace();
 
     let kucoin_subscribe_type = vec![
-        KucoinSubscribeType::PuContractMarketLevel2Depth50
+        KucoinSwapSubscribeType::PuContractMarketLevel2Depth50
     ];
     test_new_exchange_wss(ExchangeEnum::KucoinSwap, SYMBOL, kucoin_subscribe_type, "depth").await;
 }
@@ -26,7 +26,7 @@ async fn test_get_wss_ticker() {
     global::log_utils::init_log_with_trace();
 
     let kucoin_subscribe_type = vec![
-        KucoinSubscribeType::PuContractMarkettickerV2
+        KucoinSwapSubscribeType::PuContractMarkettickerV2
     ];
     test_new_exchange_wss(ExchangeEnum::KucoinSwap, SYMBOL, kucoin_subscribe_type, "ticker").await;
 }
@@ -38,7 +38,7 @@ async fn test_get_wss_account() {
     global::log_utils::init_log_with_trace();
 
     let kucoin_subscribe_type = vec![
-        KucoinSubscribeType::PrContractAccountWallet
+        KucoinSwapSubscribeType::PrContractAccountWallet
     ];
     test_new_exchange_wss(ExchangeEnum::KucoinSwap, SYMBOL, kucoin_subscribe_type, "account").await;
 }
@@ -50,7 +50,7 @@ async fn test_get_wss_position() {
     global::log_utils::init_log_with_trace();
 
     let kucoin_subscribe_type = vec![
-        KucoinSubscribeType::PrContractPosition
+        KucoinSwapSubscribeType::PrContractPosition
     ];
     test_new_exchange_wss(ExchangeEnum::KucoinSwap, SYMBOL, kucoin_subscribe_type, "position").await;
 }
@@ -62,7 +62,7 @@ async fn test_get_wss_orders() {
     global::log_utils::init_log_with_trace();
 
     let kucoin_subscribe_type = vec![
-        KucoinSubscribeType::PrContractMarketTradeOrders
+        KucoinSwapSubscribeType::PrContractMarketTradeOrders
     ];
     test_new_exchange_wss(ExchangeEnum::KucoinSwap, SYMBOL, kucoin_subscribe_type, "orders").await;
 }

+ 5 - 5
standard/tests/kucoin_spot_handle_test.rs

@@ -1,7 +1,7 @@
 mod exchange_test;
 
 use tracing::{instrument};
-use exchanges::kucoin_spot_ws::{KucoinSubscribeType};
+use exchanges::kucoin_spot_ws::{KucoinSpotSubscribeType};
 use standard::exchange::ExchangeEnum;
 use crate::exchange_test::test_new_exchange_wss;
 
@@ -14,7 +14,7 @@ async fn test_get_wss_depth() {
     global::log_utils::init_log_with_trace();
 
     let kucoin_subscribe_type = vec![
-        KucoinSubscribeType::PuSpotMarketLevel2Depth50
+        KucoinSpotSubscribeType::PuSpotMarketLevel2Depth50
     ];
     test_new_exchange_wss(ExchangeEnum::KucoinSpot, SYMBOL, kucoin_subscribe_type, "depth").await;
 }
@@ -26,7 +26,7 @@ async fn test_get_wss_ticker() {
     global::log_utils::init_log_with_trace();
 
     let kucoin_subscribe_type = vec![
-        KucoinSubscribeType::PuMarketTicker
+        KucoinSpotSubscribeType::PuMarketTicker
     ];
     test_new_exchange_wss(ExchangeEnum::KucoinSpot, SYMBOL, kucoin_subscribe_type, "ticker").await;
 }
@@ -38,7 +38,7 @@ async fn test_get_wss_account() {
     global::log_utils::init_log_with_trace();
 
     let kucoin_subscribe_type = vec![
-        KucoinSubscribeType::PrAccountBalance
+        KucoinSpotSubscribeType::PrAccountBalance
     ];
     test_new_exchange_wss(ExchangeEnum::KucoinSpot, SYMBOL, kucoin_subscribe_type, "account").await;
 }
@@ -50,7 +50,7 @@ async fn test_get_wss_orders() {
     global::log_utils::init_log_with_trace();
 
     let kucoin_subscribe_type = vec![
-        KucoinSubscribeType::PrSpotMarketTradeOrders
+        KucoinSpotSubscribeType::PrSpotMarketTradeOrders
     ];
     test_new_exchange_wss(ExchangeEnum::KucoinSpot, SYMBOL, kucoin_subscribe_type, "orders").await;
 }