|
|
@@ -1,11 +1,14 @@
|
|
|
-use std::collections::{BTreeMap};
|
|
|
+use std::collections::{BTreeMap, HashMap};
|
|
|
use std::io::{Error, ErrorKind};
|
|
|
use std::str::FromStr;
|
|
|
use tokio::sync::mpsc::Sender;
|
|
|
use async_trait::async_trait;
|
|
|
+use futures::stream::FuturesUnordered;
|
|
|
+use futures::TryStreamExt;
|
|
|
use rust_decimal::Decimal;
|
|
|
use rust_decimal::prelude::FromPrimitive;
|
|
|
use rust_decimal_macros::dec;
|
|
|
+use serde_json::json;
|
|
|
use tracing::error;
|
|
|
use exchanges::okx_swap_rest::OkxSwapRest;
|
|
|
use global::trace_stack::TraceStack;
|
|
|
@@ -28,7 +31,7 @@ pub struct OkxSwap {
|
|
|
impl OkxSwap {
|
|
|
pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> OkxSwap {
|
|
|
let market = Market::new();
|
|
|
- let mut kucoin_swap = OkxSwap {
|
|
|
+ let mut okx_swap = OkxSwap {
|
|
|
exchange: ExchangeEnum::OkxSwap,
|
|
|
symbol: symbol.to_uppercase(),
|
|
|
is_colo,
|
|
|
@@ -38,8 +41,8 @@ impl OkxSwap {
|
|
|
order_sender,
|
|
|
error_sender,
|
|
|
};
|
|
|
- kucoin_swap.market = OkxSwap::get_market(&mut kucoin_swap).await.unwrap_or(kucoin_swap.market);
|
|
|
- return kucoin_swap;
|
|
|
+ okx_swap.market = OkxSwap::get_market(&mut okx_swap).await.unwrap_or(okx_swap.market);
|
|
|
+ return okx_swap;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -317,9 +320,45 @@ impl Platform for OkxSwap {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- async fn take_order(&mut self, _custom_id: &str, _origin_side: &str, _price: Decimal, _amount: Decimal) -> Result<Order, Error> { todo!() }
|
|
|
+ async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
|
|
|
+ let symbol_format = utils::format_symbol(self.symbol.clone(), "-");
|
|
|
+ let amount_size = self.market.amount_size;
|
|
|
+ let mut params = json!({
|
|
|
+ "tdMode": "cross",
|
|
|
+ "clOrdId": custom_id.to_string(),
|
|
|
+ "instId": symbol_format,
|
|
|
+ "px": price.to_string(),
|
|
|
+ "ordType": "limit",
|
|
|
+ });
|
|
|
+ let size = (amount / amount_size).floor();
|
|
|
+ params["sz"] = json!(size);
|
|
|
+ match origin_side {
|
|
|
+ "kd" => {
|
|
|
+ params["side"] = json!("buy");
|
|
|
+ }
|
|
|
+ "pd" => {
|
|
|
+ params["side"] = json!("sell");
|
|
|
+ }
|
|
|
+ "kk" => {
|
|
|
+ params["side"] = json!("sell");
|
|
|
+ }
|
|
|
+ "pk" => {
|
|
|
+ params["side"] = json!("buy");
|
|
|
+ }
|
|
|
+ _ => { error!("下单参数错误"); }
|
|
|
+ };
|
|
|
+ let res_data = self.request.swap_order(params).await;
|
|
|
+ if res_data.code == "200" {
|
|
|
+ let res_data_str = &res_data.data;
|
|
|
+ let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
|
|
|
+ let result = okx_handle::format_order_item(res_data_json, amount_size);
|
|
|
+ Ok(result)
|
|
|
+ } else {
|
|
|
+ Err(Error::new(ErrorKind::Other, res_data.to_string()))
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- async fn take_order_symbol(&mut self, _symbol: String, _custom_id: &str, _origin_side: &str, _price: Decimal, _amount: Decimal) -> Result<Order, Error> { todo!() }
|
|
|
+ async fn take_order_symbol(&mut self, _symbol: String, _custom_id: &str, _origin_side: &str, _price: Decimal, _amount: Decimal) -> Result<Order, Error> { Err(Error::new(ErrorKind::NotFound, "okx:该交易所方法未实现".to_string())) }
|
|
|
|
|
|
async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
|
|
|
let symbol_format = utils::format_symbol(self.symbol.clone(), "-");
|
|
|
@@ -343,7 +382,7 @@ impl Platform for OkxSwap {
|
|
|
avg_price: dec!(0),
|
|
|
status: "REMOVE".to_string(),
|
|
|
order_type: "".to_string(),
|
|
|
- trace_stack: TraceStack::default().on_special("360 kucoin_swap".to_string()),
|
|
|
+ trace_stack: TraceStack::default().on_special("346 okx_swap".to_string()),
|
|
|
};
|
|
|
Ok(result)
|
|
|
} else {
|
|
|
@@ -352,20 +391,143 @@ impl Platform for OkxSwap {
|
|
|
}
|
|
|
|
|
|
async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
|
|
|
- todo!()
|
|
|
+ Err(Error::new(ErrorKind::NotFound, "okx:该交易所方法未实现".to_string()))
|
|
|
}
|
|
|
|
|
|
- async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> { todo!() }
|
|
|
-
|
|
|
- async fn set_dual_mode(&mut self, _coin: &str, _is_dual_mode: bool) -> Result<String, Error> { todo!() }
|
|
|
+ async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> {
|
|
|
+ Err(Error::new(ErrorKind::NotFound, "okx:该交易所方法未实现".to_string()))
|
|
|
+ }
|
|
|
|
|
|
- async fn set_dual_leverage(&mut self, _leverage: &str) -> Result<String, Error> {
|
|
|
- todo!()
|
|
|
+ async fn set_dual_mode(&mut self, _coin: &str, _is_dual_mode: bool) -> Result<String, Error> {
|
|
|
+ let res_data = self.request.set_position_mode().await;
|
|
|
+ if res_data.code == "200" {
|
|
|
+ let res_data_str = &res_data.data;
|
|
|
+ let result = res_data_str.clone();
|
|
|
+ Ok(result)
|
|
|
+ } else {
|
|
|
+ Err(Error::new(ErrorKind::Other, res_data.to_string()))
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> { todo!() }
|
|
|
+ async fn set_dual_leverage(&mut self, leverage: &str) -> Result<String, Error> {
|
|
|
+ let symbol_format = utils::format_symbol(self.symbol.clone(), "-");
|
|
|
+ let res_data = self.request.set_leverage(symbol_format, leverage.to_string()).await;
|
|
|
+ if res_data.code == "200" {
|
|
|
+ let res_data_str = &res_data.data;
|
|
|
+ let result = res_data_str.clone();
|
|
|
+ Ok(result)
|
|
|
+ } else {
|
|
|
+ Err(Error::new(ErrorKind::Other, res_data.to_string()))
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> { todo!() }
|
|
|
+ async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "okx:该交易所方法未实现".to_string())) }
|
|
|
+
|
|
|
+ async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "okx:该交易所方法未实现".to_string())) }
|
|
|
+
|
|
|
+ async fn command_order(&mut self, order_command: OrderCommand, trace_stack: TraceStack) {
|
|
|
+ let mut handles = vec![];
|
|
|
+ // 撤销订单
|
|
|
+ let cancel = order_command.cancel;
|
|
|
+ for item in cancel.keys() {
|
|
|
+ let mut self_clone = self.clone();
|
|
|
+ let cancel_clone = cancel.clone();
|
|
|
+ let item_clone = item.clone();
|
|
|
+ let order_id = cancel_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
|
|
|
+ let custom_id = cancel_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
|
|
|
+ let result_sd = self.order_sender.clone();
|
|
|
+ let err_sd = self.error_sender.clone();
|
|
|
+ let handle = tokio::spawn(async move {
|
|
|
+ if order_id != "" {
|
|
|
+ let result = self_clone.cancel_order(&order_id, &custom_id).await;
|
|
|
+ match result {
|
|
|
+ Ok(_) => {
|
|
|
+ // result_sd.send(result).await.unwrap();
|
|
|
+ }
|
|
|
+ Err(error) => {
|
|
|
+ // 取消失败去查订单。
|
|
|
+ let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
|
|
|
+ match query_rst {
|
|
|
+ Ok(order) => {
|
|
|
+ result_sd.send(order).await.unwrap();
|
|
|
+ }
|
|
|
+ Err(query_err) => {
|
|
|
+ error!(?query_err);
|
|
|
+ error!("撤单失败,而且查单也失败了,okx_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ err_sd.send(error).await.unwrap();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ handles.push(handle)
|
|
|
+ }
|
|
|
+ // 下单指令
|
|
|
+ let mut limits = HashMap::new();
|
|
|
+ limits.extend(order_command.limits_open);
|
|
|
+ limits.extend(order_command.limits_close);
|
|
|
+ for item in limits.keys() {
|
|
|
+ let mut self_clone = self.clone();
|
|
|
+ let limits_clone = limits.clone();
|
|
|
+ let item_clone = item.clone();
|
|
|
+ let result_sd = self.order_sender.clone();
|
|
|
+ let err_sd = self.error_sender.clone();
|
|
|
+ let mut ts = trace_stack.clone();
|
|
|
+
|
|
|
+ let handle = tokio::spawn(async move {
|
|
|
+ let value = limits_clone[&item_clone].clone();
|
|
|
+ let amount = Decimal::from_str(value.get(0).unwrap_or(&"0".to_string())).unwrap();
|
|
|
+ let side = value.get(1).unwrap();
|
|
|
+ let price = Decimal::from_str(value.get(2).unwrap_or(&"0".to_string())).unwrap();
|
|
|
+ let cid = value.get(3).unwrap();
|
|
|
+
|
|
|
+ // order_name: [数量,方向,价格,c_id]
|
|
|
+ let result = self_clone.take_order(cid, side, price, amount).await;
|
|
|
+ match result {
|
|
|
+ Ok(mut result) => {
|
|
|
+ ts.on_after_send();
|
|
|
+ result.trace_stack = ts.clone();
|
|
|
+
|
|
|
+ result_sd.send(result).await.unwrap();
|
|
|
+ }
|
|
|
+ Err(error) => {
|
|
|
+ let mut err_order = Order::new();
|
|
|
+ err_order.custom_id = cid.clone();
|
|
|
+ err_order.status = "REMOVE".to_string();
|
|
|
+
|
|
|
+ result_sd.send(err_order).await.unwrap();
|
|
|
+ err_sd.send(error).await.unwrap();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ handles.push(handle)
|
|
|
+ }
|
|
|
+ // 检查订单指令
|
|
|
+ let check = order_command.check;
|
|
|
+ for item in check.keys() {
|
|
|
+ let mut self_clone = self.clone();
|
|
|
+ let check_clone = check.clone();
|
|
|
+ let item_clone = item.clone();
|
|
|
+ let order_id = check_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
|
|
|
+ let custom_id = check_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
|
|
|
+ let result_sd = self.order_sender.clone();
|
|
|
+ let err_sd = self.error_sender.clone();
|
|
|
+ let handle = tokio::spawn(async move {
|
|
|
+ let result = self_clone.get_order_detail(&order_id, &custom_id).await;
|
|
|
+ match result {
|
|
|
+ Ok(result) => {
|
|
|
+ result_sd.send(result).await.unwrap();
|
|
|
+ }
|
|
|
+ Err(error) => {
|
|
|
+ err_sd.send(error).await.unwrap();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ handles.push(handle)
|
|
|
+ }
|
|
|
|
|
|
- async fn command_order(&mut self, _order_command: OrderCommand, _trace_stack: TraceStack) { todo!() }
|
|
|
+ let futures = FuturesUnordered::from_iter(handles);
|
|
|
+ let _: Result<Vec<_>, _> = futures.try_collect().await;
|
|
|
+ }
|
|
|
}
|