kucoin_swap.rs 22 KB


  1. use std::collections::{BTreeMap, HashMap};
  2. use std::io::{Error, ErrorKind};
  3. use std::str::FromStr;
  4. use tokio::sync::mpsc::Sender;
  5. use async_trait::async_trait;
  6. use futures::stream::FuturesUnordered;
  7. use futures::TryStreamExt;
  8. use rust_decimal::Decimal;
  9. use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
  10. use rust_decimal_macros::dec;
  11. use serde_json::{json, Value};
  12. use tracing::{error};
  13. use exchanges::kucoin_swap_rest::KucoinSwapRest;
  14. use global::trace_stack::TraceStack;
  15. use crate::exchange::ExchangeEnum;
  16. use crate::{Account, kucoin_handle, Market, Order, OrderCommand, Platform, Position, Ticker, utils};
  17. #[allow(dead_code)]
  18. #[derive(Clone)]
  19. pub struct KucoinSwap {
  20. exchange: ExchangeEnum,
  21. symbol: String,
  22. is_colo: bool,
  23. params: BTreeMap<String, String>,
  24. request: KucoinSwapRest,
  25. market: Market,
  26. order_sender: Sender<Order>,
  27. error_sender: Sender<Error>,
  28. }
  29. impl KucoinSwap {
  30. pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> KucoinSwap {
  31. let market = Market::new();
  32. let mut kucoin_swap = KucoinSwap {
  33. exchange: ExchangeEnum::KucoinSwap,
  34. symbol: symbol.to_uppercase(),
  35. is_colo,
  36. params: params.clone(),
  37. request: KucoinSwapRest::new(is_colo, params.clone()),
  38. market,
  39. order_sender,
  40. error_sender,
  41. };
  42. kucoin_swap.market = KucoinSwap::get_market(&mut kucoin_swap).await.unwrap_or(kucoin_swap.market);
  43. // 开启自动追加保证金
  44. kucoin_swap.set_auto_deposit_status(true).await.unwrap();
  45. return kucoin_swap;
  46. }
  47. }
  48. #[async_trait]
  49. impl Platform for KucoinSwap {
  50. // 克隆方法
  51. fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
  52. fn get_self_exchange(&self) -> ExchangeEnum {
  53. ExchangeEnum::KucoinSwap
  54. }
  55. // 获取交易对
  56. fn get_self_symbol(&self) -> String { self.symbol.clone() }
  57. // 获取是否使用高速通道
  58. fn get_self_is_colo(&self) -> bool {
  59. self.is_colo
  60. }
  61. // 获取params信息
  62. fn get_self_params(&self) -> BTreeMap<String, String> {
  63. self.params.clone()
  64. }
  65. // 获取market信息
  66. fn get_self_market(&self) -> Market { self.market.clone() }
  67. // 获取请求时间
  68. fn get_request_delays(&self) -> Vec<i64> { self.request.get_delays() }
  69. // 获取请求平均时间
  70. fn get_request_avg_delay(&self) -> Decimal { self.request.get_avg_delay() }
  71. // 获取请求最大时间
  72. fn get_request_max_delay(&self) -> i64 { self.request.get_max_delay() }
  73. // 获取服务器时间
  74. async fn get_server_time(&mut self) -> Result<String, Error> {
  75. let res_data = self.request.get_server_time().await;
  76. if res_data.code == "200" {
  77. let res_data_str = &res_data.data;
  78. let result = res_data_str.clone();
  79. Ok(result)
  80. } else {
  81. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  82. }
  83. }
  84. // 获取账号信息
  85. async fn get_account(&mut self) -> Result<Account, Error> {
  86. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  87. let res_data = self.request.get_account(symbol_array[1].to_string()).await;
  88. if res_data.code == "200" {
  89. let res_data_str = &res_data.data;
  90. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  91. let balance = Decimal::from_str(&res_data_json["accountEquity"].to_string()).unwrap();
  92. let available_balance = Decimal::from_str(&res_data_json["availableBalance"].to_string()).unwrap();
  93. let frozen_balance = balance - available_balance;
  94. let result = Account {
  95. balance,
  96. available_balance,
  97. frozen_balance,
  98. stocks: dec!(0),
  99. available_stocks: dec!(0),
  100. frozen_stocks: dec!(0),
  101. };
  102. Ok(result)
  103. } else {
  104. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  105. }
  106. }
  107. async fn get_position(&mut self) -> Result<Vec<Position>, Error> {
  108. let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
  109. let amount_size = self.market.amount_size;
  110. let res_data = self.request.get_position(symbol_format).await;
  111. if res_data.code == "200" {
  112. let res_data_str = &res_data.data;
  113. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  114. let result = kucoin_handle::format_position_item(&res_data_json, amount_size);
  115. Ok(vec![result])
  116. } else {
  117. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  118. }
  119. }
  120. async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
  121. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  122. let amount_size = self.market.amount_size;
  123. let res_data = self.request.get_positions(symbol_array[1].to_string()).await;
  124. if res_data.code == "200" {
  125. let res_data_str = &res_data.data;
  126. let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
  127. let mut result = Vec::new();
  128. for item in res_data_json.iter() {
  129. result.push(kucoin_handle::format_position_item(item, amount_size))
  130. }
  131. Ok(result)
  132. } else {
  133. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  134. }
  135. }
  136. async fn get_ticker(&mut self) -> Result<Ticker, Error> {
  137. let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
  138. let res_data = self.request.get_ticker(symbol_format).await;
  139. if res_data.code == "200" {
  140. let res_data_str = &res_data.data;
  141. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  142. let ticker_info = res_data_json;
  143. let time = (Decimal::from_str(&*ticker_info["ts"].to_string()).unwrap() / dec!(1000000)).floor().to_i64().unwrap();
  144. let result = Ticker {
  145. time,
  146. high: Decimal::from_str(&ticker_info["bestAskPrice"].to_string()).unwrap(),
  147. low: Decimal::from_str(&ticker_info["bestBidPrice"].to_string()).unwrap(),
  148. sell: Decimal::from_str(&ticker_info["bestAskPrice"].to_string()).unwrap(),
  149. buy: Decimal::from_str(&ticker_info["bestBidPrice"].to_string()).unwrap(),
  150. last: Decimal::from_str(&ticker_info["price"].to_string()).unwrap(),
  151. volume: Decimal::from_str(&ticker_info["size"].to_string()).unwrap(),
  152. };
  153. Ok(result)
  154. } else {
  155. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  156. }
  157. }
  158. async fn get_market(&mut self) -> Result<Market, Error> {
  159. let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
  160. let res_data = self.request.get_market_details().await;
  161. if res_data.code == "200" {
  162. let res_data_str = &res_data.data;
  163. let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
  164. let market_info = res_data_json.iter().find(|item| item["symbol"].as_str().unwrap() == symbol_format);
  165. match market_info {
  166. None => {
  167. error!("Kucoin:获取Market信息错误!\nget_market:res_data={:?}", res_data_str);
  168. panic!("Kucoin:获取Market信息错误!\nget_market:res_data={:?}", res_data_str)
  169. }
  170. Some(value) => {
  171. let base_asset = value["baseCurrency"].as_str().unwrap_or("").to_string();
  172. let quote_asset = value["quoteCurrency"].as_str().unwrap_or("").to_string();
  173. let tick_size = Decimal::from_str(&value["tickSize"].to_string()).unwrap();
  174. let amount_size = Decimal::from_str(&value["multiplier"].to_string()).unwrap();
  175. let min_qty = Decimal::from_str(&value["lotSize"].to_string()).unwrap();
  176. let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
  177. let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
  178. let min_notional = min_qty * amount_size;
  179. let result = Market {
  180. symbol: format!("{}_{}", base_asset, quote_asset),
  181. base_asset,
  182. quote_asset,
  183. tick_size,
  184. amount_size,
  185. price_precision,
  186. amount_precision,
  187. min_qty,
  188. max_qty: Decimal::from_str(&value["maxOrderQty"].to_string()).unwrap(),
  189. min_notional,
  190. max_notional: Decimal::from_str(&value["maxPrice"].to_string()).unwrap(),
  191. ct_val: Default::default(),
  192. };
  193. Ok(result)
  194. }
  195. }
  196. } else {
  197. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  198. }
  199. }
  200. async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
  201. let amount_size = self.market.amount_size;
  202. let res_data = self.request.get_orders_details(order_id.to_string(), custom_id.to_string()).await;
  203. if res_data.code == "200" {
  204. let res_data_str = &res_data.data;
  205. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  206. let result = format_order_item(res_data_json, amount_size);
  207. Ok(result)
  208. } else {
  209. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  210. }
  211. }
  212. async fn get_orders_list(&mut self, status: &str) -> Result<Vec<Order>, Error> {
  213. let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
  214. let amount_size = self.market.amount_size;
  215. let res_data = self.request.get_orders(status.to_string(), symbol_format.clone()).await;
  216. if res_data.code == "200" {
  217. let res_data_str = &res_data.data;
  218. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  219. let order_list: Vec<serde_json::Value> = res_data_json["items"].as_array().unwrap().clone();
  220. let order_info: Vec<&serde_json::Value> = order_list.iter().filter(|item| item["symbol"].as_str().unwrap_or("") == symbol_format.clone()).collect();
  221. let result = order_info.iter().map(|&item| format_order_item(item.clone(), amount_size)).collect();
  222. Ok(result)
  223. } else {
  224. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  225. }
  226. }
  227. async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
  228. let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
  229. let amount_size = self.market.amount_size;
  230. let mut params = json!({
  231. "clientOid": custom_id,
  232. "symbol": symbol_format,
  233. "leverage": "10",
  234. "reduceOnly":false,
  235. "price": price.to_string(),
  236. });
  237. let size = (amount / amount_size).floor();
  238. params["size"] = json!(size);
  239. match origin_side {
  240. "kd" => {
  241. params["side"] = json!("buy");
  242. }
  243. "pd" => {
  244. params["side"] = json!("sell");
  245. }
  246. "kk" => {
  247. params["side"] = json!("sell");
  248. }
  249. "pk" => {
  250. params["side"] = json!("buy");
  251. }
  252. _ => { error!("下单参数错误"); }
  253. };
  254. let res_data = self.request.swap_order(params).await;
  255. if res_data.code == "200" {
  256. let res_data_str = &res_data.data;
  257. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  258. let id = res_data_json["orderId"].as_str().unwrap().to_string();
  259. let result = Order {
  260. id,
  261. custom_id: custom_id.to_string(),
  262. price,
  263. amount,
  264. deal_amount: dec!(0),
  265. avg_price: dec!(0),
  266. status: "NEW".to_string(),
  267. order_type: "".to_string(),
  268. trace_stack: Default::default(),
  269. };
  270. Ok(result)
  271. } else {
  272. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  273. }
  274. }
  275. async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
  276. let res_data = self.request.cancel_order(order_id.to_string(), custom_id.to_string()).await;
  277. if order_id == "" {
  278. error!("Kucoin:撤销订单错误,该交易所为提供自定义订单号撤销订单!\ncancel_order:order_id={:?},custom_id={:?}", order_id, custom_id);
  279. panic!("Kucoin:撤销订单错误,该交易所为提供自定义订单号撤销订单!\ncancel_order:order_id={:?},custom_id={:?}", order_id, custom_id)
  280. }
  281. if res_data.code == "200" {
  282. let res_data_str = &res_data.data;
  283. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  284. let cancel_ids = res_data_json["cancelledOrderIds"].as_array().unwrap();
  285. let id = cancel_ids[0].as_str().unwrap().to_string();
  286. let result = Order {
  287. id,
  288. custom_id: custom_id.to_string(),
  289. price: dec!(0),
  290. amount: dec!(0),
  291. deal_amount: dec!(0),
  292. avg_price: dec!(0),
  293. status: "REMOVE".to_string(),
  294. order_type: "".to_string(),
  295. trace_stack: Default::default(),
  296. };
  297. Ok(result)
  298. } else {
  299. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  300. }
  301. }
  302. async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
  303. let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
  304. let res_data = self.request.cancel_orders(symbol_format).await;
  305. if res_data.code == "200" {
  306. let res_data_str = &res_data.data;
  307. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  308. let cancel_ids = res_data_json["cancelledOrderIds"].as_array().unwrap();
  309. let result = cancel_ids.iter().map(|item|
  310. Order {
  311. id: item.as_str().unwrap().to_string(),
  312. custom_id: "".to_string(),
  313. price: dec!(0),
  314. amount: dec!(0),
  315. deal_amount: dec!(0),
  316. avg_price: dec!(0),
  317. status: "REMOVE".to_string(),
  318. order_type: "".to_string(),
  319. trace_stack: Default::default(),
  320. }
  321. ).collect();
  322. Ok(result)
  323. } else {
  324. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  325. }
  326. }
  327. async fn set_dual_mode(&mut self, _coin: &str, _is_dual_mode: bool) -> Result<String, Error> {
  328. error!("Kucoin:该交易所暂不支持此方法!");
  329. panic!("Kucoin:该交易所暂不支持此方法!");
  330. }
  331. async fn set_dual_leverage(&mut self, _leverage: &str) -> Result<String, Error> {
  332. error!("Kucoin:该交易所暂不支持此方法!");
  333. panic!("Kucoin:该交易所暂不支持此方法!");
  334. }
  335. async fn set_auto_deposit_status(&mut self, status: bool) -> Result<String, Error> {
  336. let symbol_format = format!("{}M", utils::format_symbol(self.symbol.clone(), ""));
  337. let res_data = self.request.auto_deposit_status(symbol_format, status).await;
  338. if res_data.code == "200" {
  339. let res_data_str = &res_data.data;
  340. let result = res_data_str.clone();
  341. Ok(result)
  342. } else {
  343. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  344. }
  345. }
  346. async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
  347. error!("Kucoin:该交易所暂不支持此方法!");
  348. panic!("Kucoin:该交易所暂不支持此方法!");
  349. }
  350. // 指令下单
  351. async fn command_order(&mut self, order_command: OrderCommand, trace_stack: TraceStack) {
  352. let mut handles = vec![];
  353. // 撤销订单
  354. let cancel = order_command.cancel;
  355. for item in cancel.keys() {
  356. let mut self_clone = self.clone();
  357. let cancel_clone = cancel.clone();
  358. let item_clone = item.clone();
  359. let order_id = cancel_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
  360. let custom_id = cancel_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
  361. let result_sd = self.order_sender.clone();
  362. let err_sd = self.error_sender.clone();
  363. let handle = tokio::spawn(async move {
  364. if order_id != "" {
  365. let result = self_clone.cancel_order(&order_id, &custom_id).await;
  366. match result {
  367. Ok(_) => {
  368. // result_sd.send(result).await.unwrap();
  369. }
  370. Err(error) => {
  371. // 取消失败去查订单。
  372. let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
  373. match query_rst {
  374. Ok(order) => {
  375. result_sd.send(order).await.unwrap();
  376. }
  377. Err(query_err) => {
  378. error!(?query_err);
  379. error!("撤单失败,而且查单也失败了,kucoin_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
  380. }
  381. }
  382. err_sd.send(error).await.unwrap();
  383. }
  384. }
  385. }
  386. });
  387. handles.push(handle)
  388. }
  389. // 下单指令
  390. let mut limits = HashMap::new();
  391. limits.extend(order_command.limits_open);
  392. limits.extend(order_command.limits_close);
  393. for item in limits.keys() {
  394. let mut self_clone = self.clone();
  395. let limits_clone = limits.clone();
  396. let item_clone = item.clone();
  397. let result_sd = self.order_sender.clone();
  398. let err_sd = self.error_sender.clone();
  399. let mut ts = trace_stack.clone();
  400. let handle = tokio::spawn(async move {
  401. let value = limits_clone[&item_clone].clone();
  402. let amount = Decimal::from_str(value.get(0).unwrap_or(&"0".to_string())).unwrap();
  403. let side = value.get(1).unwrap();
  404. let price = Decimal::from_str(value.get(2).unwrap_or(&"0".to_string())).unwrap();
  405. let cid = value.get(3).unwrap();
  406. // order_name: [数量,方向,价格,c_id]
  407. let result = self_clone.take_order(cid, side, price, amount).await;
  408. match result {
  409. Ok(mut result) => {
  410. ts.on_after_send();
  411. result.trace_stack = ts.clone();
  412. result_sd.send(result).await.unwrap();
  413. }
  414. Err(error) => {
  415. let mut err_order = Order::new();
  416. err_order.custom_id = cid.clone();
  417. err_order.status = "REMOVE".to_string();
  418. result_sd.send(err_order).await.unwrap();
  419. err_sd.send(error).await.unwrap();
  420. }
  421. }
  422. });
  423. handles.push(handle)
  424. }
  425. // 检查订单指令
  426. let check = order_command.check;
  427. for item in check.keys() {
  428. let mut self_clone = self.clone();
  429. let check_clone = check.clone();
  430. let item_clone = item.clone();
  431. let order_id = check_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
  432. let custom_id = check_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
  433. let result_sd = self.order_sender.clone();
  434. let err_sd = self.error_sender.clone();
  435. let handle = tokio::spawn(async move {
  436. let result = self_clone.get_order_detail(&order_id, &custom_id).await;
  437. match result {
  438. Ok(result) => {
  439. result_sd.send(result).await.unwrap();
  440. }
  441. Err(error) => {
  442. err_sd.send(error).await.unwrap();
  443. }
  444. }
  445. });
  446. handles.push(handle)
  447. }
  448. let futures = FuturesUnordered::from_iter(handles);
  449. let _: Result<Vec<_>, _> = futures.try_collect().await;
  450. }
  451. }
  452. pub fn format_order_item(order: Value, amount_size: Decimal) -> Order {
  453. let price = Decimal::from_str(order["price"].as_str().unwrap()).unwrap();
  454. let size = Decimal::from_str(&order["size"].to_string()).unwrap();
  455. let status = order["status"].as_str().unwrap_or("");
  456. let filled_size = Decimal::from_str(&order["filledSize"].to_string()).unwrap();
  457. let filled_value = Decimal::from_str(order["filledValue"].as_str().unwrap()).unwrap();
  458. let amount = size * amount_size;
  459. let deal_amount = filled_size * amount_size;
  460. let avg_price = if deal_amount.is_zero() { dec!(0) } else { filled_value / deal_amount };
  461. let custom_status;
  462. if ["cancelled", "closed", "finished"].contains(&status) {
  463. custom_status = "REMOVE".to_string();
  464. } else if status == "open" {
  465. custom_status = "NEW".to_string();
  466. } else {
  467. custom_status = "NULL".to_string();
  468. };
  469. Order {
  470. id: order["id"].as_str().unwrap().to_string(),
  471. custom_id: order["clientOid"].as_str().unwrap().to_string(),
  472. price,
  473. amount,
  474. deal_amount,
  475. avg_price,
  476. status: custom_status,
  477. order_type: order["type"].as_str().unwrap().to_string(),
  478. trace_stack: Default::default(),
  479. }
  480. }