Explorar el Código

exchange添加Sender

gepangpang hace 2 años
padre
commit
5f7b0dfc1f

+ 2 - 2
standard/src/binance_handle.rs

@@ -62,8 +62,8 @@ pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
 pub fn handle_special_depth(res_data: ResponseData) -> SpecialDepth {
     let res_data_str = res_data.data;
     let res_data_json: serde_json::Value = serde_json::from_str(&*res_data_str).unwrap();
-    let mut depth_asks: Vec<MarketOrder> = parse_depth_items(&res_data_json["asks"]);
-    let mut depth_bids: Vec<MarketOrder> = parse_depth_items(&res_data_json["bids"]);
+    let mut depth_asks: Vec<MarketOrder> = parse_depth_items(&res_data_json["a"]);
+    let mut depth_bids: Vec<MarketOrder> = parse_depth_items(&res_data_json["b"]);
     depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
     depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
     let mp = (depth_asks[0].price + depth_bids[0].price) * dec!(0.5);

+ 1 - 2
standard/src/binance_spot.rs

@@ -1,7 +1,6 @@
 use std::collections::BTreeMap;
 use std::io::{Error};
 use std::result::Result;
-use tokio::sync::mpsc::Sender;
 use async_trait::async_trait;
 use rust_decimal::Decimal;
 use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand};
@@ -95,5 +94,5 @@ impl Platform for BinanceSpot {
         todo!()
     }
 
-    async fn command_order(&self, _order_command: OrderCommand, _order_sender: Sender<Order>, _error_sender: Sender<Error>) { todo!() }
+    async fn command_order(&self, _order_command: OrderCommand) { todo!() }
 }

+ 1 - 2
standard/src/binance_swap.rs

@@ -1,7 +1,6 @@
 use std::collections::BTreeMap;
 use std::io::{Error, ErrorKind};
 use std::result::Result;
-use tokio::sync::mpsc::Sender;
 use async_trait::async_trait;
 use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
@@ -118,7 +117,7 @@ impl Platform for BinanceSwap {
         todo!()
     }
 
-    async fn command_order(&self, _order_command: OrderCommand, _order_sender: Sender<Order>, _error_sender: Sender<Error>) { todo!() }
+    async fn command_order(&self, _order_command: OrderCommand) { todo!() }
 
     // 订阅深度信息
     // fn subscribe_depth(&self, symbol: &str) {

+ 6 - 5
standard/src/exchange.rs

@@ -1,10 +1,12 @@
 use std::collections::{BTreeMap};
+use std::io::Error;
+use tokio::sync::mpsc::Sender;
 use crate::binance_swap::BinanceSwap;
 use crate::binance_spot::BinanceSpot;
 use crate::gate_spot::GateSpot;
 use crate::gate_swap::GateSwap;
 use crate::kucoin_swap::KucoinSwap;
-use crate::Platform;
+use crate::{Order, Platform};
 
 /// 交易所交易模式枚举
 /// - `BinanceSwap`: Binance交易所期货;
