| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509 |
- use std::collections::{BTreeMap, HashMap};
- use std::io::{Error, ErrorKind};
- use std::str::FromStr;
- use tokio::sync::mpsc::Sender;
- use async_trait::async_trait;
- use futures::stream::FuturesUnordered;
- use futures::TryStreamExt;
- use rust_decimal::Decimal;
- use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
- use rust_decimal_macros::dec;
- use serde_json::{json, Value};
- use tracing::{error};
- use exchanges::kucoin_swap_rest::KucoinSwapRest;
- use global::trace_stack::TraceStack;
- use crate::exchange::ExchangeEnum;
- use crate::{Account, kucoin_handle, Market, Order, OrderCommand, Platform, Position, Ticker, utils};
- #[allow(dead_code)]
- #[derive(Clone)]
- pub struct KucoinSwap {
- exchange: ExchangeEnum,
- symbol: String,
- is_colo: bool,
- params: BTreeMap<String, String>,
- request: KucoinSwapRest,
- market: Market,
- order_sender: Sender<Order>,
- error_sender: Sender<Error>,
- }
- impl KucoinSwap {
- pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> KucoinSwap {
- let market = Market::new();
- let mut kucoin_swap = KucoinSwap {
- exchange: ExchangeEnum::KucoinSwap,
- symbol: symbol.to_uppercase(),
- is_colo,
- params: params.clone(),
- request: KucoinSwapRest::new(is_colo, params.clone()),
- market,
- order_sender,
- error_sender,
- };
- kucoin_swap.market = KucoinSwap::get_market(&mut kucoin_swap).await.unwrap_or(kucoin_swap.market);
- // 开启自动追加保证金
- kucoin_swap.set_auto_deposit_status(true).await.unwrap();
- return kucoin_swap;
- }
- }
- #[async_trait]
- impl Platform for KucoinSwap {
- // 克隆方法
- fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
- fn get_self_exchange(&self) -> ExchangeEnum {
- ExchangeEnum::KucoinSwap
- }
- // 获取交易对
- fn get_self_symbol(&self) -> String { self.symbol.clone() }
- // 获取是否使用高速通道
- fn get_self_is_colo(&self) -> bool {
- self.is_colo
- }
- // 获取params信息
- fn get_self_params(&self) -> BTreeMap<String, String> {
- self.params.clone()
- }
- // 获取market信息
- fn get_self_market(&self) -> Market { self.market.clone() }
- // 获取请求时间
- fn get_request_delays(&self) -> Vec<i64> { self.request.get_delays() }
- // 获取请求平均时间
- fn get_request_avg_delay(&self) -> Decimal { self.request.get_avg_delay() }
- // 获取请求最大时间
- fn get_request_max_delay(&self) -> i64 { self.request.get_max_delay() }
- // 获取服务器时间
- async fn get_server_time(&mut self) -> Result<String, Error> {
- let res_data = self.request.get_server_time().await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let result = res_data_str.clone();
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- // 获取账号信息
- async fn get_account(&mut self) -> Result<Account, Error> {
- let symbol_array: Vec<&str> = self.symbol.split("_").collect();
- let res_data = self.request.get_account(symbol_array[1].to_string()).await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
- let balance = Decimal::from_str(&res_data_json["accountEquity"].to_string()).unwrap();
- let available_balance = Decimal::from_str(&res_data_json["availableBalance"].to_string()).unwrap();
- let frozen_balance = balance - available_balance;
- let result = Account {
- balance,
- available_balance,
- frozen_balance,
- stocks: dec!(0),
- available_stocks: dec!(0),
- frozen_stocks: dec!(0),
- };
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_position(&mut self) -> Result<Vec<Position>, Error> {
- let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
- let amount_size = self.market.amount_size;
- let res_data = self.request.get_position(symbol_format).await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
- let result = kucoin_handle::format_position_item(&res_data_json, amount_size);
- Ok(vec![result])
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
- let symbol_array: Vec<&str> = self.symbol.split("_").collect();
- let amount_size = self.market.amount_size;
- let res_data = self.request.get_positions(symbol_array[1].to_string()).await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
- let mut result = Vec::new();
- for item in res_data_json.iter() {
- result.push(kucoin_handle::format_position_item(item, amount_size))
- }
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_ticker(&mut self) -> Result<Ticker, Error> {
- let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
- let res_data = self.request.get_ticker(symbol_format).await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
- let ticker_info = res_data_json;
- let time = (Decimal::from_str(&*ticker_info["ts"].to_string()).unwrap() / dec!(1000000)).floor().to_i64().unwrap();
- let result = Ticker {
- time,
- high: Decimal::from_str(&ticker_info["bestAskPrice"].to_string()).unwrap(),
- low: Decimal::from_str(&ticker_info["bestBidPrice"].to_string()).unwrap(),
- sell: Decimal::from_str(&ticker_info["bestAskPrice"].to_string()).unwrap(),
- buy: Decimal::from_str(&ticker_info["bestBidPrice"].to_string()).unwrap(),
- last: Decimal::from_str(&ticker_info["price"].to_string()).unwrap(),
- volume: Decimal::from_str(&ticker_info["size"].to_string()).unwrap(),
- };
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_market(&mut self) -> Result<Market, Error> {
- let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
- let res_data = self.request.get_market_details().await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
- let market_info = res_data_json.iter().find(|item| item["symbol"].as_str().unwrap() == symbol_format);
- match market_info {
- None => {
- error!("Kucoin:获取Market信息错误!\nget_market:res_data={:?}", res_data_str);
- panic!("Kucoin:获取Market信息错误!\nget_market:res_data={:?}", res_data_str)
- }
- Some(value) => {
- let base_asset = value["baseCurrency"].as_str().unwrap_or("").to_string();
- let quote_asset = value["quoteCurrency"].as_str().unwrap_or("").to_string();
- let tick_size = Decimal::from_str(&value["tickSize"].to_string()).unwrap();
- let amount_size = Decimal::from_str(&value["multiplier"].to_string()).unwrap();
- let min_qty = Decimal::from_str(&value["lotSize"].to_string()).unwrap();
- let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
- let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
- let min_notional = min_qty * amount_size;
- let result = Market {
- symbol: format!("{}_{}", base_asset, quote_asset),
- base_asset,
- quote_asset,
- tick_size,
- amount_size,
- price_precision,
- amount_precision,
- min_qty,
- max_qty: Decimal::from_str(&value["maxOrderQty"].to_string()).unwrap(),
- min_notional,
- max_notional: Decimal::from_str(&value["maxPrice"].to_string()).unwrap(),
- ct_val: Default::default(),
- };
- Ok(result)
- }
- }
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
- let amount_size = self.market.amount_size;
- let res_data = self.request.get_orders_details(order_id.to_string(), custom_id.to_string()).await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
- let result = format_order_item(res_data_json, amount_size);
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_orders_list(&mut self, status: &str) -> Result<Vec<Order>, Error> {
- let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
- let amount_size = self.market.amount_size;
- let res_data = self.request.get_orders(status.to_string(), symbol_format.clone()).await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
- let order_list: Vec<serde_json::Value> = res_data_json["items"].as_array().unwrap().clone();
- let order_info: Vec<&serde_json::Value> = order_list.iter().filter(|item| item["symbol"].as_str().unwrap_or("") == symbol_format.clone()).collect();
- let result = order_info.iter().map(|&item| format_order_item(item.clone(), amount_size)).collect();
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
- let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
- let amount_size = self.market.amount_size;
- let mut params = json!({
- "clientOid": custom_id,
- "symbol": symbol_format,
- "leverage": "10",
- "reduceOnly":false,
- "price": price.to_string(),
- });
- let size = (amount / amount_size).floor();
- params["size"] = json!(size);
- match origin_side {
- "kd" => {
- params["side"] = json!("buy");
- }
- "pd" => {
- params["side"] = json!("sell");
- }
- "kk" => {
- params["side"] = json!("sell");
- }
- "pk" => {
- params["side"] = json!("buy");
- }
- _ => { error!("下单参数错误"); }
- };
- let res_data = self.request.swap_order(params).await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
- let id = res_data_json["orderId"].as_str().unwrap().to_string();
- let result = Order {
- id,
- custom_id: custom_id.to_string(),
- price,
- amount,
- deal_amount: dec!(0),
- avg_price: dec!(0),
- status: "NEW".to_string(),
- order_type: "".to_string(),
- trace_stack: Default::default(),
- };
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
- let res_data = self.request.cancel_order(order_id.to_string(), custom_id.to_string()).await;
- if order_id == "" {
- error!("Kucoin:撤销订单错误,该交易所为提供自定义订单号撤销订单!\ncancel_order:order_id={:?},custom_id={:?}", order_id, custom_id);
- panic!("Kucoin:撤销订单错误,该交易所为提供自定义订单号撤销订单!\ncancel_order:order_id={:?},custom_id={:?}", order_id, custom_id)
- }
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
- let cancel_ids = res_data_json["cancelledOrderIds"].as_array().unwrap();
- let id = cancel_ids[0].as_str().unwrap().to_string();
- let result = Order {
- id,
- custom_id: custom_id.to_string(),
- price: dec!(0),
- amount: dec!(0),
- deal_amount: dec!(0),
- avg_price: dec!(0),
- status: "REMOVE".to_string(),
- order_type: "".to_string(),
- trace_stack: Default::default(),
- };
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
- let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
- let res_data = self.request.cancel_orders(symbol_format).await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
- let cancel_ids = res_data_json["cancelledOrderIds"].as_array().unwrap();
- let result = cancel_ids.iter().map(|item|
- Order {
- id: item.as_str().unwrap().to_string(),
- custom_id: "".to_string(),
- price: dec!(0),
- amount: dec!(0),
- deal_amount: dec!(0),
- avg_price: dec!(0),
- status: "REMOVE".to_string(),
- order_type: "".to_string(),
- trace_stack: Default::default(),
- }
- ).collect();
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn set_dual_mode(&mut self, _coin: &str, _is_dual_mode: bool) -> Result<String, Error> {
- error!("Kucoin:该交易所暂不支持此方法!");
- panic!("Kucoin:该交易所暂不支持此方法!");
- }
- async fn set_dual_leverage(&mut self, _leverage: &str) -> Result<String, Error> {
- error!("Kucoin:该交易所暂不支持此方法!");
- panic!("Kucoin:该交易所暂不支持此方法!");
- }
- async fn set_auto_deposit_status(&mut self, status: bool) -> Result<String, Error> {
- let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
- let res_data = self.request.auto_deposit_status(symbol_format, status).await;
- if res_data.code == "200" {
- let res_data_str = &res_data.data;
- let result = res_data_str.clone();
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
- error!("Kucoin:该交易所暂不支持此方法!");
- panic!("Kucoin:该交易所暂不支持此方法!");
- }
- // 指令下单
- async fn command_order(&mut self, order_command: OrderCommand, trace_stack: TraceStack) {
- let mut handles = vec![];
- // 撤销订单
- let cancel = order_command.cancel;
- for item in cancel.keys() {
- let mut self_clone = self.clone();
- let cancel_clone = cancel.clone();
- let item_clone = item.clone();
- let order_id = cancel_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
- let custom_id = cancel_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
- let result_sd = self.order_sender.clone();
- let err_sd = self.error_sender.clone();
- let handle = tokio::spawn(async move {
- if order_id != "" {
- let result = self_clone.cancel_order(&order_id, &custom_id).await;
- match result {
- Ok(_) => {
- // result_sd.send(result).await.unwrap();
- }
- Err(error) => {
- // 取消失败去查订单。
- let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
- match query_rst {
- Ok(order) => {
- result_sd.send(order).await.unwrap();
- }
- Err(query_err) => {
- error!(?query_err);
- error!("撤单失败,而且查单也失败了,kucoin_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
- }
- }
- err_sd.send(error).await.unwrap();
- }
- }
- }
- });
- handles.push(handle)
- }
- // 下单指令
- let mut limits = HashMap::new();
- limits.extend(order_command.limits_open);
- limits.extend(order_command.limits_close);
- for item in limits.keys() {
- let mut self_clone = self.clone();
- let limits_clone = limits.clone();
- let item_clone = item.clone();
- let result_sd = self.order_sender.clone();
- let err_sd = self.error_sender.clone();
- let mut ts = trace_stack.clone();
- let handle = tokio::spawn(async move {
- let value = limits_clone[&item_clone].clone();
- let amount = Decimal::from_str(value.get(0).unwrap_or(&"0".to_string())).unwrap();
- let side = value.get(1).unwrap();
- let price = Decimal::from_str(value.get(2).unwrap_or(&"0".to_string())).unwrap();
- let cid = value.get(3).unwrap();
- // order_name: [数量,方向,价格,c_id]
- let result = self_clone.take_order(cid, side, price, amount).await;
- match result {
- Ok(mut result) => {
- ts.on_after_send();
- result.trace_stack = ts.clone();
- result_sd.send(result).await.unwrap();
- }
- Err(error) => {
- let mut err_order = Order::new();
- err_order.custom_id = cid.clone();
- err_order.status = "REMOVE".to_string();
- result_sd.send(err_order).await.unwrap();
- err_sd.send(error).await.unwrap();
- }
- }
- });
- handles.push(handle)
- }
- // 检查订单指令
- let check = order_command.check;
- for item in check.keys() {
- let mut self_clone = self.clone();
- let check_clone = check.clone();
- let item_clone = item.clone();
- let order_id = check_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
- let custom_id = check_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
- let result_sd = self.order_sender.clone();
- let err_sd = self.error_sender.clone();
- let handle = tokio::spawn(async move {
- let result = self_clone.get_order_detail(&order_id, &custom_id).await;
- match result {
- Ok(result) => {
- result_sd.send(result).await.unwrap();
- }
- Err(error) => {
- err_sd.send(error).await.unwrap();
- }
- }
- });
- handles.push(handle)
- }
- let futures = FuturesUnordered::from_iter(handles);
- let _: Result<Vec<_>, _> = futures.try_collect().await;
- }
- }
- pub fn format_order_item(order: Value, amount_size: Decimal) -> Order {
- let price = Decimal::from_str(order["price"].as_str().unwrap()).unwrap();
- let size = Decimal::from_str(&order["size"].to_string()).unwrap();
- let status = order["status"].as_str().unwrap_or("");
- let filled_size = Decimal::from_str(&order["filledSize"].to_string()).unwrap();
- let filled_value = Decimal::from_str(order["filledValue"].as_str().unwrap()).unwrap();
- let amount = size * amount_size;
- let deal_amount = filled_size * amount_size;
- let avg_price = if deal_amount.is_zero() { dec!(0) } else { filled_value / deal_amount };
- let custom_status;
- if ["cancelled", "closed", "finished"].contains(&status) {
- custom_status = "REMOVE".to_string();
- } else if status == "open" {
- custom_status = "NEW".to_string();
- } else {
- custom_status = "NULL".to_string();
- };
- Order {
- id: order["id"].as_str().unwrap().to_string(),
- custom_id: order["clientOid"].as_str().unwrap().to_string(),
- price,
- amount,
- deal_amount,
- avg_price,
- status: custom_status,
- order_type: order["type"].as_str().unwrap().to_string(),
- trace_stack: Default::default(),
- }
- }
|