bitget_swap.rs 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. use std::collections::{BTreeMap};
  2. use exchanges::bitget_swap_rest::BitgetSwapRest;
  3. use std::io::{Error, ErrorKind};
  4. use tokio::sync::mpsc::Sender;
  5. use std::str::FromStr;
  6. use async_trait::async_trait;
  7. use futures::stream::FuturesUnordered;
  8. use futures::TryStreamExt;
  9. use rust_decimal::{Decimal, MathematicalOps};
  10. use rust_decimal::prelude::ToPrimitive;
  11. use rust_decimal_macros::dec;
  12. use serde_json::{json, Value};
  13. use tokio::spawn;
  14. use tokio::time::Instant;
  15. use tracing::{error, info};
  16. use global::trace_stack::TraceStack;
  17. use crate::exchange::ExchangeEnum;
  18. use crate::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, Ticker, utils};
  19. #[allow(dead_code)]
  20. #[derive(Clone)]
  21. pub struct BitgetSwap {
  22. exchange: ExchangeEnum,
  23. symbol: String,
  24. is_colo: bool,
  25. params: BTreeMap<String, String>,
  26. request: BitgetSwapRest,
  27. market: Market,
  28. order_sender: Sender<Order>,
  29. error_sender: Sender<Error>,
  30. }
  31. impl BitgetSwap {
  32. pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> BitgetSwap {
  33. let market = Market::new();
  34. let mut bitget_swap = BitgetSwap {
  35. exchange: ExchangeEnum::BitgetSwap,
  36. symbol: symbol.to_uppercase(),
  37. is_colo,
  38. params: params.clone(),
  39. request: BitgetSwapRest::new(is_colo, params.clone()),
  40. market,
  41. order_sender,
  42. error_sender,
  43. };
  44. bitget_swap.market = BitgetSwap::get_market(&mut bitget_swap).await.unwrap();
  45. // 修改持仓模式
  46. let mode_result = bitget_swap.set_dual_mode("", true).await;
  47. match mode_result {
  48. Ok(ok) => {
  49. info!("BitgetSwap:设置持仓模式成功!{:?}", ok);
  50. }
  51. Err(error) => {
  52. error!("BitgetSwap:设置持仓模式失败!{:?}", error)
  53. }
  54. }
  55. // 设置持仓杠杆
  56. // let lever_rate_result = bitget_swap.set_dual_leverage("10").await;
  57. // match lever_rate_result {
  58. // Ok(ok) => {
  59. // info!("BitgetSwap:设置持仓杠杆成功!{:?}", ok);
  60. // }
  61. // Err(error) => {
  62. // error!("BitgetSwap:设置持仓杠杆失败!{:?}", error)
  63. // }
  64. // }
  65. return bitget_swap;
  66. }
  67. }
  68. #[async_trait]
  69. impl Platform for BitgetSwap {
  70. fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
  71. fn get_self_exchange(&self) -> ExchangeEnum { ExchangeEnum::BitgetSwap }
  72. fn get_self_symbol(&self) -> String { self.symbol.clone() }
  73. fn get_self_is_colo(&self) -> bool { self.is_colo }
  74. fn get_self_params(&self) -> BTreeMap<String, String> { self.params.clone() }
  75. fn get_self_market(&self) -> Market { self.market.clone() }
  76. fn get_request_delays(&self) -> Vec<i64> {
  77. // self.request.get_delays()
  78. vec![]
  79. }
  80. fn get_request_avg_delay(&self) -> Decimal {
  81. // self.request.get_avg_delay()
  82. Decimal::ZERO
  83. }
  84. fn get_request_max_delay(&self) -> i64 { 0 }
  85. async fn get_server_time(&mut self) -> Result<String, Error> {
  86. let res_data = self.request.get_server_time().await;
  87. if res_data.code == 200 {
  88. let res_data_json = res_data.data;
  89. let result = res_data_json["serverTime"].as_str().unwrap().to_string();
  90. Ok(result)
  91. } else {
  92. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  93. }
  94. }
  95. async fn get_account(&mut self) -> Result<Account, Error> {
  96. let response = self.request.get_account_info().await;
  97. if response.code == 200 {
  98. for data in response.data.as_array().unwrap() {
  99. if data["marginCoin"].as_str().unwrap() != "USDT" {
  100. continue
  101. }
  102. // 格式化account信息
  103. let mut account = Account {
  104. coin: data["marginCoin"].to_string(),
  105. balance: Decimal::from_str(data["accountEquity"].as_str().unwrap()).unwrap(),
  106. available_balance: Decimal::from_str(data["available"].as_str().unwrap()).unwrap(),
  107. frozen_balance: Default::default(),
  108. stocks: Default::default(),
  109. available_stocks: Default::default(),
  110. frozen_stocks: Default::default(),
  111. };
  112. account.frozen_balance = account.balance - account.available_balance;
  113. return Ok(account)
  114. }
  115. Err(Error::new(ErrorKind::NotFound, format!("bitget_usdt_swap 未能找到USDT账户:{}。", response.to_string())))
  116. } else {
  117. Err(Error::new(ErrorKind::Other, response.to_string()))
  118. }
  119. }
  120. async fn get_spot_account(&mut self) -> Result<Vec<Account>, Error> {
  121. Err(Error::new(ErrorKind::NotFound, "bitget_swap get_spot_account:该交易所方法未实现".to_string()))
  122. }
  123. async fn get_position(&mut self) -> Result<Vec<Position>, Error> { Err(Error::new(ErrorKind::NotFound, "bitget_swap get_position:该交易所方法未实现".to_string())) }
  124. async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
  125. let params = json!({
  126. "productType": "USDT-FUTURES",
  127. "marginCoin": "USDT"
  128. });
  129. let response = self.request.get_all_position(params).await;
  130. info!(?response);
  131. if response.code != 200 {
  132. return Err(Error::new(ErrorKind::NotFound, format!("bitget_swap 获取仓位异常{:?}", response).to_string()))
  133. }
  134. // 正常处理持仓信息
  135. let mut positions: Vec<Position> = vec![];
  136. if response.data.is_null() {
  137. return Ok(positions)
  138. }
  139. let positions_json = response.data.as_array().unwrap();
  140. for position_json in positions_json {
  141. let symbol = position_json["symbol"].as_str().unwrap().to_string();
  142. let margin_level = Decimal::from_str(position_json["leverage"].as_str().unwrap()).unwrap();
  143. let amount = Decimal::from_str(position_json["total"].as_str().unwrap()).unwrap();
  144. let frozen_amount = Decimal::from_str(position_json["locked"].as_str().unwrap()).unwrap();
  145. let price = Decimal::from_str(position_json["openPriceAvg"].as_str().unwrap()).unwrap();
  146. let profit = Decimal::from_str(position_json["unrealizedPL"].as_str().unwrap()).unwrap();
  147. let position_mode = match position_json["posMode"].as_str().unwrap() {
  148. "hedge_mode" => {
  149. match position_json["holdSide"].as_str().unwrap() {
  150. "short" => {
  151. PositionModeEnum::Short
  152. }
  153. "long" => {
  154. PositionModeEnum::Long
  155. },
  156. _ => {
  157. panic!("bitget_usdt_swap: 未知的持仓模式与持仓方向: {}, {}",
  158. position_json["posMode"].as_str().unwrap(), position_json["holdSide"].as_str().unwrap())
  159. }
  160. }
  161. },
  162. "one_way_mode" => {
  163. PositionModeEnum::Both
  164. },
  165. _ => {
  166. panic!("bitget_usdt_swap: 未知的持仓模式: {}", position_json["posMode"].as_str().unwrap())
  167. }
  168. };
  169. let margin = Decimal::from_str(position_json["marginSize"].as_str().unwrap()).unwrap();
  170. positions.push(Position {
  171. symbol,
  172. margin_level,
  173. amount,
  174. frozen_amount,
  175. price,
  176. profit,
  177. position_mode,
  178. margin,
  179. });
  180. }
  181. Ok(positions)
  182. }
  183. async fn get_ticker(&mut self) -> Result<Ticker, Error> {
  184. return self.get_ticker_symbol(self.symbol.clone()).await
  185. }
  186. async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error> {
  187. let symbol_format = utils::format_symbol(symbol.clone(), "");
  188. let res_data = self.request.get_tickers(symbol_format).await;
  189. if res_data.code == 200 {
  190. let res_data_json = res_data.data;
  191. let ticker_info = res_data_json[0].clone();
  192. let time = (Decimal::from_str(&*ticker_info["ts"].as_str().unwrap()).unwrap() / dec!(1000)).floor().to_i64().unwrap();
  193. let result = Ticker {
  194. time,
  195. high: Decimal::from_str(ticker_info["high24h"].as_str().unwrap()).unwrap(),
  196. low: Decimal::from_str(ticker_info["low24h"].as_str().unwrap()).unwrap(),
  197. sell: Decimal::from_str(ticker_info["askPr"].as_str().unwrap()).unwrap(),
  198. sell_volume: Default::default(),
  199. buy: Decimal::from_str(ticker_info["bidPr"].as_str().unwrap()).unwrap(),
  200. buy_volume: Default::default(),
  201. last: Decimal::from_str(ticker_info["lastPr"].as_str().unwrap()).unwrap(),
  202. volume: Decimal::from_str(ticker_info["quoteVolume"].as_str().unwrap()).unwrap(),
  203. };
  204. Ok(result)
  205. } else {
  206. Err(Error::new(ErrorKind::Other, res_data.to_string()))
  207. }
  208. }
  209. async fn get_market(&mut self) -> Result<Market, Error> {
  210. self.get_market_symbol(self.symbol.clone()).await
  211. }
  212. async fn get_market_symbol(&mut self, symbol: String) -> Result<Market, Error> {
  213. let symbol_format = utils::format_symbol(symbol.clone(), "");
  214. let response = self.request.get_contracts(symbol_format.clone()).await;
  215. if response.code == 200 {
  216. let res_data_json = response.data.as_array().unwrap();
  217. let market_info = res_data_json[0].clone();
  218. info!(?market_info);
  219. if !market_info["symbol"].as_str().unwrap().to_string().eq(&symbol_format) {
  220. return Err(Error::new(ErrorKind::NotFound, format!("符号未找到:symbol={}, response={:?}", symbol_format, response))).unwrap();
  221. }
  222. let base_asset = market_info["baseCoin"].as_str().unwrap().to_string();
  223. let quote_asset = market_info["quoteCoin"].as_str().unwrap().to_string();
  224. let price_precision = Decimal::from_str(market_info["pricePlace"].as_str().unwrap()).unwrap();
  225. let tick_size = Decimal::TEN.powd(Decimal::NEGATIVE_ONE * price_precision);
  226. let amount_precision = Decimal::from_str(market_info["volumePlace"].as_str().unwrap()).unwrap();
  227. let amount_size = Decimal::TEN.powd(Decimal::NEGATIVE_ONE * amount_precision);
  228. let min_qty = Decimal::NEGATIVE_ONE;
  229. let max_qty = Decimal::NEGATIVE_ONE;
  230. // let ct_val = Decimal::from_str(&market_info["sizeMultiplier"].as_str().unwrap()).unwrap();
  231. let ct_val = Decimal::ONE;
  232. let result = Market {
  233. symbol: format!("{}_{}", base_asset, quote_asset),
  234. base_asset,
  235. quote_asset,
  236. tick_size,
  237. amount_size,
  238. price_precision,
  239. amount_precision,
  240. min_qty,
  241. max_qty,
  242. min_notional: min_qty,
  243. max_notional: max_qty,
  244. ct_val,
  245. };
  246. Ok(result)
  247. } else {
  248. Err(Error::new(ErrorKind::Other, response.to_string()))
  249. }
  250. }
  251. async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
  252. let symbol_format = utils::format_symbol(self.symbol.clone(), "");
  253. let params = json!({
  254. "symbol": symbol_format,
  255. "productType": "USDT-FUTURES",
  256. "clientOid": custom_id,
  257. "orderId": order_id
  258. });
  259. let ct_val = self.market.ct_val;
  260. let response = self.request.get_order(params).await;
  261. if response.code == 200 {
  262. let res_data_json = response.data;
  263. let result = format_order_item(res_data_json, ct_val);
  264. Ok(result)
  265. } else {
  266. Err(Error::new(ErrorKind::Other, response.to_string()))
  267. }
  268. }
  269. async fn get_orders_list(&mut self, _status: &str) -> Result<Vec<Order>, Error> {
  270. Err(Error::new(ErrorKind::NotFound, "bitget_swap get_orders_list:该交易所方法未实现".to_string()))
  271. // let symbol_format = utils::format_symbol(self.symbol.clone(), "");
  272. // let ct_val = self.market.ct_val;
  273. // let res_data = self.request.get_unfilled_orders(symbol_format.to_string(), "".to_string(), "".to_string(), "".to_string(), "100".to_string(), "".to_string()).await;
  274. // if res_data.code == 200 {
  275. // let res_data_str = &res_data.data;
  276. // let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
  277. // let result = res_data_json.iter().map(|item| format_order_item(item.clone(), ct_val)).collect();
  278. // Ok(result)
  279. // } else {
  280. // Err(Error::new(ErrorKind::Other, res_data.to_string()))
  281. // }
  282. }
  283. async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
  284. let ct_val = self.market.ct_val;
  285. return self.take_order_symbol(self.symbol.clone(), ct_val, custom_id, origin_side, price, amount).await;
  286. }
  287. async fn take_order_symbol(&mut self, symbol: String, ct_val: Decimal, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
  288. let symbol_format = utils::format_symbol(symbol, "");
  289. let final_size = amount / ct_val;
  290. let mut params = json!({
  291. "symbol": symbol_format,
  292. "clientOid": custom_id,
  293. "productType": "USDT-FUTURES",
  294. "marginMode": "crossed",
  295. "marginCoin": "USDT",
  296. "size": final_size.to_string()
  297. });
  298. if price.eq(&Decimal::ZERO) {
  299. params["orderType"] = json!("market");
  300. params["force"] = json!("gtc");
  301. } else {
  302. params["price"] = json!(price.to_string());
  303. params["orderType"] = json!("limit");
  304. params["force"] = json!("gtc");
  305. };
  306. match origin_side {
  307. "kd" => {
  308. params["side"] = json!("buy");
  309. params["tradeSide"] = json!("open");
  310. }
  311. "pd" => {
  312. params["side"] = json!("buy");
  313. params["tradeSide"] = json!("close");
  314. }
  315. "kk" => {
  316. params["side"] = json!("sell");
  317. params["tradeSide"] = json!("open");
  318. }
  319. "pk" => {
  320. params["side"] = json!("sell");
  321. params["tradeSide"] = json!("close");
  322. }
  323. _ => { panic!("bitget_usdt_swap 下单参数错误"); }
  324. };
  325. let res_data = self.request.swap_order(params).await;
  326. if res_data.code != 200 {
  327. return Err(Error::new(ErrorKind::Other, res_data.to_string()))
  328. }
  329. let res_data_json = res_data.data;
  330. let result = Order {
  331. id: res_data_json["orderId"].as_str().unwrap().to_string(),
  332. custom_id: res_data_json["clientOid"].as_str().unwrap().to_string(),
  333. price: Decimal::ZERO,
  334. amount: Decimal::ZERO,
  335. deal_amount: Decimal::ZERO,
  336. avg_price: Decimal::ZERO,
  337. status: "NEW".to_string(),
  338. order_type: "".to_string(),
  339. trace_stack: TraceStack::new(0, Instant::now()).on_special("328 bitget_swap".to_string()),
  340. };
  341. return Ok(result)
  342. }
  343. async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
  344. let symbol_format = utils::format_symbol(self.symbol.clone(), "");
  345. let params = json!({
  346. "symbol": symbol_format,
  347. "productType": "USDT-FUTURES",
  348. "clientOid": custom_id,
  349. "orderId": order_id
  350. });
  351. let response = self.request.cancel_order(params).await;
  352. // 取消失败,进行报错
  353. if response.code != 200 {
  354. return Err(Error::new(ErrorKind::NotFound, response.to_string()));
  355. }
  356. let res_data_json = response.data;
  357. let result = Order {
  358. id: res_data_json["orderId"].as_str().unwrap().to_string(),
  359. custom_id: res_data_json["clientOid"].as_str().unwrap().to_string(),
  360. price: Decimal::ZERO,
  361. amount: Decimal::ZERO,
  362. deal_amount: Decimal::ZERO,
  363. avg_price: Decimal::ZERO,
  364. status: "REMOVE".to_string(),
  365. order_type: "".to_string(),
  366. trace_stack: TraceStack::new(0, Instant::now()).on_special("443 bitget_swap".to_string()),
  367. };
  368. Ok(result)
  369. }
  370. async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
  371. Err(Error::new(ErrorKind::NotFound, "bitget_swap cancel_orders:该交易所方法未实现".to_string()))
  372. }
  373. async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> {
  374. let response = self.request.get_pending_orders().await;
  375. if response.code == 200 {
  376. info!("{}", response.data.to_string());
  377. let mut result = vec![];
  378. if !response.data["entrustedList"].is_null() {
  379. let orders_res_data_json = response.data["entrustedList"].as_array().unwrap();
  380. for order in orders_res_data_json {
  381. let symbol = order["symbol"].as_str().unwrap().to_string();
  382. let order_id = order["orderId"].as_str().unwrap().to_string();
  383. let params = json!({
  384. "symbol": symbol,
  385. "productType": "USDT-FUTURES",
  386. "orderId": order_id,
  387. });
  388. let cancel_res_data = self.request.cancel_order(params).await;
  389. if cancel_res_data.code == 200 {
  390. let cancel_res_data_json = cancel_res_data.data;
  391. result.push(Order {
  392. id: cancel_res_data_json["orderId"].as_str().unwrap().to_string(),
  393. custom_id: cancel_res_data_json["clientOid"].as_str().unwrap().to_string(),
  394. price: Decimal::ZERO,
  395. amount: Decimal::ZERO,
  396. deal_amount: Decimal::ZERO,
  397. avg_price: Decimal::ZERO,
  398. status: "REMOVE".to_string(),
  399. order_type: "".to_string(),
  400. trace_stack: TraceStack::new(0, Instant::now()).on_special("457 bitget_swap".to_string()),
  401. });
  402. } else {
  403. return Err(Error::new(ErrorKind::Other, cancel_res_data.to_string()));
  404. }
  405. }
  406. }
  407. Ok(result)
  408. } else {
  409. Err(Error::new(ErrorKind::Other, response.to_string()))
  410. }
  411. }
  412. async fn take_stop_loss_order(&mut self, _stop_price: Decimal, _price: Decimal, _side: &str) -> Result<Value, Error> {
  413. Err(Error::new(ErrorKind::NotFound, "bitget_swap take_stop_loss_order:该交易所方法未实现".to_string()))
  414. }
  415. async fn cancel_stop_loss_order(&mut self, _order_id: &str) -> Result<Value, Error> {
  416. Err(Error::new(ErrorKind::NotFound, "bitget_swap cancel_stop_loss_order:该交易所方法未实现".to_string()))
  417. }
  418. async fn set_dual_mode(&mut self, _coin: &str, is_dual_mode: bool) -> Result<String, Error> {
  419. let pos_mode = if is_dual_mode {
  420. "hedge_mode"
  421. } else {
  422. "one_way_mode"
  423. };
  424. let params = json!({
  425. "productType": "USDT-FUTURES",
  426. "posMode": pos_mode,
  427. });
  428. let response = self.request.set_position_mode(params).await;
  429. if response.code != 200 {
  430. return Err(Error::new(ErrorKind::Other, format!("设置持仓模式失败:{:?}", response).to_string()))
  431. }
  432. return Ok(response.data.to_string());
  433. }
  434. async fn set_dual_leverage(&mut self, leverage: &str) -> Result<String, Error> {
  435. let params = json!({
  436. "symbol": "ETHUSDT",
  437. "productType": "USDT-FUTURES",
  438. "marginCoin": "USDT",
  439. "leverage": leverage
  440. });
  441. let response = self.request.set_leverage(params).await;
  442. if response.code != 200 {
  443. return Err(Error::new(ErrorKind::Other, format!("设置杠杆失败:{:?}", response).to_string()))
  444. }
  445. return Ok(response.data.to_string());
  446. }
  447. async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> {
  448. Err(Error::new(ErrorKind::NotFound, "bitget_swap set_auto_deposit_status:该交易所方法未实现".to_string()))
  449. }
  450. async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
  451. Err(Error::new(ErrorKind::NotFound, "bitget_swap wallet_transfers:该交易所方法未实现".to_string()))
  452. // let coin_format = coin.to_string().to_uppercase();
  453. // let res_data = self.request.wallet_transfer(from.to_string(), to.to_string(), amount.to_string(), coin_format.clone(), "".to_string(), "".to_string()).await;
  454. // if res_data.code == 200 {
  455. // let res_data_str = &res_data.data;
  456. // let result = res_data_str.clone();
  457. // Ok(result)
  458. // } else {
  459. // Err(Error::new(ErrorKind::Other, res_data.to_string()))
  460. // }
  461. }
  462. async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack) {
  463. let mut handles = vec![];
  464. // 下单指令
  465. for item in order_command.limits_open.keys() {
  466. let mut ts = trace_stack.clone();
  467. let amount = Decimal::from_str(&*order_command.limits_open[item].get(0).unwrap().clone()).unwrap();
  468. let side = order_command.limits_open[item].get(1).unwrap().clone();
  469. let price = Decimal::from_str(&*order_command.limits_open[item].get(2).unwrap().clone()).unwrap();
  470. let cid = order_command.limits_open[item].get(3).unwrap().clone();
  471. // order_name: [数量,方向,价格,c_id]
  472. let mut self_clone = self.clone();
  473. let handle = spawn(async move {
  474. // TraceStack::show_delay(&ts.ins);
  475. ts.on_before_send();
  476. let result = self_clone.take_order(cid.as_str(), side.as_str(), price, amount).await;
  477. ts.on_after_send();
  478. match result {
  479. Ok(mut result) => {
  480. result.trace_stack = ts;
  481. self_clone.order_sender.send(result).await.unwrap();
  482. }
  483. Err(error) => {
  484. info!(?error);
  485. let mut err_order = Order::new();
  486. err_order.custom_id = cid.clone();
  487. err_order.status = "REMOVE".to_string();
  488. self_clone.order_sender.send(err_order).await.unwrap();
  489. self_clone.error_sender.send(error).await.unwrap();
  490. }
  491. }
  492. });
  493. handles.push(handle)
  494. }
  495. let futures = FuturesUnordered::from_iter(handles);
  496. // 等待所有任务完成
  497. let _: Result<Vec<_>, _> = futures.try_collect().await;
  498. // 撤销订单
  499. let mut cancel_handlers = vec![];
  500. for item in order_command.cancel.keys() {
  501. let order_id = order_command.cancel[item].get(1).unwrap().clone();
  502. let custom_id = order_command.cancel[item].get(0).unwrap().clone();
  503. let mut self_clone = self.clone();
  504. let handle = spawn(async move {
  505. let result = self_clone.cancel_order(&order_id, &custom_id).await;
  506. match result {
  507. Ok(_) => {
  508. // result_sd.send(result).await.unwrap();
  509. }
  510. Err(error) => {
  511. // 取消失败去查订单。
  512. let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
  513. match query_rst {
  514. Ok(order) => {
  515. self_clone.order_sender.send(order).await.unwrap();
  516. }
  517. Err(err) => {
  518. error!("撤单失败,而且查单也失败了,bitget_swap,oid={}, cid={}, err={:?}。", order_id.clone(), custom_id.clone(), err);
  519. }
  520. }
  521. self_clone.error_sender.send(error).await.unwrap();
  522. }
  523. }
  524. });
  525. cancel_handlers.push(handle)
  526. }
  527. let futures = FuturesUnordered::from_iter(cancel_handlers);
  528. // 等待所有任务完成
  529. let _: Result<Vec<_>, _> = futures.try_collect().await;
  530. // 检查订单指令
  531. let mut check_handlers = vec![];
  532. for item in order_command.check.keys() {
  533. let order_id = order_command.check[item].get(1).unwrap().clone();
  534. let custom_id = order_command.check[item].get(0).unwrap().clone();
  535. let mut self_clone = self.clone();
  536. let handle = spawn(async move {
  537. let result = self_clone.get_order_detail(order_id.as_str(), custom_id.as_str()).await;
  538. match result {
  539. Ok(result) => {
  540. self_clone.order_sender.send(result).await.unwrap();
  541. }
  542. Err(error) => {
  543. self_clone.error_sender.send(error).await.unwrap();
  544. }
  545. }
  546. });
  547. check_handlers.push(handle)
  548. }
  549. let futures = FuturesUnordered::from_iter(check_handlers);
  550. // 等待所有任务完成
  551. let _: Result<Vec<_>, _> = futures.try_collect().await;
  552. }
  553. }
  554. // pub fn format_account_info(balance_data: Value) -> Account {
  555. // let balance_coin = balance_data["coin"].as_str().unwrap().to_string().to_uppercase();
  556. // let available_balance = Decimal::from_str(balance_data["available"].as_str().unwrap()).unwrap();
  557. // let frozen_balance = Decimal::from_str(balance_data["frozen"].as_str().unwrap()).unwrap();
  558. // let balance = available_balance + frozen_balance;
  559. //
  560. // Account {
  561. // coin: balance_coin,
  562. // balance,
  563. // available_balance,
  564. // frozen_balance,
  565. // stocks: Decimal::ZERO,
  566. // available_stocks: Decimal::ZERO,
  567. // frozen_stocks: Decimal::ZERO,
  568. // }
  569. // }
  570. pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
  571. let price = Decimal::from_str(order["price"].as_str().unwrap_or(order["priceAvg"].as_str().unwrap())).unwrap();
  572. let size = Decimal::from_str(order["size"].as_str().unwrap()).unwrap();
  573. let status = order["state"].as_str().unwrap();
  574. let base_volume = Decimal::from_str(order["quoteVolume"].as_str().unwrap()).unwrap();
  575. let avg_price = if order["priceAvg"].is_null() || order["priceAvg"].as_str().unwrap().is_empty() {
  576. Decimal::ZERO
  577. } else {
  578. Decimal::from_str(order["priceAvg"].as_str().unwrap().to_string().as_str()).unwrap()
  579. };
  580. let amount = size * ct_val;
  581. let deal_amount = base_volume * ct_val;
  582. let custom_status = if ["filled", "cancelled"].contains(&status) {
  583. "REMOVE".to_string()
  584. } else if ["init", "live", "new", "partially_filled"].contains(&status) {
  585. "NEW".to_string()
  586. } else {
  587. "NULL".to_string()
  588. };
  589. Order {
  590. id: order["orderId"].as_str().unwrap().to_string(),
  591. custom_id: order["clientOid"].as_str().unwrap().to_string(),
  592. price,
  593. amount,
  594. deal_amount,
  595. avg_price,
  596. status: custom_status,
  597. order_type: order["orderType"].as_str().unwrap().to_string(),
  598. trace_stack: TraceStack::new(0, Instant::now()).on_special("700 bitget_swap".to_string()),
  599. }
  600. }