@@ -48,12 +50,11 @@ pub enum ExchangeEnum {
 /// params.insert("access_key".to_string(), "your_access_key".to_string());
 /// params.insert("access_key".to_string(), "your_secret_key".to_string());
 /// let exchange = Exchange::new(ExchangeEnum::BinanceSwap, "BTC_USDT".to_string(), false, params);
-
 #[derive(Debug)]
 pub struct Exchange;
 
 impl Exchange {
-    pub async fn new(exchange: ExchangeEnum, symbol: String, is_colo: bool, params: BTreeMap<String, String>) -> Box<dyn Platform + Send + Sync> {
+    pub async fn new(exchange: ExchangeEnum, symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> Box<dyn Platform + Send + Sync> {
         match exchange {
             ExchangeEnum::BinanceSwap => {
                 Box::new(BinanceSwap::new(symbol, is_colo, params).await)
@@ -62,13 +63,13 @@ impl Exchange {
                 Box::new(BinanceSpot::new(symbol, is_colo, params))
             }
             ExchangeEnum::GateSwap => {
-                Box::new(GateSwap::new(symbol, is_colo, params).await)
+                Box::new(GateSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
             }
             ExchangeEnum::GateSpot => {
                 Box::new(GateSpot::new(symbol, is_colo, params))
             }
             ExchangeEnum::KucoinSwap => {
-                Box::new(KucoinSwap::new(symbol, is_colo, params).await)
+                Box::new(KucoinSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
             }
         }
     }

+ 1 - 2
standard/src/gate_spot.rs

@@ -1,6 +1,5 @@
 use std::collections::BTreeMap;
 use std::io::{Error};
-use tokio::sync::mpsc::Sender;
 use async_trait::async_trait;
 use rust_decimal::Decimal;
 use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand};
@@ -95,5 +94,5 @@ impl Platform for GateSpot {
         todo!()
     }
 
-    async fn command_order(&self, _order_command: OrderCommand, _order_sender: Sender<Order>, _error_sender: Sender<Error>) { todo!() }
+    async fn command_order(&self, _order_command: OrderCommand) { todo!() }
 }

+ 12 - 8
standard/src/gate_swap.rs

@@ -21,10 +21,12 @@ pub struct GateSwap {
     params: BTreeMap<String, String>,
     request: GateSwapRest,
     market: Market,
+    order_sender: Sender<Order>,
+    error_sender: Sender<Error>,
 }
 
 impl GateSwap {
-    pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>) -> GateSwap {
+    pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> GateSwap {
         let market = Market::new();
         let mut gate_swap = GateSwap {
             exchange: ExchangeEnum::GateSwap,
@@ -33,6 +35,8 @@ impl GateSwap {
             params: params.clone(),
             request: GateSwapRest::new(is_colo, params.clone()),
             market,
+            order_sender,
+            error_sender,
         };
         gate_swap.market = GateSwap::get_market(&gate_swap).await.unwrap_or(gate_swap.market);
         return gate_swap;
@@ -314,7 +318,7 @@ impl Platform for GateSwap {
         }
     }
     // 指令下单
-    async fn command_order(&self, order_command: OrderCommand, order_sender: Sender<Order>, error_sender: Sender<Error>) {
+    async fn command_order(&self, order_command: OrderCommand) {
         let mut handles = vec![];
         // 撤销订单
         let cancel = order_command.cancel;
@@ -324,8 +328,8 @@ impl Platform for GateSwap {
             let item_clone = item.clone();
             let order_id = cancel_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
             let custom_id = cancel_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
-            let result_sd = order_sender.clone();
-            let err_sd = error_sender.clone();
+            let result_sd = self.order_sender.clone();
+            let err_sd = self.error_sender.clone();
             let handle = tokio::spawn(async move {
                 let result = self_clone.cancel_order(&order_id, &custom_id).await;
                 match result {
@@ -347,8 +351,8 @@ impl Platform for GateSwap {
             let self_clone = self.clone();
             let limits_clone = limits.clone();
             let item_clone = item.clone();
-            let result_sd = order_sender.clone();
-            let err_sd = error_sender.clone();
+            let result_sd = self.order_sender.clone();
+            let err_sd = self.error_sender.clone();
             let handle = tokio::spawn(async move {
                 let value = limits_clone[&item_clone].clone();
                 let amount = Decimal::from_str(value.get(0).unwrap_or(&"0".to_string())).unwrap();
@@ -377,8 +381,8 @@ impl Platform for GateSwap {
             let item_clone = item.clone();
             let order_id = check_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
             let custom_id = check_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
-            let result_sd = order_sender.clone();
-            let err_sd = error_sender.clone();
+            let result_sd = self.order_sender.clone();
+            let err_sd = self.error_sender.clone();
             let handle = tokio::spawn(async move {
                 let result = self_clone.get_order_detail(&order_id, &custom_id).await;
                 match result {

+ 12 - 8
standard/src/kucoin_swap.rs

@@ -22,10 +22,12 @@ pub struct KucoinSwap {
     params: BTreeMap<String, String>,
     request: KucoinSwapRest,
     market: Market,
+    order_sender: Sender<Order>,
+    error_sender: Sender<Error>,
 }
 
 impl KucoinSwap {
-    pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>) -> KucoinSwap {
+    pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> KucoinSwap {
         let market = Market::new();
         let mut kucoin_swap = KucoinSwap {
             exchange: ExchangeEnum::KucoinSwap,
@@ -34,6 +36,8 @@ impl KucoinSwap {
             params: params.clone(),
             request: KucoinSwapRest::new(is_colo, params.clone()),
             market,
+            order_sender,
+            error_sender,
         };
         kucoin_swap.market = KucoinSwap::get_market(&kucoin_swap).await.unwrap_or(kucoin_swap.market);
         return kucoin_swap;
@@ -269,7 +273,7 @@ impl Platform for KucoinSwap {
     }
 
     // 指令下单
-    async fn command_order(&self, order_command: OrderCommand, order_sender: Sender<Order>, error_sender: Sender<Error>) {
+    async fn command_order(&self, order_command: OrderCommand) {
         let mut handles = vec![];
         // 撤销订单
         let cancel = order_command.cancel;
@@ -279,8 +283,8 @@ impl Platform for KucoinSwap {
             let item_clone = item.clone();
             let order_id = cancel_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
             let custom_id = cancel_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
-            let result_sd = order_sender.clone();
-            let err_sd = error_sender.clone();
+            let result_sd = self.order_sender.clone();
+            let err_sd = self.error_sender.clone();
             let handle = tokio::spawn(async move {
                 let result = self_clone.cancel_order(&order_id, &custom_id).await;
                 match result {
@@ -302,8 +306,8 @@ impl Platform for KucoinSwap {
             let self_clone = self.clone();
             let limits_clone = limits.clone();
             let item_clone = item.clone();
-            let result_sd = order_sender.clone();
-            let err_sd = error_sender.clone();
+            let result_sd = self.order_sender.clone();
+            let err_sd = self.error_sender.clone();
             let handle = tokio::spawn(async move {
                 let value = limits_clone[&item_clone].clone();
                 let amount = Decimal::from_str(value.get(0).unwrap_or(&"0".to_string())).unwrap();
@@ -332,8 +336,8 @@ impl Platform for KucoinSwap {
             let item_clone = item.clone();
             let order_id = check_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
             let custom_id = check_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
-            let result_sd = order_sender.clone();
-            let err_sd = error_sender.clone();
+            let result_sd = self.order_sender.clone();
+            let err_sd = self.error_sender.clone();
             let handle = tokio::spawn(async move {
                 let result = self_clone.get_order_detail(&order_id, &custom_id).await;
                 match result {

+ 1 - 2
standard/src/lib.rs

@@ -1,6 +1,5 @@
 use std::collections::{BTreeMap, HashMap};
 use std::io::{Error};
-use tokio::sync::mpsc::Sender;
 use async_trait::async_trait;
 use rust_decimal::Decimal;
 
@@ -400,5 +399,5 @@ pub trait Platform {
     // 批量撤销订单
     async fn cancel_orders(&self) -> Result<Vec<Order>, Error>;
     // 指令下单
-    async fn command_order(&self, order_command: OrderCommand, order_sender: Sender<Order>, error_sender: Sender<Error>);
+    async fn command_order(&self, order_command: OrderCommand);
 }

+ 10 - 6
standard/tests/libs_test.rs

@@ -1,9 +1,11 @@
 use std::collections::BTreeMap;
 use std::env;
+use std::io::Error;
 use rust_decimal_macros::dec;
+use tokio::sync::mpsc;
 use tracing::{instrument, trace};
 use exchanges::proxy;
-use standard::{Platform};
+use standard::{Order, Platform};
 use standard::utils;
 use standard::exchange::{Exchange, ExchangeEnum};
 
@@ -12,6 +14,8 @@ async fn test_new_exchange(exchange: ExchangeEnum) -> Box<dyn Platform> {
     if proxy::ParsingDetail::http_enable_proxy() {
         trace!("检测有代理配置,配置走代理");
     }
+    let (order_sender, _order_receiver): (mpsc::Sender<Order>, mpsc::Receiver<Order>) = mpsc::channel(1024);
+    let (error_sender, _error_receiver): (mpsc::Sender<Error>, mpsc::Receiver<Error>) = mpsc::channel(1024);
     match exchange {
         ExchangeEnum::BinanceSwap => {
             let mut params: BTreeMap<String, String> = BTreeMap::new();
@@ -19,7 +23,7 @@ async fn test_new_exchange(exchange: ExchangeEnum) -> Box<dyn Platform> {
             let secret_key = env::var("binance_secret_key").unwrap_or("".to_string());
             params.insert("access_key".to_string(), access_key);
             params.insert("secret_key".to_string(), secret_key);
-            Exchange::new(exchange, "ROSE_USDT".to_string(), false, params).await
+            Exchange::new(exchange, "ROSE_USDT".to_string(), false, params, order_sender, error_sender).await
         }
         ExchangeEnum::BinanceSpot => {
             let mut params: BTreeMap<String, String> = BTreeMap::new();
@@ -27,7 +31,7 @@ async fn test_new_exchange(exchange: ExchangeEnum) -> Box<dyn Platform> {
             let secret_key = env::var("binance_secret_key").unwrap_or("".to_string());
             params.insert("access_key".to_string(), access_key);
             params.insert("secret_key".to_string(), secret_key);
-            Exchange::new(exchange, "ROSE_USDT".to_string(), false, params).await
+            Exchange::new(exchange, "ROSE_USDT".to_string(), false, params, order_sender, error_sender).await
         }
         ExchangeEnum::GateSwap => {
             let mut params: BTreeMap<String, String> = BTreeMap::new();
@@ -35,7 +39,7 @@ async fn test_new_exchange(exchange: ExchangeEnum) -> Box<dyn Platform> {
             let secret_key = env::var("gate_secret_key").unwrap_or("".to_string());
             params.insert("access_key".to_string(), access_key);
             params.insert("secret_key".to_string(), secret_key);
-            Exchange::new(exchange, "ROSE_USDT".to_string(), false, params).await
+            Exchange::new(exchange, "ROSE_USDT".to_string(), false, params, order_sender, error_sender).await
         }
         ExchangeEnum::GateSpot => {
             let mut params: BTreeMap<String, String> = BTreeMap::new();
@@ -43,7 +47,7 @@ async fn test_new_exchange(exchange: ExchangeEnum) -> Box<dyn Platform> {
             let secret_key = env::var("gate_secret_key").unwrap_or("".to_string());
             params.insert("access_key".to_string(), access_key);
             params.insert("secret_key".to_string(), secret_key);
-            Exchange::new(exchange, "ROSE_USDT".to_string(), false, params).await
+            Exchange::new(exchange, "ROSE_USDT".to_string(), false, params, order_sender, error_sender).await
         }
         ExchangeEnum::KucoinSwap => {
             let mut params: BTreeMap<String, String> = BTreeMap::new();
@@ -53,7 +57,7 @@ async fn test_new_exchange(exchange: ExchangeEnum) -> Box<dyn Platform> {
             params.insert("access_key".to_string(), access_key);
             params.insert("secret_key".to_string(), secret_key);
             params.insert("pass_key".to_string(), pass_key);
-            Exchange::new(exchange, "ROSE_USDT".to_string(), false, params).await
+            Exchange::new(exchange, "ROSE_USDT".to_string(), false, params, order_sender, error_sender).await
         }
     }
 }