gate_swap.rs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. use std::collections::{BTreeMap, HashMap};
  2. use std::io::{Error, ErrorKind};
  3. use std::str::FromStr;
  4. use tokio::sync::mpsc::Sender;
  5. use async_trait::async_trait;
  6. use rust_decimal::Decimal;
  7. use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
  8. use rust_decimal_macros::dec;
  9. use serde_json::{json};
  10. use futures::stream::FuturesUnordered;
  11. use futures::{TryFutureExt, TryStreamExt};
  12. use futures::future::err;
  13. use tracing::{error, info};
  14. use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum};
  15. use exchanges::gate_swap_rest::GateSwapRest;
  16. #[allow(dead_code)]
  17. #[derive(Clone)]
  18. pub struct GateSwap {
  19. exchange: ExchangeEnum,
  20. symbol: String,
  21. is_colo: bool,
  22. params: BTreeMap<String, String>,
  23. request: GateSwapRest,
  24. market: Market,
  25. order_sender: Sender<Order>,
  26. error_sender: Sender<Error>,
  27. }
  28. impl GateSwap {
  29. pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> GateSwap {
  30. let market = Market::new();
  31. let mut gate_swap = GateSwap {
  32. exchange: ExchangeEnum::GateSwap,
  33. symbol: symbol.to_uppercase(),
  34. is_colo,
  35. params: params.clone(),
  36. request: GateSwapRest::new(is_colo, params.clone()),
  37. market,
  38. order_sender,
  39. error_sender,
  40. };
  41. gate_swap.market = GateSwap::get_market(&mut gate_swap).await.unwrap_or(gate_swap.market);
  42. return gate_swap;
  43. }
  44. }
  45. #[async_trait]
  46. impl Platform for GateSwap {
  47. // 克隆方法
  48. fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
  49. // 获取交易所模式
  50. fn get_self_exchange(&self) -> ExchangeEnum {
  51. ExchangeEnum::GateSwap
  52. }
  53. // 获取交易对
  54. fn get_self_symbol(&self) -> String { self.symbol.clone() }
  55. // 获取是否使用高速通道
  56. fn get_self_is_colo(&self) -> bool {
  57. self.is_colo
  58. }
  59. // 获取params信息
  60. fn get_self_params(&self) -> BTreeMap<String, String> {
  61. self.params.clone()
  62. }
  63. // 获取market信息
  64. fn get_self_market(&self) -> Market { self.market.clone() }
  65. // 获取请求时间
  66. fn get_request_delays(&self) -> Vec<i64> { self.request.get_delays() }
  67. // 获取请求平均时间
  68. fn get_request_avg_delay(&self) -> Decimal { self.request.get_avg_delay() }
  69. // 获取请求最大时间
  70. fn get_request_max_delay(&self) -> i64 { self.request.get_max_delay() }
  71. // 获取服务器时间
  72. async fn get_server_time(&mut self) -> Result<String, Error> {
  73. let res_data = self.request.get_server_time().await;
  74. if res_data.code == "200" {
  75. let res_data_str = &res_data.data;
  76. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  77. let result = res_data_json["server_time"].to_string();
  78. Ok(result)
  79. } else {
  80. Err(Error::new(ErrorKind::Other, res_data.message))
  81. }
  82. }
  83. // 获取账号信息
  84. async fn get_account(&mut self) -> Result<Account, Error> {
  85. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  86. let res_data = self.request.get_account(symbol_array[1].to_string().to_lowercase()).await;
  87. if res_data.code == "200" {
  88. let res_data_str = &res_data.data;
  89. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  90. let balance = Decimal::from_str(res_data_json["total"].as_str().unwrap()).unwrap();
  91. let available_balance = Decimal::from_str(res_data_json["available"].as_str().unwrap()).unwrap();
  92. let frozen_balance = balance - available_balance;
  93. let result = Account {
  94. balance,
  95. available_balance,
  96. frozen_balance,
  97. stocks: dec!(0),
  98. available_stocks: dec!(0),
  99. frozen_stocks: dec!(0),
  100. };
  101. Ok(result)
  102. } else {
  103. Err(Error::new(ErrorKind::Other, res_data.message))
  104. }
  105. }
  106. // 获取持仓信息
  107. async fn get_position(&mut self) -> Result<Vec<Position>, Error> {
  108. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  109. let amount_size = self.market.amount_size;
  110. let res_data = self.request.get_position(symbol_array[1].to_string().to_lowercase(), self.symbol.clone()).await;
  111. if res_data.code == "200" {
  112. let res_data_str = &res_data.data;
  113. let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
  114. let result = res_data_json.iter().map(|item| { format_position_item(item, amount_size) }).collect();
  115. Ok(result)
  116. } else {
  117. Err(Error::new(ErrorKind::Other, res_data.message))
  118. }
  119. }
  120. // 获取所有持仓
  121. async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
  122. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  123. let amount_size = self.market.amount_size;
  124. let res_data = self.request.get_user_position(symbol_array[1].to_string().to_lowercase()).await;
  125. if res_data.code == "200" {
  126. let res_data_str = &res_data.data;
  127. let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
  128. let result = res_data_json.iter().map(|item| { format_position_item(item, amount_size) }).collect();
  129. Ok(result)
  130. } else {
  131. Err(Error::new(ErrorKind::Other, res_data.message))
  132. }
  133. }
  134. // 获取市场行情
  135. async fn get_ticker(&mut self) -> Result<Ticker, Error> {
  136. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  137. let res_data = self.request.get_ticker(symbol_array[1].to_string().to_lowercase()).await;
  138. if res_data.code == "200" {
  139. let res_data_str = &res_data.data;
  140. let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
  141. let ticker_info = res_data_json.iter().find(|item| item["contract"].as_str().unwrap() == self.symbol);
  142. match ticker_info {
  143. None => {
  144. panic!("Gate:获取Ticker信息错误!\nget_ticker:res_data={}", res_data_str)
  145. }
  146. Some(value) => {
  147. let result = Ticker {
  148. time: chrono::Utc::now().timestamp_millis(),
  149. high: Decimal::from_str(value["high_24h"].as_str().unwrap()).unwrap(),
  150. low: Decimal::from_str(value["low_24h"].as_str().unwrap()).unwrap(),
  151. sell: Decimal::from_str(value["lowest_ask"].as_str().unwrap()).unwrap(),
  152. buy: Decimal::from_str(value["highest_bid"].as_str().unwrap()).unwrap(),
  153. last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
  154. volume: Decimal::from_str(value["volume_24h"].as_str().unwrap()).unwrap(),
  155. };
  156. Ok(result)
  157. }
  158. }
  159. } else {
  160. Err(Error::new(ErrorKind::Other, res_data.message))
  161. }
  162. }
  163. async fn get_market(&mut self) -> Result<Market, Error> {
  164. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  165. let res_data = self.request.get_market_details(symbol_array[1].to_string().to_lowercase()).await;
  166. if res_data.code == "200" {
  167. let res_data_str = &res_data.data;
  168. let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
  169. let market_info = res_data_json.iter().find(|item| item["name"].as_str().unwrap() == self.symbol);
  170. match market_info {
  171. None => {
  172. panic!("Gate:获取Market信息错误!\nget_market:res_data={}", res_data_str)
  173. }
  174. Some(value) => {
  175. let name = value["name"].as_str().unwrap();
  176. let name_array: Vec<&str> = name.split("_").collect();
  177. let tick_size = Decimal::from_str(value["order_price_round"].as_str().unwrap()).unwrap();
  178. let amount_size = Decimal::from_str(value["quanto_multiplier"].as_str().unwrap()).unwrap();
  179. let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
  180. let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
  181. let result = Market {
  182. symbol: name.to_string(),
  183. base_asset: name_array[0].to_string(),
  184. quote_asset: name_array[1].to_string(),
  185. tick_size,
  186. amount_size,
  187. price_precision,
  188. amount_precision,
  189. min_qty: Decimal::from_str(&value["order_size_min"].to_string()).unwrap(),
  190. max_qty: Decimal::from_str(&value["order_size_max"].to_string()).unwrap(),
  191. min_notional: Default::default(),
  192. max_notional: Default::default(),
  193. ct_val: Default::default(),
  194. };
  195. Ok(result)
  196. }
  197. }
  198. } else {
  199. Err(Error::new(ErrorKind::Other, res_data.message))
  200. }
  201. }
  202. // 获取订单详情
  203. async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
  204. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  205. let amount_size = self.market.amount_size;
  206. let id = if order_id.eq("") { format!("t-my-custom-id_{}", custom_id) } else { order_id.to_string() };
  207. let res_data = self.request.get_order_details(symbol_array[1].to_string().to_lowercase(), id).await;
  208. if res_data.code == "200" {
  209. let res_data_str = &res_data.data;
  210. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  211. let result = format_order_item(res_data_json, amount_size);
  212. Ok(result)
  213. } else {
  214. Err(Error::new(ErrorKind::Other, res_data.message))
  215. }
  216. }
  217. // 获取订单列表
  218. async fn get_orders_list(&mut self, status: &str) -> Result<Vec<Order>, Error> {
  219. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  220. let amount_size = self.market.amount_size;
  221. let res_data = self.request.get_orders(symbol_array[1].to_string().to_lowercase(), status.to_string()).await;
  222. if res_data.code == "200" {
  223. let res_data_str = &res_data.data;
  224. let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
  225. let order_info: Vec<_> = res_data_json.iter().filter(|item| item["contract"].as_str().unwrap_or("") == self.symbol).collect();
  226. let result = order_info.iter().map(|&item| format_order_item(item.clone(), amount_size)).collect();
  227. Ok(result)
  228. } else {
  229. Err(Error::new(ErrorKind::Other, res_data.message))
  230. }
  231. }
  232. // 下单接口
  233. async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
  234. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  235. let amount_size = self.market.amount_size;
  236. let mut params = json!({
  237. "text": format!("t-my-custom-id_{}", custom_id),
  238. "contract": self.symbol.to_string(),
  239. "price": price.to_string(),
  240. });
  241. let size = (amount / amount_size).floor();
  242. if price.eq(&dec!(0)) {
  243. params["tif"] = json!("ioc".to_string());
  244. }
  245. match origin_side {
  246. "kd" => {
  247. params["reduce_only"] = json!(false);
  248. params["size"] = json!(size.to_i64());
  249. }
  250. "pd" => {
  251. params["reduce_only"] = json!(true);
  252. params["size"] = serde_json::Value::from((-size).to_i64());
  253. }
  254. "kk" => {
  255. params["reduce_only"] = json!(false);
  256. params["size"] = serde_json::Value::from((-size).to_i64());
  257. }
  258. "pk" => {
  259. params["reduce_only"] = json!(true);
  260. params["size"] = json!(size.to_i64());
  261. }
  262. _ => { error!("下单参数错误"); }
  263. };
  264. let res_data = self.request.swap_order(symbol_array[1].to_string().to_lowercase(), params).await;
  265. if res_data.code == "200" {
  266. let res_data_str = &res_data.data;
  267. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  268. let result = format_order_item(res_data_json, amount_size);
  269. Ok(result)
  270. } else {
  271. Err(Error::new(ErrorKind::Other, res_data.message))
  272. }
  273. }
  274. // 撤销订单
  275. async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
  276. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  277. let amount_size = self.market.amount_size;
  278. let settle = symbol_array[1].to_string().to_lowercase();
  279. let id = if order_id.eq("") { format!("t-my-custom-id_{}", custom_id) } else { order_id.to_string() };
  280. let res_data = self.request.cancel_order(settle, id.to_string()).await;
  281. if res_data.code == "200" {
  282. let res_data_str = &res_data.data;
  283. let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
  284. let result = format_order_item(res_data_json, amount_size);
  285. Ok(result)
  286. } else {
  287. Err(Error::new(ErrorKind::Other, res_data.message))
  288. }
  289. }
  290. // 批量撤销订单
  291. async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
  292. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  293. let amount_size = self.market.tick_size;
  294. let res_data = self.request.cancel_orders(symbol_array[1].to_string().to_lowercase(), self.symbol.to_string()).await;
  295. if res_data.code == "200" {
  296. let res_data_str = &res_data.data;
  297. let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
  298. let result = res_data_json.iter().map(|item| format_order_item(item.clone(), amount_size)).collect();
  299. Ok(result)
  300. } else {
  301. Err(Error::new(ErrorKind::Other, res_data.message))
  302. }
  303. }
  304. // 设置持仓模式
  305. async fn set_dual_mode(&mut self, coin: &str, is_dual_mode: bool) -> Result<String, Error> {
  306. let coin_format = coin.to_string().to_lowercase();
  307. let res_data = self.request.setting_dual_mode(coin_format, is_dual_mode).await;
  308. if res_data.code == "200" {
  309. let res_data_str = &res_data.data;
  310. let result = res_data_str.clone();
  311. Ok(result)
  312. } else {
  313. Err(Error::new(ErrorKind::Other, res_data.message))
  314. }
  315. }
  316. // 更新双持仓模式下杠杆
  317. async fn set_dual_leverage(&mut self, leverage: &str) -> Result<String, Error> {
  318. let symbol_array: Vec<&str> = self.symbol.split("_").collect();
  319. let res_data = self.request.setting_dual_leverage(symbol_array[1].to_string().to_lowercase(), self.symbol.to_string(), leverage.to_string()).await;
  320. if res_data.code == "200" {
  321. let res_data_str = &res_data.data;
  322. let result = res_data_str.clone();
  323. Ok(result)
  324. } else {
  325. Err(Error::new(ErrorKind::Other, res_data.message))
  326. }
  327. }
  328. // 交易账户互转
  329. async fn wallet_transfers(&mut self, coin: &str, from: &str, to: &str, amount: Decimal) -> Result<String, Error> {
  330. let coin_format = coin.to_string().to_lowercase();
  331. let res_data = self.request.wallet_transfers(coin_format.clone(), from.to_string(), to.to_string(), amount.to_string(), coin_format.clone()).await;
  332. if res_data.code == "200" {
  333. let res_data_str = &res_data.data;
  334. let result = res_data_str.clone();
  335. Ok(result)
  336. } else {
  337. Err(Error::new(ErrorKind::Other, res_data.message))
  338. }
  339. }
  340. // 指令下单
  341. async fn command_order(&mut self, order_command: OrderCommand) {
  342. let mut handles = vec![];
  343. // 撤销订单
  344. let cancel = order_command.cancel;
  345. for item in cancel.keys() {
  346. let mut self_clone = self.clone();
  347. let cancel_clone = cancel.clone();
  348. let item_clone = item.clone();
  349. let order_id = cancel_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
  350. let custom_id = cancel_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
  351. let result_sd = self.order_sender.clone();
  352. let err_sd = self.error_sender.clone();
  353. let handle = tokio::spawn(async move {
  354. let result = self_clone.cancel_order(&order_id, &custom_id).await;
  355. match result {
  356. Ok(result) => {
  357. result_sd.send(result).await.unwrap();
  358. }
  359. Err(error) => {
  360. err_sd.send(error).await.unwrap();
  361. }
  362. }
  363. });
  364. handles.push(handle)
  365. }
  366. // 下单指令
  367. let mut limits = HashMap::new();
  368. limits.extend(order_command.limits_open);
  369. limits.extend(order_command.limits_close);
  370. for item in limits.keys() {
  371. let mut self_clone = self.clone();
  372. let limits_clone = limits.clone();
  373. let item_clone = item.clone();
  374. let result_sd = self.order_sender.clone();
  375. let err_sd = self.error_sender.clone();
  376. let handle = tokio::spawn(async move {
  377. let value = limits_clone[&item_clone].clone();
  378. let amount = Decimal::from_str(value.get(0).unwrap_or(&"0".to_string())).unwrap();
  379. let side = value.get(1).unwrap();
  380. let price = Decimal::from_str(value.get(2).unwrap_or(&"0".to_string())).unwrap();
  381. let cid = value.get(3).unwrap();
  382. // order_name: [数量,方向,价格,c_id]
  383. let result = self_clone.take_order(cid, side, price, amount).await;
  384. match result {
  385. Ok(result) => {
  386. result_sd.send(result).await.unwrap();
  387. }
  388. Err(error) => {
  389. let mut err_order = Order::new();
  390. err_order.custom_id = (*cid).clone();
  391. err_order.status = "REMOVE".to_string();
  392. result_sd.send(err_order).await.unwrap();
  393. err_sd.send(error).await.unwrap();
  394. }
  395. }
  396. });
  397. handles.push(handle)
  398. }
  399. // 检查订单指令
  400. let check = order_command.check;
  401. for item in check.keys() {
  402. let mut self_clone = self.clone();
  403. let check_clone = check.clone();
  404. let item_clone = item.clone();
  405. let order_id = check_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
  406. let custom_id = check_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
  407. let result_sd = self.order_sender.clone();
  408. let err_sd = self.error_sender.clone();
  409. let handle = tokio::spawn(async move {
  410. let result = self_clone.get_order_detail(&order_id, &custom_id).await;
  411. match result {
  412. Ok(result) => {
  413. result_sd.send(result).await.unwrap();
  414. }
  415. Err(error) => {
  416. err_sd.send(error).await.unwrap();
  417. }
  418. }
  419. });
  420. handles.push(handle)
  421. }
  422. let futures = FuturesUnordered::from_iter(handles);
  423. let _: Result<Vec<_>, _> = futures.try_collect().await;
  424. }
  425. }
  426. pub fn format_position_item(position: &serde_json::Value, amount_size: Decimal) -> Position {
  427. let position_mode = match position["mode"].as_str().unwrap_or("") {
  428. "single" => PositionModeEnum::Both,
  429. "dual_long" => PositionModeEnum::Long,
  430. "dual_short" => PositionModeEnum::Short,
  431. _ => panic!("Gate:格式化持仓模式错误!\nformat_position_item:position_mode={}", position["mode"])
  432. };
  433. let size = Decimal::from_str(&position["size"].to_string()).unwrap();
  434. let amount = size * amount_size;
  435. Position {
  436. symbol: position["contract"].as_str().unwrap_or("").parse().unwrap(),
  437. margin_level: Decimal::from_str(position["leverage"].as_str().unwrap()).unwrap(),
  438. amount,
  439. frozen_amount: dec!(0),
  440. price: Decimal::from_str(position["entry_price"].as_str().unwrap()).unwrap(),
  441. profit: Decimal::from_str(position["unrealised_pnl"].as_str().unwrap()).unwrap(),
  442. position_mode,
  443. margin: Decimal::from_str(position["margin"].as_str().unwrap()).unwrap(),
  444. }
  445. }
  446. pub fn format_order_item(order: serde_json::Value, amount_size: Decimal) -> Order {
  447. info!("format-order-start, gate_swap");
  448. info!(?order);
  449. let status = order["status"].as_str().unwrap_or("");
  450. let text = order["text"].as_str().unwrap_or("");
  451. let size = Decimal::from_str(&order["size"].to_string()).unwrap();
  452. let left = Decimal::from_str(&order["left"].to_string()).unwrap();
  453. let amount = size * amount_size;
  454. let deal_amount = (size - left) * amount_size;
  455. let custom_status = if status == "finished" { "REMOVE".to_string() } else if status == "open" { "NEW".to_string() } else { panic!("Gate:格式化订单状态错误!\nformat_order_item:status={}", status) };
  456. let rst_order = Order {
  457. id: order["id"].to_string(),
  458. custom_id: text.replace("t-my-custom-id_", ""),
  459. price: Decimal::from_str(order["price"].as_str().unwrap()).unwrap(),
  460. amount,
  461. deal_amount,
  462. avg_price: Decimal::from_str(&order["fill_price"].as_str().unwrap()).unwrap(),
  463. status: custom_status,
  464. order_type: "limit".to_string(),
  465. };
  466. info!(?rst_order);
  467. info!("format-order-end, gate_swap");
  468. return rst_order;
  469. }