exchange_test.rs 30 KB


  1. use std::collections::{BTreeMap};
  2. use std::io::{Error};
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use futures::StreamExt;
  6. use rust_decimal_macros::dec;
  7. use tokio::sync::mpsc::{channel, Receiver, Sender};
  8. use tokio::sync::Mutex;
  9. use tokio::try_join;
  10. use tracing::{error, trace};
  11. // use exchanges::binance_spot_ws::{BinanceSpotLogin, BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
  12. // use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
  13. // use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
  14. // use exchanges::kucoin_spot_ws::{KucoinSpotLogin, KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
  15. // use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
  16. // use exchanges::bitget_spot_ws::{BitgetSpotLogin, BitgetSpotSubscribeType, BitgetSpotWs, BitgetSpotWsType};
  17. use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
  18. use exchanges::response_base::ResponseData;
  19. use standard::exchange::{Exchange, ExchangeEnum};
  20. // use standard::{binance_spot_handle, Order, Platform, utils};
  21. // use standard::{binance_handle, Order, Platform, utils};
  22. // use standard::{kucoin_handle, Order, Platform, utils};
  23. // use standard::{gate_handle, Order, Platform, utils};
  24. // use standard::{bitget_spot_handle, Order, Platform, utils};
  25. use standard::{okx_handle, Order, Platform, utils};
  26. // 创建实体
  27. #[allow(dead_code)]
  28. pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
  29. utils::proxy_handle();
  30. let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
  31. let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
  32. let account_info = global::account_info::get_account_info("../test_account.toml");
  33. match exchange {
  34. ExchangeEnum::BinanceSwap => {
  35. let mut params: BTreeMap<String, String> = BTreeMap::new();
  36. let access_key = account_info.binance_access_key;
  37. let secret_key = account_info.binance_secret_key;
  38. params.insert("access_key".to_string(), access_key);
  39. params.insert("secret_key".to_string(), secret_key);
  40. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  41. }
  42. ExchangeEnum::BinanceSpot => {
  43. let mut params: BTreeMap<String, String> = BTreeMap::new();
  44. let access_key = account_info.binance_access_key;
  45. let secret_key = account_info.binance_secret_key;
  46. params.insert("access_key".to_string(), access_key);
  47. params.insert("secret_key".to_string(), secret_key);
  48. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  49. }
  50. ExchangeEnum::GateSwap => {
  51. let mut params: BTreeMap<String, String> = BTreeMap::new();
  52. let access_key = account_info.gate_access_key;
  53. let secret_key = account_info.gate_secret_key;
  54. params.insert("access_key".to_string(), access_key);
  55. params.insert("secret_key".to_string(), secret_key);
  56. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  57. }
  58. ExchangeEnum::GateSpot => {
  59. let mut params: BTreeMap<String, String> = BTreeMap::new();
  60. let access_key = account_info.gate_access_key;
  61. let secret_key = account_info.gate_secret_key;
  62. params.insert("access_key".to_string(), access_key);
  63. params.insert("secret_key".to_string(), secret_key);
  64. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  65. }
  66. ExchangeEnum::KucoinSwap => {
  67. let mut params: BTreeMap<String, String> = BTreeMap::new();
  68. let access_key = account_info.kucoin_access_key;
  69. let secret_key = account_info.kucoin_secret_key;
  70. let pass_key = account_info.kucoin_pass;
  71. params.insert("access_key".to_string(), access_key);
  72. params.insert("secret_key".to_string(), secret_key);
  73. params.insert("pass_key".to_string(), pass_key);
  74. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  75. }
  76. ExchangeEnum::KucoinSpot => {
  77. let mut params: BTreeMap<String, String> = BTreeMap::new();
  78. let access_key = account_info.kucoin_access_key;
  79. let secret_key = account_info.kucoin_secret_key;
  80. let pass_key = account_info.kucoin_pass;
  81. params.insert("access_key".to_string(), access_key);
  82. params.insert("secret_key".to_string(), secret_key);
  83. params.insert("pass_key".to_string(), pass_key);
  84. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  85. }
  86. ExchangeEnum::OkxSwap => {
  87. let mut params: BTreeMap<String, String> = BTreeMap::new();
  88. let access_key = account_info.okx_access_key;
  89. let secret_key = account_info.okx_secret_key;
  90. let pass_key = account_info.okx_pass;
  91. params.insert("access_key".to_string(), access_key);
  92. params.insert("secret_key".to_string(), secret_key);
  93. params.insert("pass_key".to_string(), pass_key);
  94. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  95. }
  96. ExchangeEnum::BitgetSpot => {
  97. let mut params: BTreeMap<String, String> = BTreeMap::new();
  98. let access_key = account_info.bitget_access_key;
  99. let secret_key = account_info.bitget_secret_key;
  100. let pass_key = account_info.bitget_pass;
  101. params.insert("access_key".to_string(), access_key);
  102. params.insert("secret_key".to_string(), secret_key);
  103. params.insert("pass_key".to_string(), pass_key);
  104. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  105. }
  106. }
  107. }
  108. #[allow(dead_code)]
  109. pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<OkxSwapSubscribeType>: From<T> {
  110. utils::proxy_handle();
  111. let account_info = global::account_info::get_account_info("../test_account.toml");
  112. match exchange {
  113. ExchangeEnum::BinanceSpot => {
  114. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  115. // trace!(symbol_format);
  116. // let name = format!("binance_spot@{}", symbol.to_string().to_lowercase());
  117. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  118. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  119. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  120. // let bool_v1 = Arc::new(AtomicBool::new(true));
  121. //
  122. // let params = BinanceSpotLogin {
  123. // api_key: account_info.binance_access_key,
  124. // api_secret: account_info.binance_secret_key,
  125. // };
  126. // let mut exchange_wss;
  127. // exchange_wss = BinanceSpotWs::new_label(name, false, Option::from(params), BinanceSpotWsType::PublicAndPrivate);
  128. // exchange_wss.set_symbols(vec![symbol_format]);
  129. // exchange_wss.set_subscribe(subscriber_type.into());
  130. //
  131. //
  132. // let mold_arc = Arc::new(mold.to_string());
  133. // //读取
  134. // tokio::spawn(async move {
  135. // let mold_clone = Arc::clone(&mold_arc);
  136. // loop {
  137. // if let Some(data) = read_rx.next().await {
  138. // trace!("原始数据 data:{:?}",data);
  139. // match mold_clone.as_str() {
  140. // "depth" => {
  141. // if data.data != "" {
  142. // let result = binance_spot_handle::handle_special_depth(data);
  143. // trace!(?result)
  144. // }
  145. // }
  146. // "ticker" => {
  147. // if data.data != "" {
  148. // let result = binance_spot_handle::handle_special_ticker(data);
  149. // trace!(?result)
  150. // }
  151. // }
  152. // _ => {
  153. // error!("没有该命令!mode={}", mold_clone);
  154. // panic!("没有该命令!mode={}", mold_clone)
  155. // }
  156. // }
  157. // }
  158. // };
  159. // });
  160. //
  161. // let t1 = tokio::spawn(async move {
  162. // //链接
  163. // let bool_v3_clone = Arc::clone(&bool_v1);
  164. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  165. // });
  166. // try_join!(t1).unwrap();
  167. }
  168. ExchangeEnum::BinanceSwap => {
  169. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  170. // trace!(symbol_format);
  171. // let name = format!("binance_swap@{}", symbol.to_string().to_lowercase());
  172. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  173. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  174. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  175. // let bool_v1 = Arc::new(AtomicBool::new(true));
  176. //
  177. // let api_key = env::var("binance_access_key").unwrap_or("".to_string());
  178. // let api_secret = env::var("binance_secret_key").unwrap_or("".to_string());
  179. // let params = BinanceSwapLogin {
  180. // api_key: account_info.binance_access_key,
  181. // api_secret: account_info.binance_secret_key,
  182. // };
  183. // let mut exchange_wss;
  184. // exchange_wss = BinanceSwapWs::new_label(name, false, Option::from(params), BinanceSwapWsType::PublicAndPrivate);
  185. // exchange_wss.set_symbols(vec![symbol_format]);
  186. // exchange_wss.set_subscribe(subscriber_type.into());
  187. //
  188. //
  189. // let mold_arc = Arc::new(mold.to_string());
  190. // //读取
  191. // tokio::spawn(async move {
  192. // let mold_clone = Arc::clone(&mold_arc);
  193. // loop {
  194. // if let Some(data) = read_rx.next().await {
  195. // trace!("原始数据 data:{:?}",data);
  196. // match mold_clone.as_str() {
  197. // "depth" => {
  198. // if data.data != "" {
  199. // let result = binance_handle::handle_special_depth(data);
  200. // trace!(?result)
  201. // }
  202. // }
  203. // "ticker" => {
  204. // if data.data != "" {
  205. // let result = binance_handle::handle_special_ticker(data);
  206. // trace!(?result)
  207. // }
  208. // }
  209. // _ => {
  210. // error!("没有该命令!mode={}", mold_clone);
  211. // panic!("没有该命令!mode={}", mold_clone)
  212. // }
  213. // }
  214. // }
  215. // };
  216. // });
  217. //
  218. // let t1 = tokio::spawn(async move {
  219. // //链接
  220. // let bool_v3_clone = Arc::clone(&bool_v1);
  221. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  222. // });
  223. // try_join!(t1).unwrap();
  224. }
  225. ExchangeEnum::KucoinSwap => {
  226. // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
  227. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  228. //
  229. // let name = format!("kucoin_swap@{}", symbol.to_string().to_lowercase());
  230. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  231. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  232. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  233. // let bool_v1 = Arc::new(AtomicBool::new(true));
  234. //
  235. // let params = KucoinSwapLogin {
  236. // access_key: account_info.kucoin_access_key,
  237. // secret_key: account_info.kucoin_secret_key,
  238. // pass_key: account_info.kucoin_pass,
  239. // };
  240. // let mut exchange_wss;
  241. // if ["depth", "ticker"].contains(&mold) {
  242. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Public).await;
  243. // } else {
  244. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Private).await;
  245. // }
  246. // exchange_wss.set_symbols(vec![symbol_format]);
  247. // exchange_wss.set_subscribe(subscriber_type.into());
  248. //
  249. // let mold_arc = Arc::new(mold.to_string());
  250. // tokio::spawn(async move {
  251. // let mold_clone = Arc::clone(&mold_arc);
  252. // loop {
  253. // if let Some(data) = read_rx.next().await {
  254. // trace!("原始数据 data:{:?}",data);
  255. // match mold_clone.as_str() {
  256. // "depth" => {
  257. // let result = kucoin_handle::handle_special_depth(data);
  258. // trace!(?result)
  259. // }
  260. // "ticker" => {
  261. // let result = kucoin_handle::handle_special_ticker(data);
  262. // trace!(?result)
  263. // }
  264. // "account" => {
  265. // let result = kucoin_handle::handle_account_info(data, symbol_back.clone());
  266. // trace!(?result)
  267. // }
  268. // "position" => {
  269. // let result = kucoin_handle::handle_position(data, dec!(1));
  270. // trace!(?result)
  271. // }
  272. // "orders" => {
  273. // let result = kucoin_handle::handle_order(data, dec!(0.001));
  274. // trace!(?result)
  275. // }
  276. // _ => {
  277. // error!("没有该命令!mode={}", mold_clone);
  278. // panic!("没有该命令!mode={}", mold_clone)
  279. // }
  280. // }
  281. // }
  282. // }
  283. // });
  284. //
  285. // let t1 = tokio::spawn(async move {
  286. // //链接
  287. // let bool_v3_clone = Arc::clone(&bool_v1);
  288. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  289. // });
  290. // try_join!(t1).unwrap();
  291. }
  292. ExchangeEnum::KucoinSpot => {
  293. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  294. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  295. // trace!(symbol_format);
  296. // let name = format!("kucoin_spot@{}", symbol.to_string().to_lowercase());
  297. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  298. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  299. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  300. // let bool_v1 = Arc::new(AtomicBool::new(true));
  301. //
  302. // let params = KucoinSpotLogin {
  303. // access_key: account_info.kucoin_access_key,
  304. // secret_key: account_info.kucoin_secret_key,
  305. // pass_key: account_info.kucoin_pass,
  306. // };
  307. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  308. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Public).await
  309. // } else {
  310. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Private).await
  311. // };
  312. // exchange_wss.set_symbols(vec![symbol_format]);
  313. // exchange_wss.set_subscribe(subscriber_type.into());
  314. //
  315. // let mold_arc = Arc::new(mold.to_string());
  316. // tokio::spawn(async move {
  317. // let mold_clone = Arc::clone(&mold_arc);
  318. // loop {
  319. // if let Some(data) = read_rx.next().await {
  320. // trace!("原始数据 data:{:?}",data);
  321. // match mold_clone.as_str() {
  322. // "depth" => {
  323. // if data.data != "" {
  324. // let result = kucoin_spot_handle::handle_special_depth(data);
  325. // trace!(?result)
  326. // }
  327. // }
  328. // "ticker" => {
  329. // if data.data != "" {
  330. // let result = kucoin_spot_handle::handle_special_ticker(data);
  331. // trace!(?result)
  332. // }
  333. // }
  334. // "account" => {
  335. // if data.data != "" {
  336. // let result = kucoin_spot_handle::handle_account_info(data, symbol_back.clone());
  337. // trace!(?result)
  338. // }
  339. // }
  340. // "orders" => {
  341. // if data.data != "" {
  342. // let result = kucoin_spot_handle::handle_order(data, dec!(1));
  343. // trace!(?result)
  344. // }
  345. // }
  346. // _ => {
  347. // error!("没有该命令!mode={}", mold_clone);
  348. // panic!("没有该命令!mode={}", mold_clone)
  349. // }
  350. // }
  351. // }
  352. // }
  353. // });
  354. // let t1 = tokio::spawn(async move {
  355. // //链接
  356. // let bool_v3_clone = Arc::clone(&bool_v1);
  357. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  358. // });
  359. // try_join!(t1).unwrap();
  360. }
  361. ExchangeEnum::GateSwap => {
  362. // let symbol_format = utils::format_symbol(symbol.to_string(), "_").to_uppercase();
  363. // trace!(symbol_format);
  364. // let name = format!("gate_swap@{}", symbol.to_string().to_lowercase());
  365. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  366. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  367. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  368. // let bool_v1 = Arc::new(AtomicBool::new(true));
  369. //
  370. // let params = GateSwapLogin {
  371. // api_key: account_info.gate_access_key,
  372. // secret: account_info.gate_secret_key,
  373. // };
  374. // let mut exchange_wss = GateSwapWs::new_label(name, false, Option::from(params), GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  375. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  376. // exchange_wss.set_subscribe(subscriber_type.into());
  377. //
  378. // let mold_arc = Arc::new(mold.to_string());
  379. // tokio::spawn(async move {
  380. // let mold_clone = Arc::clone(&mold_arc);
  381. // loop {
  382. // if let Some(data) = read_rx.next().await {
  383. // trace!("原始数据 data:{:?}",data);
  384. // match mold_clone.as_str() {
  385. // "depth" => {
  386. // if data.data != "" {
  387. // let result = gate_handle::handle_special_depth(data);
  388. // trace!(?result)
  389. // }
  390. // }
  391. // "ticker" => {
  392. // if data.data != "" {
  393. // let result = gate_handle::handle_special_ticker(data);
  394. // trace!(?result)
  395. // }
  396. // }
  397. // "account" => {
  398. // if data.data != "" {
  399. // let result = gate_handle::handle_account_info(data, symbol_format.clone());
  400. // trace!(?result)
  401. // }
  402. // }
  403. // "orders" => {
  404. // if data.data != "" {
  405. // let result = gate_handle::handle_order(data, dec!(1));
  406. // trace!(?result)
  407. // }
  408. // }
  409. // _ => {
  410. // error!("没有该命令!mode={}", mold_clone);
  411. // panic!("没有该命令!mode={}", mold_clone)
  412. // }
  413. // }
  414. // }
  415. // }
  416. // });
  417. // let t1 = tokio::spawn(async move {
  418. // //链接
  419. // let bool_v3_clone = Arc::clone(&bool_v1);
  420. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  421. // });
  422. // try_join!(t1).unwrap();
  423. }
  424. ExchangeEnum::BitgetSpot => {
  425. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  426. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  427. // trace!(symbol_format);
  428. // let name = format!("bitget_spot@{}", symbol.to_string().to_lowercase());
  429. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  430. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  431. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  432. // let bool_v1 = Arc::new(AtomicBool::new(true));
  433. //
  434. // let params = BitgetSpotLogin {
  435. // api_key: account_info.bitget_access_key,
  436. // secret_key: account_info.bitget_secret_key,
  437. // passphrase_key: account_info.bitget_pass,
  438. // };
  439. //
  440. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  441. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Public)
  442. // } else {
  443. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Private)
  444. // };
  445. // exchange_wss.set_symbols(vec![symbol_format]);
  446. // exchange_wss.set_subscribe(subscriber_type.into());
  447. //
  448. // let mold_arc = Arc::new(mold.to_string());
  449. // //读取
  450. // tokio::spawn(async move {
  451. // loop {
  452. // let mold_clone = Arc::clone(&mold_arc);
  453. // if let Some(data) = read_rx.next().await {
  454. // trace!("原始数据 data:{:?}",data);
  455. // match mold_clone.as_str() {
  456. // "depth" => {
  457. // if data.data != "" {
  458. // let result = bitget_spot_handle::handle_special_depth(data);
  459. // trace!(?result)
  460. // }
  461. // }
  462. // "ticker" => {
  463. // if data.data != "" {
  464. // let result = bitget_spot_handle::handle_special_ticker(data);
  465. // trace!(?result)
  466. // }
  467. // }
  468. // "account" => {
  469. // if data.data != "" {
  470. // let result = bitget_spot_handle::handle_account_info(data, symbol_back.clone());
  471. // trace!(?result)
  472. // }
  473. // }
  474. // "orders" => {
  475. // if data.data != "" {
  476. // let result = bitget_spot_handle::handle_order(data, dec!(1));
  477. // trace!(?result)
  478. // }
  479. // }
  480. // _ => {
  481. // error!("没有该命令!mode={}", mold_clone);
  482. // panic!("没有该命令!mode={}", mold_clone)
  483. // }
  484. // }
  485. // }
  486. // }
  487. // });
  488. // let t1 = tokio::spawn(async move {
  489. // //链接
  490. // let bool_v3_clone = Arc::clone(&bool_v1);
  491. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  492. // });
  493. // try_join!(t1).unwrap();
  494. }
  495. ExchangeEnum::OkxSwap => {
  496. let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  497. trace!(symbol_format);
  498. let name = format!("okx_swap@{}", symbol.to_string().to_lowercase());
  499. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  500. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  501. let write_tx_am = Arc::new(Mutex::new(write_tx));
  502. let bool_v1 = Arc::new(AtomicBool::new(true));
  503. let params = OkxSwapLogin {
  504. api_key: account_info.okx_access_key,
  505. secret_key: account_info.okx_secret_key,
  506. passphrase: account_info.okx_pass,
  507. };
  508. let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  509. OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Public)
  510. } else if ["account", "orders", "position"].contains(&mold) {
  511. OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Private)
  512. } else {
  513. OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Business)
  514. };
  515. exchange_wss.set_symbols(vec![symbol_format.clone()]);
  516. exchange_wss.set_subscribe(subscriber_type.into());
  517. let mold_arc = Arc::new(mold.to_string());
  518. tokio::spawn(async move {
  519. let mold_clone = Arc::clone(&mold_arc);
  520. loop {
  521. if let Some(data) = read_rx.next().await {
  522. match mold_clone.as_str() {
  523. "depth" => {
  524. if data.data != "" {
  525. let result = okx_handle::handle_special_depth(data);
  526. trace!(?result)
  527. }
  528. }
  529. "ticker" => {
  530. if data.data != "" {
  531. let result = okx_handle::handle_special_ticker(data);
  532. trace!(?result)
  533. }
  534. }
  535. "account" => {
  536. if data.data != "" {
  537. let result = okx_handle::handle_account_info(data, symbol_format.clone());
  538. trace!(?result)
  539. }
  540. }
  541. "position" => {
  542. if data.data != "" {
  543. let result = okx_handle::handle_position(data, dec!(10));
  544. trace!(?result)
  545. }
  546. }
  547. "orders" => {
  548. if data.data != "" {
  549. let result = okx_handle::handle_order(data, dec!(10));
  550. trace!(?result)
  551. }
  552. }
  553. _ => {
  554. error!("没有该命令!mode={}", mold_clone);
  555. panic!("没有该命令!mode={}", mold_clone)
  556. }
  557. }
  558. }
  559. }
  560. });
  561. let t1 = tokio::spawn(async move {
  562. //链接
  563. let bool_v3_clone = Arc::clone(&bool_v1);
  564. exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  565. });
  566. try_join!(t1).unwrap();
  567. }
  568. _ => {
  569. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  570. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  571. }
  572. }
  573. }