|
|
@@ -3,18 +3,14 @@ use std::io::{Error, ErrorKind};
|
|
|
use std::str::FromStr;
|
|
|
use tokio::sync::mpsc::Sender;
|
|
|
use async_trait::async_trait;
|
|
|
-use futures::stream::FuturesUnordered;
|
|
|
-use futures::TryStreamExt;
|
|
|
use rust_decimal::Decimal;
|
|
|
use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
|
|
|
use rust_decimal_macros::dec;
|
|
|
use serde_json::{json, Value};
|
|
|
-use tokio::time::Instant;
|
|
|
use tracing::{error, info};
|
|
|
use exchanges::kucoin_swap_rest::KucoinSwapRest;
|
|
|
-use global::trace_stack::TraceStack;
|
|
|
use crate::exchange::ExchangeEnum;
|
|
|
-use crate::{Account, kucoin_handle, Market, Order, OrderCommand, Platform, Position, Ticker, utils};
|
|
|
+use crate::{Account, kucoin_handle, Market, Order, Platform, Position, Ticker, utils};
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
#[derive(Clone)]
|
|
|
@@ -360,8 +356,7 @@ impl Platform for KucoinSwap {
|
|
|
deal_amount: Decimal::ZERO,
|
|
|
avg_price: Decimal::ZERO,
|
|
|
status: "NEW".to_string(),
|
|
|
- order_type: "".to_string(),
|
|
|
- trace_stack: TraceStack::new(0, Instant::now()).on_special("359 kucoin_swap".to_string()),
|
|
|
+ order_type: "".to_string()
|
|
|
};
|
|
|
Ok(result)
|
|
|
} else {
|
|
|
@@ -412,8 +407,7 @@ impl Platform for KucoinSwap {
|
|
|
deal_amount: Decimal::ZERO,
|
|
|
avg_price: Decimal::ZERO,
|
|
|
status: "NEW".to_string(),
|
|
|
- order_type: "".to_string(),
|
|
|
- trace_stack: TraceStack::new(0, Instant::now()).on_special("408 kucoin_swap".to_string()),
|
|
|
+ order_type: "".to_string()
|
|
|
};
|
|
|
Ok(result)
|
|
|
} else {
|
|
|
@@ -439,8 +433,7 @@ impl Platform for KucoinSwap {
|
|
|
deal_amount: Decimal::ZERO,
|
|
|
avg_price: Decimal::ZERO,
|
|
|
status: "REMOVE".to_string(),
|
|
|
- order_type: "".to_string(),
|
|
|
- trace_stack: TraceStack::new(0, Instant::now()).on_special("436 kucoin_swap".to_string()),
|
|
|
+ order_type: "".to_string()
|
|
|
};
|
|
|
Ok(result)
|
|
|
} else {
|
|
|
@@ -464,8 +457,7 @@ impl Platform for KucoinSwap {
|
|
|
deal_amount: Decimal::ZERO,
|
|
|
avg_price: Decimal::ZERO,
|
|
|
status: "REMOVE".to_string(),
|
|
|
- order_type: "".to_string(),
|
|
|
- trace_stack: TraceStack::new(0, Instant::now()).on_special("461 kucoin_swap".to_string()),
|
|
|
+ order_type: "".to_string()
|
|
|
}
|
|
|
).collect();
|
|
|
Ok(result)
|
|
|
@@ -488,8 +480,7 @@ impl Platform for KucoinSwap {
|
|
|
deal_amount: Decimal::ZERO,
|
|
|
avg_price: Decimal::ZERO,
|
|
|
status: "REMOVE".to_string(),
|
|
|
- order_type: "".to_string(),
|
|
|
- trace_stack: TraceStack::new(0, Instant::now()).on_special("486 kucoin_swap".to_string()),
|
|
|
+ order_type: "".to_string()
|
|
|
}
|
|
|
).collect();
|
|
|
Ok(result)
|
|
|
@@ -528,112 +519,6 @@ impl Platform for KucoinSwap {
|
|
|
async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
|
|
|
Err(Error::new(ErrorKind::NotFound, "kucoin_swap:该交易所方法未实现".to_string()))
|
|
|
}
|
|
|
-
|
|
|
- // 指令下单
|
|
|
- async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack) {
|
|
|
- let mut handles = vec![];
|
|
|
- // 下单指令,limits_open里已经包含了limits_close,在core里面处理过了
|
|
|
- for item in order_command.limits_open.keys() {
|
|
|
- let mut ts = trace_stack.clone();
|
|
|
-
|
|
|
- let amount = Decimal::from_str(&*order_command.limits_open[item].get(0).unwrap().clone()).unwrap();
|
|
|
- let side = order_command.limits_open[item].get(1).unwrap().clone();
|
|
|
- let price = Decimal::from_str(&*order_command.limits_open[item].get(2).unwrap().clone()).unwrap();
|
|
|
- let cid = order_command.limits_open[item].get(3).unwrap().clone();
|
|
|
-
|
|
|
- let mut self_clone = self.clone();
|
|
|
- let handle = tokio::spawn(async move {
|
|
|
- ts.on_before_send();
|
|
|
- let result = self_clone.take_order(&cid, &side, price, amount).await;
|
|
|
- ts.on_after_send();
|
|
|
-
|
|
|
- match result {
|
|
|
- Ok(mut result) => {
|
|
|
- // ts.on_after_send();
|
|
|
- result.trace_stack = ts.clone();
|
|
|
-
|
|
|
- self_clone.order_sender.send(result).await.unwrap();
|
|
|
- }
|
|
|
- Err(error) => {
|
|
|
- let mut err_order = Order::new();
|
|
|
- err_order.custom_id = cid.clone();
|
|
|
- err_order.status = "REMOVE".to_string();
|
|
|
-
|
|
|
- self_clone.order_sender.send(err_order).await.unwrap();
|
|
|
- self_clone.error_sender.send(error).await.unwrap();
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- handles.push(handle)
|
|
|
- }
|
|
|
- let futures = FuturesUnordered::from_iter(handles);
|
|
|
- // 等待所有任务完成
|
|
|
- let _: Result<Vec<_>, _> = futures.try_collect().await;
|
|
|
-
|
|
|
- // 撤销订单
|
|
|
- let mut cancel_handlers = vec![];
|
|
|
- for item in order_command.cancel.keys() {
|
|
|
- let order_id = order_command.cancel[item].get(1).unwrap().clone();
|
|
|
- let custom_id = order_command.cancel[item].get(0).unwrap().clone();
|
|
|
-
|
|
|
- let mut self_clone = self.clone();
|
|
|
-
|
|
|
- let handle = tokio::spawn(async move {
|
|
|
- if order_id != "" {
|
|
|
- let result = self_clone.cancel_order(&order_id, &custom_id).await;
|
|
|
- match result {
|
|
|
- Ok(_) => {
|
|
|
- // result_sd.send(result).await.unwrap();
|
|
|
- }
|
|
|
- Err(error) => {
|
|
|
- // 取消失败去查订单。
|
|
|
- let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
|
|
|
- match query_rst {
|
|
|
- Ok(order) => {
|
|
|
- self_clone.order_sender.send(order).await.unwrap();
|
|
|
- }
|
|
|
- Err(_query_err) => {
|
|
|
- // error!(?_query_err);
|
|
|
- // error!("撤单失败,而且查单也失败了,kucoin_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
|
|
|
- }
|
|
|
- }
|
|
|
- self_clone.error_sender.send(error).await.unwrap();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- cancel_handlers.push(handle)
|
|
|
- }
|
|
|
- let futures = FuturesUnordered::from_iter(cancel_handlers);
|
|
|
- // 等待所有任务完成
|
|
|
- let _: Result<Vec<_>, _> = futures.try_collect().await;
|
|
|
-
|
|
|
- // 检查订单指令
|
|
|
- let mut check_handlers = vec![];
|
|
|
- for item in order_command.check.keys() {
|
|
|
- let order_id = order_command.check[item].get(1).unwrap().clone();
|
|
|
- let custom_id = order_command.check[item].get(0).unwrap().clone();
|
|
|
-
|
|
|
- let mut self_clone = self.clone();
|
|
|
-
|
|
|
- let handle = tokio::spawn(async move {
|
|
|
- let result = self_clone.get_order_detail(&order_id, &custom_id).await;
|
|
|
- match result {
|
|
|
- Ok(result) => {
|
|
|
- self_clone.order_sender.send(result).await.unwrap();
|
|
|
- }
|
|
|
- Err(error) => {
|
|
|
- self_clone.error_sender.send(error).await.unwrap();
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- check_handlers.push(handle)
|
|
|
- }
|
|
|
-
|
|
|
- let futures = FuturesUnordered::from_iter(check_handlers);
|
|
|
- // 等待所有任务完成
|
|
|
- let _: Result<Vec<_>, _> = futures.try_collect().await;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
|
|
|
@@ -662,7 +547,6 @@ pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
|
|
|
deal_amount,
|
|
|
avg_price,
|
|
|
status: custom_status,
|
|
|
- order_type: order["type"].as_str().unwrap().to_string(),
|
|
|
- trace_stack: TraceStack::new(0, Instant::now()).on_special("655 kucoin_swap".to_string()),
|
|
|
+ order_type: order["type"].as_str().unwrap().to_string()
|
|
|
}
|
|
|
}
|