exchange_test.rs 34 KB


  1. use std::collections::{BTreeMap};
  2. use std::io::{Error};
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use futures::StreamExt;
  6. use rust_decimal_macros::dec;
  7. use tokio::sync::mpsc::{channel, Receiver, Sender};
  8. use tokio::sync::Mutex;
  9. use tokio::try_join;
  10. use tracing::{error, trace};
  11. // use exchanges::binance_spot_ws::{BinanceSpotLogin, BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
  12. // use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
  13. // use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
  14. // use exchanges::kucoin_spot_ws::{KucoinSpotLogin, KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
  15. // use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
  16. // use exchanges::bitget_spot_ws::{BitgetSpotLogin, BitgetSpotSubscribeType, BitgetSpotWs, BitgetSpotWsType};
  17. use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
  18. use exchanges::response_base::ResponseData;
  19. use standard::exchange::{Exchange, ExchangeEnum};
  20. // use standard::{binance_spot_handle, Order, Platform, utils};
  21. // use standard::{binance_handle, Order, Platform, utils};
  22. // use standard::{kucoin_handle, Order, Platform, utils};
  23. // use standard::{kucoin_spot_handle, Order, Platform, utils};
  24. // use standard::{gate_handle, Order, Platform, utils};
  25. // use standard::{bitget_spot_handle, Order, Platform, utils};
  26. use standard::{okx_handle, Order, Platform, utils};
  27. // 创建实体
  28. #[allow(dead_code)]
  29. pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
  30. utils::proxy_handle(None);
  31. let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
  32. let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
  33. let account_info = global::account_info::get_account_info("../test_account.toml");
  34. match exchange {
  35. ExchangeEnum::BinanceSwap => {
  36. let mut params: BTreeMap<String, String> = BTreeMap::new();
  37. let access_key = account_info.binance_access_key;
  38. let secret_key = account_info.binance_secret_key;
  39. params.insert("access_key".to_string(), access_key);
  40. params.insert("secret_key".to_string(), secret_key);
  41. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  42. }
  43. ExchangeEnum::BinanceSpot => {
  44. let mut params: BTreeMap<String, String> = BTreeMap::new();
  45. let access_key = account_info.binance_access_key;
  46. let secret_key = account_info.binance_secret_key;
  47. params.insert("access_key".to_string(), access_key);
  48. params.insert("secret_key".to_string(), secret_key);
  49. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  50. }
  51. ExchangeEnum::GateSwap => {
  52. let mut params: BTreeMap<String, String> = BTreeMap::new();
  53. let access_key = account_info.gate_access_key;
  54. let secret_key = account_info.gate_secret_key;
  55. params.insert("access_key".to_string(), access_key);
  56. params.insert("secret_key".to_string(), secret_key);
  57. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  58. }
  59. ExchangeEnum::GateSpot => {
  60. let mut params: BTreeMap<String, String> = BTreeMap::new();
  61. let access_key = account_info.gate_access_key;
  62. let secret_key = account_info.gate_secret_key;
  63. params.insert("access_key".to_string(), access_key);
  64. params.insert("secret_key".to_string(), secret_key);
  65. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  66. }
  67. ExchangeEnum::KucoinSwap => {
  68. let mut params: BTreeMap<String, String> = BTreeMap::new();
  69. let access_key = account_info.kucoin_access_key;
  70. let secret_key = account_info.kucoin_secret_key;
  71. let pass_key = account_info.kucoin_pass;
  72. params.insert("access_key".to_string(), access_key);
  73. params.insert("secret_key".to_string(), secret_key);
  74. params.insert("pass_key".to_string(), pass_key);
  75. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  76. }
  77. ExchangeEnum::KucoinSpot => {
  78. let mut params: BTreeMap<String, String> = BTreeMap::new();
  79. let access_key = account_info.kucoin_access_key;
  80. let secret_key = account_info.kucoin_secret_key;
  81. let pass_key = account_info.kucoin_pass;
  82. params.insert("access_key".to_string(), access_key);
  83. params.insert("secret_key".to_string(), secret_key);
  84. params.insert("pass_key".to_string(), pass_key);
  85. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  86. }
  87. ExchangeEnum::OkxSwap => {
  88. let mut params: BTreeMap<String, String> = BTreeMap::new();
  89. let access_key = account_info.okx_access_key;
  90. let secret_key = account_info.okx_secret_key;
  91. let pass_key = account_info.okx_pass;
  92. params.insert("access_key".to_string(), access_key);
  93. params.insert("secret_key".to_string(), secret_key);
  94. params.insert("pass_key".to_string(), pass_key);
  95. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  96. }
  97. ExchangeEnum::BitgetSpot => {
  98. let mut params: BTreeMap<String, String> = BTreeMap::new();
  99. let access_key = account_info.bitget_access_key;
  100. let secret_key = account_info.bitget_secret_key;
  101. let pass_key = account_info.bitget_pass;
  102. params.insert("access_key".to_string(), access_key);
  103. params.insert("secret_key".to_string(), secret_key);
  104. params.insert("pass_key".to_string(), pass_key);
  105. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  106. }
  107. ExchangeEnum::HtxSwap => {
  108. let mut params: BTreeMap<String, String> = BTreeMap::new();
  109. let access_key = account_info.htx_access_key;
  110. let secret_key = account_info.htx_secret_key;
  111. let pass_key = account_info.htx_pass;
  112. params.insert("access_key".to_string(), access_key);
  113. params.insert("secret_key".to_string(), secret_key);
  114. params.insert("pass_key".to_string(), pass_key);
  115. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  116. }
  117. _ => {
  118. panic!("该交易所未实现!")
  119. }
  120. }
  121. }
  122. #[allow(dead_code)]
  123. pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<OkxSwapSubscribeType>: From<T> {
  124. utils::proxy_handle(None);
  125. let account_info = global::account_info::get_account_info("../test_account.toml");
  126. match exchange {
  127. ExchangeEnum::BinanceSpot => {
  128. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  129. // trace!(symbol_format);
  130. // let name = format!("binance_spot@{}", symbol.to_string().to_lowercase());
  131. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  132. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  133. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  134. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  135. //
  136. // let params = BinanceSpotLogin {
  137. // api_key: account_info.binance_access_key,
  138. // api_secret: account_info.binance_secret_key,
  139. // };
  140. // let mut exchange_wss;
  141. // exchange_wss = BinanceSpotWs::new_label(name, false, Option::from(params), BinanceSpotWsType::PublicAndPrivate);
  142. // exchange_wss.set_symbols(vec![symbol_format]);
  143. // exchange_wss.set_subscribe(subscriber_type.into());
  144. //
  145. //
  146. // let mold_arc = Arc::new(mold.to_string());
  147. // //读取
  148. // tokio::spawn(async move {
  149. // let mold_clone = Arc::clone(&mold_arc);
  150. // loop {
  151. // if let Some(data) = read_rx.next().await {
  152. // trace!("原始数据 data:{:?}",data);
  153. // match mold_clone.as_str() {
  154. // "depth" => {
  155. // if data.data != "" {
  156. // let result = binance_spot_handle::handle_special_depth(data);
  157. // trace!(?result)
  158. // }
  159. // }
  160. // "ticker" => {
  161. // if data.data != "" {
  162. // let result = binance_spot_handle::handle_special_ticker(data);
  163. // trace!(?result)
  164. // }
  165. // }
  166. // _ => {
  167. // error!("没有该命令!mode={}", mold_clone);
  168. // panic!("没有该命令!mode={}", mold_clone)
  169. // }
  170. // }
  171. // }
  172. // };
  173. // });
  174. //
  175. // let t1 = tokio::spawn(async move {
  176. // //链接
  177. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  178. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  179. // });
  180. // try_join!(t1).unwrap();
  181. }
  182. ExchangeEnum::BinanceSwap => {
  183. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  184. // trace!(symbol_format);
  185. // let name = format!("binance_swap@{}", symbol.to_string().to_lowercase());
  186. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  187. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  188. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  189. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  190. //
  191. // let params = BinanceSwapLogin {
  192. // api_key: account_info.binance_access_key,
  193. // api_secret: account_info.binance_secret_key,
  194. // };
  195. // let mut exchange_wss;
  196. // exchange_wss = BinanceSwapWs::new_label(name, false, Option::from(params), BinanceSwapWsType::PublicAndPrivate);
  197. // exchange_wss.set_symbols(vec![symbol_format]);
  198. // exchange_wss.set_subscribe(subscriber_type.into());
  199. //
  200. //
  201. // let mold_arc = Arc::new(mold.to_string());
  202. // //读取
  203. // tokio::spawn(async move {
  204. // let mold_clone = Arc::clone(&mold_arc);
  205. // loop {
  206. // if let Some(data) = read_rx.next().await {
  207. // trace!("原始数据 data:{:?}",data);
  208. // match mold_clone.as_str() {
  209. // "depth" => {
  210. // if data.data != "" {
  211. // let result = binance_handle::handle_special_depth(data);
  212. // trace!(?result)
  213. // }
  214. // }
  215. // "ticker" => {
  216. // if data.data != "" {
  217. // let result = binance_handle::handle_special_ticker(data);
  218. // trace!(?result)
  219. // }
  220. // }
  221. // _ => {
  222. // error!("没有该命令!mode={}", mold_clone);
  223. // panic!("没有该命令!mode={}", mold_clone)
  224. // }
  225. // }
  226. // }
  227. // };
  228. // });
  229. //
  230. // let t1 = tokio::spawn(async move {
  231. // //链接
  232. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  233. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  234. // });
  235. // try_join!(t1).unwrap();
  236. }
  237. ExchangeEnum::KucoinSwap => {
  238. // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
  239. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  240. //
  241. // let name = format!("kucoin_swap@{}", symbol.to_string().to_lowercase());
  242. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  243. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  244. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  245. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  246. //
  247. // let params = KucoinSwapLogin {
  248. // access_key: account_info.kucoin_access_key,
  249. // secret_key: account_info.kucoin_secret_key,
  250. // pass_key: account_info.kucoin_pass,
  251. // };
  252. // let mut exchange_wss;
  253. // if ["depth", "ticker"].contains(&mold) {
  254. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Public).await;
  255. // } else {
  256. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Private).await;
  257. // }
  258. // exchange_wss.set_symbols(vec![symbol_format]);
  259. // exchange_wss.set_subscribe(subscriber_type.into());
  260. //
  261. // let mold_arc = Arc::new(mold.to_string());
  262. // tokio::spawn(async move {
  263. // let mold_clone = Arc::clone(&mold_arc);
  264. // loop {
  265. // if let Some(data) = read_rx.next().await {
  266. // trace!("原始数据 data:{:?}",data);
  267. // match mold_clone.as_str() {
  268. // "depth" => {
  269. // let result = kucoin_handle::handle_special_depth(data);
  270. // trace!(?result)
  271. // }
  272. // "ticker" => {
  273. // let result = kucoin_handle::handle_special_ticker(data);
  274. // trace!(?result)
  275. // }
  276. // "account" => {
  277. // let result = kucoin_handle::handle_account_info(data, symbol_back.clone());
  278. // trace!(?result)
  279. // }
  280. // "position" => {
  281. // let result = kucoin_handle::handle_position(data, dec!(1));
  282. // trace!(?result)
  283. // }
  284. // "orders" => {
  285. // let result = kucoin_handle::handle_order(data, dec!(0.001));
  286. // trace!(?result)
  287. // }
  288. // _ => {
  289. // error!("没有该命令!mode={}", mold_clone);
  290. // panic!("没有该命令!mode={}", mold_clone)
  291. // }
  292. // }
  293. // }
  294. // }
  295. // });
  296. //
  297. // let t1 = tokio::spawn(async move {
  298. // //链接
  299. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  300. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  301. // });
  302. // try_join!(t1).unwrap();
  303. }
  304. ExchangeEnum::KucoinSpot => {
  305. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  306. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  307. // trace!(symbol_format);
  308. // let name = format!("kucoin_spot@{}", symbol.to_string().to_lowercase());
  309. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  310. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  311. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  312. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  313. //
  314. // let params = KucoinSpotLogin {
  315. // access_key: account_info.kucoin_access_key,
  316. // secret_key: account_info.kucoin_secret_key,
  317. // pass_key: account_info.kucoin_pass,
  318. // };
  319. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  320. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Public).await
  321. // } else {
  322. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Private).await
  323. // };
  324. // exchange_wss.set_symbols(vec![symbol_format]);
  325. // exchange_wss.set_subscribe(subscriber_type.into());
  326. //
  327. // let mold_arc = Arc::new(mold.to_string());
  328. // tokio::spawn(async move {
  329. // let mold_clone = Arc::clone(&mold_arc);
  330. // loop {
  331. // if let Some(data) = read_rx.next().await {
  332. // trace!("原始数据 data:{:?}",data);
  333. // match mold_clone.as_str() {
  334. // "depth" => {
  335. // if data.data != "" {
  336. // let result = kucoin_spot_handle::handle_special_depth(data);
  337. // trace!(?result)
  338. // }
  339. // }
  340. // "ticker" => {
  341. // if data.data != "" {
  342. // let result = kucoin_spot_handle::handle_special_ticker(data);
  343. // trace!(?result)
  344. // }
  345. // }
  346. // "account" => {
  347. // if data.data != "" {
  348. // let result = kucoin_spot_handle::handle_account_info(data, symbol_back.clone());
  349. // trace!(?result)
  350. // }
  351. // }
  352. // "orders" => {
  353. // if data.data != "" {
  354. // let result = kucoin_spot_handle::handle_order(data, dec!(1));
  355. // trace!(?result)
  356. // }
  357. // }
  358. // _ => {
  359. // error!("没有该命令!mode={}", mold_clone);
  360. // panic!("没有该命令!mode={}", mold_clone)
  361. // }
  362. // }
  363. // }
  364. // }
  365. // });
  366. // let t1 = tokio::spawn(async move {
  367. // //链接
  368. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  369. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  370. // });
  371. // try_join!(t1).unwrap();
  372. }
  373. ExchangeEnum::GateSwap => {
  374. // let symbol_format = utils::format_symbol(symbol.to_string(), "_").to_uppercase();
  375. // trace!(symbol_format);
  376. // let name = format!("gate_swap@{}", symbol.to_string().to_lowercase());
  377. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  378. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  379. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  380. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  381. //
  382. // let params = GateSwapLogin {
  383. // api_key: account_info.gate_access_key,
  384. // secret: account_info.gate_secret_key,
  385. // };
  386. // let mut exchange_wss = GateSwapWs::new_label(name, false, Option::from(params), GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  387. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  388. // exchange_wss.set_subscribe(subscriber_type.into());
  389. //
  390. // let mold_arc = Arc::new(mold.to_string());
  391. // tokio::spawn(async move {
  392. // let mold_clone = Arc::clone(&mold_arc);
  393. // loop {
  394. // if let Some(data) = read_rx.next().await {
  395. // trace!("原始数据 data:{:?}",data);
  396. // match mold_clone.as_str() {
  397. // "depth" => {
  398. // if data.data != "" {
  399. // let result = gate_handle::handle_special_depth(data);
  400. // trace!(?result)
  401. // }
  402. // }
  403. // "ticker" => {
  404. // if data.data != "" {
  405. // let result = gate_handle::handle_special_ticker(data);
  406. // trace!(?result)
  407. // }
  408. // }
  409. // "account" => {
  410. // if data.data != "" {
  411. // let result = gate_handle::handle_account_info(data, symbol_format.clone());
  412. // trace!(?result)
  413. // }
  414. // }
  415. // "orders" => {
  416. // if data.data != "" {
  417. // let result = gate_handle::handle_order(data, dec!(1));
  418. // trace!(?result)
  419. // }
  420. // }
  421. // _ => {
  422. // error!("没有该命令!mode={}", mold_clone);
  423. // panic!("没有该命令!mode={}", mold_clone)
  424. // }
  425. // }
  426. // }
  427. // }
  428. // });
  429. // let t1 = tokio::spawn(async move {
  430. // //链接
  431. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  432. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  433. // });
  434. // try_join!(t1).unwrap();
  435. }
  436. ExchangeEnum::BitgetSpot => {
  437. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  438. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  439. // trace!(symbol_format);
  440. // let name = format!("bitget_spot@{}", symbol.to_string().to_lowercase());
  441. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  442. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  443. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  444. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  445. //
  446. // let params = BitgetSpotLogin {
  447. // api_key: account_info.bitget_access_key,
  448. // secret_key: account_info.bitget_secret_key,
  449. // passphrase_key: account_info.bitget_pass,
  450. // };
  451. //
  452. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  453. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Public)
  454. // } else {
  455. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Private)
  456. // };
  457. // exchange_wss.set_symbols(vec![symbol_format]);
  458. // exchange_wss.set_subscribe(subscriber_type.into());
  459. //
  460. // let mold_arc = Arc::new(mold.to_string());
  461. // //读取
  462. // tokio::spawn(async move {
  463. // loop {
  464. // let mold_clone = Arc::clone(&mold_arc);
  465. // if let Some(data) = read_rx.next().await {
  466. // trace!("原始数据 data:{:?}",data);
  467. // match mold_clone.as_str() {
  468. // "depth" => {
  469. // if data.data != "" {
  470. // let result = bitget_spot_handle::handle_special_depth(data);
  471. // trace!(?result)
  472. // }
  473. // }
  474. // "ticker" => {
  475. // if data.data != "" {
  476. // let result = bitget_spot_handle::handle_special_ticker(data);
  477. // trace!(?result)
  478. // }
  479. // }
  480. // "account" => {
  481. // if data.data != "" {
  482. // let result = bitget_spot_handle::handle_account_info(data, symbol_back.clone());
  483. // trace!(?result)
  484. // }
  485. // }
  486. // "orders" => {
  487. // if data.data != "" {
  488. // let result = bitget_spot_handle::handle_order(data, dec!(1));
  489. // trace!(?result)
  490. // }
  491. // }
  492. // _ => {
  493. // error!("没有该命令!mode={}", mold_clone);
  494. // panic!("没有该命令!mode={}", mold_clone)
  495. // }
  496. // }
  497. // }
  498. // }
  499. // });
  500. // let t1 = tokio::spawn(async move {
  501. // //链接
  502. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  503. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  504. // });
  505. // try_join!(t1).unwrap();
  506. }
  507. ExchangeEnum::OkxSwap => {
  508. let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  509. trace!(symbol_format);
  510. let name = format!("okx_swap@{}", symbol.to_string().to_lowercase());
  511. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  512. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  513. let write_tx_am = Arc::new(Mutex::new(write_tx));
  514. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  515. let params = OkxSwapLogin {
  516. api_key: account_info.okx_access_key,
  517. secret_key: account_info.okx_secret_key,
  518. passphrase: account_info.okx_pass,
  519. };
  520. let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  521. OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Public)
  522. } else if ["account", "orders", "position"].contains(&mold) {
  523. OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Private)
  524. } else {
  525. OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Business)
  526. };
  527. exchange_wss.set_symbols(vec![symbol_format.clone()]);
  528. exchange_wss.set_subscribe(subscriber_type.into());
  529. let mold_arc = Arc::new(mold.to_string());
  530. tokio::spawn(async move {
  531. let mold_clone = Arc::clone(&mold_arc);
  532. loop {
  533. if let Some(data) = read_rx.next().await {
  534. trace!("原始数据 data:{:?}",data);
  535. match mold_clone.as_str() {
  536. "depth" => {
  537. if data.data != "" {
  538. let result = okx_handle::handle_special_depth(data);
  539. trace!(?result)
  540. }
  541. }
  542. "ticker" => {
  543. if data.data != "" {
  544. let result = okx_handle::handle_special_ticker(data);
  545. trace!(?result)
  546. }
  547. }
  548. "account" => {
  549. if data.data != "" {
  550. let result = okx_handle::handle_account_info(data, symbol_format.clone());
  551. trace!(?result)
  552. }
  553. }
  554. "position" => {
  555. if data.data != "" {
  556. let result = okx_handle::handle_position(data, dec!(10));
  557. trace!(?result)
  558. }
  559. }
  560. "orders" => {
  561. if data.data != "" {
  562. let result = okx_handle::handle_order(data, dec!(10));
  563. trace!(?result)
  564. }
  565. }
  566. _ => {
  567. error!("没有该命令!mode={}", mold_clone);
  568. panic!("没有该命令!mode={}", mold_clone)
  569. }
  570. }
  571. }
  572. }
  573. });
  574. let t1 = tokio::spawn(async move {
  575. //链接
  576. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  577. exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  578. });
  579. try_join!(t1).unwrap();
  580. }
  581. ExchangeEnum::HtxSwap => {
  582. let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  583. trace!(symbol_format);
  584. let name = format!("htx_swap@{}", symbol.to_string().to_lowercase());
  585. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  586. let write_tx_am = Arc::new(Mutex::new(write_tx));
  587. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  588. let params = HtxSwapLogin {
  589. api_key: account_info.htx_access_key,
  590. secret: account_info.htx_secret_key,
  591. };
  592. let htx_wss_type = match mold.to_string().clone().as_str() {
  593. "depth" => HtxSwapWsType::Public,
  594. _ => HtxSwapWsType::Private
  595. };
  596. let mut exchange_wss = HtxSwapWs::new_label(name, Option::from(params), htx_wss_type);
  597. exchange_wss.set_symbols(vec![symbol_format.clone()]);
  598. exchange_wss.set_subscribe(subscriber_type.into());
  599. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  600. let mold_clone = mold.to_string().clone();
  601. let fun = move |data: ResponseData| {
  602. let symbol_format_c = symbol_format.clone();
  603. let mold_cc = mold_clone.clone();
  604. async move {
  605. trace!("原始数据 data:{:?}",data);
  606. match mold_cc.as_str() {
  607. "depth" => {
  608. if data.data != "" {
  609. let result = handle_info::format_depth(ExchangeEnum::HtxSwap, &data);
  610. trace!("-------------------------------");
  611. trace!(?result)
  612. }
  613. }
  614. "position" => {
  615. if data.data != "" {
  616. let result = htx_swap_handle::handle_position(&data, &dec!(10));
  617. trace!("-------------------------------");
  618. trace!(?result)
  619. }
  620. }
  621. "account" => {
  622. if data.data != "" {
  623. let result = htx_swap_handle::handle_account_info(&data, &symbol_format_c);
  624. trace!("-------------------------------");
  625. trace!(?result)
  626. }
  627. }
  628. "orders" => {
  629. println!("{:?}", data);
  630. if data.data != "" {
  631. let result = htx_swap_handle::handle_order(data, dec!(10));
  632. trace!("-------------------------------");
  633. trace!(?result)
  634. }
  635. }
  636. _ => {
  637. error!("没有该命令!mode={}", mold_cc);
  638. panic!("没有该命令!mode={}", mold_cc)
  639. }
  640. };
  641. }
  642. };
  643. exchange_wss.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  644. }
  645. _ => {
  646. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  647. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  648. }
  649. }
  650. }